Rocketmq简单使用

news2025/1/14 0:44:22

1.引入依赖

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.0</version>
        </dependency>
  1. 生产者使用
    2.1 发送普通消息
public class SendMessage {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 创建消息生产者, 指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        // 指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        // 创建消息对象,指定主题、标签和消息体
        Message msg = new Message("topicB", "myTag", ("RocketMQ Message").getBytes());
        // 发送消息
        SendResult sendResult = producer.send(msg, 10000);
        System.out.println(sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}

2.2 发送延时消息

/**
 * 发送延时消息
 */
public class SendDelayLevel {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
// 创建消息生产者, 指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        // 指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        // 创建消息对象,指定主题、标签和消息体
        Message msg = new Message("topicB", "myTag", ("RocketMQ DelayL Message").getBytes());
        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        msg.setDelayTimeLevel(3);
        // 发送消息
//        SendResult send = producer.send(msg, 10000);//同步的
        System.out.println();
        producer.send(msg, new SendCallback() {//异步的回调函数
            @Override
            public void onSuccess(SendResult sendResult) {
                SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String format = sdf.format(new Date());
                System.out.println(format+":发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败");
            }
        }, 10000);
    }
}

2.3发送事务消息
事务消息业务流程
在这里插入图片描述
1.发送方发送半事务消息
2.Broker收到半事务消息存储后返回结果
3.发送半事务消息方处理本地事务
4.发送方把本地事务处理结果以消息形式发送到Broker
5.Broker在固定的时间内(默认60秒)未收到4的确认消息,Broker为发送方发送回查消息
6.业务发送发收到Broker回查消息后,查询本地业务执行结果
7.业务方发送回查结果消息
1-4 是同步调用,5-7是异步调用。RocketMQ事务消息使用了2PC+事后补偿机制保证了最终一致性。

