93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布

news2025/1/10 12:18:58

★ 使用连接池管理Redis连接

从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。

上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库

Lettuce连接池 支持需要 Apache Commons Pool2 的支持,需要添加该依赖

接下来即可在程序中通过类似如下代码片段来创建连接池了。
var conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();

conf.setMaxTotal(20); // 设置连接池中允许的最大连接数

// 创建连接池对象(其中连接由redisClient的connectPubSub方法创建)
pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, conf);

代码演示

创建连接池对象,创建两个消息订阅者和一个消息发布者,然后操作redis数据库

1、添加依赖
在这里插入图片描述

Subscriper 第一个消息订阅者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
启动这个消息订阅者的程序
在这里插入图片描述

Subscriper 第二个消息订阅者

直接拷贝第一个消息订阅者,然后修改这个消息订阅者只订阅 c2 这个channel 主题
在这里插入图片描述

Publisher 消息发布者

也是拷贝消息订阅者的代码,因为创建连接池对象的代码都是一样的。
这里只需要把消息订阅的方法改成消息发布的方法就可以了,其他代码一样。

在这里插入图片描述

测试:

测试成功
消息发布者成功发布消息
消息订阅者也能接收到各自订阅的channel的消息
用小黑窗测试也没有问题
在这里插入图片描述

完整代码

Subscriper

package cn.ljh.app;


import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.Duration;

//使用 Lettuce ,这个类是消息订阅者
//通过连接池操作redis数据库
public class Subscriper
{
    private RedisClient redisClient;
    //连接池pool对象
    private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;

    public void init()
    {
        //1、定义RedisURI
        RedisURI uri = RedisURI.builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                //选择redis 16个数据库中的哪个数据库
                .withDatabase(0)
                .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                .withTimeout(Duration.ofMinutes(5))
                .build();
        //2、创建 RedisClient 客户端
        this.redisClient = RedisClient.create(uri);

        //创建连接池的配置对象
        //GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();
        var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
        //设置连接池允许的最大连接数
        conf.setMaxTotal(20);
        //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
        pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
    }

    //关闭资源
    public void closeResource()
    {
        //关闭连接池--先开后关
        this.pool.close();
        //关闭RedisClient 客户端------最先开的最后关
        this.redisClient.shutdown();
    }


    //订阅消息的方法
    public void subscribe() throws Exception
    {

        //从连接池中取出连接
        StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();

        //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
        RedisPubSubCommands cmd = conn.sync();

        //监听消息:消息到来时,是通过监听器来实现的
        conn.addListener(new RedisPubSubAdapter<>()
        {
            //匿名内部类重写这3个方法:收到消息、订阅主题、取消订阅主题

            //接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看)
            //接收消息的方法
            @Override
            public void message(String channel, String message)
            {
                System.err.printf("从 %s 收到消息 : %s\n " , channel , message);
            }

            //订阅普通channel激发的方法,
            //订阅主题的方法--下面有这个订阅的方法cmd.subscribe("c1", "c2");
            //不太清楚这个 subscribed方法 和 下面的 cmd.subscribe 方法的关联 todo
            @Override
            public void subscribed(String channel, long count)
            {
                System.err.println("完成订阅 :" + count);
            }

            //不订阅普通的channel所使用方法--取消订阅
            //取消订阅的方法
            @Override
            public void unsubscribed(String channel, long count)
            {
                System.err.println("取消订阅");
            }
        });

        //订阅消息------订阅了 c1 和 c2 这两个主题 channel
        cmd.subscribe("c1", "c2");

    }

    public static void main(String[] args) throws Exception
    {
        Subscriper subscriper = new Subscriper();
        subscriper.init();
        subscriper.subscribe();
        //改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了
        Thread.sleep(600000);
        //关闭资源
        subscriper.closeResource();
    }
}


Subscriper2

package cn.ljh.app;


import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.Duration;

//使用 Lettuce ,这个类是消息订阅者2
//通过连接池操作redis数据库
public class Subscriper2
{
    private RedisClient redisClient;
    //连接池pool对象
    private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;

