Aerospike文档阅读翻译

Aerospike

UDF

Register a UDF

三种注册方式
  • ASCII Text File
  • ASCII Text File Embedded in a Java Resource
  • Java String
Register UDF in ASCII Text File
RegisterTask task = client.register(null, "/home/user/udf/example.lua", "example.lua", Language.LUA);
// Poll cluster for completion every second for a maximum of 10 seconds.
task.waitTillComplete(1000, 10000);

/home/user/udf/example.lua是lua脚本的本地路径

example.lua是相对于Aerospike服务器配置文件aerospike.conf的lua脚本配置路径的文件名

当配置是这样的时候,注册的脚本将放在服务器的 /opt/aerospike/usr/udf/lua/example.lua路径下

mod-lua {user-path /opt/aerospike/usr/udf/lua
}

注册脚本的过程是异步的,服务器接收完了就响应确认的信息,之后再同步到集群的其他节点,所以需要设置等待时间来等待注册脚本任务执行完成

Register UDF in Java Resource

脚本也可以放置于项目的classpath资源路径,例如src/main/resources/udf/example.lua,那么就可以使用这种方式注册

RegisterTask task = client.register(null, MyClass.class.getClassLoader(), "udf/example.lua", "example.lua", Language.LUA);
// Poll cluster for completion every second for a maximum of 10 seconds.
task.waitTillComplete(1000, 10000);
Register UDF in Java String

也可以把脚本放于Java代码中,直接注册

String newline = "\n";
String code = "-- Validate value before writing." + newline + "function writeWithValidation(r,name,value)" + newline + "    if (value >= 1 and value <= 10) then" + newline + "      if not aerospike:exists(r) then" + newline + "        aerospike:create(r)" + newline + "      end" + newline + "      r[name] = value" + newline + "      aerospike:update(r)" + newline + "    else" + newline + "        error("1000:Invalid value")" + newline +  "    end" + newline + "end" + newline + newline + "-- Set a particular bin only if record does not already exist." + newline +"function writeUnique(r,name,value)" + newline +"    if not aerospike:exists(r) then" + newline +"        aerospike:create(r)" + newline +"        r[name] = value" + newline +"        aerospike:update(r)" + newline +"    end" + newline +"end" + newline;RegisterTask task = client.registerUdfString(null, code, "example.lua", Language.LUA);
// Poll cluster for completion every second for a maximum of 10 seconds.
task.waitTillComplete(1000, 10000);

Apply UDF on a Record

对于单条记录的操作称为单记录操作,记录可能存在也可能并不存在,Aerospike将针对这种情况进行创建或更新记录

为了能调用UDF,需要使用AerospikeClient.execute()

public class AerospikeClient {public final Object execute(Policy policy,Key key,String packageName,String functionName,Value... args)   throws AerospikeException
}
  • key — 你要操作的Key,包含了namespace,setname和userKey.setname和userKey一起转成摘要
  • packageName — UDF的模块,一个模块就是一个文件,可以包含多个函数
  • functionName — 函数名
  • args — 函数形参

example

function readBin(r, name)return r[name]
end

这个例子定义了一个读取一个记录指定的Bin的函数,注册到服务器之后,就可以在客户端这样调用它

String result = (String) client.execute(null, key, "examples", "readBin", Value.get("name")
);
Multiple Arguments

UDF也可以传多个参数,例如

function multiplyAndAdd(r, a, b)return r['factor'] * a + b;
end

这个例子将记录指定Bin的系数*a+b的结果返回回来,调用和之前的基本类似,就是多传几个参数的问题

String result = (String) client.execute(null, key, "examples", "multiplyAndAdd", Value.get(10), Value.get(5)
);

Query

Manage Indexes

Aerospike允许使用Java来添加或者删除二级索引

Creating a Secondary Index

为了创建二级索引,需要调用AerospikeClient.createIndex().二级索引的创建是异步的,结果的返回是在二级索引传播到集群之前,当然也有选项来设置等待异步任务的完成

public class AerospikeClient {public final IndexTask createIndex(Policy policy,String namespace,String setName,String indexName,String binName,IndexType indexType)   throws AerospikeException
}
IndexTask task = client.createIndex(null, "foo", "bar", "idx_foo_bar_baz", "baz", IndexType.NUMERIC);task.waitTillComplete();

