详解RocketMQ使用

news2024/11/14 6:47:36

目录

1.环境

2.生产者、消费者的模式

3.顺序消息

4.广播消息

5.延迟消息

6.批量消息

7.过滤消息

8.事务消息


本文着重聊的是RocketMQ的编程模型,下载安装和概念可以移步博主的另外两篇博文:

RocketMQ基础概念__BugMan的博客-CSDN博客

RocketMQ下载安装、集群搭建保姆级教程__BugMan的博客-CSDN博客

1.环境

RocketMQ版本:4.7.1

Maven依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

2.生产者、消费者的模式

生产者有三种生产模式:

  • 同步,生产者等待broker的响应后再往下走。

  • 异步,生产者不等待broker的响应,继续往下走,broker的响应通过事件监听的方式,触发回调函数,通知生产者。

  • 单向,生产者只管发,不管broker的响应,这种模式没办法做消息丢失后的补救

消费者有两种消费模式:

  • 主动拉取

  • 等待推送

生产者示例:

public static void main(String[] args) throws MQClientException, InterruptedException {
		//创建消费者,创建的时候可以指定该消费者属于哪个消费者组
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //指定name server的地址
        producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        producer.start();
		//发送一千条信息
        for (int i = 0; i < 1000; i++) {
            try {
            	//消息,topic为TopicTest,后面跟的一串是tag
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //同步发送
                SendResult sendResult = producer.send(msg);
				/**异步发送,通过自定义回调函数的方式来触发响应
				producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                }**/
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

消费者示例:

推模式:

public static void main(String[] args) throws InterruptedException, MQClientException {
		//创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        //设置name server
        consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

拉模式:

private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

3.顺序消息

有些场景中我们需要保证消息的有序性,比如以下场景:

1.开通会员,将积分初始化为10分

2.完成赚取积分的操作,+5分

3.完成违规操作,-10分

最后剩余积分5分

如果消息乱序为321,最后积分为10分。

生产者将消息顺序的发到一个MessageQueue上,然后消费者去一个MessageQueue上拿消息,就能保证消息的顺序性。

RocketMQ支持生产者、消费者在生产、消费的时候指定MessageQueue,但是具体怎样利用这一点来实现顺序消费,需要开发人员去手写。

生产者:

for (int i = 0; i < 100; i++) {
                for(int j=0;i<5;i++) {
                    //每一个订单用同一个orderId
                    int orderId = i;
                    Message msg =
                            new Message("TopicTest","order_"+orderId, "KEY" + orderId,
                                    ("order_" + orderId+"step"+j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        /**
                         * 回调函数
                         * @param mqs broker中的所有MessageQueue
                         * @param msg 发送的消息
                         * @param arg 就是传过来的orderId
                         * @return
                         */
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            //每个订单的message用同一个orderId,必然会选中同一个MessageQueue,自然会顺序存进去
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
            }

消费者:

//MessageListenerOrderly这种类型的监听器,会让consumer一直去读一个messagequeue的内容,一直读完为止
consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("收到的消息内容:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

4.广播消息

默认情况下,topic中的一条消息只会被一个消费者所消费。广播模式,topic中的一条消息,可以被订阅该topic的所有消费者消费。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
           ConsumeConcurrentlyContext context) {
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    });
    consumer.start();
    System.out.printf("Broadcast Consumer Started.%n");
    }

5.延迟消息

延迟消息,即指定消息在MQ的一个驻留时间,过多少时间后,过了驻留时间,消费者才能消费到消息。

延迟消息可以用来做定时任务。

API上来说,就是在生产者端,给消息指定一个延迟级别即可:

//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//3即第三个等级,10s
msg.setDelayTimeLevel(3);

很明显上面的延迟级别都是定死的,如果我们要用来做定时任务,需要自定义延迟级别。开源版的RocketMQ不支持自定义延迟级别,只有商业版(阿里云上部署的RocketMQ)才支持。

开源版的RocketMQ只能自己去改源码,所以这里也是各个公司做自己定制化的RocketMQ时,一个改造的重点

6.批量消息

RocketMQ支持生产者批量发送消息,以此去减少生产者的网络IO,批量消息只和生产者有关,消费者仍然是按照一条一条的去消费的。

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
}

7.过滤消息

通过tag,消费者可以消费同一个topic下自己感兴趣的消息

生产者:

public class TagFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }

消费者:

public class TagFilterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        consumer.subscribe("TagFilterTest", "TagA || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

8.事务消息

事务消息,即发送到MQ的消息能和本地事务一起被回滚。

MQ消息和业务之间其实会存在类似于分布式事务的一致性问题,举个例子:

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。很明显这几个步骤的调用都应该是在一个事务中的,这些调用都可以用MQ做成移步的,那么问题就来了。这些业务中一旦有一个业务失败,其它业务应该同时失败,但是消息已经发出去了怎么办?

这时候事务消息就派上用场了。

事务消息的生产者在发送消息时,会将消息转为一个half(半消息),并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC这个Topic,这个Topic对消费者是不可见的,当本地事务执行成功后会向MQ发送commit信号,MQ再将消息转为原本的Topic,当本地事务执行失败后会向MQ发送roll_back信号,MQ会丢弃对应消息。如果本地事务没有执行完,可以向MQ发送一个unknown信号,MQ收到unknow信号后,会让对应的消息等待,并且定期回查状态为unknown的消息。

 自定义本地事务的提交逻辑和检查逻辑:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

 生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/758124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

dede编辑器修改成纯文本编辑器的方法

我在做优秀啦网站大全的时候需要的正文内容都不需要设置什么文字样式&#xff0c;所以我需要把编辑器上的工具全部取消掉&#xff0c;包括会员投稿中的编辑器工具栏全部取消掉或者屏蔽隐藏掉&#xff0c;所以我需要把DEDE编辑器修改成纯文本编辑器的方法如下&#xff1a;如图&a…

一文教你如何优雅地配置树莓派的静态IP、中文环境

引言&#xff1a; 树莓派的静态IP配置与ubuntu这些都是类似的&#xff0c;毕竟都是linux&#xff0c;只要会一个&#xff0c;其他的看一遍就会了。 目录 配置树莓派的静态IP 1、确定树莓派的网络接口 2、编辑网络配置文件&#xff1a; 3、设置静态IP地址&#xff1a; 4、…

JavaWeb 速通Tomcat

目录 一、拾枝杂谈 1.web服务器说明 : 2.常用web服务软件 : 二、Tomcat服务 1.Tomcat下载和安装 : 2.启动Tomcat服务 : 3.Tomcat启动的注意事项 : 4.关闭Tomcat服务 : 三、Tomcat部署 1.Tomcat目录结构说明 : 1 bin 2.conf 3 lib 4 logs 5 temp 6 webapps 7 work 2.关…

golang单元测试及mock总结

文章目录 一、前言1、单测的定位2、vscode中生成单测 二、构造测试case的注意事项1、项目初始化2、构造空interface{}3、构造结构体的time.Time类型4、构造json格式的test case 三、运行单测文件1、整体运行单测文件2、运行单个单测文件报错&#xff08;1&#xff09;command-l…

fileclude

背景知识 文件包含漏洞 题目 分析上述代码 file2被放入file_get_contents()函数&#xff0c;且要求返回值为hello ctf file1是要包含的文件&#xff0c;放在include函数中 用php://filter伪协议读取源代码 构造payload&#xff1a; file1php://filter/readconvert.base64-…

Loki+Promtail+Grafana 监控 K8s 日志

Loki 架构&#xff1a; 1、loki&#xff1a;服务端&#xff0c;负责存储日志和处理查询 2、promtail&#xff1a;采集端&#xff0c;负责采集日志发送给loki 3、grafana&#xff1a;负责采集日志的展示 创建 yaml 文件 cat loki-rbac.yaml apiVersion: v1 kind: ServiceAccount…

HarmonyOS应用开发-第一章-DevEco Studio的安装

一、前言 本栏可以帮助正在学习HarmonyOS应用开发的开发者快速上手和掌握。 二、安装步骤 首先下载DevEco Studio&#xff08;HarmonyOS应用的集成开发境&#xff09;&#xff0c;点击&#xff1a;IDE下载页面&#xff0c;点击立即下载。 下载完成后&#xff0c;双击运行安装程…

消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序

目录 基本概念 MQ 的优势 1.应用解耦 2.异步提速 3.削峰填谷 MQ 的劣势 使用mq的条件 常见MQ产品 RabbitMQ简介 RabbitMQ的六种工作模式 JMS RabbitMQ安装和配置。 RabbitMQ控制台使用。 RabbitMQ快速入门——生产者 需求: RabbitMQ快速入门——消费者 小结 基本概…

golang slice参数传递

在介绍slice函数参数传递之前&#xff0c;先介绍一下slice的结构 type slice struct {array unsafe.Pointerlen intcap int }这个应该周知了&#xff0c;也不必多解释&#xff0c;需要注意两个问题 1、如何初始化slice 我们知道初始化slice有几种方式&#xff0c;注意以…

Spring Batch之读数据—读多文件(三十三)

一、读多文件 前面的所有文件的读取基本上是对单文件执行的&#xff0c;在实际应用中&#xff0c;我们经常操作批量的文件。 Spring Batch框架提供了现有的组件MultiResourceItemReader支持对多文件的读取&#xff0c;通过MultiResourceItemReader读取批量文件非常简单。MultiR…

【算法与数据结构】144、145、94LeetCode二叉树的前中后遍历

文章目录 一、题目二、递归算法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、递归算法 思路分析&#xff1a;这道题比较简单&#xff0c;不多说了&#xff0c;大家直接看代码就行。注意前中后遍历是指中间…

01Matlab编程基础

回忆你所学过的数学函数并给出x3.56 时以下函数的值 s i g n ( x ) , x a ( a 3 ) , sin ⁡ ( x ) , cos ⁡ ( x ) , tan ⁡ ( x 2 ) , 2 tan ⁡ ( x ) \begin{aligned}sign\left( x\right) ,x^{a}\left( a3\right) ,\sin \left( x\right) ,\cos \left( x\right) ,\tan \left(…

MiniGPT4系列之二推理篇命令行方式:在RTX-3090 Ubuntu服务器推理详解

MiniGPT4系列之一部署篇&#xff1a;在RTX-3090 Ubuntu服务器部署步骤详解_seaside2003的博客-CSDN博客 MiniGPT4系列之二推理篇命令行方式&#xff1a;在RTX-3090 Ubuntu服务器推理详解_seaside2003的博客-CSDN博客 MiniGPT4系列之三模型推理 (Web UI)&#xff1a;在RTX-309…

如何更简洁查看接口返回的树状图信息

首先&#xff0c;你的接口返回的得是树状图信息。在浏览器上访问接口&#xff1a; 按下f12 刷新页面 点击就可以看到层级关系了。当然也可以使用下面这个插件对数据进行格式化。

行列式计算

举例&#xff1a; 1.暴力计算 2.通过代数余子式计算 相关理论&#xff1a; 这个C就是上图的Aij哈&#xff0c;我拷的别人的图。 可以得出&#xff0c;行列式的值可以按照某行展开&#xff0c;展开后余子式即为一个新的行列式&#xff0c;就是原行列式删除某一行一列之后得到的…

Java 设计模式——适配器模式

目录 1.概述2.结构3.类适配器模式3.1.目标接口3.2.被适配类3.3.适配器类3.4.测试 4.对象适配器模式5.优缺点6.应用场景7.JDK 源码解析——InputStreamReader 1.概述 &#xff08;1&#xff09;如果去欧洲国家去旅游的话&#xff0c;他们的插座如下图最左边&#xff0c;是欧洲标…

vue3- 02vue3的变化

1. main.js 创建实例不再使用构造函数&#xff0c;而是使用createApp使用插件时不再通过构造函数&#xff0c;而是通过实例 2. 组件 1. this指向不同 vue2的this指向是组件vue3的this指向是proxy&#xff08;代理&#xff0c;代理的是组件实例&#xff09; <template&…

分布式软件架构——传输链路

传输链路 链路指无源的点到点的物理连接。链路是计算机网络中的一个重要概念&#xff0c;它指的是连接两个网络设备的物理或逻辑路径。简单来说&#xff0c;链路就是电信号或数据在网络中传输的路径。在计算机网络中&#xff0c;链路可以分为物理链路和逻辑链路两种。物理链路…

传承与进取的力量-节选

只简单谈如下两点&#xff1a; 传承&#xff1a;家族各类关系网总和 进取&#xff1a;个人提升获取资源和 少数人的晚餐 之前&#xff0c;每一届都会在交流中谈及&#xff0c;时间才是真正的公平公正&#xff0c;生命只有一次&#xff0c;至少在目前还没有公开报道的永生人。…

动态内存分配(2)——经典例题的讲解

前言&#xff1a; 在前面我们已经学习动态分配内存&#xff0c;今天我们就来做一做它的几道经典例题&#xff0c;加深巩固我们所学的知识。 知识复习&#xff1a;动态内存管理&#xff08;1&#xff09;_从前慢&#xff0c;现在也慢的博客-CSDN博客 题目1&#xff1a; 下面代码…