RocketMQ 5.1.0 在java中的使用

news2024/11/25 20:30:23

版本:

        当前测试版本:springBoot 2.3.9、 RocketMQ 5.1.0

Maven或Gradle RocketMQ的依赖项:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.5</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

1、生产者

# RocketMQ Producer配置
rocketmq.namesrv.addr=119.3.81.109:9876
rocketmq.producer.group=BREAD_ORDER_GROUP
rocketmq.topic =Tseng-Dev

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RocketMQProducer {

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    public void sendMessage(String topic, String message) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.start();
        Message rocketMsg = new Message(topic, message.getBytes());
        producer.send(rocketMsg);
        producer.shutdown();
    }
}
    @Autowired
    private RocketMQProducer rocketMQProducer;
    @Value("${rocketmq.topic}")
    private String topic;

    @GetMapping("/send-message")
    public String sendMessage() {
        try {
            String message = "Hello, RocketMQ!";
            rocketMQProducer.sendMessage(topic, message);
            return "Message sent successfully";
        } catch (Exception e) {
            e.printStackTrace();
            return "Failed to send message";
        }
    }

 

2、消费者

# RocketMQ Consumer配置
rocketmq.namesrv.addr=119.3.81.109:9876
rocketmq.consumer.group=BREAD_ORDER_GROUP
rocketmq.topic =Tseng-Dev

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author:Tseng
 * @description 消费者
 * @since: JDK1.8
 * @version: 1.0
 * @date: 2023-07-17
 * @Copyright © 2023
 */
@Component
public class RocketMQConsumer {

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    @Value("${rocketmq.topic}")
    private String topic;

    private final RocketMQMessageListener messageListener;

    @Autowired
    public RocketMQConsumer(RocketMQMessageListener messageListener) {
        this.messageListener = messageListener;
    }

    @PostConstruct
    public void start() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe(topic, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            return messageListener.consumeMessage(msgs, context);
        });
        consumer.start();
    }
}


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.List;

@Slf4j
@Component
public class RocketMQMessageListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String topic = msg.getTopic();
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            String tags = msg.getTags();
            String keys = msg.getKeys();
            log.info("Received message: topic={}, tags={}, keys={}, message={}", topic, tags, keys, message);
            // 处理接收到的消息
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

3、测试

发送

结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/764931.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

国内开源框架(快速开发,避免重复造轮子)

若依开源框架&#xff08;最容易上手&#xff0c;轻巧简洁&#xff09; 若依开源框架是一款基于SpringBoot2.x和Vue.js的前后端分离的权限管理系统。它采用了前后端分离的架构&#xff0c;使得系统更加灵活、易扩展。同时&#xff0c;它还集成了多种常见的功能模块&#xff0c…

UEC++: 接口

1. 2. 3. 4.一般接口的源文件是不用写逻辑的&#xff0c;一般是在接口头文件中编写 5.被C类继承&#xff1a; 写完函数&#xff0c;千万不允许定义&#xff01;&#xff01;&#xff01; 添加标记宏 找到一个类&#xff1a;继承I开头的接口&#xff1a;引用头文件 错误重写&…

移 动 端

移动端 国内的UC和QQ&#xff0c;百度等手机浏览器都是根据 Webkit 修改过来的内核 兼容移动端主流浏览器 处理 webkit 内核浏览器即可 常见移动端屏幕尺寸 调式 Chrome DevTools&#xff08;谷歌浏览器&#xff09;的模拟手机调试搭建本地 web 服务器&#xff0c; 手机和服…

嵌入式开发--STM32用DMA+IDLE中断方式串口接收不定长数据

回顾 之前讲过用 利用IDLE空闲中断来接收不定长数据 &#xff0c;但是没有用到DMA&#xff0c;其实用DMA会更加的高效&#xff0c;MCU也可以腾出更多的性能去处理应该做的事情。 原理简介 IDLE顾名思义&#xff0c;就是空闲的意思&#xff0c;即当监测到串口空闲超过1个串口…

Java---第五章(类和对象,方法带参)

Java---第五章 一 类和对象类的由来&#xff1a;二者之间的关系this关键字&#xff1a;构造方法 二 方法带参构造方法带参&#xff1a;方法带参对象数组引用数据类型作为方法参数方法重载面向对象说明面向对象和面向过程的区别 一 类和对象 类的由来&#xff1a; 人们在日常生…

【HCIA】11.ACL与NAT地址转换

ACL 通过ACL可以实现对网络中报文流的精确识别和控制&#xff0c;达到控制网络访问行为、防止网络攻击和提高网络带宽利用率的目的。 ACL是由permit或deny语句组成的一系列有顺序的规则的集合&#xff1b;它通过匹配报文的相关字段实现对报文的分类。ACL是能够匹配一个IP数据包…

结合ChatGPT制作PPT

今天看到圈友的一个AI分享&#xff0c;然后自己本身需要做一个分享的PPT。刚好那着帖子实战一下。先说下整体感受。 优点&#xff1a;制作成本确实会比较低&#xff0c;很熟练的话大概就是1分钟一个都有可能。整体流程是先找个第三方PPT制作网站&#xff0c;看下支不支持文本转…

Unity游戏源码分享-Third Person Controller - Shooter Template v1.3.1

