RocketMQ的简单使用

news2025/1/10 9:36:17

大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。

领域模型介绍

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-80MLgVDF-1682998620720)(C:\Users\DY\AppData\Roaming\marktext\images\2023-04-03-16-02-10-image.png)]

Producer: 用于生产消息的运行实体。

Topic: 主题,用于消息传输和存储的分组容器。

MessageQueue: 消息传输和存储的实际单元容器。

Message: 消息传输的最小单元。

ConsumerGroup: 消费者组。

Consumer: 消费者。

Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置。

MQ的优势

MQ的明显优势有3个。

应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合。

异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度。

削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求。

当然MQ也有劣势系统可用性降低系统复杂度提高一致性问题

RocketMQ的主要角色

主要包括Producer、Broker、Consumer、NameServer Cluster。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qrKVxGrf-1682998620721)(D:\卓面\学习文件\学习内容\博客\images\2023-04-28-10-07-53-image.png)]

一对多

可以通过设置不同的消费者组

不同组通过不同的消费者组既可以实现同时收到一样数量的消息,那同一个消费者组需要怎样才能收到同样数量的消息呢?

// 消费者消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);

默认是集群模式CLUSTERING,设置成广播模式

既可以实现一对多的发送。

同步消息(普通消息)

同步消息需要阻塞等待消息发送结果的返回

public class ProducerDemo {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message message = new Message();
        message.setTopic("MQLearn");
        message.setTags("1.0.0");
        message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(message);
        if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
            System.out.println(result);
            System.out.println("发送成功:" + message);
        }

        producer.shutdown();
    }
}

异步消息

异步消息需要实现发送成功和失败的回调函数。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("192.168.246.140:9876");
        producer.start();
        // 异步消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic7");
            message.setTags("1.0.0");

            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 发送成功的回调方法
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    // 发送失败的回调方法
                    System.out.println(e);
                }
            });
        }
        TimeUnit.SECONDS.sleep(10);
        System.out.println("异步发送完成!");
    }
}

单向消息

单向消息就类似UDP,只顾单向发送,不管是否发送成功,常用于日志收集等场景。

public class SingleDirectionProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        // 单向消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic8");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.sendOneway(message);
        }
        System.out.println("带向发送完成!");
    }
}

延时(定时)消息

RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别:

1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h

// 设置消息延时级别
message.setDelayTimeLevel(3);

批量消息

批量消息支持一次发送多条消息。

注意:

  • 批量消息需要有相同的topic

  • 不能是延时消息

  • 消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送)

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        List<Message> list = new ArrayList<>();
        // 批量消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic10");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            list.add(message);
        }
        SendResult result = producer.send(list);
        System.out.println(result);
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发送完成!");
    }
}

顺序消息

顺序消息支持按照消息的发送消息先后获取消息。

比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成。

通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题。

通过实现MessageQueueSelector来选择一个队列。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();

            Message message = new Message();
            // 模拟业务ID
            int step = 10;
            message.setTopic("topic12");
            message.setTags("1.0.0");
            message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 队列数
                    int size = mqs.size();
                    // 取模
                    int orderId = step;
                    return mqs.get(orderId % size);
                }
            }, null);

        System.out.println("发送完成!");
    }
}
public class Consumer {

    public static void main(String[] args) throws Exception{

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup("group1");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("topic12", "*");
        // 消费者,起一个顺序监听,一个线程,只监听一个队列
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动了!");

    }
}

事务消息

RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9yAZWR00-1682998620722)(D:\卓面\学习文件\学习内容\博客\images\2023-05-02-11-03-40-image.png)]

大致流程为,

  1. 生产者先将消息发送至RocketMQ。

  2. RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态。

  3. 执行本地事务逻辑。

  4. 生产者根据事务执行结果向Broker提交commit或者rollback结果。

  5. 如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查。

  6. 生产者收到消息回查后,需要本地事务执行的最终结果。

  7. 生产者对本地事务状态进行二次提交或确认。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        // 设置事务监听
        producer.setTransactionListener(new TransactionListener() {
            // 正常事务监听
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 把消息保存到mysql数据库
                boolean ok = false;

                if (ok) {
                    System.out.println("正常执行事务过程");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    System.out.println("事务补偿过程");
                    return LocalTransactionState.UNKNOW;
                    //return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            }
            // 事务补偿事务
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("事务补偿过程");
                // sql select
                if (true) {

                } else {

                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        String msg = "Hello Transaction Message!";
        Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
        TimeUnit.SECONDS.sleep(2);
        System.out.println(transactionSendResult);
        System.out.println("发送完成!");
    }
}

消息的过滤

在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

支持两种方式:Tag标签过滤和SQL属性过滤。

Message message = new Message();
message.setTopic("topic11");
message.setTags("tag");
message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "16");
SendResult result = producer.send(message);

subscribe方法subExpression参数也支持Tag过滤

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("112.74.125.184:9876");
consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));

