Redis Java 客户端 - Lettuce
今天学习下Redis Java客户端开源项目 - Lettuce,Lettuce支持同步、异步通信的方式 API调用,也支持响应式编程API,包括发布/订阅消息、高可用性服务部署架构。
开始之旅
Maven依赖
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.2.2.RELEASE</version>
</dependency>
第一个示例
public static void main(String[] args) {
//连接第0个数据库
RedisClient redisClient = RedisClient.create("redis://localhost:6379/0");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
syncCommands.set("key", "Hello, Redis!");
connection.close();
redisClient.shutdown();
}
执行以上代码后,在redis服务端存储 key => Hello, Redis! 数据。
创建连接
Redis Standalone、Sentinel、Cluster 模式创建Redis连接需要指定详细的连接参数,如Redis数据库地址、密码、超时信息等,最终形成RedisURI对象,最终通过RedisClient.create方法创建连接对象。使用方式有以下几种
public static RedisClient getRedisClient(){
//直接指定连接信息
RedisURI uri = RedisURI.create("redis://localhost:6379/0");
return RedisClient.create(uri);
}
public static RedisClient getRedisClient(){
RedisURI uri = RedisURI.Builder.redis("localhost", 6379)
.withDatabase(1)
.withPassword(new StringBuffer("password"))
.build();
return RedisClient.create(uri);
}
public static RedisClient getRedisClient(){
RedisURI uri = new RedisURI("localhost",6379, Duration.ofSeconds(5));
return RedisClient.create(uri);
}
URI 语法
Redis 部署方式不同,URI连接的语法也略有区别
-
单点部署
redis :// [[username :] password@] host [:port][/database] [?[timeout=timeout[d|h|m|s|ms|us|ns]]
-
单点- SSL部署
rediss :// [[username :] password@] host [: port][/database] [?[timeout=timeout[d|h|m|s|ms|us|ns]]
-
单点- Unix Domain Sockets
redis-socket :// [[username :] password@]path [?[timeout=timeout[d|h|m|s|ms|us|ns]] [&database=database]]
-
Redis Sentinel
redis-sentinel :// [[username :] password@] host1[:port1] [, host2[:port2]] [, hostN[:portN]] [/database] [?[timeout=timeout[d|h|m|s|ms|us|ns]] [&sentinelMasterId=sentinelMasterId]
异常捕获
Redis服务发出异常、错误响应的情况下,客户端将收到 RedisException 异常信息(包含其子类),该异常是一种运行时异常。
# redis.conf 配置密码
requirepass andy
public static RedisClient getRedisClient(){
RedisURI uri = RedisURI.Builder.redis("localhost", 6379)
.withDatabase(1)
.withSsl(false)
// 指定错误的密码
.withPassword(new StringBuffer("password"))
.build();
return RedisClient.create(uri);
}
同步API
public static void main(String[] args) {
RedisClient redisClient = getRedisClient();
StatefulRedisConnection<String, String> connection = redisClient.connect();
// RedisCommands 为连接生成的同步对象
RedisCommands<String, String> syncCommands = connection.sync();
// 所有依赖 RedisCommands 的方法都同步返回 (即直接返回结果)
String value = syncCommands.get("key");
System.out.println(value);
connection.close();
redisClient.shutdown();
}
同步API调用的方式非常简单,不做过多的介绍,提一下主要是为了区别后续的异步API
异步API
Lettuce 从4.x的版本开始提供的异步API,Lettuce的异步API是构建在Netty Reactor模型的基础之上,分为连接线程、工作线程。所有的通信过程都是异步处理。目的是为了更好地利用系统资源,而不浪费线程等待网络、磁盘IO的时间。使用异步API将提高系统的吞吐量,但是其异步处理模式会比同步调用复杂,也会给开发者带来一定的难度。
异步示例
Lettuce异步API上的每个命令都会创建一个RedisFuture<T>对象,该提供了取消、等待、订阅、监听相关的功能。RedisFuture是指向初始未知结果的指针,因为其值的计算可能尚未完成,RedisFuture<T>提供同步等待操作。接下来将通过多种方式异步获取结果
public class AsynchronousAPI {
static RedisClient client = null;
Executor sharedExecutor = Executors.newSingleThreadExecutor();
@BeforeAll
public static void initClient(){
client = RedisClient.create("redis://andy@localhost:6379/0");
}
@AfterAll
public static void shutdown(){
client.shutdown();
}
@Test
public void test1() throws Exception{
RedisAsyncCommands<String, String> commands = client.connect().async();
RedisFuture<String> future = commands.get("key");
// 方式1
System.out.println("同步等待结果: " + future.get());
// 方式二
System.out.println("同步超时等待结果: " + future.get(1,TimeUnit.SECONDS));
}
}
@Test
public void test2() throws Exception{
RedisAsyncCommands<String, String> commands = client.connect().async();
RedisFuture<String> future = commands.get("key");
// 方式三
future.thenAccept(new Consumer<>() {
@Override
public void accept(String value) {
System.out.println(value);
}
});
// 方式四 lambda 写法
future.thenAccept(System.out::println);
//方式五
future.thenAcceptAsync(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println(value);
}
}, sharedExecutor);
}
如果对JDK juc包比较熟悉的童鞋,相信对以上方式理解起来不会太困难。
@Test
public void test3() throws Exception{
RedisAsyncCommands<String, String> commands = client.connect().async();
RedisFuture<String> future = commands.get("key");
// 等待指定时间
if(!future.await(1, TimeUnit.MINUTES)) {
System.out.println("Could not complete within the timeout");
}
System.out.println(future.get());
}
异步阻塞调用
如果需要对系统的某些部分执行批处理/添加并发,则异步阻塞是一个非常好的选择。批处理的一个示例可以是设置/检索多个值,并在处理的某一点之前等待结果。
@Test
public void test(){
RedisAsyncCommands<String, String> commands = client.connect().async();
List<RedisFuture<String>> futures = new ArrayList<RedisFuture<String>>();
for (int i = 0; i < 10; i++) {
futures.add(commands.set("key-" + i, "value-" + i));
}
//批量等待结果
LettuceFutures.awaitAll(1, TimeUnit.MINUTES, futures.toArray(new RedisFuture[futures.size()]));
//获取返回 直接输出原始协议
futures.stream().forEach(System.out::println);
}
错误处理
错误处理是每一个应用程序不可或缺的组成部分 ,需要在设计之处就应该考虑,一种通用的错误处理机制。一般情况下,错误处理有一下几种方式:
- 返回默认值
- 使用备份机制
- 重试
先来看一下Lettuce遇到错误时,如何使用默认值进行处理。测试步骤
- 首先在redis 设置一个map 结构数据
首先在redis 设置一个map 结构数据
127.0.0.1:6379> hset mapkey key1 value1 key2 value2
(integer) 2
- 使用 get string的方式获取数据,触发异常
-
方式一
@Test public void test4() throws Exception{ RedisAsyncCommands<String, String> commands = client.connect().async(); RedisFuture<String> future = commands.get("mapkey"); future.handle(new BiFunction<String, Throwable, String>() { @Override public String apply(String value, Throwable throwable) { if(throwable != null) { return "default value"; } return value; } }).thenAccept(new Consumer<String>() { @Override public void accept(String value) { System.out.println("Got value: " + value); } }); }
-
方式二
@Test public void test5() throws Exception{ RedisAsyncCommands<String, String> commands = client.connect().async(); RedisFuture<String> future = commands.get("mapkey"); future.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable throwable) { if (throwable instanceof IllegalStateException) { String message = "default value"; System.out.println(message); return message; } return "other default value"; } }); }
发布订阅
Lettuce支持发布/订阅Redis独立和Redis群集连接。订阅通道建立后,将在消息/订阅/未订阅事件中通知连接。提供了同步、异步和反应式API,以与Redis发布/订阅功能交互。
同步订阅
public class SubAPI {
static RedisClient client = null;
@BeforeAll
public static void initClient(){
client = RedisClient.create("redis://andy@localhost:6379/0");
}
@AfterAll
public static void shutdown(){
client.shutdown();
}
@Test
public void test0() throws Exception{
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println("sub message :" + message);
}
});
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
TimeUnit.MINUTES.sleep(1);
}
}
异步订阅
从开发角度来看,异步订阅值需要替换相关的API即可
//RedisPubSubCommands<String, String> sync = connection.sync();
RedisPubSubAsyncCommands<String, String> async = connection.async();
响应式API
@Test
public void test2() throws Exception{
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();
reactive.observeChannels().doOnNext(patternMessage -> {
System.out.println(patternMessage.getMessage());
}).subscribe();
TimeUnit.MINUTES.sleep(1);
}
Cluster 集群
Redis群集支持发布/订阅,但通常需要注意。用户发布消息(调用PUBLISH)在整个集群中广播,而不考虑对特定频道/模式的订阅。此行为允许连接到任意群集节点并注册订阅。客户端不需要连接到发布消息的节点。关于单机集群搭建 请参考
因此,订阅代码跟之前一样。
事务处理
Redis事务允许在一个步骤中执行一组命令。可以使用WATCH、UNWATCH、EXEC、MULTI和DISCARD命令控制事务。
同步事务
@Test
public void test0(){
StatefulRedisConnection<String, String> connect = client.connect();
RedisCommands<String, String> sync = connect.sync();
sync.multi();
sync.set("hello", "world");
sync.set("java", "world");
TransactionResult exec = sync.exec();
//返回多个命令的执行结果 两个OK
exec.stream().forEach(t -> System.out.println(t));
}
异步事务
@Test
public void test1(){
RedisAsyncCommands<String, String> async = client.connect().async();
async.multi();
async.set("key", "value");
async.set("java", "world");
RedisFuture<TransactionResult> future = async.exec();
future.thenAccept(new Consumer<TransactionResult>() {
@Override
public void accept(TransactionResult objects) {
objects.forEach(t -> System.out.println(t));
}
});
}
响应式API
@Test
public void test2(){
RedisReactiveCommands<String, String> reactive = client.connect().reactive();
reactive.multi().subscribe(multiResponse -> {
reactive.set("key", "1").subscribe();
reactive.incr("key").subscribe();
reactive.exec().subscribe();
});
}
集群事务
默认情况下,群集连接执行路由。这意味着,开发者无法真正确定命令在哪个主机上执行。如果事务中的多个KEY不会分配到同一个哈希槽 由于多点网络通信的问题,无法保证事务的一致性。这一点请特别注意。
集成Spring Boot
- 添加依赖
集成Spring Boot需要在pom文件中增加如下依赖
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
-
新增配置属性
# 在 application.yml 中添加以下依赖 spring: redis: port: 6379 password: andy host: localhost timeToLive: 60
-
自动注入
@Configuration public class RedisConfiguration { @Value("${spring.redis.host}") private String url; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Bean public RedisStandaloneConfiguration redisStandaloneConfiguration() { RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(url, port); redisStandaloneConfiguration.setPassword(password); return redisStandaloneConfiguration; } @Bean public ClientOptions clientOptions() { return ClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .autoReconnect(true) .build(); } @Bean public RedisConnectionFactory connectionFactory(RedisStandaloneConfiguration redisStandaloneConfiguration) { LettuceClientConfiguration configuration = LettuceClientConfiguration.builder() .clientOptions(clientOptions()).build(); return new LettuceConnectionFactory(redisStandaloneConfiguration, configuration); } @Bean @ConditionalOnMissingBean(name = "redisTemplate") @Primary public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } }
-
实例代码
@Autowired
private StringRedisTemplate redisTemplate;
@GetMapping("/getkey")
public String getKey(){
return redisTemplate.opsForValue().get("hello");
}