java基础巩固-宇宙第一AiYWM:为了维持生计,架构知识+分+微序幕就此拉开之RocketM消息中间件~整起

news2025/1/7 6:49:58

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

RocketMQ

  • 一、RocketMQ概念~一览无余
    • 1.消息队列有啥用?能干啥?消息队列的应用场景?
    • 2.常见的消息队列有哪些?如何进行消息队列的技术选型?
    • 3.RocketMQ的架构中的角色:
    • 4.RocketMQ使用长轮询
    • 5.RocketMQ集群:
  • 二、RocketMQ实操~整一点
    • 1.一言不合就看官网
    • 2.使用RocketMQ
    • 3.RocketMQ与SpringBoot的整合:
  • 巨人的肩膀


一、RocketMQ概念~一览无余

我感觉消息队列或者说消息中间件就像是一个藏咱们私房钱的地方。呀呀呀,这个月的私房钱太多了花不了,先存到这个容器里面,当咱们需要用钱时从里面取出来消息去用

  • 【为什么又叫消息中间件,是因为这个消息队列更多的指的是各个微服务以及系统内部各个组件/模块之间的一种通信方式,相当于是一个中间桥梁,所以也可以叫中间件】
    • RocketMQ:阿里推出的,放到Apache中孵化
  • 中间件:
    • 消息中间件【RocketMQ、kafka】、文件中间件【FastDFS】、缓存中间件【Redis】、搜索中间件【ES】
  • 消息的本质就是数据或者待处理的命令