    public void init()
    {
        //1、定义RedisURI
        RedisURI uri = RedisURI.builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                //选择redis 16个数据库中的哪个数据库
                .withDatabase(0)
                .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                .withTimeout(Duration.ofMinutes(5))
                .build();
        //2、创建 RedisClient 客户端
        this.redisClient = RedisClient.create(uri);

        //创建连接池的配置对象
        //GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();
        var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
        //设置连接池允许的最大连接数
        conf.setMaxTotal(20);
        //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
        pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
    }

    //关闭资源
    public void closeResource()
    {
        //关闭连接池--先开后关
        this.pool.close();
        //关闭RedisClient 客户端------最先开的最后关
        this.redisClient.shutdown();
    }


    //订阅消息的方法
    public void subscribe() throws Exception
    {

        //从连接池中取出连接
        StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();

        //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
        RedisPubSubCommands cmd = conn.sync();

        //监听消息:消息到来时,是通过监听器来实现的
        conn.addListener(new RedisPubSubAdapter<>()
        {
            //接收来自普通的channel的消息,就用这个方法(就是没带模式的,比如那些主从、集群模式,点进RedisPubSubAdapter类里面看),
            @Override
            public void message(String channel, String message)
            {
                System.err.printf("从 %s 收到消息 : %s\n " , channel , message);
            }

            //订阅普通channel激发的方法,
            @Override
            public void subscribed(String channel, long count)
            {
                System.err.println("完成订阅 :" + count);
            }

            //不订阅普通的channel所使用方法
            @Override
            public void unsubscribed(String channel, long count)
            {
                System.err.println("取消订阅");
            }
        });

        //订阅消息------订阅了 c2 这个主题 channel
        cmd.subscribe( "c2");
    }

    public static void main(String[] args) throws Exception
    {
        Subscriper2 subscriper2 = new Subscriper2();
        subscriper2.init();
        subscriper2.subscribe();
        //改程序只订阅了60分钟,超过60分钟就程序就退出不订阅了
        Thread.sleep(600000);
        //关闭资源
        subscriper2.closeResource();
    }


}



Publisher

package cn.ljh.app;


import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.Duration;

//消息发布者

//通过连接池操作redis数据库
public class Publisher
{
    private RedisClient redisClient;
    //连接池pool对象
    private GenericObjectPool<StatefulRedisPubSubConnection<String, String>> pool;

    public void init()
    {
        //1、定义RedisURI
        RedisURI uri = RedisURI.builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                //选择redis 16个数据库中的哪个数据库
                .withDatabase(0)
                .withPassword(new char[]{'1', '2', '3', '4', '5', '6'})
                .withTimeout(Duration.ofMinutes(5))
                .build();
        //2、创建 RedisClient 客户端
        this.redisClient = RedisClient.create(uri);

        //创建连接池的配置对象
        //GenericObjectPoolConfig<StatefulRedisConnection<String, String>> conf = new GenericObjectPoolConfig<StatefulRedisConnection<String, String>>();
        var conf = new GenericObjectPoolConfig<StatefulRedisPubSubConnection<String, String>>();
        //设置连接池允许的最大连接数
        conf.setMaxTotal(20);
        //3、创建连接池对象(其中连接由 redisClient 的 connectPubSub 方法创建)
        pool = ConnectionPoolSupport.createGenericObjectPool(this.redisClient::connectPubSub, conf);
    }

    //关闭资源
    public void closeResource()
    {
        //关闭连接池--先开后关
        this.pool.close();
        //关闭RedisClient 客户端------最先开的最后关
        this.redisClient.shutdown();
    }


    //订阅消息的方法
    public void publish() throws Exception
    {

        //从连接池中取出连接
        StatefulRedisPubSubConnection<String, String> conn = this.pool.borrowObject();

        //4、创建 RedisPubSubCommands -- 作用相当与 RedisTemplate 这种,有各种操作redis的方法
        RedisPubSubCommands cmd = conn.sync();

        //向这两个channel主题各自发布了一条消息
        cmd.publish("c2","c2 c2 c2 这是一条来自 c2 这个channel 里面的消息");
        cmd.publish("c1","c1 c1 c1 这是一条来自 c1 这个channel 里面的消息");


        //关闭资源
        redisClient.shutdown();

    }