SpringBoot整合RocketMQ的使用

在SpringBoot项目中主要通过RocketMQTemplate进行消息的发送。

// 普通消息
rocketMQTemplate.convertAndSend("topic10", user);
rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
SendResult result = rocketMQTemplate.syncSend("topic10", user);
// 异步消息
rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功!");
    }
    @Override
    public void onException(Throwable e) {
        System.out.println(e);
    }
}, 1000L);
// 单向消息
rocketMQTemplate.sendOneWay("topic10", user);
// 延时消息
rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3); 
// 批量消息
rocketMQTemplate.syncSend("topic10", list, 1000);

消费者:在注解中可以实现根据Tag和SQL进行属性的过滤。

@Service
//@RocketMQMessageListener(
//        consumerGroup = "group1",
//        topic = "topic10",
//        selectorExpression = "tag1 || tag2"
//)
@RocketMQMessageListener(
        consumerGroup = "group1",
        topic = "topic10",
        selectorType = SelectorType.SQL92,
        selectorExpression = "age > 16",
        messageModel = MessageModel.BROADCASTING
)
public class UserConsumer implements RocketMQListener<User> {


    @Override
    public void onMessage(User message) {

    }
}

总结

今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/482815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何让你的 WebSocket 接口测试更高效?拯救你的接口测试工作

目录 引言 WebSocket介绍 HTTP与WebSocket的区别 WebSocket测试方法 使用在线工具 使用Postman 使用Jmeter 使用Python 结语 引言 你是否曾经为 WebSocket 接口测试中复杂的协议和难以捕获的数据而感到束手无策&#xff1f;WebSocket 协议与传统的 HTTP 协议不同&…

【牛客网】美国节日与因式分解

目录 一、编程题 1.美国节日 2.因式分解 一、编程题 1.美国节日 链接&#xff1a;美国节日__牛客网 (nowcoder.com) 和中国的节日不同&#xff0c;美国的节假日通常是选择某个月的第几个星期几这种形式&#xff0c;因此每一年的放假日期都不相同。具体规则如下&#xff1a…

volatile与synchronized

文章目录 前言一、简介volatilesynchronized 二、名词解释可见性原子性指令重排临界区对象锁类锁 二、实战使用1 Volatile可以解决的问题2 volatile无法解决非原子性操作问题--synchronized 总结 前言 volatile与synchronized 都是java的关键字 volatile一般修饰变量,被修饰的…

做了一年csgo搬砖项目,还清所有债务:会赚钱的人都在做这件事 !

前段時间&#xff0c;在网上看到一句话&#xff1a;有什么事情&#xff0c;比窮更可怕&#xff1f; 有人回答说&#xff1a;“又忙又窮。” 很扎心&#xff0c;却是绝大多数人的真实写照。 每天拼死拼活的996&#xff0c;你有算过你的時间值多少钱&#xff1f; 我们来算一笔…

操作系统——死锁

0.关注博主有更多知识 操作系统入门知识合集 目录 5.1死锁概念 5.2死锁的起因 5.3预防死锁的策略 思考题&#xff1a; 5.1死锁概念 在介绍死锁之前&#xff0c;先来探究一个问题&#xff1a;哲学家就餐问题。五个哲学家围坐在圆桌边&#xff0c;有5支筷子&#xff0c;哲…

免费的绘图工具DrowIO下载及安装

还在为论文绘图而烦恼吗&#xff1f;还在为如何选择画图工具而烦恼吗&#xff1f;没事&#xff0c;本期就给你们推荐一款超级好用且免费的绘图工具——DrawIO。 目前使用比较多的绘图工具有&#xff1a;Visio、亿图图示、Word、PPT、DrawIO等 其中DrawIO由于其既实用又免费的…

使用 Esp32 和 TinyML 进行手势分类

介绍 手势分类是机器学习可以做什么的一个简单但同时又很好的例子。它使用大量“混乱”的数据来对事物进行分类。 在这个项目中,我们将制作一个包含 4 个类的分类器,idle、up_down、left_right 和 circle。 数据采集 要将数据上传到 Edge Impulse,我们需要使用 Edge Imp…

199. 二叉树的右视图【111】

难度等级&#xff1a;中等 上一篇算法&#xff1a; 236. 二叉树的最近公共祖先【190】 力扣此题地址&#xff1a; 199. 二叉树的右视图 - 力扣&#xff08;Leetcode&#xff09; 1.题目&#xff1a;199. 二叉树的右视图 给定一个二叉树的 根节点 root&#xff0c;想象自己站在…

