Spring中基于redis stream 的消息队列实现方法

news2024/9/23 9:31:34

     本文主要介绍了消息队列的概念性质和应用场景,介绍了kafka、rabbitMq常用消息队列中间件的应用模型及消息队列的实现方式,并实战了在Spring中基于redis stream 的消息队列实现方法。

一、消息队列

      消息队列是一种进程间通信或者同一个进程中不同线程间的通信方式,主要解决异步处理、应用耦合、流量消峰、负载均衡等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

1、异步处理

收到订单消息后,各子系统(库存、支付、消息)可以同步进行。

2、应用解耦

收到订单消息后,各子系统(库存、支付、消息)可以不用被调用或按顺序进行,解决调用失败造成的数据错误

3、流量削峰

在应用和数据库操作之间设置消息队列,消息队列配置请求最大数(低于数据库最大并发数),避免数据库超负荷运行。

4、负载均衡

Kafka、rabbitMq等支持主从架构,在多台服务器进行同步和自动选主。

二、消息队列实现方法

1、四大类方法

内存队列:消息队列通常在内存中实现

文件系统队列:消息可以被写入到文件系统中,持久化存储消息,但需要额外的磁盘空间和I/O操作。

数据库队列:消息可以被添加到数据库的特定表中,然后由另一个进程或线程从表中读取并处理。例如redis、tdengine都可以实现

消息队列中间件:如RabbitMQ、 Kafka等

2、kafka概念

Kafka 的核心架构由以下几个主要组件组成:

  1. Producer(生产者):发送消息的一方,负责发布消息到 Kafka 主题(Topic)。
  2. Consumer(消费者):接受消息的一方,订阅主题并处理消息。
  3. Broker(代理):服务代理节点,Kafka 集群中的一台服务器就是一个 broker,可以水平无限扩展,同一个 Topic 的消息可以分布在多个 broker 中。
  4. Topic(主题):Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  5. Partition(分区):主题的物理分片,提高了并行处理能力。
  6. Replica(副本):副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
  7. ZooKeeper:管理 Kafka 集群的元数据和分布式协调。

3、rabbitMq概念

1.Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3.Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4.Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 5.Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6.Connection 网络连接,比如一个TCP连接。

7.Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8.Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

4、Redis

发布订阅、list 队列、zset 队列、Stream 队列

三、基于redis Stream的实现案例

如业务需要发送邮件和短信时,可引入消息队列,不影响业务进行。

1、依赖

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

2、redisConfig.java连接配置

主要配置连接地址数据库

sync-stream-redis:
  host: 10.110.1.1
  password: aaaa
  database: 0
  port: 6379
  timeout: 10s
  # 连接超时时间
  lettuce:
    shutdown-timeout: 60s
    pool:
      # 连接池中的最小空闲连接
      min-idle: 0
      # 连接池中的最大空闲连接
      max-idle: 8
      # 连接池的最大数据库连接数
      max-active: 8
      # #连接池最大阻塞等待时间(使用负值表示没有限制)
      max-wait: -1ms

syncKey: stream_vir_name

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{


/**
 * 数据库连接配置
 * @return
 */
@Bean(name = "syncDataRedisProperties")
@ConfigurationProperties(prefix = "spring.sync-stream-redis")
public RedisProperties syncDataRedisProperties() {
    return new RedisProperties();
}





@Bean(name = "syncDataRedisConnectionFactory")
public RedisConnectionFactory syncDataRedisConnectionFactory(@Qualifier("syncDataRedisProperties") RedisProperties redisProperties) {
    RedisProperties.Sentinel sentinel = redisProperties.getSentinel();
    RedisConfiguration redisConfig = null;
    if (sentinel == null) {
        // redis单体模式连接配置
        RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
        standaloneConfig.setHostName(redisProperties.getHost());
        standaloneConfig.setPort(redisProperties.getPort());
        standaloneConfig.setDatabase(redisProperties.getDatabase());
        standaloneConfig.setPassword(RedisPassword.of(redisProperties.getPassword()));
        standaloneConfig.setDatabase(redisProperties.getDatabase());
        redisConfig = standaloneConfig;
    }


    // lettuce连接池配置
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    RedisProperties.Lettuce lettuce = redisProperties.getLettuce();
    if(lettuce.getPool() != null) {
        RedisProperties.Pool pool = redisProperties.getLettuce().getPool();
        // 连接池最大连接数
        poolConfig.setMaxTotal(pool.getMaxActive());
        // 连接池中的最大空闲连接
        poolConfig.setMaxIdle(pool.getMaxIdle());
        // 连接池中的最小空闲连接
        poolConfig.setMinIdle(pool.getMinIdle());
        // 连接池最大阻塞等待时间(使用负值表示没有限制)
        poolConfig.setMaxWaitMillis(pool.getMaxWait().toMillis());
    }
    LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();
    // timeout
    if(redisProperties.getTimeout() != null) {
        builder.commandTimeout(redisProperties.getTimeout());
    }
    // shutdownTimeout
    if(lettuce.getShutdownTimeout() != null) {
        builder.shutdownTimeout(lettuce.getShutdownTimeout());
    }
    // 创建Factory对象
    LettuceClientConfiguration clientConfig = builder.poolConfig(poolConfig).build();
    return new LettuceConnectionFactory(redisConfig, clientConfig);
}

}