    //发送消息,消息发出去,程序就退出了
    public static void main(String[] args) throws Exception
    {
        Publisher subscriper2 = new Publisher();
        subscriper2.init();
        subscriper2.publish();
        subscriper2.closeResource();
    }


}



pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>Lettucepool</artifactId>
    <version>1.0.0</version>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- 引入 Lettuce 这个操作redis的框架的依赖 -->
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.1.4.RELEASE</version>
        </dependency>
        <!-- 创建连接池对象的依赖 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1058998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter笔记:手写并发布一个人机滑动验证码插件

Flutter笔记 手写一个人机滑块验证码 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_28550263/article/details/133529459 写 Flut…

【深蓝学院】手写VIO第4章--基于滑动窗口算法的 VIO 系统:可观性和 一致性--作业

0. 内容 T1. 参考SLAM14讲P247直接可写&#xff0c;注意 ξ 1 , ξ 2 \xi_1,\xi_2 ξ1​,ξ2​之间有约束&#xff08;关系&#xff09;。 套用舒尔补公式&#xff1a; marg掉 ξ 1 \xi_1 ξ1​之后&#xff0c;信息被传递到 L 1 和 L 2 L_1和L_2 L1​和L2​之间了。 T2. …

同学苹果ios的ipa文件应用企业代签选择签名商看看这篇文章你再去吧

同学我们要知道随着互联网的发展&#xff0c;苹果应用市场的火爆&#xff0c;越来越多的开发者加入到苹果应用开发行业中来。同时&#xff0c;苹果应用市场上的应用也在不断增多&#xff0c;用户数量也在不断增加&#xff0c;苹果应用代签是指通过第三方公司为开发者的应用进行…

谋道翻译逆向

文章目录 前文crypto模块分析完整代码结尾 前文 本文章中所有内容仅供学习交流&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff0c;若有侵权&#xff0c;请联系我立即删除&#xff01; crypto模块 Crypto是加密的简称&#…

nodejs+vue校园跑腿系统elementui

购物车品结算,管理个人中心&#xff0c;订单管理&#xff0c;接单处理&#xff0c;商品维护&#xff0c;用户管理&#xff0c;系统管理等功育食5&#xff09;要求系统运行可靠、性能稳定、界面友好、使用方便。 第三章 系统分析 10 3.1需求分析 10 3.2可行性分析 10 3.2.1技术…

Lagrange插值法实验:求拉格朗日插值多项式和对应x的近似值matlab实现(内附代码)

一、实验要求 已知函数表&#xff1a; 求出Lagrange 插值多项式&#xff0c;并计算x1.2处的y的近似值。 二、MATLAB代码 求解多项式&#xff1a; X input(请输入横坐标向量X:\nX); % 获取用户输入的横坐标向量 Y input(请输入纵坐标向量Y:\nY); % 获取用户输入的纵坐标…

MySQL-MVCC(Multi-Version Concurrency Control)

MySQL-MVCC&#xff08;Multi-Version Concurrency Control&#xff09; MVCC&#xff08;多版本并发控制&#xff09;&#xff1a;为了解决数据库并发读写和数据一致性的问题&#xff0c;是一种思想&#xff0c;可以有多种实现方式。 核心思想&#xff1a;写入时创建行的新版…

基于 Netty + RXTX 的无协议 COM 通讯案例实现

参考 Netty集成串口RXTX编程&#xff0c;为什么过时了&#xff1f; Java版本 java version "1.8.0_231" Java(TM) SE Runtime Environment (build 1.8.0_231-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode) RXTX版本 # 官网 http://rx…

第 4 章 串(图书关键字索引表实现)

1. 背景说明 需要从书目文件中获取其关键字及对应的书号索引 bookInfo.txt 005 Computer Data Structures 010 Introduction to Data Structures 023 Fundamentals of Data Structures 034 The Design and Analysis of Computer Algorithms 050 Introduction to Numerical Anal…

