【Redis】Redis高级客户端Lettuce详解

news2025/10/25 21:25:55

文章目录

  • 前提
  • Lettuce简介
  • 连接Redis
    • 定制的连接URI语法
    • 基本使用
  • API
    • 同步API
    • 异步API
    • 反应式API
    • 发布和订阅
    • 事务和批量命令执行
    • Lua脚本执行
  • 高可用和分片
    • 普通主从模式
    • 哨兵模式
    • 集群模式
  • 动态命令和自定义命令
  • 高阶特性
    • 配置客户端资源
    • 使用连接池
  • 几个常见的渐进式删除例子
  • 在SpringBoot中使用Lettuce
  • 小结
  • 链接

前提

Lettuce,读音[ˈletɪs],是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本之后替换为Lettuce。Lettuce翻译为生菜,没错,就是吃的那种生菜,所以它的Logo长这样:
在这里插入图片描述
既然能被Spring生态所认可,Lettuce想必有过人之处,于是笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]。超长警告:这篇文章断断续续花了两周完成,超过4万字…

Lettuce简介

Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API,5.1版本的新特性如下:

  • 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。
  • 支持通过Brave模块跟踪Redis命令执行。
  • 支持Redis Streams。
  • 支持异步的主从连接。
  • 支持异步连接池。
  • 新增命令最多执行一次模式(禁止自动重连)。
  • 全局命令超时设置(对异步和反应式命令也有效)。
  • …等等

注意一点:Redis的版本至少需要2.6,当然越高越好,API的兼容性比较强大。

只需要引入单个依赖就可以开始愉快地使用Lettuce:

  • Maven
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
  • Gradle