1.消息队列有啥用?能干啥?消息队列的应用场景?

  • 通常来说,使用消息队列能为我们的系统带来下面几点好处:
    • 应用解耦:让应用之间不在相互依赖,降低系统耦合性
      • 之前咱们all in one拆开后,都在一个JVM中,方便
      • SOA【服务之间相互调用】中,A调B服务,B宕机了,怎么办
        • 服务降级,弃车保帅
        • 对B做集群,相当于做备份,一个调不通就调第二个,要是B集群中所有节点都宕机了怎么办-------->这种情况也说明A模块和B模块之间是强耦合【一个坏了会影响另一个,这就叫强耦合,不是说单一或者分布式咋咋咋】
      • 找中间商、中介、第三方来解耦和,A想干啥,把你的意图先告诉第三方,B有活力有空了来满足你------>异步调用
    • 流量削峰,或者说削峰限流【流量削峰中接入层跟第三方这俩得足够强大,这俩不能拉垮
      • 先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
      • 之前没用RocketMQ时,碰到双11这种流量高峰时,经常使用限流算法【令牌桶、漏桶、计数器】
        • 但用限流算法之后,你想呀,你害怕处理不了你限定,让你的餐厅连锁店一天只能进100个人,要是本来能容纳一万人你得亏多少呀,对不对,你这得丢失多少用户呀【之前用户请求来了,到接入层,你直接用service来接接入层送来的流量,此时为了安全你service上得有限流算法,这样会丢失用户】
      • 并且令牌桶你会把数据缓存进来,但是这个服务拉垮了其他服务是不能帮忙处理的
      • 使用了消息队列之后,可以使用消息中间件来缓冲大量的请求,匀速消费,当消息队列中堆积消息过多时,我们可以动态上线增加消费端,来保证不丢失重要请求
        • 显得很灵活【保证第三方里的消息不会堆积太多,之后用了Docker后,比如服务是Docker中一个一个镜像实例化出来的容器,那么第三方里流量多了我集群中处理的节点【Docker镜像实例化的容器】就增加,流量少了集群中减少【Docker镜像实例化的容器】,这就更灵活了,足够健壮【安全】】
        • 搞个接入层,不管你多少请求流量来,我都能接住你全部这么多请求,接住流量之后再送给第三方,再由service从第三方那里消费这些请求
    • 大数据处理:消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消息收集到主题中,数据使用方可以订阅自己感兴趣的数据内容互不影响,进行消费
    • 异构系统:跨语言
      • 并且可以 通过异步处理提高系统性能(减少响应所需时间)。因为当系统整个的请求量太大时,也就是系统太忙时,将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费(请求数据在后续的业务校验、写数据库等操作中可能失败
      • 异步消息:想要快速发消息,并且可以保证消息不丢失。send方法不会阻塞去等待broker的确认,而是会采用事件监听方式接受broker返回的确认,这不就是异步嘛
        producer.send(message,new SendCallback() {
        
        	public void onSuccess(SendResult sendResult) {
        	// TODO Auto-generated method stub
        	System.out.println("ok");
        	}
        	
        	public void onException(Throwable e) {
        	// TODO Auto-generated method stub
        	e.printStackTrace();
        	System.out.println("err");
        	}
        });
        
        • 有时候上下文消息有关联时,用异步消息可能来不及,人家可能等不及你回调

2.常见的消息队列有哪些?如何进行消息队列的技术选型?

  • kafka。开源的一个分布式流式处理平台,已经成为 Apache 顶级项目,早期被用来用于处理海量的日志,后面才慢慢发展成了一款功能全面的高性能消息队列
    • 流式处理平台常见的关键功能有:作为消息队列、容错的持久方式存储记录消息流、作为一个流式处理平台在消息发布时进行处理
    • Kafka 官网:http://kafka.apache.org/
  • RocketMQ 是阿里开源的一款云原生“消息、事件、流”实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。
    • RocketMQ 官网:https://rocketmq.apache.org/ (文档很详细,推荐阅读)
  • RabbitMQ:RabbitMQ 官网:https://www.rabbitmq.com/
    • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是你的首选
  • Pulsar 官网:https://pulsar.apache.org/

3.RocketMQ的架构中的角色:

在这里插入图片描述

  • NameServer集群:相当于一个Topic路由 注册中心【来管教broker的,也就是支持Broker的动态注册与发现】,提供注册发现功能,
    • 特点:
      • 是一个无状态节点,【无状态指的是不存数据,什么属性都没有,每个节点都长得一样】。从而利用允许不同NameServer之间数据不同步来避免各节点数据强一致性带来的额外性能消耗。
        • zookeeper是有状态的,每个节点存的数据是一样的,节点间会相互通信同步,nameserver各个节点不相互通信,各玩各的
        • 那既然不做数据交互,如何保证数据一致性,其中broker启动时会向所有的nameserver节点建立长连接上报自己的topic信息和queue信息【只要你上线nameserver后】,间接的由这种方法来维持数据间的一致性,来弥补生产者和消费者未进行的数据一致性操作带来的缺陷。
      • NameServer不保证数据一致性,AP,性能极高【数据不持久化都在内存里,因为这里面的数据一般变化快】
      • 底层由netty实现,提供了路由管理、服务注册、服务发现的功能
        • 客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求
      • nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
      • nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用
      • NameServer集群间互不通信,没有主备的概念
      • nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
    • 主要的功能:
      • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳机制,检查Broker是否存活;
      • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息【Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息】和用于Producer及Consumer这些个客户端查询的队列信息。然后Producer及Consumer这些个客户端通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的生产投递和消费了。
        • 当某个NameServer因为某种原因下线了时,Broker仍然可以向集群中的其他NameServer节点同步其路由信息
  • broker集群:
    • 每个Broker节点在启动时,都会遍历NameServer列表【不管你是真的注册中心还是高度模拟注册中心,人家到你这来注册,你不得把人家和人家要存的信息映射关系给存下来,这是你的责任】,与每个NameServer建立长连接,注册自己的信息,之后定时上报
      • 每个Broker与nameserver集群中的所有节点建立长连接,不仅向nameserver上报自己broker上线相关信息,还得定时注册Topic信息【我目前这个broker中有哪些Topic】到所有nameserver
    • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
      • Master与Slave的对应关系通过指定相同的BrokerName不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave
    • producer发给broker消息之后,就跟producer没关系了,broker会持久化到内存,然后可以刷盘到硬盘。而consumer跟broker不管是推还是拉都是有消息成功则ack确认的
  • Producer:
    • 去连接nameserver,问nameserver我想要发送的这个Topic在哪个broker上。然后producer才能发【客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求】
    • 比如咱们模拟一个Producer,代码如下:
      package com.aiminhu.rocketmq;
      
      import org.apache.rocketmq.client.exception.MQBrokerException;
      import ......
      
      /**
       * Created by HuHongBo on 2022/12/21.
       * 消息发送方
       */
      public class Producer {
          public static void main(String[] args) {
              /**
               * 客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求
               */
              DefaultMQProducer producer = new DefaultMQProducer("xxooGroup");
      
              /**
               * 设置nameserver的地址和端口
               */
              producer.setNamesrvAddr("192.168.1.165:9876");
              try {
                  producer.start();
      
                  /**
                   * Message的第一个参数Topic表示消息将要发送的地址
                   * body表示Message中真正发的消息体,真实的数据
                   */
                  Message message = new Message("myTopic001", "first message".getBytes());
                  producer.send(message);
              } catch (MQClientException e) {
                  e.printStackTrace();
              } catch (MQBrokerException e) {
                  e.printStackTrace();
              } catch (RemotingException e) {
                  e.printStackTrace();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      
      
      • 然后在rocketMq的rocketmq-console的web监控页面就能看到Topic
        在这里插入图片描述
    • producer.sendOneWay(message);只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
      • 有可能会丢失消息的
      • 一般你没回调你也没同步,非常容器丢失消息,UDP就是因为没有回调或者没有同步
    • 批量消息发送:发送多条消息,可以把消息们放到列表中打包到一起
      在这里插入图片描述
  • Consumer
    • 一个consumer只能关注一个Topic,消费消息时有没有消费到会反馈ack给broker
    • RocketMQ消费模式有几种?消费模型由consumer决定,消费维度为Topic
      • 集群消费:默认的消费模式,消息只被只消费一次
        在这里插入图片描述
         consumer.setMessageModel(MessageModel.BROADCASTING);
         consumer.setMessageModel(MessageModel.CLUSTERING);
        
        • 一组consumer同时消费一个topic,可以分配消费负载均衡策略分配consumer对应消费topic下的哪些queue
        • 多个group同时消费一个topic时,每个group都会消费到数据
        • 一条消息只会被一个group中的consumer消费
      • 广播消费
        在这里插入图片描述
        • 消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
        • 当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次
        • 只广播一次,你没收到我不会再发
  • Topic和Queue:Topic是一个逻辑上的概念【Topic表示消息将要发送的地址,是个字符串】,实际上Message是在每个Broker上以Queue的形式记录
    在这里插入图片描述
    • 一个Topic不够你可以搞集群
    • RocketMQ的Topic是一组Message Queue的集合 ConsumeQueue,ConsumerQueue是通过消息偏移量建立的消息索引。一条消息是广播消息还是队列消息由客户端消费决定
      在这里插入图片描述
      • 针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置。
        • indexFile:消息的Key和时间戳索引
        • CommitLog:包含的是消息体,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量
      • 启动后会被加载到内存中,加快查找消息速度
      • 以Topic作为文件名称,每个Topic下又以queue id作为文件夹分组
  • Group
    在这里插入图片描述
  • Message
    • RocketMQ消息存储机制:
      在这里插入图片描述
      • 很多使用文件系统存储的高性能中间件都是用了零拷贝技术来发送文件数据,比如Nginx。RocketMQ需要使用内存映射MappedByteBuffer这个类来实现零拷贝。
    • 消息过滤:大数据过滤时特别重要。Message实例化时可以通过添加tag参数来过滤消费
      • 在Producer中使用Tag:Message msg = new Message(“TopicTest”,“TagA” ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET));
      • 在Consumer中订阅Tag:consumer.subscribe(“TopicTest”, “TagA||TagB”);// * 代表订阅Topic下的所有消息
    • 也可以用SQL表达式过滤:
      MessageSelector selector = MessageSelector.bySql("order > 5");
      consumer.subscribe("xxoo3", selector);
      
    • RocketMQ事务消息:
      在这里插入图片描述
      • 用了MQ消息中间件之后,本地事务如果commit了,那么发到MQ的这条消息会被标识为真正可用,这边的consumer再从MQ拿消息消费
      • 本地事务如果rollback了,那么发到MQ的这条消息会被撤回
    • 消息重试:
      • producer端:
        在这里插入图片描述
      • consumer端:
        • 消费超时,单位分钟:consumer.setConsumeTimeout()
        • 发送ack,消费失败:RECONSUME_LATER
      • broker投递:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
        在这里插入图片描述
        • 重投使用messageDelayLevel
    • 保证消息的顺序消费:
      在这里插入图片描述
      在这里插入图片描述
      • MessageListenerOrderly对每个queue开启一个线程,在同一个线程中进行消费,消费(消息)是有序的【每个queue中保证了顺序】
      • 或者说同一个Topic同一个queue,发消息时启动一个线程去发消息,消费时一个线程去消费,多个queue只能保证单个queue中的顺序
  • 刷盘机制:Producer把消息发到broker中后,broker中会刷盘。在CommitLog初始化时,判断配置文件 加载相应的service
    在这里插入图片描述

4.RocketMQ使用长轮询

  • RocketMq使用介于轮询和长连接之间的方式,也就是长轮询【client掌握主动权,如果没有消息供你消费则先将连接挂起来,这个消息玩完了我再去拉下一条来消费,我拉垮我就多用点时间消费消息,我牛B就少用点时间消费这条消息,而不用你server无脑推送。(server先起来,然后client再起来,跟broker建立连接,如果client有消息产生到broker,broker将消息推给server进行消费,消费完成后再建立连接请求,等待消息来,如果有消息产生进去由broker路由转发进行消费)】
    • 因为轮询太浪费资源。【优点是无延迟,响应能被及时送达】
      • 轮询最典型的实现是HTTP协议,每次请求都会建立一次三次握手连接
    • 因为长连接得维护客户端状态【信息】,还有一个重要原因是RocketMQ嫌弃Server不知道客户端这边对消息的消费速度或者说消费能力,我server发了这么多过去你嫌多还是嫌少呀,多了那你能不能消化掉这么多消息呀,容易产生消息堆积,
      • socket

5.RocketMQ集群:

  • 单Master模式:
    • 只有一个 Master节点,配置简单,方便部署。这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
  • 多Master模式:
    • 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
      在这里插入图片描述
    • 优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
  • 多Master多Slave模式(异步复制)
    • 每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    • 缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
  • 多Master多Slave模式(同步双写)
    • ​ 每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
    • ​ 优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    • 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
      在这里插入图片描述

二、RocketMQ实操~整一点

1.一言不合就看官网

  • RocketMQ
    • 官网:https://rocketmq.apache.org/
    • https://github.com/apache/rocketmq

2.使用RocketMQ

  • 单纯简单使用MQ,用activeMQ、消息特别多流量特别大、低延迟时用面向集群cluster的RocketMQ
  • 编译安装RocketMQ、按照QuickStart运行HelloWorld
    • 学到了你该安装什么软件,该解压该配环境变量然后source /etc/profile,该弄就弄,弄完了比如咱们现在用人家RocketMQ这个软件,就得先编译一下人家的源码,编译命令是:mvn -Prelease-all -DskipTests clean install -U
      • 再强调,只有这个项目有pom.xml文件,这个项目才能被maven编译,所以看清楚找找这个pom.xml
      • java ?,里面查询之后,有个java -verbose可以查询到咱们java的安装环境
    • 编译RocketMq的源码后,进入他的bin目录下,./mqnamesrv和./mqbroker分别启动【这个namesrv相当于一个注册中心,这个broker相当于一个服务】mqnamesrv和mqbroker,mqnamesrv启动时会调用执行runserver.sh这个脚本,而mqbroker启动时会调用执行runbroker.sh这个脚本【改一下rubbroker.sh中Xms和Xma,改小一点】
      在这里插入图片描述

3.RocketMQ与SpringBoot的整合:

  • 找官方的starter或者没有官方的starter就用下面的pom.xml
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-common</artifactId>
        <version>4.6.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.1</version>
    </dependency>
    
  • Producer配置
    • Config配置类:用于在系统启动时初始化producer参数并启动
      package com.....rmq.controller;
      
      import org.apache.rocketmq.client.exception.MQClientException;
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class MQConfig {
      public static final Logger LOGGER = LoggerFactory.getLogger(MQConfig.class);
      
      @Value("${rocketmq.producer.groupName}")
      private String groupName;
      
      @Value("${rocketmq.producer.namesrvAddr}")
      private String namesrvAddr;
      
      @Bean
      public DefaultMQProducer getRocketMQProducer() {
      
      	DefaultMQProducer producer;
      	producer = new DefaultMQProducer(this.groupName);
      	
      	producer.setNamesrvAddr(this.namesrvAddr);
      	
      	try {
      	producer.start();
      	System.out.println("start....");
      	
      	LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]", this.groupName,
      	this.namesrvAddr));
      	} catch (MQClientException e) {
      	LOGGER.error(String.format("producer is error {}", e.getMessage(), e));
      	}
      	return producer;
      	
      	}
      }
      
    • Service消息发送类
      package com.....rmq.service;
      
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.apache.rocketmq.common.message.Message;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Service;
      @Service
      public class MQService {
      	@Autowired
      	DefaultMQProducer producer;
      	
      	
      	public Object sendMsg(String string) {
      	
      	for (int i = 0; i < 1; i++) {
      	Message message = new Message("tpk02", "xx".getBytes());
      	
      	try {
      	return producer.send(message);
      	} catch (Exception e) {
      	// TODO Auto-generated catch block
      	e.printStackTrace();
      	} 
      	}
      	return null;
      	}
      }
      
    • 配置文件
      spring.application.name=mq01
      rocketmq.producer.namesrvAddr=192.168.150.131:9876
      rocketmq.producer.groupName=${spring.application.name}
      server.port=8081
      
  • Consumer配置
    • Config配置类
      package com.....rmq.controller;
      
      
      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class MQConfig {
      
          
      public static final Logger logger = LoggerFactory.getLogger(MQConfig.class);
      
      @Value("${rocketmq.consumer.namesrvAddr}")
          private String namesrvAddr;
          @Value("${rocketmq.consumer.groupName}")
          private String groupName;
          @Value("${rocketmq.consumer.topics}")
          private String topics;
      
          @Bean
          public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
          
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
              consumer.setNamesrvAddr(namesrvAddr);
              consumer.subscribe(topics, "*");
              
              consumer.registerMessageListener(new MyMessageListener() );
              consumer.start();
              
              return consumer;
          }
      }
      
    • 消息处理类
      package com.....rmq.controller;
      
      import java.util.List;
      
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
      import org.apache.rocketmq.common.message.MessageExt;
      
      public class MyMessageListener implements MessageListenerConcurrently {
      	@Override
      	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      	System.out.println("来啦!!22!");
      	for (MessageExt msg : msgs) {
      	
      	System.out.println(new String(msg.getBody()));;
      	}
      	return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      	}
      }
      
    • 配置文件
      spring.application.name=mq02
      rocketmq.producer.namesrvAddr=192.168.150.131:9876
      rocketmq.producer.groupName=${spring.application.name}
      
      rocketmq.consumer.topics=tpk02
      

