【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

news2025/1/12 22:57:19

【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

1.引入依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

2.Producer

public class MyProducer {

    public static void main(String[] args) throws Exception{
        // 构造Producer时,必须指定groupId
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        //指定NameServer的地址  只用namesrv的地址就行,它会从namesrv上拿到broker的地址和topic信息
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

        int num = 0;
        while (num < 20) {
            num++;
             /**
              * rocketmq封装了Message
                   * String topic,
                   * String tags, 标签(分类)---> 筛选
                   * byte[] body
                   */
            Message message = new Message("my_test_topic", "", ("hello rocketmq:" + num).getBytes());
            //同步发送  发送消息,拿到返回SendResult
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        //关闭生产者
        producer.shutdown();
    }
}

启动并发送消成功后,返回的SendResult如下:

SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成 SYNC_FLUSH 才会报这个错误)
FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK

3.Consumer


public class MyConsumer {

    public static void main(String[] args) throws MQClientException {
        // 构造Consumer时,必须指定groupId
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
        consumer.setNamesrvAddr("localhost:9876"); // nameServer地址,用于获取broker、topic信息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定订阅的主题与tag,通过tag可以定制性消费(*表示全部tag)
        consumer.subscribe("my_test_topic", "*"); 
                
        // 异步消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
//                System.out.println("Receive Message:" + msgs.toString());
                    // 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象
                // 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,
                //   并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
                    try {
                            for(MessageExt msg:msgs){
                                    String msgbody = new String(msg.getBody(), "utf-8");
                                    System.out.println(" MessageBody: "+ msgbody);//输出消息内容
                            }
                    } catch (Exception e) {
                            e.printStackTrace();
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 签收
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("消费者启动成功。。。");
    }
}

收到消息的内容:

consumer Group:位于同一个consumer Group中的consumer实例

和producer Group中的各个produer实例承担的角色类似
同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。

【转载原文地址】:【RocketMQ】基本使用:Java操作RocketMQ(rocketmq-client)

【RocketMq在windows下的环境配置】:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1052972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

公司知识库搭建步骤,知识库建设与运营的四个步骤分享

在知识管理方面&#xff0c;团队中的每一员&#xff0c;都像是一名独行侠&#xff0c;自己的知识&#xff0c;满足自己的需要&#xff0c;这其中&#xff0c;就造成了很多无意义的精力消耗。 公司知识库搭建必要性 比如&#xff0c;一名员工撰写一QA文档&#xff0c;并没有将它…

CDH 6.3.2升级Flink到1.17.1版本

CDH&#xff1a;6.3.2 原来的Flink&#xff1a;1.12 要升级的Flink&#xff1a;1.17.1 操作系统&#xff1a;CentOS Linux 7 一、Flink1.17编译 build.sh文件&#xff1a; #!/bin/bash set -x set -e set -vFLINK_URLsed /^FLINK_URL/!d;s/.*// flink-parcel.properties FLI…

【模板语法+数据绑定+el与data的两种写法+MVVM模型】

模板语法数据绑定el与data的两种写法MVVM模型 1 模板语法1.1 插值语法1.2 指令语法 2 数据绑定2.1 单向数据绑定2.2 双向数据绑定 3 el与data的两种写法4 MVVM模型 1 模板语法 1.1 插值语法 双大括号表达式功能&#xff1a;用于解析标签体内容语法&#xff1a;{{xxx}}&#x…

Postman的高级用法—Runner的使用

1.首先在postman新建要批量运行的接口文件夹&#xff0c;新建一个接口&#xff0c;并设置好全局变量。 2.然后在Test里面设置好要断言的方法 如&#xff1a; tests["Status code is 200"] responseCode.code 200; tests["Response time is less than 10000…

分子相互作用的人工智能

8 分子相互作用的人工智能 正如在第 5、6 和 7 节中所描述的那样&#xff0c;人工智能已经彻底改变了分子学习、蛋白质科学和材料科学领域。尽管已经广泛研究了用于单个分子的人工智能&#xff0c;但分子的物理和生物功能通常是由它们与其他分子的相互作用驱动的。在本节中&am…

iPhone数据丢失怎么办?9 佳免费 iPhone 数据恢复软件可收藏

您是否知道有多种原因可能导致 iPhone 上存储的数据永久丢失&#xff1f;然而&#xff0c;使用一些最好的免费 iPhone 数据恢复软件&#xff0c;您仍然可以恢复它。 由于我们几乎总是保存手机上的所有内容&#xff08;从联系人到媒体文件&#xff09;&#xff0c;因此 iPhone …

MySQL学习笔记27

MySQL主从复制的核心思路&#xff1a; 1、slave必须安装相同版本的mysql数据库软件。 2、master端必须开启二进制日志&#xff0c;slave端必须开启relay log 日志。 3、master主服务器和slave从服务器的server-id号不能一致。 4、slave端配置向master端来同步数据。 master…

【Java 进阶篇】MySQL 数据控制语言(DCL):管理用户权限

MySQL 是一个强大的关系型数据库管理系统&#xff0c;提供了丰富的功能和选项来管理数据库和用户。数据库管理员&#xff08;DBA&#xff09;通常使用数据控制语言&#xff08;Data Control Language&#xff0c;简称 DCL&#xff09;来管理用户的权限和访问。 本文将详细介绍…

基于php+MySql实现简易留言板(超级详细!)

基于phpMySql实现简易留言板&#xff08;超级详细&#xff01;&#xff09; 这篇文章是基于PHP和MySQL实现的一个简易留言板。该留言板具有用户注册、登录、发表留言以及查看留言等功能。首先&#xff0c;用户可以通过注册功能创建自己的账号&#xff0c;然后使用该账号进行登…

WPF 实现点击按钮跳转页面功能

方法1. 配置环境 首先添加prism依赖项&#xff0c;配置好所有文件。需要配置的有两个文件&#xff1a;App.xaml.cs和App.xaml App.xaml.cs using System.Data; using System.Linq; using System.Threading.Tasks; using System.Windows;namespace PrismDemo {/// <summa…

Linux 集锦 之 最常用的几个命令

Linux最常用的几个命令 ​ Linux系统中的命令那是相当地丰富&#xff0c;不同的版本可能还有不同的命令&#xff0c;不过Linux核心自带的命令大概有几百个&#xff0c;这个不管是什么发行版一般都是共用的。 ​ 如果希望探索Linux的所有命令&#xff0c;可能不太实际&#xf…

【Unity Build-In管线的SurfaceShader剖析_PBS光照函数】

Unity Build-In管线的SurfaceShader剖析 在Unity Build-In 管线&#xff08;Universal Render Pipeline&#xff09;新建一个Standard Surface Shader文件里的代码如下&#xff1a;选中"MyPBR.Shader"&#xff0c;在Inspector面板&#xff0c;打开"Show generat…

力扣刷题-哈希表-求两个数组的交集

349 求两个数组的交集 题意&#xff1a;给定两个数组&#xff0c;编写一个函数来计算它们的交集。注意&#xff1a;输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 提示&#xff1a; 1 < nums1.length, nums2.length < 1000 0 < nums1[i], …

TensorFlow-Federated简介与安装

1、简介 TensorFlow Federated&#xff08;TFF&#xff09;是一个用于机器学习和其他分布式数据计算的开源框架。TFF 的开发旨在促进联邦学习 &#xff08;FL&#xff09;的开放研究和实验。联邦学习是一种机器学习方法&#xff0c;其中一个共享的全局模型在许多参与的客户之间…

集线器与交换机有什么区别?一文带你了解

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 梦想从未散场&#xff0c;传奇永不落幕&#xff0c;博主会持续更新优质网络知识、Python知识、Linux知识以及各种小技巧&#xff0c;愿你我共同在CSDN进步 目录 一、集线器 1. 集线器是什么&#xff1f; 2. …

高并发时代到底是Go还是Java?

作为一名用过Java和Go开发过微服务架构程序的在校学生的角度思考&#xff0c;本文将从以下几个方便来讲述Go和Java的区别。 前言 小明&#xff1a;听说Go在天然情况下支持并发 小红&#xff1a;我不管Java就是最好的语言 小明&#xff1a;不行&#xff0c;我要学以下神秘的Go…

vue3 element-ui-plus Carousel 跑马灯 的使用 及 踩坑记录

vue3 element-ui-plus Carousel 跑马灯 的踩坑记录 Carousel 跑马灯首页跑马灯demo Carousel 跑马灯 首先&#xff0c;打开其官网-跑马灯案例 跑马灯代码&#xff1a; <el-carousel :interval"5000" arrow"always"><el-carousel-item v-for"…

“智慧时代的引领者:探索人工智能的无限可能性“

目录 一.背景 二.应用 2.1金融领域 2.2医疗领域 2.3教育领域 三.发展 四.总结: 一.背景 人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;&#xff0c;是指通过计算机程序模拟人类智能的一种技术。它是计算机科学、工程学、语言学、哲学等多…

Halcon 从基础到精通-01- 基本概念

1 HALCON Architecture 【图一】 HALCON的架构如上&#xff0c;其主要的部分&#xff0c;就是图像处理库。 2 HALCON的基本架构 2.1 Operators HALCON库功能的使用都是通过【operators】操作符来实现的。绝大多数的操作符由多种方法构成&#xff0c;具体可以参考给出的下面…

【开发篇】十、Spring缓存:手机验证码的生成与校验

文章目录 1、缓存2、用HashMap模拟自定义缓存3、SpringBoot提供缓存的使用4、手机验证码案例完善 1、缓存 缓存是一种介于数据永久存储介质与数据应用之间的数据临时存储介质使用缓存可以有效的减少低速数据读取过程的次数&#xff08;例如磁盘IO&#xff09;&#xff0c;提高…