RocketMQ安装部署+简单实战开发

news2025/1/10 1:31:46

文章目录

    • 1.简介、安装部署
    • 2.Springboot集成RocketMQ
      • 2.1.添加maven依赖:
      • 2.2.RocketMQ配置
        • 生产者配置
        • 消费者配置
      • 2.3.生产者(发送消息)
      • 2.4.消费者(接收消息)
    • 3.实战结果
      • 3.1.消费者服务
      • 3.2.生产者服务
      • 3.3.运行日志
        • 生产日志
        • 消费日志

1.简介、安装部署

此部分不做赘述,可从官网查阅

说明文档 https://rocketmq.apache.org/zh/docs/4.x/

下载地址 https://rocketmq.apache.org/zh/download (建议下载二进制包)

安装和启动教程 https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart

其实官网讲解已经很详细了,所以这里只是详细描述一下启动和停止命令

### 启动namesrv, <mqnamesrv.path> 表示安装好的二进制包中bin目录下的 mqnamesrv 文件地址,使用命令时替换为实际地址即可
$ nohup sh <mqnamesrv.path> &

### 先启动broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqbroker 文件地址
### -n NameServer地址
$ nohup sh <mqbroker.path> -n localhost:9876 &

### 关闭broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqshutdown 文件地址
sh <mqshutdown.path> broker

### 关闭namesrv
sh <mqshutdown.path> namesrv

2.Springboot集成RocketMQ

点击前往 官方案例

2.1.添加maven依赖:

<!--在pom.xml中添加依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${RELEASE.VERSION}</version>
</dependency>

2.2.RocketMQ配置

生产者配置
# rocketmq配置
rocketmq:
  # NameServer 服务器地址
  name-server: localhost:9876
  # 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer
  producer:
    # 发送同一类消息的生产者设置为同一个group,保证唯一
    group: my_producer_group
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 3000
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
    # access-key
    #accessKey: xxx
    # secret-key
    #secretKey: xxx
    # 是否启用消息跟踪,默认false
    enableMsgTrace: false
    # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
    customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
消费者配置
# rocketmq配置
rocketmq:
  # NameServer 服务器地址
  name-server: localhost:9876
  #Push模式, 对应name为rocketMQTemplate的RocketMQTemplate
  # 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer
  consumer:
    # 配置指定group是否启动监听器 group.topic = false
    listeners:
      # key:group名称。 value:{key: topic名称: value: true/false}
      my_consumer_group:
        mytopic: true

2.3.生产者(发送消息)

通过 RocketMQTemplate 模版类,我们进行了二次封装,构建了一个统一的消息发送工具类 MessageHelper,目的在于核心代码抽离实现解耦合。哪怕后续如果更换了新的消息中间件,只需统一更改该工具类的结构即可。

重点方法 selectSendWay 根据处理好的 destination 和消息方式方式 way 选择 RocketMQTemplate 中对应的方法版本。

/**
 * @Name: MessageHelper
 * @Description: 消息发送工具
 * @Author: ahao
 * @Date: 2024/4/12 7:22 PM
 */
@Component
public class MessageHelper {

    /**
     * 默认标签
     */
    public static final String DEFAULT_TAG = "none";

    public static final int SYNCHRONOUSLY = 1;
    public static final int ASYNCHRONOUSLY = 2;
    public static final int ORDERLY_SYNCHRONOUSLY = 3;
    public static final int ORDERLY_ASYNCHRONOUSLY = 4;


    /**
     * 导入RocketMQ模版工具
     */
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 同步发送消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     */
    public MsgSendResult send(@NotNull String destination, Object msg) {
        return selectSendWay(destination, msg, null, null, SYNCHRONOUSLY);
    }

    /**
     * 同步发送消息,会阻塞等待消息发送结果
     *
     * @param topic 主题
     * @param tag   标签
     * @param msg   消息内容
     * @return
     */
    public MsgSendResult send(@NotNull String topic, @Nullable String tag, Object msg) {
        return selectSendWay(topic, tag, msg, null, null, SYNCHRONOUSLY);
    }