Unity游戏源码分享-Third Person Controller - Shooter Template v1.3.1 功能非常齐全 AI格斗 2.5D 完整工程地址&#xff1a;https://download.csdn.net/download/Highning0007/88057824

兴达易控modbus转profinet网关与流量变送器兼容转modbusTCP网口协议

本案例演示电磁流量计通过兴达易控modbus转profinet网关&#xff08;XD-MDPN100&#xff09;连接西门子1200PLC实现Profinet转ModbusTCP&#xff0c;协议网关MDPN100兼容转ModbusTCP网口协议&#xff0c;大大减少了对plc编程的工作 网络拓展图 打开博图&#xff0c;添加PLC并加…

django报错设置auth User

1.报错&#xff1a;auth.User.groups... auth.User.user_permissions... 我们的用户组、用户权限只能关联一个用户 &#xff0c;我们自己定义了一个用户表&#xff0c;系统还有一个用户表&#xff0c;这时候就会出问题。 解决办法&#xff1a; 让给我们自己定义的user替换系…

(学习笔记-TCP连接建立)TCP 为什么是三次握手?不是两次、四次?

常规回答&#xff1a;“因为三次握手才能保证双方具有接收和发送的能力” 原因一&#xff1a;避免历史连接 三次握手的首要原因是为了防止旧的重复连接初始化造成混乱。 假设&#xff1a;客户端先发送了SYN(seq90)报文&#xff0c;然后客户端宕机了&#xff0c;而且这个SYN报…

七夕杯—密码签到

0x00 前言 CTF 加解密合集&#xff1a;CTF 加解密合集 0x01 题目 大牛在今年的hvv过程中&#xff0c;渗透进某业务系统&#xff0c;发现了密文,可是不知道如何解出明文&#xff0c;你能帮他找出来吗&#xff1f;已知密文由两部分组成&#xff0c;后面的推测为秘钥密文如下&a…

nginx基本2——配置文件详解(网页配置篇)

文章目录 一、基本了解二、nginx.conf配置参数2.1 调试参数2.2 必配参数2.3 优化性能参数2.4 event{}段配置参数2.5 网络连接参数2.6 fastcgi配置参数2.7 总结常配参数 三、http{}段配置参数3.1 配置结构3.2 精简配置网页3.3 location定义网页3.3.1 root path用法3.3.1 alias p…

编码器旋转圈数和单圈计数值计算功能块(SMART PLC梯形图)

有关 PLC的编码器更多应用请参看专栏其它文章,常用链接如下: 西门子SMART PLC高速脉冲计数采集编码器速度(RC滤波)_编码器频率采集计算速度采用什么滤波方法_RXXW_Dor的博客-CSDN博客这篇文章主要讲解西门子 SMART PLC高速计数采集编码器脉冲信号计算速度,根据编码器脉冲数…

数据结构--时间复杂度与空间复杂度

数据结构–时间复杂度与空间复杂度 文章目录 数据结构--时间复杂度与空间复杂度时间复杂度一、什么是时间复杂度二、具体实例1.大O的渐进表示法2.二分查找的时间复杂度 空间复杂度一、什么是空间复杂度二、具体实例总结 时间复杂度 一、什么是时间复杂度 在计算机科学中&…

字符串相加(力扣)

Problem: 415. 字符串相加 文章目录 思路Code复杂度运行结果 思路 创建一个StringBuilder对象使用append方法追加每位数字相加&#xff0c;使用双指针的方式&#xff0c;指针i&#xff0c;j分别指向num1和num2的每位数字&#xff0c;从后往前&#xff0c;进位用carry存储着。 …

什么是DevOps监控以及如何在组织中实施?

如今的软件开发商经常面临两大挑战——快速交付和大规模创新。DevOps通过在软件开发生命周期(SDLC)中引入自动化来开发和交付高质量的软件&#xff0c;从而帮助解决这些挑战。 持续集成(CI)/持续部署&#xff08;CD)是DevOps实践中自动化的关键组件。它可以自动化代码构建、测试…

【UE4 塔防游戏系列】06-炮塔发射子弹攻击敌人

效果 步骤 1. 新建一个Actor蓝图类&#xff0c;命名为“TotalBulletsCategory”&#xff0c;用来表示子弹蓝图总类&#xff0c;后面会有很多不同类型的子弹会继承该类 打开“TotalBulletsCategory”&#xff0c;添加粒子系统组件、盒体碰撞组件和发射物移动组件 调整发射物重力…

GitHub上整理的一些实用的工具

1. Visual Studio Code 简称VScode&#xff0c;是一个轻量且强大的跨平台开源代码编辑器&#xff08;IDE&#xff09;&#xff0c;支持Windows&#xff0c;OS X和Linux。内置JavaScript、TypeScript和Node.js支持&#xff0c;而且拥有丰富的插件生态系统&#xff0c;可通过安装…

【明解STM32】一文读懂STM32芯片总线

目录 一、前言 二、总线基础知识概述 (1)、总线在芯片中的角色 (2)、总线的类型 (3)、总线的指标 (4)、AHB和APB 三、总线框架结构 (1)、结构类型 (2)、总线模块 (3)、总线交互 四、总结 一、前言 本篇介绍STM32芯片内部的总线系统结构&#xff0c;嵌入式芯片内部的…