3、RedisStreamConfig.java 监听配置及消费者注册 启动监听

主要配置监听容器设置(最大消息数-流量削峰可重点关注)和消费者组的注册,程序运行时启动监听,配置监听的topic(streamName示例中为syncKey)

主要用到以下类和方法

StreamMessageListenerContainer 、createGroup、register、createautoAcknowledge

@SuppressWarnings({"rawtypes", "unchecked"})
public class RedisStreamConfig {

//  监听性质配置

@Bean( name = "syncListenerContainer", initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer syncListenerContainer(@Qualifier("syncDataRedisConnectionFactory") RedisConnectionFactory factory) {
    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小, 设置为0会导致CPU飙升
                    .pollTimeout(Duration.ofSeconds(2))
                    // 一次最多获取多少条消息
                    .batchSize(10)
                    // 运行 Stream 的 poll task
                    .executor(emsThreadPoolTaskExecutor)
                    // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                    .errorHandler(e -> {
                        logger.error("streamMessageListenerContainer异常", e);
                    })
                    .build();
    return StreamMessageListenerContainer.create(factory, options);
}

@Bean
public Subscription syncDeviceDataSubscription(@Qualifier("syncListenerContainer") StreamMessageListenerContainer listenerContainer) {
    String groupName = syncKey + "ems";
    StreamOperations streamOperations = syncDataRedisTemplate.opsForStream();
    RecordId recordId = null;
    // 如果队列不存在,则创建队列
    if (Boolean.FALSE.equals(syncDataRedisTemplate.hasKey(syncKey))) {
        recordId = streamOperations.add(syncKey, Collections.singletonMap("_up", "up"));
        // 删除创建队列时的测试消息
        streamOperations.delete(syncKey, recordId);
    }
    // 如果分组不存在,则创建分组
    StreamInfo.XInfoGroups groups = streamOperations.groups(syncKey);
    long groupCount = groups.stream().filter(xInfoGroup -> xInfoGroup.groupName().equals(groupName)).count();
    if (groupCount <= 0) {
        streamOperations.createGroup(syncKey, groupName);
    }

    StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
            StreamMessageListenerContainer.StreamReadRequest
                    .builder(StreamOffset.create(syncKey, ReadOffset.lastConsumed()))
                    .consumer(Consumer.from(groupName, "consumer_" + System.currentTimeMillis()))
                    .cancelOnError(t -> false)
                    // 自动确认消息
                    .autoAcknowledge(true)
                    .build();
    return  listenerContainer.register(readRequest, syncDataStreamListener);

}


}

4、消费者接受消息

主要实现接口StreamListener,并重写onMessage, 在onMessage可调用其他业务方法进行处理(如短信邮箱发送等),接受到的消息格式MapRecord,<id, map<string,data>>.

@Component
public class SyncDataStreamListener implements StreamListener<String, MapRecord<String,String,String>> {
    private final Logger logger = LoggerFactory.getLogger(SyncDataStreamListener.class);


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            String stream = message.getStream();
            RecordId messageId = message.getId();
            Map<String, String> value = message.getValue();
            //   业务处理(如短信、邮箱发送)
        } catch (Exception e) {
            logger.error("处理异常", e);
        }

    }
}