    /**
     * 异步发送消息
     *
     * @param destination 消息发送目的地
     * @param msg         消息内容
     * @param callback    消息发送回调通知 {@link SendCallback}
     */
    public void asyncSend(@NotNull String destination, Object msg, SendCallback callback) {
        selectSendWay(destination, msg, callback, null, ASYNCHRONOUSLY);
    }

    /**
     * 异步发送消息
     *
     * @param topic    主题
     * @param tag      标签
     * @param msg      消息内容
     * @param callback 消息发送回调通知 {@link SendCallback}
     */
    public void asyncSend(@NotNull String topic, @Nullable String tag, Object msg, SendCallback callback) {
        selectSendWay(topic, tag, msg, callback, null, ASYNCHRONOUSLY);
    }

    /**
     * 同步发送顺序消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     * @param key         分区键关键字,相同key的消息发送到同一个队列中
     */
    public MsgSendResult sendOrderly(@NotNull String destination, Object msg, String key) {
        return selectSendWay(destination, msg, null, key, ORDERLY_SYNCHRONOUSLY);
    }

    /**
     * 异步发送顺序消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     * @param key         分区键关键字,相同key的消息发送到同一个队列中
     */
    public void asyncSendOrderly(@NotNull String destination, Object msg, String key, SendCallback callback) {
        selectSendWay(destination, msg, callback, key, ORDERLY_ASYNCHRONOUSLY);
    }

    private MsgSendResult selectSendWay(String destination, Object msg, SendCallback callback, String key, int way) {
        if (!destination.contains(":")) {
            destination = destination + ":" + DEFAULT_TAG;
        }
        if (way == SYNCHRONOUSLY) {
            return new MsgSendResult(rocketMQTemplate.syncSend(destination, msg));
        } else if (way == ASYNCHRONOUSLY) {
            rocketMQTemplate.asyncSend(destination, msg, callback);
        } else if (way == ORDERLY_SYNCHRONOUSLY) {
            return new MsgSendResult(rocketMQTemplate.syncSendOrderly(destination, msg, key));
        } else if (way == ORDERLY_ASYNCHRONOUSLY) {
            rocketMQTemplate.asyncSendOrderly(destination, msg, key, callback);
        }
        return null;
    }

    private MsgSendResult selectSendWay(String topic, String tag, Object msg, SendCallback callback, String key, int way) {
        if (topic.contains(":")) {
            // 这里其实也可以抛异常,提示topic不合法,包含':'
            int i = topic.indexOf(':');
            topic = topic.substring(0, i);
        }
        if (tag == null) {
            tag = DEFAULT_TAG;
        }
        String destination = topic + ":" + tag;
        return selectSendWay(destination, msg, callback, key, way);
    }

}

消息发送结果

// 内部实现可以自定义,这里偷懒,作者只是继承RocketMQ的SendResult
public class MsgSendResult extends SendResult {
    
    private SendResult result;
    
    public MsgSendResult(SendResult result){}
    
}

2.4.消费者(接收消息)