dependencies {
  compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

连接Redis

单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息,在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例:

  • 定制的字符串URI语法:
RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接通过构造函数实例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定制的连接URI语法

  • 单机(前缀为redis://)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
简单:redis://localhost
  • 单机并且使用SSL(前缀为rediss://) <== 注意后面多了个s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
简单:rediss://localhost
  • 单机Unix Domain Sockets模式(前缀为redis-socket://)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(前缀为redis-sentinel://)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超时时间单位:

  • d 天
  • h 小时
  • m 分钟
  • s 秒钟
  • ms 毫秒
  • us 微秒
  • ns 纳秒

个人建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,但是比较容易出现人为错误。鉴于笔者没有SSL和Unix Domain Socket的使用场景,下面不对这两种连接方式进行列举。

基本使用

Lettuce使用的时候依赖于四个主要组件:

  • RedisURI:连接信息。
  • RedisClient:Redis客户端,特殊地,集群连接有一个定制的RedisClusterClient。
  • Connection:Redis连接,主要是StatefulConnection或者StatefulRedisConnection的子类,连接的类型主要由连接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
  • RedisCommands:Redis命令API接口,基本上覆盖了Redis发行版本的所有命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会经常跟RedisCommands系列接口打交道。

一个基本使用例子如下:

@Test
public void testSetGet() throws Exception {
    RedisURI redisUri = RedisURI.builder()                    // <1> 创建单机连接的连接信息
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);   // <2> 创建客户端
    StatefulRedisConnection<String, String> connection = redisClient.connect();     // <3> 创建线程安全的连接
    RedisCommands<String, String> redisCommands = connection.sync();                // <4> 创建同步命令
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    String result = redisCommands.set("name", "throwable", setArgs);
    Assertions.assertThat(result).isEqualToIgnoringCase("OK");
    result = redisCommands.get("name");
    Assertions.assertThat(result).isEqualTo("throwable");
    // ... 其他操作
    connection.close();   // <5> 关闭连接
    redisClient.shutdown();  // <6> 关闭客户端
}

注意:

  • <5>:关闭连接一般在应用程序停止之前操作,一个应用程序中的一个Redis驱动实例不需要太多的连接(一般情况下只需要一个连接实例就可以,如果有多个连接的需要可以考虑使用连接池,其实Redis目前处理命令的模块是单线程,在客户端多个连接多线程调用理论上没有效果)。
  • <6>:关闭客户端一般应用程序停止之前操作,如果条件允许的话,基于后开先闭原则,客户端关闭应该在连接关闭之后操作。

API

Lettuce主要提供三种API:

  • 同步(sync):RedisCommands。
  • 异步(async):RedisAsyncCommands。
  • 反应式(reactive):RedisReactiveCommands。

先准备好一个单机Redis连接备用:

private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    CLIENT = RedisClient.create(redisUri);
    CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
    CONNECTION.close();
    CLIENT.shutdown();
}

Redis命令API的具体实现可以直接从StatefulRedisConnection实例获取,见其接口定义:

public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {

    boolean isMulti();

    RedisCommands<K, V> sync();

    RedisAsyncCommands<K, V> async();

    RedisReactiveCommands<K, V> reactive();
}    

值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient创建的StatefulRedisConnection实例一般是泛型实例StatefulRedisConnection<String,String>,也就是所有命令API的KEY和VALUE都是String类型,这种使用方式能满足大部分的使用场景。当然,必要的时候可以定制编码解码器RedisCodec<K,V>。

同步API

先构建RedisCommands实例:

private static RedisCommands<String, String> COMMAND;

@BeforeClass
public static void beforeClass() {
    COMMAND = CONNECTION.sync();
}

基本使用:

@Test
public void testSyncPing() throws Exception {
   String pong = COMMAND.ping();
   Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    COMMAND.set("name", "throwable", setArgs);
    String value = COMMAND.get("name");
    log.info("Get value: {}", value);
}

// Get value: throwable

同步API在所有命令调用之后会立即返回结果。如果熟悉Jedis的话,RedisCommands的用法其实和它相差不大。

异步API

先构建RedisAsyncCommands实例:

private static RedisAsyncCommands<String, String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
    ASYNC_COMMAND = CONNECTION.async();
}

基本使用:

@Test
public void testAsyncPing() throws Exception {
    RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
    log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG

RedisAsyncCommands所有方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义如下:

public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {

    String getError();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}    

也就是,RedisFuture可以无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子:

@Test
public void testAsyncSetAndGet1() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
    // CompletableFuture#thenAccept()
    future.thenAccept(value -> log.info("Set命令返回:{}", value));
    // Future#get()
    future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    CompletableFuture<Void> result =
            (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
                    .thenAcceptBoth(ASYNC_COMMAND.get("name"),
                            (s, g) -> {
                                log.info("Set命令返回:{}", s);
                                log.info("Get命令返回:{}", g);
                            });
    result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

如果能熟练使用CompletableFuture和函数式编程技巧,可以组合多个RedisFuture完成一些列复杂的操作。

反应式API

Lettuce引入的反应式编程框架是Project Reactor,如果没有反应式编程经验可以先自行了解一下Project Reactor。

构建RedisReactiveCommands实例:

private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
    REACTIVE_COMMAND = CONNECTION.reactive();
}

根据Project Reactor,RedisReactiveCommands的方法如果返回的结果只包含0或1个元素,那么返回值类型是Mono,如果返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:

@Test
public void testReactivePing() throws Exception {
    Mono<String> ping = REACTIVE_COMMAND.ping();
    ping.subscribe(v -> log.info("Ping result:{}", v));
    Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
    REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
    Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
    REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
    Flux<String> flux = REACTIVE_COMMAND.smembers("food");
    flux.subscribe(log::info);
    REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
    Thread.sleep(1000);
}
// meat
// bread
// fish

举个更加复杂的例子,包含了事务、函数转换等:

@Test
public void testReactiveFunctional() throws Exception {
    REACTIVE_COMMAND.multi().doOnSuccess(r -> {
        REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
        REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
    }).flatMap(s -> REACTIVE_COMMAND.exec())
            .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
            .subscribe();
    Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

这个方法开启一个事务,先把counter设置为1,再将counter自增1。

发布和订阅

非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection,两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub():

  • 非集群模式:
// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });

// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// 异步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反应式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 集群模式:
    // 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...

这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:

@Test
public void testSyncKeyspaceNotification() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            // 注意这里只能是0号库
            .withDatabase(0)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
    RedisCommands<String, String> redisCommands = redisConnection.sync();
    // 只接收键过期的事件
    redisCommands.configSet("notify-keyspace-events", "Ex");
    StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
    connection.addListener(new RedisPubSubAdapter<>() {

        @Override
        public void psubscribed(String pattern, long count) {
            log.info("pattern:{},count:{}", pattern, count);
        }

        @Override
        public void message(String pattern, String channel, String message) {
            log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
        }
    });
    RedisPubSubCommands<String, String> commands = connection.sync();
    commands.psubscribe("__keyevent@0__:expired");
    redisCommands.setex("name", 2, "throwable");
    Thread.sleep(10000);
    redisConnection.close();
    connection.close();
    redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

实际上,在实现RedisPubSubListener的时候可以单独抽离,尽量不要设计成匿名内部类的形式。

事务和批量命令执行

事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有对应的方法。举个例子:

// 同步模式
@Test
public void testSyncMulti() throws Exception {
    COMMAND.multi();
    COMMAND.setex("name-1", 2, "throwable");
    COMMAND.setex("name-2", 2, "doge");
    TransactionResult result = COMMAND.exec();
    int index = 0;
    for (Object r : result) {
        log.info("Result-{}:{}", index, r);
        index++;
    }
}
// Result-0:OK
// Result-1:OK

Redis的Pipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通讯,所以它的做法是客户端缓存将要发送的命令,最后需要触发然后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。Pipeline在Lettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,换言之可以这样理解:Netty帮Lettuce从底层实现了Redis的Pipeline机制。但是,Lettuce的异步API也提供了手动Flush的方法:

@Test
public void testAsyncManualFlush() {
    // 取消自动flush
    ASYNC_COMMAND.setAutoFlushCommands(false);
    List<RedisFuture<?>> redisFutures = Lists.newArrayList();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        redisFutures.add(ASYNC_COMMAND.set(key, value));
        redisFutures.add(ASYNC_COMMAND.expire(key, 2));
    }
    long start = System.currentTimeMillis();
    ASYNC_COMMAND.flushCommands();
    boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
    Assertions.assertThat(result).isTrue();
    log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

上面只是从文档看到的一些理论术语,但是现实是骨感的,对比了下Jedis的Pipeline提供的方法,发现了Jedis的Pipeline执行耗时比较低:

@Test
public void testJedisPipeline() throws Exception {
    Jedis jedis = new Jedis();
    Pipeline pipeline = jedis.pipelined();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        pipeline.set(key, value);
        pipeline.expire(key, 2);
    }
    long start = System.currentTimeMillis();
    pipeline.syncAndReturnAll();
    log.info("Jedis cost:{} ms", System.currentTimeMillis()  - start);
}
// Jedis cost:9 ms

个人猜测Lettuce可能底层并非合并所有命令一次发送(甚至可能是单条发送),具体可能需要抓包才能定位。依此来看,如果真的有大量执行Redis命令的场景,不妨可以使用Jedis的Pipeline。

注意:由上面的测试推断RedisTemplate的executePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。

Lua脚本执行

Lettuce中执行Redis的Lua命令的同步接口如下:

public interface RedisScriptingCommands<K, V> {

    <T> T eval(String var1, ScriptOutputType var2, K... var3);

    <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);

    <T> T evalsha(String var1, ScriptOutputType var2, K... var3);

    <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);

    List<Boolean> scriptExists(String... var1);

    String scriptFlush();

    String scriptKill();

    String scriptLoad(V var1);

    String digest(V var1);
}

