SpringBoot集成RocketMQ

news2025/1/16 11:11:05

SpringBoot整合RocketMQ使用非常简单,下面是一个简单的例子,作为备忘:

完整项目代码: https://github.com/dccmmtop/springBootRocketMQ

项目目录结构

依赖

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.1.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>2.1.6.RELEASE</version>
    </dependency>
<dependencies>

配置类 ExtRocketMQTemplate

package com.roy.rocketmq.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

消费者 SpringConsumer

package com.roy.rocketmq.basic;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
// consumerGroup 相同角色的消费者需要具有完全相同的订阅和consumerGroup才能正确实现负载均衡。它是必需的并且需要是全球唯一的。
// consumeMode 控制消费模式,可以选择并发或顺序接收消息。
// 该注解还有很多其他属性,请详细查看源码
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

发送者示例 SpringProducer

package com.roy.rocketmq.basic;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // 就是如此简单
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
}

更多的发消息示例

带有返回数据的消费者

package com.roy.rocketmq.basic;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup1", topic = "TestTopicReplyString",consumeMode= ConsumeMode.CONCURRENTLY)
// 这里注意要使用与收到消息对应的类型 <String,String> 代表接收string 消息 返回的也是string类型的消息
public class SpringConsumerWithReply implements RocketMQReplyListener<String,String> {

    @Override
    public String onMessage(String message) {
        System.out.println("收到消息,并返回结果, " + , message);
        return "OK";
    }
}

接收 User 返回User

package com.roy.rocketmq.basic;

import com.roy.rocketmq.domain.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup2", topic = "TestTopicUser",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumerReplyUser implements RocketMQReplyListener<User,User> {


    @Override
    public User onMessage(User user) {
        System.out.println("收到User: " + user.toString());
        return user;
    }
}

User

package com.roy.rocketmq.domain;

public class User {
    private String userName;
    private Byte userAge;

    public String getUserName() {
        return userName;
    }

    public User setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    public Byte getUserAge() {
        return userAge;
    }

    public User setUserAge(Byte userAge) {
        this.userAge = userAge;
        return this;
    }