5、生产者

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){
    StreamOperations streamOperations = redisTemplate.opsForStream();
    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);
        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add(syncKey,msgMap);
    
    return "success";
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2101341.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Netlify 为静态站点部署 Waline 评论系统

目录 1 准备工作2 简介2.1 Netlify2.2 Waline2.3 Leancloud 3 开始搭建3.1 Fork 仓库3.2 设置 Leancloud3.3 部署 Netlify3.4 查看评论系统 从我建成个人网站以来&#xff0c;一个月了&#xff0c;竟然还没配置过评论系统&#xff0c;一直用的别人的 awa。 那么今天就稍微研究…

B站up主全程教学趋动云部署大模型:Meta新开源【Llama3.1-70B-Instruct】!

Llama 3.1 的指令调优版本&#xff08;8B、70B、405B&#xff09;针对多语言对话用例进行了优化&#xff0c;在比8种支持语言更广泛的语言集合上进行了训练&#xff0c;预训练模型可以适应多种自然语言生成任务。 Llama 3.1 模型集合还支持利用其模型的输出来改进其他模型&…

基于yolov8的红绿灯目标检测训练与Streamlit部署(代码+教程)

项目背景 随着智能交通系统的快速发展&#xff0c;自动驾驶技术逐渐成为研究的热点。在自动驾驶领域中&#xff0c;准确识别道路上的交通信号灯是确保车辆安全行驶的关键技术之一。近年来&#xff0c;深度学习技术的发展为交通信号灯的识别提供了强大的支持。YOLO&#xff08;…

云微客短视频矩阵系统,如何让企业赢在起跑线?

在这个信息爆炸的时代&#xff0c;传统的营销方式已经无法满足现代企业的快速发展的需求了。那么如何让企业的品牌和产品脱颖而出呢&#xff1f;云微客短视频矩阵系统&#xff0c;就是这样一个创新的解决方法。 但是很多企业认为&#xff0c;在这个短视频盛行的时代&#xff0c…

cr2怎么转换成jpg?分享这五款好用软件!

在数字摄影时代&#xff0c;CR2作为佳能相机常用的RAW格式&#xff0c;虽然能够保留更多的图像细节和色彩信息&#xff0c;但在日常分享和编辑中&#xff0c;JPG格式因其兼容性和便捷性而更受欢迎。今天&#xff0c;我们就来分享五款好用的软件&#xff0c;帮助你轻松将CR2格式…

数据中心代理IP的使用指南:提升网络体验的秘密武器

在互联网的广阔海洋中&#xff0c;数据中心代理IP是一种常见且实用的工具。无论是个人用户还是企业&#xff0c;使用数据中心代理IP都能带来诸多好处。本文将详细介绍数据中心代理IP的概念、优势以及使用技巧&#xff0c;让你在网络世界中游刃有余。 什么是数据中心代理IP&…

用自定义类级注解校验两字段不能同时为空

背景&#xff1a; 有下面这么一个类&#xff0c;要校验两字段query、image不能同时为空&#xff0c;应该怎么实现&#xff1f;已知的NotBlank等标签都只能检验单个字段。 import jakarta.validation.constraints.NotBlank; import lombok.Data; import org.springframework.h…

无人机之载重篇

无人机的载重能力是一个复杂且多样化的参数&#xff0c;它受到多种因素的影响&#xff0c;包括无人机的类型、设计、技术规格以及用途等。以下是对无人机载重能力的详细解析&#xff1a; 一、无人机载重能力的差异 无人机的载重能力差异很大&#xff0c;从几百克到几十千克不等…

全网都在学,2024最新大模型畅销的三本书!千言万语,尽在书中

哈咯各位&#xff0c;我们都知道2024年最火爆的技术非大模型莫属&#xff0c;而今天就给大家分享几本大模型方向2024年新出的书籍&#xff0c;主要分享这三本非常畅销的书----《Transformer自然语言处理实战》&#xff0c;《实战AI大模型》&#xff0c; 《精通Transformer&…

草原灭火车的功能与性能_鼎跃安全