异步和反应式的接口方法定义差不多,不同的地方就是返回值类型,一般我们常用的是eval()、evalsha()和scriptLoad()方法。举个简单的例子:

private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
        "local value = ARGV[1]\n" +
        "local timeout = ARGV[2]\n" +
        "redis.call('SETEX', key, tonumber(timeout), value)\n" +
        "local result = redis.call('GET', key)\n" +
        "return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
    LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
    String[] keys = new String[]{"name"};
    String[] args = new String[]{"throwable", "5000"};
    String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
    log.info("Get value:{}", result);
}
// Get value:throwable

高可用和分片

为了Redis的高可用,一般会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅做了主从复制,故障需要手动切换)、哨兵和集群。普通主从模式可以独立运行,也可以配合哨兵运行,只是哨兵提供自动故障转移和主节点提升功能。普通主从和哨兵都可以使用MasterSlave,通过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。

这里注意一点,MasterSlave中提供的方法如果只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;如果要求传入一个RedisURI集合,那么对于普通主从模式来说所有节点信息是静态的,不会进行发现和更新。

拓扑发现的规则如下:

  • 对于普通主从(Master/Replica)模式,不需要感知RedisURI指向从节点还是主节点,只会进行一次性的拓扑查找所有节点信息,此后节点信息会保存在静态缓存中,不会更新。
  • 对于哨兵模式,会订阅所有哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵天然就是动态发现节点信息,不支持静态配置。

拓扑发现机制的提供API为TopologyProvider,需要了解其原理的可以参考具体的实现。

对于集群(Cluster)模式,Lettuce提供了一套独立的API。

另外,如果Lettuce连接面向的是非单个Redis节点,连接实例提供了数据读取节点偏好(ReadFrom)设置,可选值有:

  • MASTER:只从Master节点中读取。
  • MASTER_PREFERRED:优先从Master节点中读取。
  • SLAVE_PREFERRED:优先从Slavor节点中读取。
  • SLAVE:只从Slavor节点中读取。
  • NEAREST:使用最近一次连接的Redis实例读取。

普通主从模式

假设现在有三个Redis服务形成树状主从关系如下:

  • 节点一:localhost:6379,角色为Master。
  • 节点二:localhost:6380,角色为Slavor,节点一的从节点。
  • 节点三:localhost:6381,角色为Slavor,节点二的从节点。

