Springboot集成rocketmq快速入门demo

news2025/1/12 1:41:20

一、rocketmq介绍

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

二、rocketmq环境搭建

采用docker-compose搭建,具体配置如下

 
 
version: '3'
services:
  # rocket mq name server
  rmqnamesrv:
    image: apache/rocketmq:4.9.6
    restart: always
    container_name: rocket-server
      # environment:
      #   JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m"
      # volumes:
      # 映射本地目录权限一定要设置为 777 权限,否则启动不成功
    # - ../volumes/data/rocket/server/logs:/home/rocketmq/logs
    networks:
      - rocketmq
    ports:
      - 9876:9876
    command: sh mqnamesrv
  # rocket mq broker
  rmqbroker:
    image: apache/rocketmq:4.9.6
    restart: always
    container_name: rocket-broker
    volumes:
      # 映射本地目录权限一定要设置为 777 权限,否则启动不成功
      # - ../volumes/data/rocket/broker/logs:/home/rocketmq/logs
      # - ../volumes/data/rocket/broker/store:/home/rocketmq/store
      - ./config/broker.conf:/opt/rocketmq-4.9.6/conf/broker.conf
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
      # - JAVA_OPTS:=-Duser.home=/opt
      - JAVA_OPT_EXT=-server -Xms64m -Xmx64m -Xmn64m
    depends_on:
      - rmqnamesrv
    networks:
      - rocketmq
    ports:
      - 10909:10909
      - 10911:10911
    command: sh mqbroker -c /opt/rocketmq-4.9.6/conf/broker.conf
  # rocket console 这个可以不需要
  rmqdashboard:
    image: apacherocketmq/rocketmq-dashboard:1.0.0
    restart: always
    container_name: rocket-dashboard
    environment:
      - JAVA_OPTS=-Drocketmq.config.namesrvAddr=rmqnamesrv:9876 -Dserver.port=8180 -Drocketmq.config.isVIPChannel=false
      # - JAVA_OPT_EXT=-Xms128m -Xmx128m -Xmn128m
    depends_on:
      - rmqnamesrv
    networks:
      - rocketmq
    ports:
      - 8180:8180


networks:
  rocketmq:
    driver: bridge
  stack:
    driver: bridge

运行docker-compose

 
 
docker-compose -f docker-compose-rocketmq.yml -p rocketmq up -d
注:修改 xx/rocketmq/rocketmq_broker/conf/broker.conf中配置brokerIP1为宿主机IP

访问地址:http://ip地址:8180f7429e9ea14f7441c79b51fbcbb20cc7.png 

三、代码工程

pom.xml

 
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>rocketmq</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <scope>test</scope>
        </dependency>


    </dependencies>


</project>

entity

 
 
package com.et59.rocketmq.entity;


/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName Person
 * @Description todo
 * @date 2023年12月29日 9:52
 */


public class Person {
    private String name;
    private int age;


    public String getName() {
        return name;
    }


    public void setName(String name) {
        this.name = name;
    }


    public int getAge() {
        return age;
    }


    public void setAge(int age) {
        this.age = age;
    }
}

listener

 
 
package com.et59.rocketmq.listener;


import com.et59.rocketmq.entity.Person;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
 * @Author: heyuhua
 * @Date: 2021/1/8 15:55
 */
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "PERSON_ADD")
public class PersonMqListener implements RocketMQListener<Person> {
    @Override
    public void onMessage(Person person) {
        System.out.println("接收到消息,开始消费..name:" + person.getName() + ",age:" + person.getAge());
    }
}

util

 
 
package com.et59.rocketmq.util;


import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;


/**
 * RocketMq助手
 *
 * @Author: heyuhua
 * @Date: 2020/1/8 10:03
 */
@Component
public class RocketMqHelper {


    /**
     * 日志
     */
    private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);


    /**
     * rocketmq模板注入
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @PostConstruct
    public void init() {
        LOG.info("---RocketMq助手初始化---");
    }


    /**
     * 发送异步消息
     *
     * @param topic   消息Topic
     * @param message 消息实体
     */
    public void asyncSend(Enum topic, Message<?> message) {
        asyncSend(topic.name(), message, getDefaultSendCallBack());
    }




    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     */
    public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {
        asyncSend(topic.name(), message, sendCallback);
    }


    /**
     * 发送异步消息
     *
     * @param topic   消息Topic
     * @param message 消息实体
     */
    public void asyncSend(String topic, Message<?> message) {
        rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());
    }


    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback);
    }


    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }


    /**
     * 发送异步消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }


    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     */
    public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {
        syncSendOrderly(topic.name(), message, hashKey);
    }




    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     */
    public void syncSendOrderly(String topic, Message<?> message, String hashKey) {
        LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }


    /**
     * 发送顺序消息
     *
     * @param message
     * @param topic
     * @param hashKey
     * @param timeout
     */
    public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {
        LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }


    /**
     * 默认CallBack函数
     *
     * @return
     */
    private SendCallback getDefaultSendCallBack() {
        return new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                LOG.info("---发送MQ成功---");
            }


            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                LOG.error("---发送MQ失败---"+throwable.getMessage(), throwable.getMessage());
            }
        };
    }




    @PreDestroy
    public void destroy() {
        LOG.info("---RocketMq助手注销---");
    }


}