    @Override
    public String toString() {
        return "User{" +
                "userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}

OrderPaidEvent

package com.roy.rocketmq.domain;

import java.math.BigDecimal;

public class OrderPaidEvent {
    private String orderId;

    private BigDecimal paidMoney;

    public OrderPaidEvent() {
    }

    public OrderPaidEvent(String orderId, BigDecimal paidMoney) {
        this.orderId = orderId;
        this.paidMoney = paidMoney;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getPaidMoney() {
        return paidMoney;
    }

    public void setPaidMoney(BigDecimal paidMoney) {
        this.paidMoney = paidMoney;
    }
}

发送

package com.roy.rocketmq;

import com.roy.rocketmq.domain.OrderPaidEvent;
import com.roy.rocketmq.domain.User;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringRocketTest {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendMessageTest(){
        String springTopic="TestTopic";
        String springTopicReplyString="TestTopicString";
        String springTopicUser="TestTopicUser";
        //发送字符消息
        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "你好!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
                new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        //发送对象消息
        rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }
        });

        //发送指定TAG的消息
        // 注意这里指定tag的方式,topic + : + tag
        rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0");  // tag0 will not be consumer-selected
        System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");
        rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");
        System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");

        //同步发送消息并且返回一个String类型的结果。
        String replyString = rocketMQTemplate.sendAndReceive(springTopicReplyString, MessageBuilder.withPayload("同步发送测试").build(), String.class);
        System.out.printf("send %s and receive %s %n", "request string", replyString);

        //同步发送消息并且返回一个Byte数组类型的结果。
        byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopicReplyString, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
        System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));

        // 同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果
        User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
        User replyUser = rocketMQTemplate.sendAndReceive(springTopicUser, requestUser, User.class, "order-id");
        System.out.printf("send %s and receive %s %n", requestUser, replyUser);

        //同步发送一个带延迟级别的消息(延迟消息)
        String replyGenericObject = rocketMQTemplate.sendAndReceive(springTopicReplyString, "request generic",
                String.class, 30000, 2);
        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);


        //异步发送消息,返回String类型结果。
        rocketMQTemplate.sendAndReceive(springTopicReplyString, "request string", new RocketMQLocalRequestCallback<String>() {
            @Override public void onSuccess(String message) {
                System.out.printf("send %s and receive %s %n", "request string", message);
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        //异步发送消息,并返回一个User类型的结果。
        rocketMQTemplate.sendAndReceive(springTopicUser, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);
        //发送批量消息
        List<Message> msgs = new ArrayList<Message>();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
        }

        SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);

        System.out.printf("--- Batch messages send result :" + sr);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/810237.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux下进程特性总结:工作目录,环境变量,标准输出转命令行参数,O_CLOEXEC标志作用,读写锁控制进程互斥

进程是运行中的程序&#xff0c;是资源分配的最小单位&#xff0c;其有一些特性对于实际开发很有帮助&#xff0c;本篇博客将进程的相关特性进行梳理总结&#xff0c;包含工作目录&#xff0c;环境变量&#xff0c;标准输出转命令行参数&#xff0c;读写锁控制进程互斥。 目录…

vo 2 输出helloworld

vo 2 输出helloworld 目录概述需求&#xff1a; 设计思路实现思路分析1.code 拓展实现性能参数测试&#xff1a; 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better r…

HCIP重发布实验

目录 实验要求&#xff1a; 步骤一&#xff1a;拓扑设计IP地址规划 拓扑设计 R1 R2 R3 R4 发布路由 R1 R2 R3 R4 双向重发布 在R2和R4 上进行 R2 R4 检查R1 修改开销值选路 择优选择去4.0网段的路径 测试&#xff1a;​编辑 择优选择去32网段的路径 测试&…

redis(四)—— java如何操作redis、springboot集成redis

一、java如何操作redis——Jedis jedis的“j”就是javajedis是java官方推荐的java操作redis工具&#xff0c;是一个非可视化的客户端redis-clientspringboot的redisTemplate对象就相当于这里的jedis对象&#xff08;redisTemplate去调用一系列方法不就相当于jedis这个client去…

使用java将个人微信打造成得力助手

本文提供一个通过java编程将微信打造成得力助手的方案, 先看效果&#xff1a; 查看支持的功能与对该功能开放的用户 接入人工智能 下载BiliBili 视频 原理 这个方案最难的地方就是如何把个人账号安全的接入微信&#xff0c;不会被封号。网上主流的有逆向web端微信&#xff0c…

KY258 日期累加

一、题目 二、代码 #include <iostream> using namespace std; class Date {public:Date(int year 0, int month 0, int day 0) {_year year;_month month;_day day;}Date(const Date& _d);int GetDay(int year, int month);Date& operator(int d);Date o…

this is incompatible with sql_mode=only_full_group_by

查看配置 select global.sql_mode 在sql命令行中输入select sql_mode 能够看到sql_mode配置,如果有ONLY_FULL_GROUP_BY&#xff0c;则需要修改 在mysql5.7.5后&#xff0c;ONLY_FULL_GROUP_BY是默认选项&#xff0c;所以就会导致group by的问题 set sql_mode‘复制去掉ONLY_F…

Python(五十)获取列表中指定的元素

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

嵌入式linux之OLED显示屏SPI驱动实现(SH1106,ssd1306)

周日业余时间太无聊&#xff0c;又不喜欢玩游戏&#xff0c;大家的兴趣爱好都是啥&#xff1f;我觉得敲代码也是一种兴趣爱好。正巧手边有一块儿0.96寸的OLED显示屏&#xff0c;一直在吃灰&#xff0c;何不把玩一把&#xff1f;于是说干就干&#xff0c;最后在我的imax6ul的lin…

BUU [BJDCTF2020]The mystery of ip

BUU [BJDCTF2020]The mystery of ip 再hint的源码里面找到这个东西。 这题一定和IP有关系&#xff0c;试了一下伪造IP还真是。 分析一下&#xff0c;这题可能存在SSTI漏洞&#xff0c;先用模板算式子{{9*‘9’}}测一下 那SSTI稳了&#xff0c;应该是Twig模板。 但是报错测出来是…

Android 面试题 线程间通信 六

&#x1f525; 主线程向子线程发送消息 Threadhandler&#x1f525; 子线程中定义Handler&#xff0c;Handler定义在哪个线程中&#xff0c;就跟那个线程绑定&#xff0c;在线程中绑定Handler需要调用Looper.prepare(); 方法&#xff0c;主线程中不调用是因为主线程默认帮你调用…

编写脚本,使用mysqldump实现分库分表备份。

一、实现分库备份&#xff1a; #!/bin/bash #分库备份 bak_userroot-----------备份用户 bak_password513721ykp--------备份密码 bak_path/backup/db_bak---------备份路径 bak_cmd"-u$bak_user -p$bak_password"-------登录命令&#xff0c;以便后面重复编写 exc_…

分布式锁漫谈

简单解释一下个人理解的分布式锁以及主要的实现手段。 文章目录 什么是分布式锁常用分布式锁实现 什么是分布式锁 以java应用举例&#xff0c;如果是单应用的情况下&#xff0c;我们通常使用synchronized或者lock进行线程锁&#xff0c;主要为了解决多线程或者高并发场景下的共…

3ds MAX绘制摄像机动画

之前&#xff0c;我们已经绘制了山地、山间小路、以及树林&#xff1a; 这里我们添加一个自由摄像机&#xff1a;&#xff08;前视图&#xff09; 在动作窗口&#xff0c;给摄像机添加一个按路径移动的设定&#xff1a; 这样&#xff0c;我们只要把指定的路径绘制出来&#xff…

UE4/5C++多线程插件制作(0.简介)

目录 插件介绍 插件效果 插件使用 English 插件介绍 该插件制作&#xff0c;将从零开始&#xff0c;由一个空白插件一点点的制作&#xff0c;从写一个效果到封装&#xff0c;层层封装插件&#xff0c;简单粗暴的对插件进行了制作&#xff1a; 插件效果 更多的是在cpp中去…

Cpp04 — 默认成员函数

前言&#xff1a;本文章主要用于个人复习&#xff0c;追求简洁&#xff0c;感谢大家的参考、交流和搬运&#xff0c;后续可能会继续修改和完善。 因为是个人复习&#xff0c;会有部分压缩和省略。 一、默认成员函数 当类里面成员函数什么都不写的时候&#xff0c;编译器会自动…

AutoSAR系列讲解(实践篇)10.3-BswM配置

目录 一、ECU State Handing(ESH) 二、Module Initialization 三、Communication Control 说起BswM的配置,其实博主问过很多朋友了,大家基本都只用自动配置;很少有用到手动配置的时候,对于刚刚入门的大家来说,掌握自动配置基 本也就足够了。 一、ECU State Handing(…

【雕爷学编程】MicroPython动手做(12)——掌控板之Hello World

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

内网隧道代理技术(十四)之 Earthworm的使用(一级代理)

Earthworm的使用(一级代理) ew 全称是EarchWorm,是一套轻量便携且功能强大的网络穿透工具,基于标准C开发,具有socks5代理、端口转发和端口映射三大功能,可在复杂网络环境下完成网络穿透,且支持全平台(Windows/Linux/Mac)。该工具能够以“正向”、“反向”、“多级级联”…

谷粒商城第七天-商品服务之分类管理下的删除、新增以及修改商品分类

目录 一、总述 1.1 前端思路 1.2 后端思路 二、前端部分 2.1 删除功能 2.2 新增功能 2.3 修改功能 三、后端部分 3.1 删除接口 3.2 新增接口 3.3 修改接口 四、总结 一、总述 1.1 前端思路 删除和新增以及修改的前端无非就是点击按钮&#xff0c;就向后端发送请求…