首次动态节点发现主从模式的节点信息需要如下构建连接:

@Test
public void testDynamicReplica() throws Exception {
    // 这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以
    RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisClient redisClient = RedisClient.create(uri);
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
    // 只从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

如果需要指定静态的Redis主从节点连接属性,那么可以这样构建连接:

@Test
public void testStaticReplica() throws Exception {
    List<RedisURI> uris = new ArrayList<>();
    RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
    RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
    uris.add(uri1);
    uris.add(uri2);
    uris.add(uri3);
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
            new Utf8StringCodec(), uris);
    // 只从主节点读取数据
    connection.setReadFrom(ReadFrom.MASTER);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

哨兵模式

由于Lettuce自身提供了哨兵的拓扑发现机制,所以只需要随便配置一个哨兵节点的RedisURI实例即可:

@Test
public void testDynamicSentinel() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withPassword("你的密码")
            .withSentinel("localhost", 26379)
            .withSentinelMasterId("哨兵Master的ID")
            .build();
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
    // 只允许从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    RedisCommands<String, String> command = connection.sync();
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    command.set("name", "throwable", setArgs);
    String value = command.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

集群模式

鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用本身就有比较多的限制,所以这里只简单介绍一下怎么用。先说几个特性:

下面的API提供跨槽位(Slot)调用的功能:

  • RedisAdvancedClusterCommands。
  • RedisAdvancedClusterAsyncCommands。
  • RedisAdvancedClusterReactiveCommands。

静态节点选择功能:

  • masters:选择所有主节点执行命令。
  • slaves:选择所有从节点执行命令,其实就是只读模式。
  • all nodes:命令可以在所有节点执行。

集群拓扑视图动态更新功能:

  • 手动更新,主动调用RedisClusterClient#reloadPartitions()。
  • 后台定时更新。
  • 自适应更新,基于连接断开和MOVED/ASK命令重定向自动更新。

Redis集群搭建详细过程可以参考官方文档,假设已经搭建好集群如下(192.168.56.200是笔者的虚拟机Host):

  • 192.168.56.200:7001 => 主节点,槽位0-5460。
  • 192.168.56.200:7002 => 主节点,槽位5461-10922。
  • 192.168.56.200:7003 => 主节点,槽位10923-16383。
  • 192.168.56.200:7004 => 7001的从节点。
  • 192.168.56.200:7005 => 7002的从节点。
  • 192.168.56.200:7006 => 7003的从节点。

简单的集群连接和使用方式如下:

@Test
public void testSyncCluster(){
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name",10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
}
// Get value:throwable

节点选择:

@Test
public void testSyncNodeSelection() {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
//  commands.all();  // 所有节点
//  commands.masters();  // 主节点
    // 从节点只读
    NodeSelection<String, String> replicas = commands.slaves();
    NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
    // 这里只是演示,一般应该禁用keys *命令
    Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    keys.forEach(key -> log.info("key: {}", key));
    connection.close();
    redisClusterClient.shutdown();
}

定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):

@Test
public void testPeriodicClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
            .builder()
            .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

自适应更新集群拓扑视图:

@Test
public void testAdaptiveClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
            .enableAdaptiveRefreshTrigger(
                    ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
                    ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
            )
            .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String, String> commands = connection.sync();
    commands.setex("name", 10, "throwable");
    String value = commands.get("name");
    log.info("Get value:{}", value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

动态命令和自定义命令

自定义命令是Redis命令有限集,不过可以更细粒度指定KEY、ARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:

// 自定义实现PING方法
@Test
public void testCustomPing() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
    log.info("PING:{}", result);
    connect.close();
    redisClient.shutdown();
}
// PING:PONG

// 自定义实现Set方法
@Test
public void testCustomSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommands<String, String> sync = connect.sync();
    RedisCodec<String, String> codec = StringCodec.UTF8;
    sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
            new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
    String result = sync.get("name");
    log.info("Get value:{}", result);
    connect.close();
    redisClient.shutdown();
}
// Get value:throwable

动态命令是基于Redis命令有限集,并且通过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:

public interface CustomCommand extends Commands {

    // SET [key] [value]
    @Command("SET ?0 ?1")
    String setKey(String key, String value);

    // SET [key] [value]
    @Command("SET :key :value")
    String setKeyNamed(@Param("key") String key, @Param("value") String value);

