Java整合RocketMQ实现生产消费

news2024/11/20 15:31:02

文章目录

  • 参考文档
  • 环境搭建
  • 生产者
    • 普通消息
      • 同步发送
      • 异步发送
      • 单向传输
    • 顺序消息
    • 延迟消息
    • 批量消息
    • 事务消息
  • 消费者
    • Push消费
    • Pull 消费
  • 代码仓库

参考文档

RocketMQ作为阿里系开源项目,有非常成熟的中文文档可以快速了解并上手。

  • 环境部署
  • 控制台安装
  • RocketMQ常见问题

环境搭建

  1. 创建Maven项目。
  2. pom.xml文件中引入RocketMQ依赖。
<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
</dependencies>

生产者

普通消息

RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

同步发送

在这里插入图片描述

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group";

    private final static String topic = "topic-test";
@Test
    public void syncSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

异步发送

在这里插入图片描述

@Test
    public void asyncSend() throws IOException {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("OK %s %n",
                            sendResult.getMsgId());
                }

                public void onException(Throwable e) {
                    System.out.printf("Exception %s %n", e);
                    e.printStackTrace();
                }
            },10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.in.read();
    }

单向传输

在这里插入图片描述

@Test
    public void onewaySend()  {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.sendOneway(msg);
            // 一旦producer不再使用,关闭producer
            //producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

顺序消息

RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
@Test
    public void orderSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            String[] tags = new String[]{"TagA", "TagB", "TagC"};
            for (int i = 0; i < 10; i++) {
                int orderId = i % 10;
                Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

延迟消息

延迟消息发送是指消息发送到RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级延迟时间投递等级延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h
@Test
    public void scheduledSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 消息延迟等级
            msg.setDelayTimeLevel(2);
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

 @Test
    public void batchSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            List<Message> messages = new ArrayList<Message>();
            messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes()));
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(messages, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

事务消息

在一些对数据一致性有强需求的场景,可以用RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息。
如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback)。
半事务消息只有 commit 状态才会真正向下游投递。
如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。