这个例子创建了一个二级索引

  • fool是namespace
  • bar是setname
  • iidx_foo_bar_baz是索引名称
  • baz就是Bin了

二级索引是namespace,set和binName的组合,integer,string等都可以.但是只能被创建一次,不限制类型,如果第一次创建使用了integer,后面使用非integer来创建将不会有新的索引产生,因为会报错.索引只要被创建,会被传播到集群其他节点

Removing a Secondary Index

删除索引需要调用AerospikeClient.dropIndex()

public class AerospikeClient {public final void dropIndex(Policy policy,String namespace,String setName,String indexName)   throws AerospikeException
}

Query Records

Aerospike Java client APIs 使用 二级索引来进行查询记录

Defining the Query

定义查询条件,只需要使用com.aerospike.client.query.Statement

先创建一个Statement实例

Statement stmt = new Statement()

查询指定的namespace和setname,需要设置Statement的属性

stmt.setNamespace("foo");
stmt.setSetName("bar");

注意使用二级索引查询namespace是必须的,setname是可选项

为了查询二级索引,可以设置Filter

public final class Statement {void setFilters(Filter... filters);
}

这个看似是可以传入多个条件的,但是目前Aerospike服务器只支持单一的FIlter

Filter的创建是使用com.aerospike.client.query.Filter的静态方法来构造的,例如

  • Filter.equal(name, value) — 查询Bin name=指定value(integer/string)的记录.
  • Filter.range(name, begin, end) —根据name查询指定范围的记录

注意,这个只是返回符合条件的记录,记录可能含有N个bin,并不是返回符合条件的bin

Transforming Records

Aerospike 提供了使用UDF来查询并更改记录的功能,类似于SQL的update set …where…

Defining the Record UDF

定义一个脚本

function queryThenSet(r, firstName, secondName, value)local val = r[firstName]-- 如果是9的倍数,删除recordif val % 9 == 0 thenaerospike:remove(r)end-- 如果是5的倍数,删除secondName的binif val % 5 == 0 thenr[secondName] = nilaerospike:update(r)end-- 如果是2的倍数,对value进行累加if val % 2 == 0 thenr[firstName] = val + value;aerospike:update(r)end
end
Registering UDFs

将UDF注册到服务器

Initializing the Query Statement

初始化查询条件

 Statement statement = new Statement();statement.setNamespace(key.namespace);statement.setSetName(key.setName);statement.setFilter(Filter.range(bin1.name, 2, 35));

注意使用QueryAndSet需要提前准备好二级索引

Executing a UDF on a Record

执行脚本

private static void prepareQueryAndSet() {AerospikeClient client = multiClient();Key key = new Key("test", "lrjTest", "lrjSet");Bin bin1 = new Bin("first", 15);Bin bin2 = new Bin("second", 33);Bin bin3 = new Bin("bin1", 15);Bin bin4 = new Bin("bin2", 18);Bin[] bins = {bin1, bin2, bin3, bin4};client.put(null, key, bins);for (Bin bin : bins) {try {IndexTask dropIndex = client.dropIndex(null, key.namespace, key.setName, "queryAndSetIndex" + bin.name);dropIndex.waitTillComplete();} catch (AerospikeException e) {e.printStackTrace();}try {IndexTask task = client.createIndex(null, key.namespace, key.setName, "queryAndSetIndex"+bin.name, bin.name, IndexType.NUMERIC);task.waitTillComplete();} catch (AerospikeException e) {e.printStackTrace();}}Statement statement = new Statement();statement.setNamespace(key.namespace);statement.setSetName(key.setName);statement.setFilter(Filter.range(bin1.name, 2, 35));ExecuteTask execute = client.execute(null, statement, "queryAndSet", "queryThenSet",Value.get(bin1.name), Value.get(bin2.name), Value.get(22));execute.waitTillComplete();System.out.println(client.get(null, key, bin1.name));System.out.println(client.get(null, key, bin2.name));close(client);
}

Aggregate Records

对查询出来的记录的处理方式有很多种,最常见的莫过于对记录进行聚合了

使用SQL来开发的时候,我们需要定义一些查询条件来进行统计,例如对数据的某个字段等于某个值进行统计

SELECT count(*)
FROM test.demo 
WHERE d = 50

