RocketMQ消息发送基本示例(推送消费者)

news2025/1/10 18:50:39

消息生产者通过三种方式发送消息

1.同步发送:等待消息返回后再继续进行下面的操作  同步发送保证了消息的可靠性,适用于关键业务场景。

2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用

CountDownLatch计数

3.单向发送:只负责发送,不管消息是否发送成功  单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。

消费者消费消息分两种:

拉模式:消费者主动去Broker上拉取消息

推模式:消费者等待Broker把消息推送过来

事实上:尽管存在“推送消费者”(DefaultMQPushConsumer)和“拉取消费者”(DefaultMQPullConsumer)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.3.0</version>
</dependency>

客户端与服务器安装版本一致即可

演示1    同步发送模式   客户端推送模式

注意观察   broker是把消息分两次推送的  就是发多少条消息  推送多少次

生产者     

package com.example.rocketmqdemo.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
 * 同步发送
 * 使用场景:
 * 1.可靠性要求高,消息发送需要等待确认
 * 2.数据量较少的场景
 * 3.实时响应,消息发送需要立即得到结果
 * 小的订单系统
 * @author hrui
 * @date 2024/7/31 20:31
 */
public class SyncProducer {
    public static void main(String[] args) {
        //创建一个DefaultMQProducer实例,指定生产者组名为"group1"
        DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            //启动生产者实例
            producer.start();

            //发送10条消息
            for (int i = 0; i < 2; i++) {
                //创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号
                Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));

                //发送消息,并同步等待发送结果 (同步发送)
                SendResult sendResult = producer.send(message);

                //打印消息发送结果
                System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);
            }
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        } finally {
            //关闭生产者实例,释放资源
            producer.shutdown();
        }
    }
}

消费者

package com.example.rocketmqdemo.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 简单消费者
 * @author hrui
 * @date 2024/7/31 20:40
 */
public class Consumer {

    public static void main(String[] args) {
        //创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"
        //采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            //订阅主题"Topic1",过滤标签为"*",表示接收所有消息
            consumer.subscribe("Topic1", "*");

            //设置消息监听器,处理接收到的消息
            //可以传入两种类型的监听器:
            //1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
            //2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
            consumer.setMessageListener(new MessageListenerConcurrently() {

                //consumeMessage方法用于处理接收到的消息列表
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    //遍历消息列表,处理每条消息
//                    list.forEach(messageExt -> {
//                        //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
//                        System.out.println(new String(messageExt.getBody()));
//                        //消息处理成功后输出确认信息
//                        System.out.println("消息消费成功");
//                    });
                    for (int i=0;i<list.size();i++){
                        System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的
                    }

                    //返回消费状态,CONSUME_SUCCESS表示消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            //启动消费者实例,开始接收消息
            consumer.start();
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        }
    }
}



演示2  异步发送

package com.example.rocketmqdemo.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 异步发送消息
 * 并发流量高的场景下,使用异步发送消息可以提高吞吐量。
 * @author hrui
 * @date 2024/7/31 21:53
 */
public class AsyncProducer {

    public static void main(String[] args) {
        //创建一个DefaultMQProducer实例,指定生产者组名为"group2"
        DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");

        //计数器,用于跟踪异步消息发送的完成情况
        CountDownLatch countDownLatch = new CountDownLatch(100);

        try {
            // 启动生产者实例
            producer.start();

            //发送100条消息
            for (int i = 0; i < 100; i++) {
                final int index = i;
                //创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号
                Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));

                //发送消息,异步发送。第二个参数是SendCallback回调函数
                producer.send(message, new SendCallback() {

                    @Override
                    //发送成功时,Broker回调此方法
                    public void onSuccess(SendResult sendResult) {
                        //将CountDownLatch计数器减一,表示一个消息发送任务完成
                        countDownLatch.countDown();
                        System.out.println("消息发送成功_" + sendResult);
                    }

                    @Override
                    //发送失败时,Broker回调此方法
                    public void onException(Throwable throwable) {
                        // 将CountDownLatch计数器减一,表示一个消息发送任务完成
                        countDownLatch.countDown();
                        System.out.println("消息发送失败_" + throwable.getStackTrace());
                    }
                });
            }

            //等待所有消息发送完成
            //countDownLatch.await();
            boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
            if (!await) {
                System.out.println("消息发送超时");
            }
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        } finally {
            //关闭生产者实例,释放资源
            producer.shutdown();
        }
    }
}