巨人的肩膀

RocketMQ官网
码农翻身
javaGuide
凤凰架构
官网:https://rocketmq.apache.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/118625.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2、MySQL支持的数据类型

目录 1、整数类型 &#xff08;1&#xff09;fillzero&#xff1a;根据整数类型的长度自动添加0 &#xff08;2&#xff09;unsigned&#xff1a;非负整数 &#xff08;3&#xff09;bin&#xff08;m&#xff09;&#xff1a;将十进制数转为m进制 2、日期时间类型 &#x…

【MySQL基础教程】函数的介绍与使用

前言 本文为 【MySQL基础教程】函数的介绍与使用 相关知识&#xff0c;下边具体将对字符串函数&#xff0c;数值函数&#xff0c;日期函数&#xff0c;流程函数等进行详尽介绍~ &#x1f4cc;博主主页&#xff1a;小新要变强 的主页 &#x1f449;Java全栈学习路线可参考&…

MAXHUB+腾讯会议:为未来办公造一部动力引擎

科技领域有个规律&#xff0c;我们经常高估一年的变化&#xff0c;而低估了十年或者更长时间所可能发生的变化。不信可以做个测试&#xff0c;你觉得未来线上办公会怎么发展&#xff1f;不少朋友会说&#xff0c;既然线上办公是疫情到来之后的PlanB&#xff0c;那么随着疫情结束…