在Aerospike中,我们也可以使用UDF来实现这样的查询统计,在这之前呢,你需要具备一些知识

  • Connecting— 如何连接Aerospike集群.
  • Manage Indexes— 如何创建二级索引.
  • Register UDF — 注册UDF.
  • Query Records— 创建查询和执行查询

Defining the Query

创建查询条件

Statement stmt = new Statement()
stmt.setNamespace("foo");
stmt.setSetName("bar");
stmt.setFilters( Filter.range("baz", 0,100) );
stmt.setBinNames("baz");

这个例子返回namespace=foo and setName=bar 且binName=baz的值在0-100之间的的所有的记录

Defining the Stream UDF

定义流处理UDF

local function one(rec)return 1
endlocal function add(a, b)return a + b
endfunction count(stream)return stream : map(one) : reduce(add);
end

count() 传入了流对象,这个例子是从查询结果的对象流.我们对结果的流进行了两个操作.

  • map —stream中的map操作是将对象的一个输入转成另一个输出,这个例子,我们想要的是次数,所以每条记录就记录为1,即result->1.
  • reduce — Reduces阶段是将多个输入合成一个输出,我们将stream.map()的输出1累加起来.

Registering the UDF

注册UDF到服务器

client.register(null, "udf/example.lua", "example.lua", Language.LUA);

Configuring the UDF Search Path

对于流式处理,客户端方必须指定客户端本地的脚本路径,不然会报FileNotFountException

LuaConfig.SourceDirectory = "D:/aerospike-learning/src/main/resources/udf";

Executing the Query

执行聚合查询,我们使用的是AerospikeClient.queryAggregate()

public class AerospikeClient {public final ResultSet queryAggregate(QueryPolicy policy,Statement statement,String packageName,String functionName,Value... functionArgs) throws AerospikeException
}

Processing the Results

聚合结果返回的是ResultSet对象,我们可以使用迭代器来处理它

if (rs.next()) {Object result = rs.getObject();System.out.println("Count = " + result);
}

Cleaning Up Resources

所有的流对象记得用完关闭它

rs.close();

full code

private static void countByQuery() {final AerospikeClient client = multiClient();//注册脚本registerUDF("D:/aerospike-learning/src/main/resources/udf/countByQuery.lua", "countByQuery.lua");//准备10条记录final List<Key> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(new Key("test", "lrjTest", "record" + i));}final Random random = new Random();//插入AS中list.forEach(key -> {client.put(null, key, new Bin("userId", key.userKey), new Bin("amount", random.nextInt(500)));});//创建amount的二级索引client.createIndex(null, "test", "lrjTest", "amount-index","amount", IndexType.NUMERIC).waitTillComplete();//插入完了,先来遍历一下插入的结果System.out.println("--------after insert--------");list.forEach(key -> System.out.println(client.get(null, key)));//创建查询条件Statement statement = new Statement();statement.setNamespace("test");statement.setSetName("lrjTest");statement.setBinNames("amount");statement.setFilter(Filter.range("amount", 150, 300));LuaConfig.SourceDirectory = "D:/aerospike-learning/src/main/resources/udf";try(ResultSet resultSet = client.queryAggregate(null, statement, "countByQuery", "count");) {System.out.println("--------count using udf--------");while (resultSet.next()) {System.out.println(resultSet.getObject());}}close(client);
}

Scan

Scan Records

Aerospike也提供了扫描所有记录的功能

Scanning Records
private static void scanAllRecord() {AerospikeClient client = multiClient();ScanPolicy scanPolicy = new ScanPolicy();scanPolicy.priority = Priority.LOW;client.scanAll(scanPolicy, "test", "lrjTest", (key, record) -> System.out.println(record.bins));close(client);
}

可以定制不同的扫描策略,例如这个例子就使用低等级事务扫描,其他的设置还有是否包含具体Bin,超时时间等

扫描遭遇异常将抛出异常中断扫描线程,同时通知其他节点中断扫描线程

Apply UDF on Scan

Data Types

支持的数据类型

  • String
  • Integer
  • Blob
  • Map
  • List

Java代码设置一个值,Aerospike将自动的将数据转换成Aerospike合适的数据类型

  • Integers and longs 将倍转成Aerospike内部指定格式的数字类型
  • Strings 将转换成UTF-8编码格式的字符串.
  • Byte arrays 字节数组将使用blob存储.

Aerospike可以接受各种Java数据类型,包括各种复杂的集合对象,在存入时对对象进行序列化,取出时进行反序列化

