Aerospike 的使用案例

微信扫一扫,分享到朋友圈

Aerospike 的使用案例

导入依赖

Java 程序多用Maven 构建,需要导入一下依赖包:

<!--  KV 缓存 aerospike -->
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>4.1.11</version>
</dependency>

Aerospike 天然支持Netty,如果需要用到 Netty 做 IO 循环事件,需要增加如下 POC 依赖:

<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>4.1.11</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.11.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.11.Final</version>
</dependency>
<!-- Only needed when using epoll event loops on linux -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>4.1.11.Final</version>
</dependency>
</dependencies>

配置客户端连接

单点连接

// 方法 1:最简单的方式,使用默认的客户端连接策略
AerospikeClient aerospikeClient = new AerospikeClient("10.57.30.214", 3000);
// 方法 2:自己创建客户端连接策略
// 创建全局写策略
WritePolicy policy = new WritePolicy();
policy.setTimeout(3000); // 单位毫秒
// 创建客户端连接策略
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.writePolicyDefault = policy;
// 创建客户端
AerospikeClient aerospikeClient = new AerospikeClient(clientPolicy, "10.57.30.214", 3000);

集群连接

集群同步方式:

// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"
String[] arr = hostStr.split(",");  // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0],  Integer.valueOf(hostPort[1]));
hosts[i] = host;
}
WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.writePolicyDefault = wp;
AerospikeClient   aerospikeClient = new AerospikeClient(clientPolicy, hosts);

集群异步方式:

// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"
String[] arr = hostStr.split(",");  // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0],  Integer.valueOf(hostPort[1]));
hosts[i] = host;
}
WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);
wp.timeoutDelay = 50;
AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
clientPolicy.writePolicyDefault = wp;
AsyncClient   aerospikeClient = new AsyncClient(clientPolicy, hosts);

Springboot 配置

#--------------- aerospike -----------------#
#as配置
aerospike.endpoints=10.57.30.214:3000
aerospike.namespace=ns1
aerospike.timeout=3000
#as的单台最大tps,压测任意调大,线上1000000,单位为一分钟
aerospike.max.tps=1000000
#as单条record最大长度限制
aerospike.data.maxLen=8388608
package cn.openmind.conf;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* AerospikeConfig
*
* @author zhoujunwen
* @date 2020-09-16 10:37
* @desc
*/
@Configuration
@Getter
@Setter
@Slf4j
public class AerospikeConf {
@Value("{aerospike.namespace}")
private String namespace;
@Value("{aerospike.endpoints}")
private String hostStr;
@Value("${aerospike.timeout}")
private Integer timeout;
@Bean
public AerospikeClient aerospikeClient() {
AerospikeClient client = new AerospikeClient(new ClientPolicy(), getHost());
return client;
}
@Bean
public WritePolicy writePolicy() {
WritePolicy policy = new WritePolicy();
policy.setTimeout(getTimeout());
return policy;
}
private Host[] getHost() {
if (StringUtils.isBlank(getHostStr())) {
log.warn(" 初始化 arerospike 异常,因为没有配置 host 列表");
return null;
}
String[] arr = getHostStr().split(",");
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
hosts[i] = host;
}
return hosts;
}
}

在需要的地方通过注解注入即可。比如:

@Autowired
private AerospikeClient aerospikeClient;

定义接口

一般而言,我们项目中存在多种缓存机制。大部分的时候,我们优先选择了 Redis,而且对 redis 的数据结构也相对熟悉。因此,在定义接口时,我们需要借鉴 redis client 的方法,方便缓存迁移。

package cn.openmind.cache;
import java.util.List;
/**
* IKVStorage
*
* @author zhoujunwen
* @date 2020-09-16 09:51
* @desc
*/
public interface IKVStorage {
/**
* 设置 key 的过期时间
*
* @param key 缓存 key
* @param ttl ttl
* @return
* @throws Exception
*/
boolean touch(String key, int ttl) throws Exception;
/**
* 自增
*
* @param key 缓存 key
* @param by  自增步长
* @param ttl
* @return
* @throws Exception
*/
long incr(String key, long by, int ttl) throws Exception;
/**
* 删除
*
* @param key
* @return
* @throws Exception
*/
boolean delete(String key) throws Exception;
/**
* 判断 key 是否存在
*
* @param key
* @return
* @throws Exception
*/
boolean exists(String key) throws Exception;
/**
* 设值
*
* @param key   key
* @param value 值, 支持对象和基本数据类型
* @param ttl   ttl
* @return
*/
boolean setString(String key, String value, int ttl);
/**
* @param key
* @return
*/
String getString(String key);
/**
* 入队
*
* @param key
* @param value
* @return
*/
<T> boolean lpush(String key, T... value) throws Exception;
/**
* 从列表中删除某个元素
*
* @param key
* @param value
* @return
*/
<T> boolean lrem(String key, T value);
/**
* 获取队列成员
*
* @param key
* @param start
* @param end
* @return
*/
List<?> lrange(String key, int start, int end);
}