【STM32F4系列】【HAL库】【自制库】ps2手柄模块驱动

外观和电气连接 外观 手柄外观如下 接收器外观 这是接收器和底座 电气连接 需要4根连接线 单片机输出是CLK DO CS 单片机输入是DI 电源电压是3.3-5v 注意模块和单片机共地 模块不支持高速,最大时钟周期约为4us左右 因此使用软件模拟时序的方式来与模块通信 只需要将模块的4根线…

Golang Context 的几种应用场景

Golang context主要用于定义超时取消&#xff0c;取消后续操作&#xff0c;在不同操作中传递值。本文通过简单易懂的示例进行说明。 超时取消 假设我们希望HTTP请求在给定时间内完成&#xff0c;超时自动取消。 首先定义超时上下文&#xff0c;设定时间返回取消函数&#xff…

Apache POI操作百万数据excel实战方案及JDK性能监控工具Jvisualvm实战

百万数据报表概述 文章目录**百万数据报表概述****1、** **概述****2、 JDK性能监控工具介绍****2.1、 Jvisualvm概述****2.2、 Jvisualvm的位置****2.3、 Jvisualvm的使用****3、** **解决方案分析****4**、**百万数据报表导出****4.1** **需求分析****4.2** **解决方案****4.…

玩转门店管理新方法,促进营收利润加倍