/**
 * @Name: ConsumerDemo
 * @Description: 消费样例
 * @Author: ahao
 * @Date: 2024/4/12 7:35 PM
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "my_topic",
        consumerGroup = "my_consumer_group",
        selectorType = SelectorType.TAG,
        // * 表示匹配所有
        selectorExpression = "*",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING
)
public class ConsumerDemo implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("接收到的消息:{}",message);
    }

}

@RocketMQMessageListener 相关属性解析:

  • topic:主题
  • consumerGroup:消费分组
  • selectorType:筛选方式
    • SelectorType.TAG:根据TAG选择。仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
    • SelectorType.SQL92:根据SQL92表达式选择。支持类似SQL的关键词语法 AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS,NULL
  • selectorExpression:筛选条件,与selectorType关联
  • consumeMode:消费模式
    • ConsumeMode.CONCURRENTLY:并行处理
    • ConsumeMode.ORDERLY:顺序处理
  • messageModel:消息模式
    • MessageModel.CLUSTERING:集群模式即负载均衡模式,每一个消息只会被某一个消费者消费一次
    • MessageModel.BROADCASTING:广播模式,每个消费者都会消费消息
  • consumeThreadMax:最大线程数
  • consumeTimeout:消息阻塞消费线程的最长时间(以分钟为单位)
  • enableMsgTrace:是否启用消息轨迹
  • customizedTraceTopic:自定义的消息轨迹主题
  • nameServer:命名服务器地址
  • accessKey:标识用户身份的字符串(access-key)
  • secretKey:密钥(secret-key)

3.实战结果

3.1.消费者服务

启动类

@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }


    @Override
    public void run(ApplicationArguments args) throws Exception {
        TimeUnit.SECONDS.sleep(10000);
    }

}

目录结构

在这里插入图片描述

3.2.生产者服务

启动类

@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {

    @Resource
    private MessageHelper messageHelper;

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        new Thread(() -> {
            SimpleDateFormat format = new SimpleDateFormat("hh:MM:ss");
            while (true){
                String curDate = format.format(new Date());
                try {
                    log.info("发送消息:{}",curDate);
                    messageHelper.send("my_topic", curDate);
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        },"producer_test").start();
        TimeUnit.SECONDS.sleep(10000);
    }
    
}

目录结构

在这里插入图片描述

3.3.运行日志

生产日志

在这里插入图片描述

消费日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1591368.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【SpringBoot】-- 使用minio对象存储服务实现上传图片

目录 一、安装minio 拉取镜像 启动 查看 进入登录页面 创建bucket 二、安装miniomc 三、代码 application.yml MinioUtil Controller 四、拓展 以下基于云服务和docker使用minio服务 一、安装minio Minio 是一个开源的对象存储服务器。它允许用户在私有云环境中建…

【vue】watch 侦听器

watch&#xff1a;可监听值的变化&#xff0c;旧值和新值 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

蓝桥杯嵌入式(G431)备赛——最后一晚查漏补缺

蓝桥杯嵌入式&#xff08;G431&#xff09;备赛笔记——初始化cubeMX 蓝桥杯嵌入式&#xff08;G431&#xff09;备赛笔记——LED 蓝桥杯嵌入式&#xff08;G431&#xff09;备赛笔记——按键模块设计 蓝桥杯嵌入式&#xff08;G431&#xff09;备赛笔记——LCD按键 蓝桥杯…

2023年度编程语言将花落谁家

2023年度编程语言将花落谁家 TIOBE的预测你预测年度最受欢迎的编程语言会是什么&#xff1f;TIOBE 认为 C# 最有可能成为年度编程语言&#xff0c;你同意吗&#xff1f;为什么&#xff1f;AI时代已经到来&#xff0c;你有学习新语言的打算吗&#xff1f; 以下是来自年度编程语言…

Android 纵向双选日历

这个日历的布局分两部分&#xff0c;一部分是显示星期几的LinearLayout&#xff0c;另外就是一个RecyclerView&#xff0c;负责纵向滚动了。 工具类&#xff1a; implementation com.blankj:utilcode:1.17.3上activity_calendar代码&#xff1a; <?xml version"1.0&…

Springboot+Vue项目-基于Java+MySQL的在线视频教育平台系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

中仕公考:非师范生能考教师编吗?

非师范生能考教师编吗?关于这个问题&#xff0c;首要条件是必须持有教师资格证。只要符合招聘条件&#xff0c;非师范专业背景的考生同样有资格报名参加教师编制考试。具体要求如下&#xff1a; 1. 年龄要求&#xff1a;申请人需年满35岁以下&#xff0c;对于特定职位&#x…

Elastic 线下 Meetup 将于 2024 年 4 月 27 号在重庆举办

2024 Elastic Meetup 重庆站活动&#xff0c;由 Elastic、新智锦绣联合举办&#xff0c;现诚邀广大技术爱好者及开发者参加。 活动时间 2024年4月27日 13:30-18:00 活动地点 中国重庆 沙坪坝区学城大道62-1号研发楼一期b3栋1楼(瑞幸咖啡旁&#xff09; 活动流程 14:00-14:50…

多因子模型的数据处理

优质博文&#xff1a;IT-BLOG-CN 数据处理的基本目的是从多量的、可能是杂乱无章的、难以理解的数据中抽取并推导出有价值、有意义的数据。特别是金融数据&#xff0c;存在数据缺失&#xff0c;不完整以及极端异常值等问题&#xff0c;对于我们的分析和建模影响很多。 对于我…

【新版HI3559AV100开发注意事项(四)】

新版HI3559AV100开发注意事项&#xff08;四&#xff09; 三十、HI3559A参数中对输入分辨率限制的原因是&#xff1f; 答&#xff1a;分辨率限制有两个来源&#xff1a; 一个是时钟频率最高为600M&#xff0c;开启一拍两像素之后相当于1200M。你这个数据量太大了&#xff0c;6…

(学习日记)2024.04.17:UCOSIII第四十五节:中断管理

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

微信小程序开发学习笔记——4.9【小案例】开启下拉刷新页面enablePullDownRefresh

>>跟着b站up主“咸虾米_”学习微信小程序开发中&#xff0c;把学习记录存到这方便后续查找。 课程连接&#xff1a;4.9.【小案例】开启下拉刷新页面enablePullDownRefresh_哔哩哔哩_bilibili 一、api2.json 小程序配置 / 页面配置 (qq.com) {"usingComponents&q…

抖音小店是什么?个人店、个体店、企业店,新手商家该如何选择?

大家好&#xff0c;我是电商花花。 抖音小店这两年来说都是一个发展不错的电商项目&#xff0c;凭借着直播电商的快速发展&#xff0c;让更多人看到了抖音小店其中的红利&#xff0c;吸引着商家入驻。 抖音小店是什么&#xff1f; 很多人会把抖音小店和达人橱窗搞混&#xff…

Harmony鸿蒙南向驱动开发-Regulator接口使用

功能简介 Regulator模块用于控制系统中某些设备的电压/电流供应。在嵌入式系统&#xff08;尤其是手机&#xff09;中&#xff0c;控制耗电量很重要&#xff0c;直接影响到电池的续航时间。所以&#xff0c;如果系统中某一个模块暂时不需要使用&#xff0c;就可以通过Regulato…

LC 515.在每个树行中找最大值

515. 在每个树行中找最大值 给定一棵二叉树的根节点 root &#xff0c;请找出该二叉树中每一层的最大值。 示例1&#xff1a; 输入: root [1,3,2,5,3,null,9] 输出: [1,3,9] 示例2&#xff1a; 输入: root [1,2,3] 输出: [1,3] 提示&#xff1a; 二叉树的节点个数的范围是…

ubuntu下NTFS分区无法访问挂载-解决办法!

Ubuntu系统下&#xff0c;有的时候发现&#xff0c;挂载的NTFS文件系统硬盘无法访问。点击弹出类似问题&#xff1a; Error mounting /dev/sda1 at /media/root/新加卷: Command-line mount -t "ntfs" -o "uhelperudisks2,nodev,nosuid,uid0,gid0" "/…

LeetCode-416. 分割等和子集【数组 动态规划】

LeetCode-416. 分割等和子集【数组 动态规划】 题目描述&#xff1a;解题思路一&#xff1a;01背包问题&#xff0c;动规五部曲解题思路二&#xff1a;0解题思路三&#xff1a;0 题目描述&#xff1a; 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分…

云计算:OVS 集群 使用 Geneve 流表

目录 一、实验 1.环境 2.OVS 集群 使用 Geneve 流表 二、问题 1.VXLAN与Geneve区别 一、实验 1.环境 (1) 主机 表1 宿主机 主机架构软件IP网卡备注ovs_controller控制端 karaf 0.7.3 192.168.204.63 1个NAT网卡 &#xff08;204网段&#xff09; 已部署ovs_server01服务…

【无标题】nodejs+mogoodb数据库写注册接口

描述 本篇文章主要记录使用nodejs express搭建服务器&#xff0c;并链接mogoodb数据来书写简单的后台接口&#xff1b;前端项目使用的vue2的一个酒店管理项目。阅读本文章&#xff0c;可以了解如何连接mogoodb数据库&#xff0c;和一些对数据库进行操作的命令。前端如何进行跨…

2024年Mathorcup数学建模竞赛A题思路

可以关注下&#xff0c;代码已出 【金山文档 | WPS云文档】 2024年Mathorcup思路代码在线文档https://link.zhihu.com/?targethttps%3A//kdocs.cn/l/cdlol5FlRAdE 整体来说这个题就两个步骤&#xff0c;第一是训练一个响应面模型&#xff0c;输入是附表1邻区的PCI值&#xff0…