STM32复习笔记(六):STM32远程升级BootLoader相关

目录 Preface&#xff1a; &#xff08;一&#xff09;STM32上电启动流程 &#xff08;二&#xff09;BootLoader相关 &#xff08;三&#xff09;Clion配置 Preface&#xff1a; 有关STM32的BootLoader主要还是参考了许多大佬的文章&#xff0c;这里只是简单地列举一下&am…

postgresql新特性之Merge

postgresql新特性之Merge 创建测试表测试案例 创建测试表 create table cps.public.test(id integer primary key,balance numeric,status varchar(1));测试案例 官网介绍 merge into test t using ( select 1 id,0 balance,Y status) s on(t.id s.id) -- 当匹配上了,statu…

STM32复习笔记(二):GPIO

目录 &#xff08;一&#xff09;Demo流程 &#xff08;二&#xff09;工程配置 &#xff08;三&#xff09;代码部分 &#xff08;四&#xff09;外部中断&#xff08;EXTI&#xff09; &#xff08;一&#xff09;Demo流程 首先&#xff0c;板子上有4个按键&#xff0c;…

网络技术在学校是学习网络协议吗

大家好&#xff0c;我是网络工程师成长日记实验室的郑老师&#xff0c;您现在正在查看的是网络工程师成长日记专栏&#xff0c;记录网络工程师日常生活的点点滴滴 一个同学说他现在也在学这个计算机专业&#xff0c;他以后他现在目前规划是把软考和华为的HIP1套。我自己本身也做…

AI时代给我们公司模式带来的改变

最近和几个朋友聊天&#xff0c;都说公司卷的更厉害了&#xff0c;老板趁着经济形势不好压榨的更厉害了&#xff01;但确实没办法&#xff0c;我们部分老板还是喜欢以人为本&#xff08;成本&#xff01;&#xff09;&#xff0c;更喜欢雇更多的人&#xff0c;看着他们低效而痛…

【Java】权限修饰符

目录 权限修饰符 权限修饰符-示例代码 权限修饰符 Java有四种访问权限&#xff0c;其中三种有访问权限修饰符&#xff0c;分别为 private&#xff0c;public 和 protected&#xff0c;还有一种不带任何修饰符&#xff1a; private&#xff1a;Java语言中对访问权限限制的最窄…

【C语言经典100例题-68】有n个整数,使其前面各数顺序向后移m个位置,最后m个数变成最前面的m个数

方法一 将原数组拆成两部分&#xff0c;前面n-m个数和后面m个数。首先将前面n-m个数逆序&#xff0c;然后将后面的m个数逆序。最后将整个数组逆序即可。 #include <stdio.h>void reverse(int arr[], int start, int end) {for (int i start, j end; i < (start en…

快排三种递归及其优化,非递归和三路划分

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 目录 快排简介&#xff1a; 快排的三种递归实现&#xff1a; Hoare&#xff1a; 挖坑&#xff1a; 双指针&#xff1a; 小区间优化&#xff1a; 三数取中优化&#xff1a; 快排非递归实现&#xff1a; 快排的三路划…

嵌入式Linux应用开发-驱动大全-第一章同步与互斥④

嵌入式Linux应用开发-驱动大全-第一章同步与互斥④ 第一章 同步与互斥④1.5 自旋锁spinlock的实现1.5.1 自旋锁的内核结构体1.5.2 spinlock在UP系统中的实现1.5.3 spinlock在SMP系统中的实现 1.6 信号量semaphore的实现1.6.1 semaphore的内核结构体1.6.2 down函数的实现1.6.3 u…

Android etc1tool之png图片转换pkm 和 zipalign简介

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、etc1tool2.1、用法 三、zipalign3.1 使用 四…

day49 ARM

.text .globl _start _start:mov r1,#1mov r2,#0mov r3,#100 fun2:cmp r2,r3bcc fun1 stop:b stop fun1: ADD r2,r2,r1add r4,r4,r2b fun2 .end