门店管理的好坏是门店是否可以运营下去的重要因素&#xff0c;决定了门店的存亡与兴衰。以往很多门店管理者为了更简单方便&#xff0c;采用的是传统方式进行管理。即运用手工的方式记录和计算门店的各种信息。但是随着门店规模的扩大、商品种类的丰富、客户需求的增加以及员工…

普惠微光汇聚暖阳,招联携手奋斗者筑梦前行

撰稿 | 多客 来源 | 贝多财经 近年来&#xff0c;受疫情反复带来的经济下行压力&#xff0c;收入减少生活难以保障成了社会一大难题。值此艰难时刻&#xff0c;一大批爱心企业出资出力&#xff0c;纷纷用实际行动诠释企业的使命和担当。口罩、防护服、矿泉水、食品、药物、免费…

vue配置webpack生产环境.env.production、测试环境.env.development(配置不同环境的打包访问地址)

vue-cli区分办法 vue配置生产环境.env.production、测试环境.env.development vue配置webpack生产环境、测试环境 在使用webpack创建完vue2项目的时候&#xff0c;为了解决生产打包、测试打包对应的全局变量不一致的问题。 首先看一下package.json的改动&#xff1a; "…

MARL算法系列(1):IQL【原理+代码实现】

原文题目&#xff1a;Multiagent cooperation and competition with deep reinforcement learning 作者&#xff1a;Tampuu, Ardi and Matiisen, Tambet and Kodelja, Dorian等 发表时间&#xff1a;2017年 主要内容&#xff1a;相互独立的两个DQN智能体&#xff0c;竞争任务下…