事务消息的详细交互流程如下图所示:
在这里插入图片描述

 @Test
    public void transactionSend() {
        try {
            // 事务消息的发送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 进行发送
            TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 事务回查的线程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });

            producer.setExecutorService(executorService);
            producer.setTransactionListener(new TransactionListener() {
                //半事务消息发送成功后,执行本地事务的方法
                public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
                    System.out.printf("执行本地事务 %n");
                    /*
                    二次确认
                    LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
                    LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
                    LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
                    */
                    return LocalTransactionState.UNKNOW;
                }

                // 二次确认消息没有收到,Broker端回查事务状态的方法,默认60s
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.printf("二次确认失败,broker事务回查  %n");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer进行发送事务消息,并同步等待发送结果
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

消费者

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

    private final static String nameServer = "127.0.0.1:9876";

    private final static String consumerGroup = "my_group";

    private final static String topic = "topic-test";


    @Test
    public void consumerPush() throws MQClientException, IOException {
        // 初始化consumer,并设置consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
        consumer.subscribe(topic, "*");
        //设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
        //consumer.setMessageModel(MessageModel.BROADCASTING);
        //注册回调接口来处理从Broker中收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 顺序消费
//        consumer.registerMessageListener(new MessageListenerOrderly() {
//            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//                return ConsumeOrderlyStatus.SUCCESS;
//            }
//        });
        // 启动Consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
        System.in.read();
    }

Pull 消费

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

@Test
    public void consumerPull() {
        try {
            DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
            consumer.setNamesrvAddr(nameServer);
            //关闭自动提交
            consumer.setAutoCommit(false);
            consumer.subscribe(topic, "*");
            consumer.setPullBatchSize(20);
            consumer.start();
            while (true) {
                List<MessageExt> messageExts = consumer.poll();
                System.out.printf("%s%n", messageExts);
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

代码仓库

https://gitee.com/codeWBG/learn_rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/127715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【现代机器人学】学习笔记八:轨迹生成

这节课的内容主要讲述如何通过插值等方式生成一条满足运动学约束的运动轨迹。这节的内容在全书中较少&#xff0c;相比前一章开链动力学而言&#xff0c;可以说内容少了许多。但是这节的内容却是目前在机械臂应用方面使用最广泛的一节。 闲话休提&#xff0c;马上开始&#xf…

ThreeJS:创建第一个三维场景

场景效果 ThreeJS与WebGL WebGL使得开发者可以直接使用显卡的计算资源,创建高性能的二维和三维计算机图形效果,然后在JavaScript脚本中进行WebGL编程,创建三维场景并生成动画。但是,原生的WebGL编程是十分复杂的,且容易出错。然而,Three.JS库可以简化WebGL的开发过程。 基…

十、Java 17 新特性

十、Java 17 新特性 JDK 17 在 2021 年 9 月 14 号正式发布了&#xff01;根据发布的规划&#xff0c;这次发布的 JDK 17 是一个长期维护的版本&#xff08;LTS)。Java 17 提供了数千个性能、稳定性和安全性更新&#xff0c;以及 14 个 JEP&#xff08;JDK 增强提案&#xff09…

示波器应用(二)

前篇我们对场景六基色色相和白平衡还有明度进行了验证&#xff0c;对黑白场做了微调。后面我们还需要对场景调光。 本篇主要涉及一些画面美术知识&#xff0c;不感兴趣可以跳过。 我们需要先了解一个摄影知识 曝光 下面要说到一种颜色模式HSB&#xff0c;HSB分别表示&#…

Redhat-ansible-合集

1.安装 2.部署ANSIBLE 2.1INVENTORY 2.2ANSIBLE配置文件 2.3AD HOC命令 3.PLAYBOOK 4.变量 5.ansible vault加密变量 6.ansible_facts 7.loop 8.条件判断 9.handler处理 10.错误处理 11.tags标签 12.管理文件 13.template模板 14.host-pattern 15.动态Inventory 16.ro…

玻纤效应对skew的影响(一)

在高速SerDes传输系统中&#xff0c;随着信号速率的提高&#xff0c;UI会越来越小&#xff0c;传输线的对内skew会越来越大。以PCIe信号来说&#xff0c;PCIe4.0速率的一个UI是62.5ps&#xff0c;当速率提高到PCIe5.0时&#xff0c;每个UI就只有31.25ps&#xff0c;更进一步&am…

SparkSQL源码分析系列01-Catalyst作用

SparkSQL 是如何将SQL语句转化为Spark任务的呢&#xff1f; 详细过程如下图 通过拉去 github 的 Spark 源码&#xff0c;查看 SparkSQL 模块的 readme.txt 文件可以看出&#xff0c;SparkSQL 包含4个方面的内容 SparkSQL源码主要包含4大模块 Catalyst (sql/catalyst)&…

Solidity之为什么 ++i 比 i++ 省gas

文章目录为什么 i 比 i 省gas测试验证demo1demo2为什么 i 比 i 省gas为什么 i 比 i 省gas i通常更昂贵&#xff0c;因为它必须增加一个值并“返回”旧值&#xff0c;因此可能需要在内存中保存两个数字我在记忆中只使用过一个数字。在许多情况下&#xff0c;在编译器优化之后&a…

【SVM】简单介绍(四)

1、Soft Margin SVM 对偶求解 构造拉格朗日函数 L12∥w∥2C∑i1nξi−∑i1nαi(yi(wTxib)−1ξi)−∑i1nγiξiαi≥0γi≥0\begin{aligned} & L\frac{1}{2}\|w\|^2C \sum_{i1}^n \xi_i-\sum_{i1}^n \alpha_i\left(y_i\left(w^T x_ib\right)-1\xi_i\right)-\sum_{i1}^n \ga…

dagum基尼系数分析全流程

Dagum系数分析 Dagum基尼系数是传统基尼gini系数的升级&#xff0c;其可分解为组内系数、组间系数和超变密度系数&#xff0c;即Dagum 组内Gw 组间Gb 超变密度Gt。 组内Gw分别反映各地区内部水平的差距、组间Gb反映各地区之间水平的差距&#xff0c;以及超变密度Gt反映各地区…

Strtus2漏洞 - Struts2-052 Struts2-057 Struts2-059

文章目录S2-052(CVE-2017-9805)环境搭建漏洞复现S2-057(CVE-2018-11776)环境搭建漏洞复现S2-059(CVE-2019-0230)环境搭建漏洞复现S2-052(CVE-2017-9805) 原理&#xff1a;Struts2 REST插件的XStream组件存在反序列化漏洞&#xff0c;使用XStream组件对XML格式的数据包进行反序…

为什么说DeFi隐私协议Unijoin.io具备趋势性

区块链技术以点对点、去中心化、公开透明、不可逆等作为其主要特点&#xff0c;而基于区块链的加密货币原生的具备了区块链技术的种种特性&#xff0c;这意味着通常每一笔链上交易都是透明可查的。虽然加密账户以“伪匿名”作为主要特点&#xff0c;但我们也同样看到&#xff0…

强大的ANTLR4(1)

以前对于《编译原理》这门课有一种恐惧&#xff0c;现在强大的工具越来越多&#xff0c;有些原理并不一定要非常清楚&#xff0c;也是可以设计一种编程语言的&#xff0c;那就是ANTLR4。 Antlr4&#xff08;全名&#xff1a;ANother Tool for Language Recognition&#xff09…

Redis集群系列六 —— 分片集群搭建

Redis 常用集群中&#xff0c;常用的几种集群方案有&#xff1a;主从集群、哨兵集群、分片集群&#xff0c;不同的集群对应着不同的场景&#xff0c;并且各种集群也都有不同的优劣&#xff0c;本篇将以 redis 分片集群为切入点。 主从和哨兵虽然解决了高可用、高并发读的问题&…

spring之IoC注解(二)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、Spring注解的使用1、加入aop的依赖2、在配置文件中添加context命名空间3、在配置文件中指定扫描的包4、在Bean类上使用注解二、Bean的选择性实例化1、需求2、…

二十三种设计模式--系列篇(一)

一、软件设计模式的产生背景 “设计模式”最初并不是出现在软件设计中&#xff0c;而是被用于建筑领域的设计中。 1977年&#xff0c;美国著名建筑大师、加利福尼亚大学伯克利分校环境结构中心主任克里斯托夫亚历山大&#xff08;Christopher Alexander&#xff09;在他的著作《…

(五)devops持续集成开发——jenkins发布一个maven流水线项目

前言 本节内容使用jenkins的maven流水线组件发布一个springboot项目&#xff0c;实现自动化部署一个后端项目。在开始流水化部署前我们需要准备好一个git项目&#xff0c;并在jenkins所在的服务器安装好git客户端便于源码的拉取。并且需要安装一个ssh插件&#xff0c;将我们的…

C++学习:多态与运算符(Day.7~)

总结让人明白。 表明覆盖意图的限定符 override 如图&#xff1a; 说明&#xff1a;1.使用关键字const后&#xff0c;由于函数特征不同&#xff0c;派生类不会再隐藏基类方法 2.想要覆盖基类方法可使用关键字override&#xff0c;此关键字会强制覆盖基类方法&#xff0c;若…

微信功能,你知道多少

用了微信很多年&#xff0c;选择才发现微信收藏的正确打开方式&#xff01;感觉之前白用了那么久微信收藏&#xff0c;只用来保存消息实在是太可惜了&#xff0c;原来它还有那么多实用功能&#xff01;协助日常安排如果每天需要做的事情比较多&#xff0c;可以在这里记录日常工…

谷粒商城之高级篇知识补充

谷粒商城高级篇之知识补充 前言 本篇主要是完成谷粒商城高级篇开发时&#xff0c;我们需要了解并学习一部分补充的知识&#xff0c;才能更好的完成商城业务。 以后我们将商城任务和额外知识分开来编写&#xff0c;方便商城业务的连贯性。 下面是本篇文章各个章节对应的相应…