    // MGET [key1] [key2]
    @Command("MGET ?0 ?1")
    List<String> mGet(String key1, String key2);
    /**
     * 方法名作为命令
     */
    @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
    String mSet(String key1, String value1, String key2, String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String, String> connect = redisClient.connect();
    RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
    CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
    commands.setKey("name", "throwable");
    commands.setKeyNamed("throwable", "doge");
    log.info("MGET ===> " + commands.mGet("name", "throwable"));
    commands.mSet("key1", "value1","key2", "value2");
    log.info("MGET ===> " + commands.mGet("key1", "key2"));
    connect.close();
    redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]

高阶特性

Lettuce有很多高阶使用特性,这里只列举个人认为常用的两点:

  • 配置客户端资源。
  • 使用连接池。

更多其他特性可以自行参看官方文档。

配置客户端资源

客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroups和EventExecutorGroup),这些线程池或者线程组是连接程序的基础组件。一般情况下,客户端资源应该在多个Redis客户端之间共享,并且在不再使用的时候需要自行关闭。笔者认为,客户端资源是面向Netty的。注意:除非特别熟悉或者花长时间去测试调整下面提到的参数,否则在没有经验的前提下凭直觉修改默认值,有可能会踩坑。

客户端资源接口是ClientResources,实现类是DefaultClientResources。

构建DefaultClientResources实例:

// 默认
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
                        .ioThreadPoolSize(4)
                        .computationThreadPoolSize(4)
                        .build()

使用:

ClientResources resources = DefaultClientResources.create();
// 非集群
RedisClient client = RedisClient.create(resources, uri);
// 集群
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();

客户端资源基本配置:

属性描述默认值
ioThreadPoolSizeI/O线程数Runtime.getRuntime().availableProcessors()
computationThreadPoolSize任务线程数Runtime.getRuntime().availableProcessors()

客户端资源高级配置:

属性描述默认值
eventLoopGroupProviderEventLoopGroup提供商-
eventExecutorGroupProviderEventExecutorGroup提供商-
eventBus事件总线DefaultEventBus
commandLatencyCollectorOptions命令延时收集器配置DefaultCommandLatencyCollectorOptions
commandLatencyCollector命令延时收集器DefaultCommandLatencyCollector
commandLatencyPublisherOptions命令延时发布器配置DefaultEventPublisherOptions
dnsResolverDNS处理器JDK或者Netty提供
reconnectDelay重连延时配置Delay.exponential()
nettyCustomizerNetty自定义配置器-
tracing轨迹记录器-

非集群客户端RedisClient的属性配置:

Redis非集群客户端RedisClient本身提供了配置属性方法:

RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
                       .autoReconnect(false)
                       .pingBeforeActivateConnection(true)
                       .build());

非集群客户端的配置属性列表:

属性描述默认值
pingBeforeActivateConnection连接激活之前是否执行PING命令false
autoReconnect是否自动重连true
cancelCommandsOnReconnectFailure重连失败是否拒绝命令执行false
suspendReconnectOnProtocolFailure底层协议失败是否挂起重连操作false
requestQueueSize请求队列容量2147483647(Integer#MAX_VALUE)
disconnectedBehavior失去连接时候的行为DEFAULT
sslOptionsSSL配置-
socketOptionsSocket配置10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions超时配置-
publishOnScheduler发布反应式信号数据的调度器使用I/O线程

集群客户端属性配置:

Redis集群客户端RedisClusterClient本身提供了配置属性方法:

RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
                .enableAllAdaptiveRefreshTriggers()
                .build();

client.setOptions(ClusterClientOptions.builder()
                       .topologyRefreshOptions(topologyRefreshOptions)
                       .build());

集群客户端的配置属性列表:

属性描述默认值
enablePeriodicRefresh是否允许周期性更新集群拓扑视图false
refreshPeriod更新集群拓扑视图周期60秒
enableAdaptiveRefreshTrigger设置自适应更新集群拓扑视图触发器RefreshTrigger-
adaptiveRefreshTriggersTimeout自适应更新集群拓扑视图触发器超时设置30秒
refreshTriggersReconnectAttempts 自适应更新集群拓扑视图触发重连次数5
dynamicRefreshSources是否允许动态刷新拓扑资源true
closeStaleConnections是否允许关闭陈旧的连接true
maxRedirects集群重定向次数上限5
validateClusterNodeMembership是否校验集群节点的成员关系true

使用连接池

引入连接池依赖commons-pool2:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency

基本使用如下:

@Test
public void testUseConnectionPool() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    GenericObjectPool<StatefulRedisConnection<String, String>> pool
            = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
    try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
        RedisCommands<String, String> command = connection.sync();
        SetArgs setArgs = SetArgs.Builder.nx().ex(5);
        command.set("name", "throwable", setArgs);
        String n = command.get("name");
        log.info("Get value:{}", n);
    }
    pool.close();
    redisClient.shutdown();
}