2022年威胁隐私和安全的数个“罪魁祸首”

随着互联网技术的不断发展&#xff0c;我们对网络的信任也在不断增加&#xff0c;甚至将自己的私人数据委托给各种在线平台&#xff0c;如个人数字身份信息、银行账户、各种机密信息。网络一方面的确带来变革型的进步&#xff0c;但另一方面&#xff0c;频频曝光的数据泄露事件…

VueJs中setup的使用(下)

前言在Vue当中,父组件想要向子组件传值,是通过在父组件标签上通过自定义属性实现的,而在子组件中是通过props进行接收在Vue2.0里面,在子组件中的选项式API配置项选项中props进行接收就可以了的,在子组件中的模板中可以直接使用但在Vue3里面与Vue2.0存在一些差异,这个主要是针对…

excel文件管理:如何进行密码保护和破解? 下篇

在上篇文章中&#xff0c;我们提到了设置工作簿的打开权限密码、修改权限密码、保护工作簿的密码、允许编辑区域的密码&#xff0c;并且讲到了两种破解excel密码的方式。今天&#xff0c;我们书接上回&#xff0c;继续讲解excel中常见的密码保护和破解方式&#xff0c;一起来看…

浅谈屏幕适配

文章目录1. 概述2. 屏幕尺寸3. 屏幕分辨率4. 屏幕像素密度5. dp、sp、px6. mdpi、hdpi、xdpi..7. 屏幕分辨率限定符8. 最小宽度限定符8.1 获取设计图最小宽度(dp)8.2 生成对应的dimens.xml文件8.3 尺寸限定符8.4 其它9. 今日头条相关9.1 系统状态栏获取不对问题9.2 autosize1. …