实现接口

package cn.openmind.cache.impl;
import cn.openmind.conf.AerospikeConf;
import cn.tongdun.kara.biz.service.dal.IKVStorage;
import com.aerospike.client.*;
import com.aerospike.client.cdt.*;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
/**
* AspKVStorageImpl
*
* @author zhoujunwen
* @date 2020-09-16 09:52
* @desc
*/
@Slf4j
@Component("aspKVStorage")
public class AspKVStorageImpl implements IKVStorage {
@Autowired
@Setter
private AerospikeClient aerospikeClient;
@Autowired
@Setter
private AerospikeConf aerospikeConf;
// as 的数据集名称,相当于 mysql 表名
private static final String dataSetName = "kara";
private static final String BIN = "data";
private static final Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
/**
* 设置 key 的过期时间
*
* @param key 缓存 key
* @param ttl ttl
* @return
* @throws Exception
*/
@Override
public boolean touch(String key, int ttl) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
try {
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
aerospikeClient.touch(wp, k);
} catch (AerospikeException e) {
log.warn("it happens error when set ttl {} for key {} ", ttl, key);
throw new Exception(e);
} finally {
}
return true;
}
/**
* 自增
*
* @param key 缓存 key
* @param by  自增步长
* @param ttl
* @return
* @throws Exception
*/
@Override
public long incr(String key, long by, int ttl) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
long res;
try {
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
Operation[] opt = new Operation[]{Operation.add(new Bin(BIN, by)), Operation.get(BIN)};
Record record = aerospikeClient.operate(wp, k, opt);
if (record == null) {
return 0L;
}
res = record.getLong(BIN);
} catch (AerospikeException e) {
throw new Exception(e);
} finally {
}
return res;
}
/**
* 删除
*
* @param key
* @return
* @throws Exception
*/
@Override
public boolean delete(String key) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
boolean res;
try {
res = aerospikeClient.delete(null, k);
} catch (AerospikeException e) {
throw new Exception(e);
} finally {
}
return res;
}
/**
* 判断 key 是否存在
*
* @param key
* @return
*/
@Override
public boolean exists(String key) {
if (StringUtils.isBlank(key)) {
return false;
}
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
return aerospikeClient.exists(null, k);
}
/**
* 设值
*
* @param key   key
* @param value 值, 支持对象和普通字符串
* @param ttl   ttl
* @return
*/
@Override
public boolean setString(String key, String value, int ttl) {
if (value == null) {
return false;
}
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Bin bin = new Bin(BIN, value);
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.maxRetries = 1;
wp.recordExistsAction = RecordExistsAction.REPLACE;
aerospikeClient.put(wp, k, bin);
return true;
}
/**
* @param key
* @return
*/
public String getString(String key) {
if (key == null) {
return null;
}
try {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Record r = aerospikeClient.get(null, k);
return r != null ? r.getString(BIN) : null;
} catch (Throwable ex) {
throw ex;
}
}
/**
* lpush, push 时去重,支持多个值
*
* @param key
* @param value
* @return
*/
public <T> boolean lpush(String key, T... value) {
if (value == null || value.length == 0) {
return false;
}
WritePolicy policy = new WritePolicy();
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
ListPolicy listPolicy = new ListPolicy(ListOrder.UNORDERED, ListWriteFlags.ADD_UNIQUE);
for (T v : value) {
if (v == null) {
continue;
}
Value val = Value.get(v);
Operation opt = ListOperation.append(listPolicy, BIN, val);
try {
Record rec = aerospikeClient.operate(policy, k, opt);
log.info("asp lpush return record: [{}]", rec);
} catch (AerospikeException e) {
if (e.getResultCode() == ResultCode.ELEMENT_EXISTS) {
log.warn("element [{}] already exists in [{}] key", v, key);
continue;
} else {
return false;
}
} finally {
}
}
return true;
}
@Override
public <T> boolean lrem(String key, T value) {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Operation opt = ListOperation.removeByValue(BIN, Value.get(value), ListReturnType.COUNT);
Record r = aerospikeClient.operate(null, k, opt);
int count = r != null ? r.getInt(BIN) : 0;
log.info("asp lrem from key:[{}] remove value:[{}] count:[{}]", key, Value.getAsBlob(value), count);
return true;
}
/**
* 获取队列成员
*
* @param key
* @param start
* @param end
* @return
*/
@Override
public List<?> lrange(String key, int start, int end) {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Operation opt = ListOperation.getRange(BIN, start);
Record r = aerospikeClient.operate(null, k, opt);
return r != null ? r.getList(BIN) : Collections.emptyList();
}
}

点击量:1

微信扫一扫,分享到朋友圈

Aerospike 的使用案例

如何让“白嫖党”心甘情愿为产品掏钱?

上一篇

伽马数据发布《“传奇”IP影响力报告》:已产生900亿+流水 未来三年有望增至1300亿+

下一篇

你也可能喜欢

Aerospike 的使用案例

长按储存图像,分享给朋友