配置文件

 
 
server:
  port: 8088
#rocketmq配置
rocketmq:
  name-server: 10.11.68.77:9876
  # 生产者配置
  producer:
    isOnOff: on
    # 发送同一类消息的设置为同一个group,保证唯一
    group: hyh-rocketmq-group
    groupName: hyh-rocketmq-group
    # 服务地址
    namesrvAddr: 10.11.68.77:9876
    # 消息最大长度 默认1024*4(4M)
    maxMessageSize: 4096
    # 发送消息超时时间,默认3000
    sendMsgTimeout: 3000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2

启动类

 
 
package com.et59.rocketmq;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class DemoApplication {


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

code repository

  • https://github.com/Harries/springboot-demo

四、测试

 
 
package com.et59.rocketmq;




import com.et59.rocketmq.entity.Person;
import com.et59.rocketmq.util.RocketMqHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTests {
   @Autowired
   private RocketMqHelper rocketMqHelper;


   @Test
   public void testProducter() throws InterruptedException {
      for(int i=0;i<1000000;i++){
         Person person = new Person();
         person.setName("heyuhua");
         person.setAge(25);
         rocketMqHelper.asyncSend("PERSON_ADD", MessageBuilder.withPayload(person).build());
         Thread.sleep(1000);
      }
   }


}

运行结果

 
 
2024-02-02 15:20:27.101 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:28.116 INFO 11188 --- [ublicExecutor_4] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:29.130 INFO 11188 --- [ublicExecutor_5] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:30.131 INFO 11188 --- [ublicExecutor_7] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:31.142 INFO 11188 --- [ublicExecutor_8] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:32.156 INFO 11188 --- [ublicExecutor_9] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:33.167 INFO 11188 --- [blicExecutor_10] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
2024-02-02 15:20:34.182 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25

五、引用

  •  https://rocketmq.apache.org/

  •  http://www.liuhaihua.cn/archives/710205.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PyTorch 2.2 中文官方教程(十九)

使用 RPC 进行分布式管道并行 原文&#xff1a;pytorch.org/tutorials/intermediate/dist_pipeline_parallel_tutorial.html 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Shen Li 注意 在github中查看并编辑本教程。 先决条件&#xff1a; PyTorc…

JVM 性能调优 - 四种引用(4)

为什么会有四种引用 我们先回顾下在 Java 虚拟机内存体系(1) 中提到了的垃圾回收算法 1、引用计数法 原理:给对象添加一个引用计数器,每当有一个地方引用它,计数器的值就加一。每当有一个引用失效,计数器的值就减一。当计数器值为零时,这个对象被认为没有其他对象引用,…

专业排版设计软件:QuarkXPress 2024 for mac中文激活版

QuarkXPress 2024 for Mac是一款功能强大、易于使用、高质量输出的专业排版软件。无论您是出版业的专家还是初学者&#xff0c;都可以通过QuarkXPress 2024轻松创建出令人惊叹的出版物。 软件下载&#xff1a;QuarkXPress 2024 for mac中文激活版下载 QuarkXPress 2023 for Mac…

mvn常见报错:Failed to read artifact descriptor for 解决

问题&#xff1a; mvn打包时报错&#xff1a;Failed to read artifact descriptor for 产生原因&#xff1a; 项目打包时所需的依赖包不存在本地仓库&#xff0c;或本地仓库文件存在问题。 解决方法&#xff1a; 检查仓库可用性&#xff1a; 确保在Maven设置或pom.xml中指定…

居然在Web上就可以体验下MacOS啦?

发现一款宝藏项目 MacOS &#xff0c;在Web上打造一款原汁原味的 MacOS系统&#xff0c;不同于以外的仿操作系统的web应用&#xff0c;该应用底层基于 HTML5的 FileSystem 和 IndexedDB 构建了文件系统&#xff0c;理论上可以基于这套系统实现任何的上层应用。作者还制定了可以…

nacos越权漏洞复现

1.低版本(nacos<1.4.1)默认白名单UA 开启鉴权功能后&#xff0c;服务端之间的请求也会通过鉴权系统的影响。考虑到服务端之间的通信应该是可信的&#xff0c;因此在1.2~1.4.0版本期间&#xff0c;通过User-Agent中是否包含Nacos-Server来进行判断请求是否来自其他服务端。 但…

程序员为什么不喜欢关电脑?

目录 标题&#xff1a;程序员为何乐见电脑长时间处于关闭状态&#xff1f; 引言&#xff1a; 一、思维的延续性&#xff1a; 二、环境的连续性&#xff1a; 三、长时间开机的原因&#xff1a; 四、恢复成本的考量&#xff1a; 结论&#xff1a; 特别的&#xff1a; 不是…

Vue3快速上手(二)VSCode官方推荐插件安装及配置

一、VSCode官方插件安装&#xff0c;如下图2款插件 在用vite创建的程序里&#xff0c;提示提安装推荐的插件了&#xff0c;如下图&#xff1a; 二、配置 在设置-扩展里找到Volar插件&#xff0c;将Dot Value勾选上。这样在ref()修改变量时&#xff0c;会自动填充.value,无需…

电力负荷预测 | 基于ARIMA的电力负荷预测(Python)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 电力负荷预测 | 基于ARIMA的电力负荷预测(Python) 源码设计 #------------

第6节、T型加减速转动【51单片机+L298N步进电机系列教程】

↑↑↑点击上方【目录】&#xff0c;查看本系列全部文章 摘要&#xff1a;本章介绍步进电机T型加减速的控制方法&#xff0c;分三个小节&#xff0c;本小节主要内容为该控制方法的推导与计算。目前各平台对该控制方法介绍的文章目前较多&#xff0c;但部分关键参数并未给出推导…

图解GPT2

GPT是一种Transformer Decoder架构&#xff0c;Decoder通过自回归方式生成下一个词&#xff0c;所以擅长文本生成任务。 本文将图解GPT2&#xff0c;本系列还有图解Tokenization&#xff0c;Word2Vec&#xff0c;Transformer&#xff0c;Bert。 这篇文章主要来自下面这篇博客。…

【C++】内存管理深入解析

目录 1. 内存的五大区域1.1 栈区&#xff08;Stack&#xff09;1.2 堆区&#xff08;Heap&#xff09;1.3 全局/静态存储区1.4 常量存储区1.5 代码区 2. 回顾c语言的动态内存管理2.1 malloc/calloc/realloc2.2 free 3. C中的新旧对话3.1 new3.2 delete 4. new/delete的实现原理…

IDEA生成可执行jar包

1. 进入需要打包的项目&#xff0c;选择 最上方菜单栏的 File → Project Structure 2. 选择 左侧菜单栏 Artifacts → 加号 → JAR → from modules with dependencies 3. 选择入口类 Main Class&#xff08;点击文件夹图标可以快速选择&#xff09;&#xff0c;点击 OK&#…

(四)elasticsearch 源码之索引流程分析

https://www.cnblogs.com/darcy-yuan/p/17024341.html 1.概览 前面我们讨论了es是如何启动&#xff0c;本文研究下es是如何索引文档的。 下面是启动流程图&#xff0c;我们按照流程图的顺序依次描述。 其中主要类的关系如下: 2. 索引流程 (primary) 我们用postman发送请求&…

【漏洞复现】EPON上行A8-C政企网关未授权下载漏洞

Nx01 产品简介 EPON上行A8-C政企网关是一款终端产品&#xff0c;提供企业网络解决方案。 Nx02 漏洞描述 EPON上行A8-C政企网关配置文件未授权下载漏洞&#xff0c;攻击者在未授权状态下下载配置文件&#xff0c;获取配置文件内敏感信息。 Nx03 产品主页 fofa-query: "Z…

龙宝宝头像怎么做?分享3个AI生成工具!

龙宝宝头像怎么做&#xff1f;分享3个AI生成工具&#xff01; 在这个数字化时代&#xff0c;社交媒体已成为我们展示个性和表达自我的重要舞台。龙宝宝头像不仅有传统的中国龙形象&#xff0c;还有各种创意十足的卡通头像。然而&#xff0c;设计一个理想的头像并非易事&#x…

【面试】零基础自学Java路线及课程推荐,适用各阶段人群

目前各个平台上都有着各种各样的教程&#xff0c;给我们带来了更多选择的同时&#xff0c;也为新手小白带来了一丢丢的麻烦&#xff1a;如何系统学习Java?选择什么样的方式学习&#xff1f;本篇就来解答这些问题。 一 学习方式的选择 学习计算机的主流方式主要是视频课程、书…

22.HarmonyOS App(JAVA)位置布局PositionLayout使用方法

不常用 在PositionLayout中&#xff0c;子组件通过指定准确的x/y坐标值在屏幕上显示。(0, 0)为左上角&#xff1b;当向下或向右移动时&#xff0c;坐标值变大&#xff1b;允许组件之间互相重叠 布局方式 PositionLayout以坐标的形式控制组件的显示位置&#xff0c;允许组件相…

小白买新电脑保姆验机教程

目录 前言&#xff1a; 正文&#xff1a; 7&#xff0c;查看硬件信息 8&#xff0c;检查硬盘 9&#xff0c;检查屏幕 10&#xff0c;烤机检测 总结&#xff1a; 前言&#xff1a; 最近因为学习的需求&#xff0c;购入了一台新的笔记本&#xff0c;正巧趁这次给身边的朋友…

《动手学深度学习(PyTorch版)》笔记6.2

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…