Elasticsearch8.X入门实战(二)Elasticsearch集群架构

Elasticsearch集群由一个或多个节点(服务器)组成,这些节点一起保存Elasticsearch的所有数据,并提供跨所有节点的联合索引和搜索功能。集群由一个唯一的名称来标识,该名称默认为“elasticsearch”(可以在配置文件中修改)。当某个节点被设置为相同的集群名称时,该节点才能…

Docker容器的简单介绍与使用

前言&#xff1a;大家好&#xff0c;我是小威&#xff0c;24届毕业生&#xff0c;曾经在某央企公司实习&#xff0c;目前入职某税务公司。本篇文章将记录和分享docker容器相关的知识点。 本篇文章记录的基础知识&#xff0c;适合在学Java的小白&#xff0c;也适合复习中&#x…

如何更好地进行 Android 组件化开发——路由原理篇

前言 组件化开发的会实现代码隔离&#xff0c;在开发时访问不到模块的代码&#xff0c;降低代码耦合度。那么如何跳转组件的页面、如何进行组件间的通信是个问题。这通常会使用到 ARouter、TheRouter、WMRouter 等路由框架。可能有不少人只知道怎么去调用&#xff0c;并不知道…

Pod内容详情梳理

本篇是笔者的一篇读书笔记&#xff0c;用于梳理pod的详情&#xff0c;方便理解和学习&#xff0c;也方便后续自己查询。一、Pod的概述Pod是k8s里面典型的CR&#xff0c;从它的元数据来看&#xff0c;具有所有CR的基本数据构成&#xff0c;分别是 version、kind&#xff0c;以及…

迅为RK3568开发板支持多屏同显/异显动态方案

iTOP-RK3568开发板采用四核Cortex-A55处理器&#xff0c;芯片内置VOP控制器&#xff0c;支持HDMI、LVDS、MIPI、EDP四种显示接口的多屏同显、异显和异触&#xff0c;可有效提高行业定制的拓展性。 三屏同显&#xff1a; 三屏异显&#xff1a; 双屏同显&#xff1a; 双屏异显&am…

Docker容器里进程的 pid 是如何申请出来的?

大家好&#xff0c;我是飞哥&#xff01;如果大家有过在容器中执行 ps 命令的经验&#xff0c;都会知道在容器中的进程的 pid 一般是比较小的。例如下面我的这个例子。# ps -ef PID USER TIME COMMAND1 root 0:00 ./demo-ie13 root 0:00 /bin/bash21 root …