public class SendTransactionMessage {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer ("myproducer-group");
        // 指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setTransactionListener(new TransactionListener() {
            private AtomicInteger transactionIndex = new AtomicInteger(1);
            private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                String transactionId = message.getTransactionId();
                //0:执行中,状态未知 1:执行成功 2:执行失败
                localTrans.put(transactionId, 0);
                try {
                    //
                    Thread.sleep(1000);
                    localTrans.put(transactionId, 1);
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch(Exception e){
                    localTrans.put(transactionId,2);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.printf("%s,事务结果回查%s%n\n", new Date(System.currentTimeMillis()),messageExt.toString());
                Integer status = localTrans.get(messageExt.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.UNKNOW;
                        case 1:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        default:
                            return LocalTransactionState.COMMIT_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        // 启动生产者
        producer.start();
        // 创建消息对象,指定主题、标签和消息体
        Message msg = new Message("topicB", "myTag", ("RocketMQ Transaction Message").getBytes());
        SendResult sendResult =producer.sendMessageInTransaction(msg,null);
        System.out.printf("%s,半消息发送结果%s%n",new Date(System.currentTimeMillis()),sendResult);
    }
}

  1. 消费者使用
    消费者从Broker中获取消息的方式有两种:pull拉取方式和push推动方式。
    3.1 拉取式消费
    Consumer主动从Broker中拉取消息,主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。
    特点:由于拉取时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差
    3.2 推送式消费
    该模式下Broker收到数据后会主动推送给Consumer。该获取方式一般实时性较高。
    特点:该获取方式是典型的发布-订阅模式,即Consumer向其关联的Queue注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是Consumer去Queue中拉取消息。而这些都是基于Consumer与Broker间的长连接的。长连接的维护是需要消耗系统资源的。
  @Bean
    public DefaultMQPushConsumer defaultConsumer() {
        log.info("ruleEngineConsumer 正在创建---------------------------------------");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();//推送式消费
//        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();//拉取式消费
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(2);
        consumer.setConsumeMessageBatchMaxSize(100);//批量消费消息数目
       // consumer.setMessageModel(MessageModel.BROADCASTING);//广播式消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);//默认消费模式,集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。
        // 设置监听
        consumer.registerMessageListener(mqConsumeMsgListenerProcessor);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setInstanceName(System.currentTimeMillis()+"dstMQMsgConsumer");
        consumer.setConsumerGroup(resultsGroupName);
        try {
            consumer.subscribe("topicB", "*");
            consumer.start();
            log.info("ruleEngineConsumer 创建成功 groupName={}",resultsGroupName);
        } catch (MQClientException e) {
            log.error("ruleEngineConsumer 创建失败!");
        }
        return consumer;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/557879.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

22.vue插槽

目录 1 基本使用 2 name属性与v-slot 3 插槽放默认内容(后备内容) 4 插槽的自定义属性(作用域插槽) 4.1 简单使用 4.2 传data 4.3 支持解构 插槽操作就是写在组件中间的东西&#xff0c;其目的是增加组件在UI结构上的复用性&#xff0c;就像下面这样 直接写是渲染…

chatgpt赋能Python-python_ipynb

Python Ipython Notebook: 大数据时代的完美解决方案 在大数据时代&#xff0c;数据处理和分析是许多组织必须面对的挑战。Python Ipython Notebook (IPYNB) 可以提高数据探索性分析的效率&#xff0c;并能够使您更好地理解和评估数据。本文将介绍Python IPYNB是什么、以及为什…

C++设计手段的智慧:从基础到前沿

C设计手段的智慧&#xff1a;从基础到前沿 一、C基础设计手段&#xff08;Basic Design Techniques in C&#xff09;1.1 C 类和对象设计1.1.1 类的定义1.1.2 对象的创建和使用1.1.3 类的封装1.1.4 类的继承1.1.5 类的多态 1.2 RAII of C design tools (resource acquisition i…

ROS学习笔记(九):MoveIt!与机械臂控制

ROS学习笔记&#xff08;九&#xff09;&#xff1a;MoveIt&#xff01;与机械臂控制 MoveIt&#xff01;简介MoveIt&#xff01;系统架构MoveIt&#xff01;编程与机械臂控制关节空间规划工作空间规划笛卡尔运动规划避障规划 Pick and Place示例 MoveIt&#xff01;简介 Move…

一图看懂!RK3568与RK3399怎么选?

▎简介 RK3568和RK3399都是Rockchip公司的处理器&#xff0c;具有不同的特点和适用场景。以下是它们的主要区别和应用场景。 ▎RK3568 RK3568是新一代的高性能处理器&#xff0c;采用了22nm工艺&#xff0c;具有更高的性能和更低的功耗。它支持4K视频解码和编码&#xff0c;支持…

某程序员辞职后,接6份兼职,月入3w+

对于程序员来说&#xff0c;35岁真的是很关键。 如果成为架构师或者是成为管理方面的人才&#xff0c;还是不用担心失业。要是你30多岁还在一线写代码&#xff0c;那被裁的可能性很大。即使你现在没有失业&#xff0c;也说明你能力很一般。 最近在职场论坛上看到这样一个帖子…

互联网广告丨行业知识储备

文章状态&#xff1a;持续更新中 更新时间&#xff1a;2023.05.22 本文不同于专业咨询机构输出的专业行业调研报告&#xff0c;仅作为产品经理对互联网广告行业的一些基础知识储备。文章会以产品经理的角度&#xff0c;从行业概述、行业目标与愿景、行业生态、行业的发展、行业…

数仓中指标-标签,维度-度量,自然键-代理键等各名词深度解析

作为一个数据人&#xff0c;是不是经常被各种名词围绕&#xff0c;是不是对其中很多概念认知模糊。有些词虽然只有一字之差&#xff0c;但是它们意思完全不同&#xff0c;今天我们就来了解下数仓建设及数据分析时常见的一些概念含义及它们之间的关系。 本文首发于公众号【五分钟…

LiveNVR视频平台接收无人机等移动终端RTMP推流后转成GB28181协议输出级联到GB28181视频平台的操作说明...

1、需求介绍 目前很多移动终端设备(如无人机等)只支持RTMP推流输出&#xff0c;不支持GB28181协议。但是又有需要通过GB28181协议接入到视频平台的需求。比如有些大疆无人机产品不能直接注册国标平台&#xff0c;只能rtmp推流。那么&#xff0c;项目中如果将无人机的rtmp的推流…

Stablediffusion模型diffusesr格式和ckpt格式相互转换

参考资料&#xff1a; diffusers的源码 [github] 因为小博客可能看的人很少&#xff0c;所以我写的啰嗦一点&#xff0c;想直接看如何互相转换的朋友可以直接转到文末的代码段。 当你在学习Stablediffusion这个开源的t2i模型时&#xff0c;不可避免地会碰到两种模型权重的存储格…

在rk3568移植rtl8723du,配置成wifi ap模式

1、在路径添加rtl8723du模块代码 kernel/drivers/net/wireless/rockchip_wlan 添加rtl8723du 2、修改Makefile 修改对应的路径 修改交叉编译的工具的路径和内核路径 3、修改rockchip_wlan目录下的Makefile 添加这个 obj-$(CONFIG_RTL8723DU) rtl8723du/ 4、修改rockchip_w…

淘宝按关键字搜索淘宝商品 API 参数及返回值说明 翻页展示 含调用示例

淘宝关键字搜索接口&#xff0c;是复原我们在淘宝购物时&#xff0c;在搜索栏内输入关键字&#xff0c;即可获取到相关商品列表&#xff0c;商品信息齐全&#xff0c;支持翻页展示。同时&#xff0c;传入参数sort可按价格排序&#xff0c;也可筛选响应价格段的商品。商品信息是…

关于【Stable-Diffusion WEBUI】基础模型对应VAE的问题

文章目录 &#xff08;零&#xff09;前言&#xff08;一&#xff09;什么是VAE&#xff08;二&#xff09;模型嵌入VAE了么&#xff08;三&#xff09;我们能做什么&#xff08;3.1&#xff09;准备常见的VAE&#xff08;3.2&#xff09;下载模型对应的VAE&#xff08;3.3&…

小航编程题库GoC南海区小学四年级模拟测试题(含题库教师账号)

需要在线模拟训练的题库账号请点击 小航助学编程在线模拟试卷系统&#xff08;含题库答题软件账号&#xff09;_程序猿下山的博客-CSDN博客 填空题8.0分 删除编辑 答案:100 第1题画一条高度为100&#xff0c;粗为5的竖线。 //程序名:直线 //作者: int main() { pen.size(5)…

Python 面向对象高级--继承,方法重写,权限,类成员,实例成员

1.继承入门 class 子类名(父类名): 面向对象中的继承: 指的是多个类之间的所属关系&#xff0c;即子类默认继承父类的所有属性和方法. 面向对象中继承的作用: 提高代码的复用率, 减少重复代码的书写. class Animal():def __init__(self,name,age):self.name nameself.age a…

聚焦珠宝产业数字化变革 世界珠宝数字化发展论坛在厦门举办

家庭周报厦门讯 5月18日&#xff0c;世界珠宝数字化发展论坛在厦门举办。本活动由北京北大宝石鉴定中心作为指导单位&#xff0c;中国广告主协会品牌建设与营销专业委员会、世界珠宝数字化发展论坛组委会主办&#xff0c;北京真心红珠宝有限公司承办。这是在数字中国建设整体布…

S20330-SRS步进电机最简单的驱动方法

​ S20330-SRS步进电机最简单的驱动方法 步进电机最简单的驱动方法&#xff0c;了解四轴步进电机驱动器原理 四轴步进电机驱动器原理-简介四轴步进电机驱动器&#xff0c;其实就是一种将电脉冲转化为角位移的执行机构。首先步进驱动器会接收到一个脉冲信号&#xff0c;然后它按…

小航编程题库2022年NOC决赛图形化(小高组)(含题库教师学生账号)

需要在线模拟训练的题库账号请点击 小航助学编程在线模拟试卷系统&#xff08;含题库答题软件账号&#xff09;_程序猿下山的博客-CSDN博客 单选题3.0分 删除编辑 答案:A 第1题运行下面的程序&#xff0c;最终“我的变量”的值是多少&#xff1f; A、5B、10C、25D、30 答案…

排序算法——冒泡排序详解及优化

冒泡排序 排序的稳定性冒泡排序优化后的冒泡排序冒泡排序的复杂度 排序的稳定性 对于一个排序算法&#xff0c;假设两个相同的元素Ai和Aj 在排序前这两个元素满足条件i<j&#xff0c;即Ai在Aj之前 在排序后Ai仍在Aj之前&#xff0c;则称为排序算法为稳定排序 否则称这个算法…

【Linux】多进程实现并发服务器

多进程实现并发服务器 1.此时来一个客户端请求时 创建cfd1 属于是阻塞状态 如果再有一个客户端就没办法提取了 2.所以要fork一个子进程&#xff0c;此时关闭父进程的cfd1&#xff0c;关闭子进程的lfd;不影响其它客户端请求连接 具体流程&#xff1a; 创建套接字 绑定 监听 …