演示3  单向发送

package com.example.rocketmqdemo.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
 * 单向发送
 * 试用场景
 * 日志收集
 * @author hrui
 * @date 2024/7/31 22:27
 */
public class OnewayProducer {
    public static void main(String[] args) {
        //创建一个DefaultMQProducer实例,指定生产者组名为"group1"
        DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            //启动生产者实例
            producer.start();

            //发送10条消息
            for (int i = 0; i < 2; i++) {
                //创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号  topic要和消费者相同
                Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));

                //发送消息,单向发送,不管发送成功与否
                producer.sendOneway(message);
                System.out.println(i+"_消息发送了");

            }
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        } finally {
            //关闭生产者实例,释放资源
            producer.shutdown();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1964739.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【过题记录】7.31(树形dp,根号分治)

先补一下昨天没来得及写的题目 延时操控 分析&#xff1a; 由于是延时操控 所以敌人的前面几步跟我们走的是一样的 所不一样的是我们比敌人多走了k步 所以我们可以先让我们和敌人同步行走&#xff0c;最后让我们随机游走k步即可。 由于这里n和m的范围都很小&#xff0c;所以我…

力扣 位运算

位运算基础的异或运算&#xff0c;线性时间复杂度和常数空间复杂度。 题目 class Solution {public int singleNumber(int[] nums) {int ans 0;for (int i: nums) {ans ^ i;}return ans;} }

Linux下用户组练习

目录 建立用户组 shengcan&#xff0c;其id 为 2000 建立用户组 caiwu&#xff0c;其id为 2001 建立用户组 jishu&#xff0c;其id 为 2002 建立用户lee&#xff0c;指定其主组id为shengchan&#xff0c;附加组为jishu和 caiwu&#xff0c;确保 lee 用户的uid 和 gid 保持一…

C++客户端Qt开发——界面优化(绘图)

2.绘图 Qt提供了画图相关的APL&#xff0c;可以允许我们在窗口上绘制任意的图形形状&#xff0c;来完成更复杂的界面设计 所谓的"控件"&#xff0c;本质上也是通过画图的方式画上去的 画图AP|和控件之间的关系&#xff0c;可以类比成机器指令和高级语言之间的关系…

大模型“挣钱”新方法!用GPT-4优化众筹文稿,提高筹款成功率11.9%!

怎么才能在大模型时代&#xff0c;更好地通过大模型&#xff08;LLM&#xff09;来挣钱&#xff1f;写软文拿打赏&#xff0c;画海报给甲方&#xff0c;或者制作视频来打造个人IP&#xff1f;不够&#xff0c;还想要更直接一点的方式&#xff1f;那有没有一种可能&#xff0c;直…

密码学基础-为什么使用真随机数(True Random Number Generators)

密码学基础-为什么使用真随机数&#xff08;True Random Number Generators&#xff09; 概述 随机的意义很重要&#xff0c;就像你的银行密码如果是亲朋好友的生日&#xff0c;结婚纪念日&#xff08;可预测的&#xff09;&#xff0c;那么就容易被人测试出来&#xff1b;而…

Centos 7配置问题

在VMWare12上面安装Centos 7 Linux虚拟机&#xff0c;在切换到命令界面时&#xff0c;需要登录用户名和密码&#xff0c;但发现输入用户后有字符显示&#xff0c;但是密码没有。 经过一系列查看后&#xff0c;发现这个是Linux的一种机制&#xff0c;即当你输入密码时不显示&…

Python批量移除Word文档水印

Word文档被广泛用于各种正式与非正式的沟通场合。有时候这些文档中可能包含着不再需要的水印&#xff0c;比如早期的草稿标记、保密声明或是仅供预览的信息等。这些水印的存在可能会干扰文档的阅读体验&#xff0c;甚至在某些情况下导致信息传达的不准确或产生误解。移除Word文…

QT:多版本同时使用(5.15.2在线安装教程)

前言 根据不同项目的需要有时候不得不安装多个版本的QT&#xff0c;新版本的QT都需要在线安装&#xff0c;以下为QT5.15.2的在线安装办法&#xff08;5.15.2为LTS版本相对更稳定&#xff09;&#xff0c;老版本可参考之前的离线安装&#xff0c; 版本选择 比如 5.15.2 是完整的…

昇思25天学习打卡营第XX天|Diffusion扩散模型

扩散模型自DDPM论文提出后&#xff0c;在图像生成领域取得了显著进展&#xff0c;特别是在文本条件图像生成方面。重要发展包括改进的去噪模型&#xff0c;级联扩散模型以提高图像分辨率&#xff0c;以及无需分类器的扩散模型指导。DALL-E 2和ImageGen等模型展示了结合语言模型…

7.29 Day11 LVM逻辑卷管理

LVM逻辑卷管理&#xff1a; 优点&#xff1a;将多个磁盘进行统一管理&#xff0c;易于扩容 缺点&#xff1a;不支持容错&#xff08;任意一个磁盘坏了&#xff0c;整个磁盘都会坏&#xff09; 实现步骤&#xff1a; 对磁盘进行分区--改为8e--PV&#xff08;物理卷&#xff0…

pythonGame-实现羊了个羊简易字母版

通过python简单复现羊了个羊游戏。 使用到的库函数&#xff1a; import pygame import random 游戏源码&#xff1a; import pygame import random# 初始化pygame pygame.init()# 设置窗口大小 WIDTH 800 HEIGHT 600 screen pygame.display.set_mode((WIDTH, HEIGHT)) p…

基于N32L406MB EasyFlash参数(key-value)记录库移植

EasyFlash 感谢作者的分享https://github.com/armink/EasyFlash EasyFlash是一款开源的轻量级嵌入式Flash存储器库&#xff0c;方便开发者更加轻松的实现基于Flash存储器的常见应用开发 三大实用功能 ENV快速保存产品参数(key-value)&#xff0c;支持 写平衡&#xff08;磨…

文心智能体零代码开发实践,创建一个智能体:从理论到实践AI技术落地

文心智能体引领零代码智能体开发新风尚&#xff0c;诚邀您一同探索这前沿科技的魅力&#xff01;以下为实践创建一个叫”从理论到实践AI技术落地“智能体的步骤。 首先登录官网&#xff1a;文心智能体平台AgentBuilder | 想象即现实 登录后点击&#xff1a;创建智能体 输入“…

《660》+《880》强化带刷计划‼️45天吃透所有核心知识点

如果把660吃透再去做880&#xff0c;肯定会轻松一些&#xff01; 因为660题对于基础的考查很深入&#xff0c;每一道题都有难度&#xff0c;都需要认真思考才能做出来&#xff0c;所以&#xff0c;660建议在基础结束之后再开始做&#xff0c;因为基础阶段本身对基础知识的理解…

240731-一图解释LM-Studio如何设置模型的国内下载

感谢微信公众号作者数翼分享的文章 — 本地 LLM 可视化工具 LM Studio 突破国内网络限制使用 A. 图文指南 B. 具体步骤 Step 1. 安装软件并通过VSCode等编辑打开软件所在目录 官网下载并安装。随后找到软件的安装路径&#xff0c;并通过VSCode打开。 Step 2. 全局替换 被替…

2024年8月1日 十二生肖 今日运势

小运播报&#xff1a;2024年8月1日&#xff0c;星期四&#xff0c;农历六月廿七 &#xff08;甲辰年辛未月丁酉日&#xff09;&#xff0c;法定工作日。今天建军节&#xff0c;祝保家卫国、英勇无畏的解放军战士们节日快乐&#xff01; 红榜生肖&#xff1a;龙、牛、猪 需要注…

JavaSE基础 (认识String类)

一&#xff0c;什么是String类 在C语言中已经涉及到字符串了&#xff0c;但是在C语言中要表示字符串只能使用字符数组或者字符指针&#xff0c;可以使用标准库提 供的字符串系列函数完成大部分操作&#xff0c;但是这种将数据和操作数据方法分离开的方式不符合面相对象的思想&…

vue中scoped详解以及样式穿透>>>、/deep/、::v-deep

1、scoped scoped属性用于限制样式仅应用于当前组件。当一个style标签拥有scoped属性时&#xff0c;它的CSS样式就只能作用于当前的组件&#xff0c;通过该属性&#xff0c;可以使得组件之间的样式不互相污染。 原理&#xff1a;当样式中加了scoped属性时候&#xff0c;编译的…

解决 Python 中 AttributeError: module ‘typing‘ has no attribute ‘_ClassVar‘

启动 Flask 的时候遇到&#xff1a; AttributeError: module typing has no attribute _ClassVar 卸载 dataclasses pip uninstall dataclasses 启动 Flask 正常