在内蒙古的草原火灾中&#xff0c;水陆两栖全地形草原灭火车曾多次用于紧急救援。其强大的越野能力和高速反应&#xff0c;使其在广袤的草原上能够迅速到达火场&#xff0c;并使用集成的多功能灭火设备进行灭火作业&#xff0c;有效防止了火灾的进一步蔓延。 水陆两栖全地形草原…

SpringDataJPA系列(4)Repository 中的方法返回值使用

SpringDataJPA系列(4)Repository 中的方法返回值使用 Repository 返回结构有哪些&#xff1f; 打开 SimpleJpaRepository 直接看它的 Structure 就可以知道&#xff0c;它实现的方法&#xff0c;以及父类接口的方法和返回类型包括&#xff1a;Optional、Iterable、List、Page…

S-MLAG-简单跨设备链路聚合

资料&#xff1a; https://www.h3c.com/cn/Service/Document_Software/Document_Center/Home/Switches/00-Public/Configure/Practice/H3C_S_MLAG-BP_Long/#_Toc115363852https://www.h3c.com/cn/d_202010/1348323_30005_0.htm#_Toc53156759 S-MLAG概述 S-MLAG简介 通过S-M…

CSS 高级区块效果——WEB开发系列25

CSS提供了多种工具和属性&#xff0c;使我们能够创建视觉上引人注目的效果。今天我们继续将深入了解几种高级CSS效果&#xff1a;盒子阴影、滤镜、混合模式和文本背景裁剪&#xff0c;提升网页设计的质感和深度。 一、盒子阴影&#xff08;Box Shadow&#xff09; 对于盒子元素…

申请联通卡时,为什么需要上传身份证呢?

我们在后台收到了很多朋友的私信&#xff0c;除了咨询流量卡套餐方面的问题之外&#xff0c;也有不少朋友是咨询流量卡申请方面的问题&#xff0c;今天&#xff0c;小编就回答一个很有代表性的问题&#xff0c;为什么申请联通卡时要上传身份证信息&#xff0c;而其他的运营商就…

局域网IP地址老是冲突/环路?这个关键点绝对被你忽略了

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 下午好&#xff0c;我的网工朋友。 在现代企业环境中&#xff0c;局域网(LAN)是支撑日常业务运营的关键基础设施之一。 但随着网络规模的不断扩…

12、Django Admin在列表视图页面上显示计算字段

两种方法&#xff1a; 注册模型有两种方式&#xff0c;需要首先添加或者修改admin中的注册模型如下方式 admin.register(Origin) class OriginAdmin(admin.ModelAdmin):list_display ("name",) 1、在models的模型类中添加函数 def hero_count(self,):return sel…

大学生就业必备的2款软件!助你轻松化身Offer收割机!

金秋九月&#xff0c;又是一年开学季&#xff0c;随之而来也有许多大学生要开始找工作&#xff0c;但现在的情况是&#xff0c;大部分岗位有很多人同时申请&#xff0c;要想从激烈的竞争中脱颖而出&#xff0c;少不了在求职的每一个环节上下功夫。 最受求职的企业注意的&#…

ubuntu linux搭建lvgl

记录一下ubuntu linux搭建 lvgl的过程 本地环境:ubuntu 16.04 ubuntu lvgl sdl2 1 获取源码 git clone https://github.com/lvgl/lv_port_linux.git cd lv_port_linux/ git submodule update --init --recursive查看分支: git branch -a 我选择的是9.2(master分支一直在变动…

DeepWalk【图神经网络论文精读】笔记

链接: DeepWalk【图神经网络论文精读】_哔哩哔哩_bilibili [内容总结::] - deep walk 解决图嵌入问题&#xff1a;将结点压缩成低维向量随机游走&#xff08;类似NLP生成句子&#xff09; 优点与缺点: 相关学习资料 - word2vec 开山必读论文 - DeepWalk 论文阅读 - 代…

erlang学习:用OTP构建系统4,监控树学习

监控树学习 之前写的server程序&#xff0c;如果产生了错误&#xff0c;server会立刻停止。本次学习使用了监控树&#xff0c;能够让server崩溃时打印错误信息并重启服务器&#xff0c;能够使服务器正常使用 -module(sellaprime_supervisor). -behaviour(supervisor). -expor…