使用Java序列化的时候千万要小心,尽管使用序列化能让你存储各种各样的数据类型,但是如果你的对象改了,在反序列化的时候将很容易出问题,在Java中改变一个类是很常见的,推荐使用不可变的Integer,String数组等

貌似不推荐面向对象编程,NoSql还是map比较好,随时添加/删除字段,没有这些问题

Logging

Aerospike java客户端提供了对于debug来说非常有用的日志接口

日志功能默认是禁用的

Log Callback

为了能使用日志,需要提供对于每条日志的回调.回调的类需要是 com.aerospike.client.Log.Callback的实例

public interface Callback {public void log(Log.Level level, String message);
}

为了能够记录日志,需要创建Callback的实例,然后调用Log.setCallback()来设置它

class Log {public static void setCallback(Log.Callback callback);
}

example

public class MyLogging implements Log.Callback {@Overridepublic void log(Log.Level level, String message) {Date date = new Date();System.out.println(date.toString() + ' ' + level + ' ' + message);}
}

现在可以创建日志回调实例,然后使用它了

Log.Callback callback = new MyLogging();
Log.setCallback(callback);

Log Level

为了能控制日记级别,需要调用Log.setLevel()来设置

Log.setLevel(Log.Level.INFO);

full code

private static void useLogging() {Log.Callback callback = new MyLogging();Log.setCallback(callback);Log.setLevel(Log.Level.DEBUG);registerUDF("D:/aerospike-learning/src/main/resources/udf/useLogging.lua", "useLogging.lua");AerospikeClient client = multiClient();Key key = new Key("test", "lrjTest", "userKey");client.put(null, key, new Bin("len", 4), new Bin("width", 5), new Bin("height", 6));Object result = client.execute(null, key, "useLogging", "volume", Value.get("len"), Value.get("width"), Value.get("height"));System.out.println("result:" + result);close(client);
}
-- useLogging.lua
function volume(r,len, width, height)info("len:{},width:{},height:{}", len, width, height)local result = r[len] * r[width] * r[height]info("result:{}", result)return result
end

输出:

Mon Dec 09 11:01:33 CST 2019 DEBUG Update partition map for node BB90565D7290C00 host1 3000
Mon Dec 09 11:01:33 CST 2019 DEBUG Update partition map for node BB91E9DA3290C00 host2 3000
Mon Dec 09 11:01:33 CST 2019 DEBUG Update partition map for node BB95F52F6290C00 host3 3000
Mon Dec 09 11:01:33 CST 2019 DEBUG Add seed host1 3000
Mon Dec 09 11:01:33 CST 2019 DEBUG Add seed host2 3000
Mon Dec 09 11:01:33 CST 2019 DEBUG Add seed host3 3000

这个日志其实没啥意思,lua脚本的日志根本都打印不出来…

Error Handling

Aerospike抛出异常有以下几类

ExceptionDescription
AerospikeException基本的异常,异常码可以参考com.aerospike.client.ResultCode
AerospikeException.Timeout事务超时异常.
AerospikeException.SerializeError using java or messagepack encoding/decoding.
AerospikeException.Parse响应信息解析异常.
AerospikeException.Connection连接服务器异常.
AerospikeException.InvalidNode使用非正常运行节点异常.
AerospikeException.ScanTerminated扫描提前终止了.
AerospikeException.QueryTerminated查询提前终止了.
AerospikeException.CommandRejected异步操作命令被拒绝,超过了允许的最大并发异步命令数.

异常处理example

AerospikeClient client = new AerospikeClient("192.168.1.150", 3000);
try {try {client.put(null, new Key("test", "demo", "key"), new Bin("testbin", 32));}catch (AerospikeException.Timeout aet) {// Handle timeouts differently.retryTransaction();}catch (AerospikeException ae) {throw new MyException("AerospikeException " + ae.getResultCode() + ": " + ae.getMessage());}
}
finally {client.close();
}

Asynchronous API

AerospikeClient 提供了异步函数,事件队列事件回调监听作为函数的额外参数,异步函数将把事件队列注册到服务器上,异步事件线程会处理命令,并将结果返回给回调监听.

  • 特点

    异步函数只使用少量的线程,线程的利用率笔同步函数高得多.

  • 不足

    异步编程模型很难实现,调试和维护

Create Event Loops

Eventloop果然很头疼…懒得看了


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部