其中,同步连接的池化支持需要用ConnectionPoolSupport,异步连接的池化支持需要用AsyncConnectionPoolSupport(Lettuce5.1之后才支持)。

几个常见的渐进式删除例子

渐进式删除Hash中的域-属性:

@Test
public void testDelBigHashKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_HASH_KEY";
    prepareHashTestData(key);
    log.info("开始渐进式删除Hash的元素...");
    int counter = 0;
    do {
        MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        Collection<String> fields = result.getMap().values();
        if (!fields.isEmpty()) {
            COMMAND.hdel(key, fields.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter);
}

private void prepareHashTestData(String key) throws Exception {
    COMMAND.hset(key, "1", "1");
    COMMAND.hset(key, "2", "2");
    COMMAND.hset(key, "3", "3");
    COMMAND.hset(key, "4", "4");
    COMMAND.hset(key, "5", "5");
}

渐进式删除集合中的元素:

@Test
public void testDelBigSetKey() throws Exception {
    String key = "BIG_SET_KEY";
    prepareSetTestData(key);
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    log.info("开始渐进式删除Set的元素...");
    int counter = 0;
    do {
        ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<String> values = result.getValues();
        if (!values.isEmpty()) {
            COMMAND.srem(key, values.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter);
}

private void prepareSetTestData(String key) throws Exception {
    COMMAND.sadd(key, "1", "2", "3", "4", "5");
}

渐进式删除有序集合中的元素:

@Test
public void testDelBigZSetKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_ZSET_KEY";
    prepareZSetTestData(key);
    log.info("开始渐进式删除ZSet的元素...");
    int counter = 0;
    do {
        ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<ScoredValue<String>> scoredValues = result.getValues();
        if (!scoredValues.isEmpty()) {
            COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter);
}

private void prepareZSetTestData(String key) throws Exception {
    COMMAND.zadd(key, 0, "1");
    COMMAND.zadd(key, 0, "2");
    COMMAND.zadd(key, 0, "3");
    COMMAND.zadd(key, 0, "4");
    COMMAND.zadd(key, 0, "5");
}

在SpringBoot中使用Lettuce

个人认为,spring-data-redis中的API封装并不是很优秀,用起来比较重,不够灵活,这里结合前面的例子和代码,在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.8.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
            <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
</dependencies>        

一般情况下,每个应用应该使用单个Redis客户端实例和单个连接实例,这里设计一个脚手架,适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源,采用默认的实现即可。对于Redis的连接属性,比较主要的有Host、Port和Password,其他可以暂时忽略。基于约定大于配置的原则,先定制一系列属性配置类(其实有些配置是可以完全共用,但是考虑到要清晰描述类之间的关系,这里拆分多个配置属性类和多个配置方法):

@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {

    private LettuceSingleProperties single;
    private LettuceReplicaProperties replica;
    private LettuceSentinelProperties sentinel;
    private LettuceClusterProperties cluster;

}

@Data
public class LettuceSingleProperties {

    private String host;
    private Integer port;
    private String password;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {

}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {

    private String masterId;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {

}

配置类如下,主要使用@ConditionalOnProperty做隔离,一般情况下,很少有人会在一个应用使用一种以上的Redis连接场景:

@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {

    private final LettuceProperties lettuceProperties;

    @Bean(destroyMethod = "shutdown")
    public ClientResources clientResources() {
        return DefaultClientResources.create();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisURI singleRedisUri() {
        LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
        return RedisURI.builder()
                .withHost(singleProperties.getHost())
                .withPort(singleProperties.getPort())
                .withPassword(singleProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
        return singleRedisClient.connect();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisURI replicaRedisUri() {
        LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
        return RedisURI.builder()
                .withHost(replicaProperties.getHost())
                .withPort(replicaProperties.getPort())
                .withPassword(replicaProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,
                                                                                     @Qualifier("replicaRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisURI sentinelRedisUri() {
        LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
        return RedisURI.builder()
                .withPassword(sentinelProperties.getPassword())
                .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
                .withSentinelMasterId(sentinelProperties.getMasterId())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,
                                                                                      @Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisURI clusterRedisUri() {
        LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
        return RedisURI.builder()
                .withHost(clusterProperties.getHost())
                .withPort(clusterProperties.getPort())
                .withPassword(clusterProperties.getPassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {
        return RedisClusterClient.create(clientResources, redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.cluster")
    public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
        return clusterClient.connect();
    }
}

最后为了让IDE识别我们的配置,可以添加IDE亲缘性,/META-INF文件夹下新增一个文件spring-configuration-metadata.json,内容如下:

{
  "properties": [
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceSingleProperties",
      "description": "单机配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.replica",
      "type": "club.throwable.spring.lettuce.LettuceReplicaProperties",
      "description": "主从配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.sentinel",
      "type": "club.throwable.spring.lettuce.LettuceSentinelProperties",
      "description": "哨兵配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },
    {
      "name": "lettuce.single",
      "type": "club.throwable.spring.lettuce.LettuceClusterProperties",
      "description": "集群配置",
      "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    }
  ]
}

如果想IDE亲缘性做得更好,可以添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用如下:

@Slf4j
@Component
public class RedisCommandLineRunner implements CommandLineRunner {

    @Autowired
    @Qualifier("singleRedisConnection")
    private StatefulRedisConnection<String, String> connection;

    @Override
    public void run(String... args) throws Exception {
        RedisCommands<String, String> redisCommands = connection.sync();
        redisCommands.setex("name", 5, "throwable");
        log.info("Get value:{}", redisCommands.get("name"));
    }
}
// Get value:throwable

小结

本文算是基于Lettuce的官方文档,对它的使用进行全方位的分析,包括主要功能、配置都做了一些示例,限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳作为官方的Redis客户端驱动,所以值得信赖,它的一些API设计确实比较合理,扩展性高的同时灵活性也高。个人建议,基于Lettuce包自行添加配置到SpringBoot应用用起来会得心应手,毕竟RedisTemplate实在太笨重,而且还屏蔽了Lettuce一些高级特性和灵活的API。

参考资料:

Lettuce Reference Guide

链接

Github Page:http://www.throwable.club/2019/09/28/redis-client-driver-lettuce-usage
Coding Page:http://throwable.coding.me/2019/09/28/redis-client-driver-lettuce-usage

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/387068.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C/C++每日一练(20230304)

目录 1. 计数质数 ☆ 2. 筛选10到1000的回文数 ☆ 3. 计算位于矩阵边缘的元素之和 ★ 1. 计数质数 统计所有小于非负整数 n 的质数的数量。 示例 1&#xff1a; 输入&#xff1a;n 10 输出&#xff1a;4 解释&#xff1a;小于 10 的质数一共有 4 个, 它们是 2, 3, 5, 7…

【HomeKit】从HomeKit架构层细化到HomeKit ADK集成

前言&#xff1a;这篇文章是对于苹果协议文件《HomeKit ADK Integration Guide - Addendum for Televisions》的学习&#xff0c;针对版本为ADK 6.0电视。描述了将HomeKit ADK的电视简介集成到目标平台中所需的步骤。 总说明 此配置文件用于控制启用Airplay的电视&#xff0c;…

高通Android 13默认切换免提功能

1、测试部反馈 由于平板本身没有听筒功能 因此考虑工厂直接切换到免提功能 2、修改路径 frameworks/av/services/audiopolicy/enginedefault/src/Engine.cpp 3、编译源码ok 拨打紧急号码 可以正常切换到免提功能 其他mtk平台可能不一样 具体以项目实际为准 相关链接 构建…

ESP32编译及运行错误记录

1、打印格式不对 一般都是因为日志中某个参数打印格式不匹配造成。 ESP_LOGI(TAG, "[APP] Free memory: %lu bytes", esp_get_free_heap_size());//将之前的%d 改为%lu 2、配置载不对 这里选择了蓝牙模块需要引入蓝牙组件才能编译通过 idf.py menuconfig Component…

项目中的MD5、盐值加密

首先介绍一下MD5&#xff0c;而项目中用的是MD5和盐值来确保密码的安全性&#xff1b; 1. md5简介 md5的全称是md5信息摘要算法&#xff08;英文&#xff1a;MD5 Message-Digest Algorithm &#xff09;&#xff0c;一种被广泛使用的密码散列函数&#xff0c;可以产生一个128位…

css-盒模型

巧妙运用margin负值盒模型和怪异盒模型(border padding 包含在内)display: block 能让textarea input 水平尺寸自适应父容器? – 不能 * {box-sizing: border-box; // bs: bb }<textarea/> 是替换元素,尺寸由内部元素决定,不受display水平影响. 当然可以直接设置宽度10…

React(三):脚手架、组件化、生命周期、父子组件通信、插槽、Context

React&#xff08;三&#xff09;一、脚手架安装和创建1.安装脚手架2.创建脚手架3.看看脚手架目录4.运行脚手架二、脚手架下从0开始写代码三、组件化1.类组件2.函数组件四、React的生命周期1.认识生命周期2.图解生命周期&#xff08;1&#xff09;Constructor&#xff08;2&…

Allegro如何导入第一方网表操作指导

Allegro如何导入第一方网表操作指导 在启动PCB设计之前,网表的导入是首要的流程,第一方网表内容如下图 如何将第一方网表导入到PCB中,具体操作如下 点击File点击Import

【项目】用户管理系统

一、需求分析完成一个简单的用户信息管理系统&#xff0c;超级管理员可以登录本系统&#xff0c;查询用户信息、实现用户的管理功能。1.1功能&#xff1a;主要操作和管理的对象&#xff1a;用户。用户分为两类&#xff1a;超级管理员/普通用户。登录功能&#xff08;只针对超管…

深入理解多进程

多进程 一、进程状态 二、创建子进程 - fork 1、函数接口 #include <unistd.h>pid_t fork(void);2、基本概述 成功后&#xff0c;子进程的 PID 在父进程中返回&#xff0c;在子进程中返回 0。 失败时&#xff0c;在父进程中返回 -1&#xff0c;不创建子进程&#xff0c…

MyBatis——进阶操作(2)

标签 if标签 当提交的表单中有些为非必填项&#xff0c;用户并没有上传这些属性的值&#xff0c;那么程序可以上传NUll&#xff0c;也可以用if标签判断用户有没有上传这个值 <if test"参数!null">操作 </if>其中test中填写一条语句&#xff0c;如果得…

uniapp实现地图点聚合功能

前言 在工作中接到的一个任务&#xff0c;在app端实现如下功能&#xff1a; 地图点聚合地图页面支持tab切换&#xff08;设备、劳务、人员&#xff09;支持人员搜索显示分布 但是uniapp原有的map标签不支持点聚合功能&#xff08;最新的版本支持了点聚合功能&#xff09;&am…

爬虫碎碎念

20230304 - &#xff08;非专业人士&#xff0c;简单记录自己的需求和思考&#xff09; 0. 引言 平时看到一些网站的照片什么的&#xff0c;有那种批量下载的需求&#xff0c;当然有些也是视频网站的图片介绍什么的&#xff0c;也即是说&#xff0c;我需要把这些网站的照片批…

剑指 Offer II 013. 二维子矩阵的和

题目链接 剑指 Offer II 013. 二维子矩阵的和 mid 题目描述 给定一个二维矩阵 matrix&#xff0c;以下类型的多个请求&#xff1a; 计算其子矩形范围内元素的总和&#xff0c;该子矩阵的左上角为 (row1, col1)&#xff0c;右下角为 (row2, col2)。 实现 NumMatrix类&#xf…

测开:前端基础-css

一、CSS介绍和引用 1.1 css概述 层叠样式表&#xff0c;是一种样式表语言&#xff0c;用来描述HTML和XML文档的呈现。 CSS 用于简化HTML标签&#xff0c;把关于样式部分的内容提取出来&#xff0c;进行单独的控制&#xff0c;使结构与样式分离开发。 CSS 是以HTML为基础&…

docker环境下安装jenkins

前言 废话不多说&#xff0c;上来就是干&#xff0c;jenkins是干嘛用的&#xff0c;小白的话&#xff0c;自己去查&#xff0c;首先我的环境时centos7&#xff0c;自己在vmware建立的一套centos虚拟机环境。docker版本如图所示: 第一步 其实可以先查看一下又那些镜像jenkin…

DC-5 靶场学习

文章目录环境配置&#xff1a;信息搜集&#xff1a;漏洞测试&#xff1a;漏洞利用&#xff1a;提权&#xff1a;得到flag&#xff1a;下载地址&#xff1a;环境配置&#xff1a; 直接将其与攻击机放在同一网段。 信息搜集&#xff1a; arp-scan -l nmap -sP 192.168.28.0/24漏…

基于SSH的网上图书俱乐部的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;网上图书俱乐部是一个虚拟的书友会&#xff0c;该平台是为了给那些爱好读书的人提供一个网上交流的场所。以服务广大读者朋友为主&#xff0c;强调互动性、知识性、趣味性&#xff0c;是读书、会友的好去处&#xff0c;读者可以在线…

【Spring学习】Spring自定义标签详细步骤

目录标题前言一、自定义标签步骤1、定义属性POJO2、定义XSD文件描述组件内容3、定义标签解析器4、注册标签解析器5、定义spring.handlers和spring.schemas文件6、user.xml文件配置7、测试类二、仓库位置总结前言 Spring中除了http://www.springframework.org/schema/beans命名…

pytorch-在竞赛中去摸索用法,用房价预测比赛了解数据处理流程

实战Kaggle比赛&#xff1a;房价预测 让我们动手实战一个Kaggle比赛&#xff1a;房价预测House Prices - Advanced Regression Techniques | Kaggle。本文将提供未经调优的数据的预处理、模型的设计和超参数的选择。通过动手操作、仔细观察实验现象、认真分析实验结果并不断调…