JavaScript事件

事件流描述的是从页面接收事件的顺序。比如说单击了某个按钮&#xff0c;但是单击事件不仅发生在按钮上&#xff0c;在单击按钮的同时&#xff0c;也单击了按钮的容器元素&#xff0c;甚至是 <body> 、<html> 、document。 事件传播的顺序不同导致存在两种事件流机…

初识CPU(二)

目录 一、控制器的功能与工作原理 1.控制器的设计思路 2.控制器的分类 3.微程序 3.1微命令 3.2微操作 3.3微指令 3.4微程序 3.5微地址 4.控制方式 4.1同步控制方式 4.2异步控制方式 4.3联合控制方式 4.4人工控制方式 二、微指令 5.微指令的编码方式 5.1直接编码…

基于springboot的家政服务管理平台(源码,设计文档等)

摘要 随着家政服务行业的不断发展&#xff0c;家政服务在现实生活中的使用和普及&#xff0c;家政服务行业成为近年内出现的一个新行业&#xff0c;并且能够成为大众广为认可和接受的行为和选择。设计家政服务管理平台的目的就是借助计算机让复杂的销售操作变简单&#xff0c;…

kafka的安装与使用

文章目录 kafka安装1 上传安装包2 解压安装包3 创建logs文件夹4 修改配置文件5 分发kafka6 启动kafka kafka使用1 启动kafka2 关闭kafka3 查看topic4 创建topic,名称为test5 删除名称为test的topic6 向topic发送数据7 从topic里消费数据 kafka安装 kafka安装前需要确认zookeep…

《程序员面试金典(第6版)面试题 16.09. 运算

题目描述 请实现整数数字的乘法、减法和除法运算&#xff0c;运算结果均为整数数字&#xff0c;程序中只允许使用加法运算符和逻辑运算符&#xff0c;允许程序中出现正负常数&#xff0c;不允许使用位运算。 你的实现应该支持如下操作&#xff1a; Operations() 构造函数minus…

Linux【模拟实现C语言文件流】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 文章目录 &#x1f307;前言&#x1f3d9;️正文1、FILE 结构设计2、函数使用及分析3、文件打开 fopen4、文件关闭 fclose5、缓…

4.3 实施部署Nginx 高可用负载均衡集群

部署大致可分为&#xff1a;准备工作、配置、验证与交付几个步骤&#xff0c;接下来按顺序逐一介绍。 4.3.1 准备工作 Nginx高可以负载均衡集群准备工作分两个层面&#xff1a;前端负载均衡器的准备工作与后端真实服务器的准备工作。根据长期实践出来的经验&#xff0c;先准备后…

更轻更好用的蓝牙耳机,日常佩戴更舒适,QCY Crossky Link体验

平时为了方便接打电话&#xff0c;我经常会戴上一副蓝牙耳机&#xff0c;不过戴久了入耳式的耳机&#xff0c;总感觉不舒服&#xff0c;上个月我看到一款设计很新颖的开放式耳机&#xff0c;来自我之前用过的一个国产品牌&#xff0c;最近到手后试了试&#xff0c;感觉质量不错…

Pandoc 从入门到精通,你也可以学会这一个文本转换利器

Pandoc 简介 如果你需要在不同的文件格式之间相互转换&#xff0c;多半听说或使用过文档转换的瑞士军刀——Pandoc。事实上&#xff0c;不仅人类知道 Pandoc&#xff0c;最近很火的人工智能 ChatGPT 也知道「将 Markdown 转换为 docx」&#xff0c;首选方案是使用 Pandoc。 ​…

Codeforces Round 868 (Div. 2)

Problem - D - Codeforces 思路&#xff1a; 首先&#xff0c;一个位置至多贡献1&#xff0c;不然就是0.如[l1,r]与[l2,r]都是回文串&#xff08;l1<l2) 若(l1r)/2<l2,即[l2,r]本身就是[l1,r]回文串右边的一部分&#xff0c;那么他的贡献在[l1,r]左边已经计算过。如果(…

Python程序的执行过程

哈喽&#xff0c;大家好&#xff0c;五一快乐呀&#xff0c;都去哪里旅游了呢&#xff1f;再游玩之余也花点时间来学习学习&#xff0c;让自己更强哟。这期就给大家分享的是Python程序执行的过程学习。 之前已经给大家介绍了Python语言的简介、Python环境的安装、IDE的选择与安…

4 斐波那契数列

4 斐波那契数列 作者: Turbo时间限制: 1S章节: 递归 问题描述 : 斐波那契数列的排列是&#xff1a;0&#xff0c;1&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;5&#xff0c;8&#xff0c;13&#xff0c;21&#xff0c;34&#xff0c;55&#xff0c;89&#xff0c;…