RocketMQ的消息类型

news2024/11/5 12:14:29

RocketMQ的消息类型

文章目录

  • RocketMQ的消息类型
    • 一、顺序消息
    • 二、广播消息
      • 应用场景:
      • 示例代码:
      • 实现思路:
      • 注意点:
    • 三、延时消息
      • 应用场景:
      • 核心方法:
    • 四、批量消息
      • 应用场景:
      • 示例代码:
      • 注意点:
    • 五、过滤消息
      • 应用场景:
      • 示例代码:
        • 简单过滤:
        • SQL过滤:
      • 实现思路:
    • 六、事务消息
      • 应用场景:
      • 注意点:

一、顺序消息

顺序消息指生产者局部有序发送一个queue,但是多个queue之间时全局无序的。

  • 顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    
    /**
     * 顺序消息生产者
     * @author 
     * @date 2024年10月23日 11:00
     */
    public class OrderProducer {
        public static void main(String[] args) throws MQBrokerException, RemotingException, InterruptedException, MQClientException ,MQBrokerException, RemotingException, InterruptedException {
            //创建一个生产者
            DefaultMQProducer producer = new DefaultMQProducer("orderProducer");
            producer.setNamesrvAddr("localhost:9876");
            try {
                producer.start();
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
            //一个外循环对应十条内循环消息,以便观察消息的有序性
            for (int i = 0; i < 5; i++) {
                for (int j = 0; j < 10; j++) {
                    Message message = new Message("TopicOrder","TagA",("order_"+ i +"_step_"+ j).getBytes(StandardCharsets.UTF_8));
                    SendResult send = producer. send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            Integer id = (Integer) o;
                            int index = id % list.size();
                            return list.get(index);
                        }
                    }, i);
                    System.out.println("消息发送成功_"+ send);
                }
    
            }
            producer.shutdown();
        }
    }
    
  • 顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * 顺序消息消费者
     * @author 
     * @date 2024年10月23日 11:00
     */
    public class OrderConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe("TopicOrder", "*");
    
    //        consumer.setMessageListener(new MessageListenerOrderly() {
    //            @Override
    //            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    //                for (int i = 0; i < list.size(); i++) {
    //                    System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
    //                }
    //                return ConsumeOrderlyStatus.SUCCESS;
    //            }
    //        });
            consumer.setMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                    for (int i = 0; i < list.size(); i++) {
                        System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer. start();
            System.out.println("consumer started.%n");
        }
    }
    

二、广播消息

应用场景:

​ 广播模式和集群模式是RocketMQ的消费端处理消息最基本的两种模式,集群模式下,一个消息,只会被一个消费者组中的多个消费者实例共同处理一次。广播模式下,一个消息,则会推送所有消费者实例处理,不再关心消费者组。

示例代码:

consumer.setMessageModel(MessageModel.BROADCASTING);

实现思路:

​ 默认模式也就是集群模式下,Broker端会给每个ConsumerGroup委会一个同意的Offset,这样,打给你Consumer来拉取消息是,就可以通过Offset保证一个消息,在同一个ConsumerGroup内只会被消费一次,而广播模式的本质,是将Offset转移到Consumer端自行保管,包括Offset的记录以及更新,全都放在客户端。这样Broker推送消息是,就不再管ConsumerGroup,只要Consumer来拉取消息,就返回对应的消息。

注意点:

  1. Broker端不维护消息消费进度,意味着,如果消费者处理消息失败了,将无法进行消息重试。

  2. Consumer端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。如果Offset丢了,Consumer依然可以拉取消息。

    比如生产者发送了1~10号消息。消费者当前消费到第6个时宕机了。当他重启时,Broker端已经把第十个消息都推送完成了。如果消费者维护好了自己的Offset,那么他就可以在服务重启时,重新向Broker申请6号~10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运行,但是6~10号消息就无法再申请。后续这个消息者就只能获取10号以后的消息。

三、延时消息

应用场景:

​ 延时消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

​ 对比RabbitMQ和Kafka。RabbitMQ中只能通过死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。Kafka则不太好实现延迟消息。

核心方法:

//指定固定的延迟级别
Message message = new Message(TOPIC,("Hello scheduled message"+i).getBytes(StandardsCharsets.UTF_8));
message.setDelayTimelevel(3);//10秒之后发送
//指定消息发送时间
Message message = new Message(TOPIC,("Hello scheduled message"+i).getBytes(StandardsCharsets.UTF_8));
message.setDeliverTimeMs(System.currentTimeMillis()+10_000L);//10秒之后的时间点

在这里插入图片描述

四、批量消息

应用场景:

​ 生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。

示例代码:

List<Message> message = new ArraayList<>(MESSAGE_COUNT);
for(int i = 0;i<MESSAGE_COUNT;i++){
	message.add(new Message(TOPIC,TAG,("Hello world"+i).getBytes(StandardsCharsets.UTF_8)))
}
ListSplitter splitter = new ListSplitter(message);
while(splitter.hasNext()){
	List<Message> listItem = splitter.next();
	SendResult sendResult = producer.send(listItem);
	System.oout.printf("%s",sendResult);
}

注意点:

​ 批量消息的使用非常简单,但是要注意RocketMQ做了限制。同一批消息的Topic必须相同,另外,不支持延迟消息。还有;;idling消息的大小不要超过1M,如果太大就需要自行分割。

五、过滤消息

应用场景:

​ 同一个Topic下有多种不同的消息,消费者只希望关注某一类消息。

​ 例如,某系统中给仓储系统分配一个Topic,在Topic下,会传递过来入库,出库等不同的消息,仓储系统的不同业务消费者就需要过滤出自己感兴趣的消息,进行不同的业务操作。

在这里插入图片描述

示例代码:

简单过滤:

​ 生产者端需要在发送消息时,增加Tag属性。比如我们上面举例当中的入库、出库。核心代码:

String[] tags = new String[] {"TagA","TagB","TagC"};
for(int i = 0;i< 15;i++){
	Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
	SendResult sendResult = producer.send(msg);
	System.out.printf("%s%n",sendResult);
}

​ 消费者端就可以通过这个Tag属性订阅自己感兴趣的内容,核心代码:

consumer.subscribe("TagFilterTest","TagA");

​ 这样,后续Consumer就只会出处理TagA的消息

SQL过滤:

​ 通过Tag属性,只能进行简单的消息匹配。如果要进行更复杂的消息过滤,比如数字比较,模糊匹配等,就需要使用SQL过滤方式可以通过Tag属性以及用户自定义的属性一起,以标准SQL的方式进行消息过滤。

​ 生产者端就在发送消息时,除了Tag属性外,还可以增加自定义属性。核心代码:

String[] tags = new String[] {"TagA","TagB","TagC"};
for(int i = 0; i < 15; i++){
	Message msg = new Message("SqlFilterTest",tag[i%tags.length],("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAILT_CHARSET));
	msg.putUserProperty("a",String.valueOf(i));
	SendResult sendResult = producer.sen(msg);
	System.out.printf("%s%n",sendResult);
}

消费者端在进行过滤时,可以指定一个标准的SQL语句,定制复杂的过滤规则。核心代码:

consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAG in('TagA','TagB'))"+"and(a is not and a between 0 and 3)"));

实现思路:

​ 实际上,Tags和用户自定义的属性,都是随着消息一起传递的,所以,消费者端是可以拿到Tags和自定义属性的。比如“

consumer.registerMessageListener(new MessageListenerConcurrently(){
	@Override
	public ConsumeConcurrentlyStatus consumeMessag(List<MessageExt> msgd,
	ConsumeConcurrentlyContext context{
		for(MessageExt msg:msgs){
			System.out.println(msg.getTags());
			System.out.println(msg.getProperTties());
		}
		System.out.println("%s Receive New Message: %s %n",Thread.currenThread().getName(),msg);
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	})
})

​ 这样,剩下的就是在Consumer中对消息进行过滤了。Broker会在往Consumer推送消息时,在Broker端进行消息过滤。是Consumer感兴趣的消息,就往Consumer推送。

六、事务消息

应用场景:

​ 事务消息时RocketMQ非常特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。

​ 以电商为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景,非常适合使用RocketMQ的解藕功能来进行串联。

在这里插入图片描述

​ 考虑到事务的安全性,即要保证相关联的这几个业务一定是同时成功或者同时失败的。如果要将四个服务一起作为一个分布式事务来控制,可以做到,但是会非常麻烦。而使用RocketMQ在中间串联了之后,事情可以得到一定程度的简化。由于RocketMQ与消费者端有失败重试机制,所以,只要消息成功发送到RocketMQ了,那么可以认为Branch2.1,Branch2.2,Branch2.3这几个分支步骤,是可以保证最终的数据一致性的。这样,一个复杂的分布式事务问题,就变成了MinBranch1和Branch2两个步骤的分布式事务问题。

​ 在此基础上,RocketMQ提出了事务消息机制,采用两阶段提交的思路,保证Main Branch1和Branch2之间的事务一致性。

在这里插入图片描述

注意点:

  1. 半消息是对消费者不可见的一种消息。实际上,RocketMQ的做法是将消息转到了一个系统Topic,RMQ_SYS_TRANS_HALF_TOPIC.
  2. 事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。
  3. 在具体执行时,可以对事务流程进行适当的调整。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2232560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Selective Generation for Language Models 语言模型的选择性生成

生成式语言模型&#xff08;Generative Language Models, GLMs&#xff09;在文本生成任务中取得了显著进展。然而&#xff0c;生成内容的“幻觉”现象&#xff0c;即生成内容与事实或真实语义不符的问题&#xff0c;仍是GLMs在实际应用中的一个重大挑战。为了解决这一问题&…

git clone,用https还是ssh

前言 在使用Git去克隆项目时&#xff0c;会遇到https和ssh等形式&#xff0c;这两种又有何种区别呢&#xff0c;本文将重点讨论在具体使用中的问题。 注:第一次使用Git 时&#xff0c;需要先设置全局用户名和邮箱&#xff0c;否则后续使用命令时会报错&#xff0c;也是提醒先添…

最新整理:Selenium自动化测试面试题

1.selenium中如何判断元素是否存在? find_elements查找到的元素个数为0&#xff0c;find_element报错意味着元素不存在 2.如何判断元素是否出现? 判断元素是否出现&#xff0c;存在两种情况&#xff0c;一种是该元素压根就没有&#xff0c;自然不会出现;另外一种是有这样的…

业绩代码查询实战——php

一、一级代码显示职员 foreach($data_职员信息 as $key > $value){//$where_查询分类$where_查询通用;//$dat分类one $业绩提成->where($where_查询分类)->order("CreateDate desc")->select();if($value[haschildname]0 && $value[key] !"…

如何彻底删除gitbash中所有的命令记录、以及彻底删除Windows powerShell或者cmd中的所有命令记录

文章目录 1. 文章引言2. 彻底删除gitbash中所有的命令记录3. 彻底删除Windows powerShell或者cmd中的所有命令记录1. 文章引言 有时,我们使用外部电脑从gitbash中下载代码,假设使用history -c命令: 可以清除当前弹框的历史记录,但也无法彻底删除命令记录。打开gitbash后,通…

工作管理实战指南:利用Jira、Confluence等Atlassian工具打破信息孤岛,增强团队协作【含免费指南】

如果工作场所存在超级反派&#xff0c;其中之一可能会被命名为“信息孤岛”&#xff0c;因为它们能够对公司的生产力和协作造成严重破坏。当公司决定使用太多互不关联的工具来完成工作时&#xff0c;“信息孤岛”就会出现&#xff0c;导致团队需要耗费大量时间才能就某件事情达…

OceanBase V4.3.3,首个面向实时分析场景的GA版本发布

在10月23日举办的 OceanBase年度发布会 上&#xff0c;我们怀着激动之情&#xff0c;正式向大家宣布了 OceanBase 4.3.3 GA 版的正式发布&#xff0c;这也是OceanBase 为实时分析&#xff08;AP&#xff09;场景打造的首个GA版本。 2024 年初&#xff0c;我们推出了 4.3.0 版本…

最新最全面的JAVA面试题免费下载

面对求职市场的激烈竞争&#xff0c;掌握全面且深入的Java知识已成为每一位Java开发者必不可少的技能。《2023最新版Java面试八股文》是一份精心整理的面试准备资料&#xff0c;旨在帮助广大开发者系统复习&#xff0c;从容应对Java及相关技术栈的面试挑战。这份文档不仅汇聚了…

Spring Security 框架篇-深入了解 Spring Security 的授权核心功能(RBAC 权限模型、自定义异常处理器、校验权限方法)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 权限系统 1.1 引入 1.2 RBAC 权限模型 1.3 数据库设计 2.0 Spring Security 核心功能-授权 2.1 思路分析 2.2 编写 SQL 语句 2.3 将用户权限进行封装 2.4 获取用户…

博捷芯MIP专机:精密划片技术的革新者

BJX8160 精密划片机作为MINI行业的专用机&#xff0c;凭借其全自动上下料、高精度高速度um级无膜切割以及兼容多种上下料方式等特点&#xff0c;成为了工厂无人值守自动化的理想选择。同时&#xff0c;MIP专机作为博捷芯的独创产品&#xff0c;展现了博捷芯在精密划片机领域的领…

【嵌入式】STM32中的SPI通信

SPI是由摩托罗拉公司开发的一种通用数据总线&#xff0c;其中由四根通信线&#xff0c;支持总线挂载多设备&#xff08;一主多从&#xff09;&#xff0c;是一种同步全双工的协议。主要是实现主控芯片和外挂芯片之间的交流。这样可以使得STM32可以访问并控制各种外部芯片。本文…

Android 虚拟化框架(AVF)指南

Android 虚拟化框架&#xff08;AVF&#xff09;指南 一、项目介绍二、项目特色三、如何使用AVF四、总结 随着移动设备的普及和应用场景的多样化&#xff0c;安全性和隐私保护成为了移动操作系统的重要课题。Android作为全球最广泛使用的移动操作系统之一&#xff0c;一直在不断…

explain执行计划分析 ref_

这里写目录标题 什么是ExplainExplain命令扩展explain extendedexplain partitions 两点重要提示本文示例使用的数据库表Explain命令(关键字)explain简单示例explain结果列说明【id列】【select_type列】【table列】【type列】 【possible_keys列】【key列】【key_len列】【ref…

1.2 图像处理基本操作

在本实战中&#xff0c;我们将学习如何使用OpenCV进行基本的图像处理操作。首先&#xff0c;我们将通过cv2.imread()函数读取图像&#xff0c;并使用cv2.imshow()在窗口中显示它。接着&#xff0c;我们将探索如何通过cv2.imwrite()保存图像&#xff0c;并设置不同的参数以控制图…

[Unity Demo]从零开始制作空洞骑士Hollow Knight第十八集:制作UI系统的主菜单界面和选择存档界面

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、制作UI系统的主菜单界面 1.选择存档界面制作 2.代码的逻辑处理二、制作UI系统的选择存档界面 1.选择存档界面制作2.代码的逻辑处理总结 前言 hello大家好久…

Unity照片墙效果

Unity照片墙效果&#xff0c;如下效果展示 。 工程源码

华为HarmonyOS打造开放、合规的广告生态 - 贴片广告

场景介绍 贴片广告是一种在视频播放前、视频播放中或视频播放结束后插入的视频或图片广告。 接口说明 接口名 描述 loadAd(adParam: AdRequestParams, adOptions: AdOptions, listener: AdLoadListener): void 请求单广告位广告&#xff0c;通过AdRequestParams、AdOptions…

基于 Transformer 的语言模型

基于 Transformer 的语言模型 Transformer 是一类基于注意力机制&#xff08;Attention&#xff09;的模块化构建的神经网络结构。给定一个序列&#xff0c;Transformer 将一定数量的历史状态和当前状态同时输入&#xff0c;然后进行加权相加。对历史状态和当前状态进行“通盘…

【天线&运输】冲浪者检测系统源码&数据集全套:改进yolo11-DySnakeConv

改进yolo11-SCConv等200全套创新点大全&#xff1a;冲浪者检测系统源码&#xff06;数据集全套 1.图片效果展示 项目来源 人工智能促进会 2024.11.03 注意&#xff1a;由于项目一直在更新迭代&#xff0c;上面“1.图片效果展示”和“2.视频效果展示”展示的系统图片或者视频可…

计算机毕业设计Hadoop+Spark大模型微博情感分析 微博舆情分析 微博爬虫 微博可视化 微博大数据分析 微博大数据 大数据毕业设计 Hive数据仓库

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…