Spring boot整合rocketmq(windows)

news2025/1/12 9:49:40

目录

1.环境搭建

2.命名服务器和业务服务器的启动

3.名词说明

4.执行步骤

5.示例

1.导入依赖

2.配置(至少指定下面两个)

3.代码

6.常见问题


1.环境搭建

下载地址: https://rocketmq.apache.org /

 解压缩进行安装,默认服务端口:9876

环境变量的配置

ROCKETMQ_HOME
PATH
NAMESRV_ADDR 建议): 127.0.0.1:9876

2.命名服务器和业务服务器的启动

//命名服务器的启动,默认端口:9876
cd bin的目录下
mqnamesrv.cmd 
//业务服务器的启动
win+r,进入命令输入页面
//设置命名服务器的地址
set NAMESRV_ADDR 127.0.0.1:9876
mqbroker.cmd

//可能会遇到的问题
业务服务器启动报找不到或无法加载主类 Files\Java\jdk1.8.0_291\lib\dt.jar;C:\Program

解决方法:
1.启动mqbroke.cmd报错的时候,需要改一下runserver.cmd, runbroker.cmd这两个文件里面的这个地方:set CLASSPATH=.;%BASE_DIR%conf;"%CLASSPATH%",这个地方的%CLASSPATH%外面要添加上半角双引号。而set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%" 这里面的%CLASSPATH%的头尾不能再添加半角双引号
2.jdk是安装在Program Files目录下的,问题就出在这里。卸载JDK,重新安装到其他没有空格的目录

 启动成功的结果如下

命名服务器:

 业务服务器:

3.名词说明

  • Producer:消息的发送者;
  • Consumer:消息接收者;
  • Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
  • Broker:暂存和传输消息;
  • NameServer:管理 Broker;
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息
  • Tag可以看作子主题,它是消息的第二级类型。同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag来标识

4.执行步骤

消息发送者步骤分析:

  • 创建消息生产者 producer,并指定生产者组名
  • 指定 Nameserver 地址
  • 启动 producer
  • 创建消息对象,指定主题 Topic、Tag 和消息体发送消息
  • 关闭生产者 producer

消息消费者步骤分析:

  • 创建消费者 Consumer,制定消费者组名
  • 指定 Nameserver 地址
  • 订阅主题 Topic 和 Tag
  • 设置回调函数,处理消息
  • 启动消费者 consumer

整体流程:

  • NameServer 先启动
  • Broker 启动时向 NameServer 注册
  • 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
  • NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除。
  • 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定

5.示例

1.导入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

2.配置(至少指定下面两个)

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: rocket-order

3.代码

服务层接口

package com.example.demo.rocketmq;

public interface MessageService {

    //发送消息
    public void sendMessage(String msg);

    //接收消息
    public String doMessage();
}

服务层实现

package com.example.demo.rocketmq.impl;


import com.example.demo.rocketmq.MessageService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


/**
 * @author linaibo
 * @version 1.0
 * Create by 2022/12/17 13:40
 */
@Service
public class MessageRocketServiceImpl implements MessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String msg) {
        String mdg = msg + "88888";
        System.out.println("rocketmq消息开始发送" + msg);
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送失败" + throwable.getMessage());
            }
        };
        rocketMQTemplate.asyncSend("order:tag", msg, callback);
//        rocketMQTemplate.asyncSend("order:tag2", mdg, callback);
    }

    @Override
    public String doMessage() {
        return null;
    }
}

使用异步发送方式, RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可。

监听器

package com.example.demo.rocketmq.listener;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @author linaibo
 * @version 1.0
 * Create by 2022/12/17 13:50
 */
@Component
@RocketMQMessageListener(consumerGroup = "rocket-order", topic = "order",selectorType = SelectorType.TAG,selectorExpression = "tag")
public class RocketmqListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("接收到发送的消息" + s);

    }
}

通过selectorType属性指定消费的选择类型为Tag,这个类型也是selectorType属性的默认值

通过selectorExpression属性来选择消费的Tag。默认是"*",即会消费该topic下所有的Tag的消息。

需要注意的是:

1.如果我们的消费者指定了消费的Tag后,发送的消息如果不带tag,将会消费不到;

2.如果我们的生产者指定了Tag,但是消费者的selectorExpression没有设置,即用默认的“*”,那么这个消费者也会消费到。

6.常见问题

官方的常见问题解答地址

常见问题解答 | RocketMQ

1.rocket报错No route info of this topic: order(也可能是其他原因,这里的原因是其中的一种)

原因:命名服务器正常启动成功,业务服务器没有正常启动导致

2.在同一个group下,相同主题不同tag的消息,设置两个监听器来监听不同的tag时,两种消息都可以正常发送,但是只有一个tag的消息背消费,另一个消费不了

原因:RocketMq消费者如果针对同一个topic不同的tag配置了相同的group,会导致消息消费混乱。针对不同的tag配置不同的group即可。比如说在两个项目中,配置文件中定义不同的生产者group,两个系统发送相同topic,不同tag的消息,然后对不同group,相同topic,不同的tag进行监听处理

 linux版的可参照下面的博客

RocketMQ详细配置与使用_一名小码农的博客-CSDN博客_rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/98635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

执行 select ... for update 语句,如果查询条件没有索引字段的话,是加行锁还是加表锁?

大家好&#xff0c;我是小林。 昨天在群里看到大家在讨论一个 MySQL 锁的问题&#xff0c;就是执行 select … for update 语句&#xff0c;如果查询条件没有索引字段的话&#xff0c;是加「行锁」还是加「表锁」&#xff1f; 如果你做过这个实验的话&#xff0c;你会发现执行…

数据结构刷题训练营1

开启蓝桥杯备战计划&#xff0c;每日练习算法一题&#xff01;&#xff01;坚持下去&#xff0c;想必下一年的蓝桥杯将会有你&#xff01;&#xff01; 笔者是在力扣上面进行的刷题&#xff01;&#xff01;由于是第一次刷题&#xff01;找到的题目也不咋样&#xff01;所以&a…

SPRING-了解3-注解

IOC容器操作Bean 注解格式&#xff1a;注解名称(属性名称属性值,属性名称属性值) 放在类&#xff0c;方法&#xff0c;属性都可以 目的&#xff1a;简化XML配置 对象创建四大注解 1&#xff09;用的位置不是强制的 Component 最普通 Service 用在service层 Controlle…

接口测试(十)—— telnet和python代码测试dubbo接口

目录 一、传智健康项目介绍 1、项目描述 2、目标用户群体 3、项目模块 4、系统框架 二、Dubbo接口测试 1、RPC 2、Dubbo 3、查阅API文档 三、Telnet工具远程调用 1、启用telnet 2、telnet远程连接服务 3、telnet调用服务接口 四、python借助dubbo远程调用 1、安…

MySQL~JDBC

10、JDBC&#xff08;重点&#xff09; 10.1、数据库驱动 驱动&#xff1a;声卡、显卡、数据库 我们的程序会通过 数据库 驱动&#xff0c;和数据库打交道&#xff01; 10.2、JDBC SUN公司为了简化 开发人员的&#xff08;对数据库的统一&#xff09;操作&#xff0c;提供了…

剑指offer常见题 - 链表问题(一)

二叉树相关算法 链表相关知识点&#xff1a; 链表是一种物理存储单元上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 知识点一&#xff1a;链表由一系列结点&#xff08;链表中每一个元素称为结点&#xff09;组成&#xff0c;…

IDEA中如何使用Vim?看完本教程,让你用IDEA用到爽~(建议收藏)

目录 前言 Vim有什么特点&#xff1f; 为什么我要安利你在 IEAD 中使用Vim? Vim 一、环境配置 二、Vim的使用 2.1、方向键 hjkl 2.2、​编辑复制&粘贴 2.3、选择代码块并删除 2.4、块级删除 2.5、各种插入模式 2.5.1、以下是gif演示 2.6、jump&#xff08;解放鼠…

毕业设计 stm32智能电子秤系统 - 物联网 嵌入式 单片机

文章目录0 前言1 简介2 主要器件3 实现效果4 设计原理4.1 STM32F103C8T64.2 HX711压力传感器5 部分核心代码6 最后0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往往达不到毕业答辩的要求&…

【OpenCV】Ubuntu配置OpenCV环境

1.从官网下载opencv包拷贝到虚拟机Ubuntu中&#xff0c; 虚拟机与主机传输文件可以采用 vmware tool、共享文件夹或者远程连接工具 2.解压得到对应版本号文件夹&#xff0c;我的是opencv-3.4.2 3.修改文件权限chmod -R 777 opencv-3.4.2 从win10进入Ubuntu中的文件压缩包解…

2022年云南省—信息安全管理与评估赛项竞赛规程

2022年云南省职业院校技能大赛 信息安全管理与评估赛项竞赛规程 一、赛项名称 赛项编号&#xff1a;No.11 赛项名称&#xff1a;信息安全管理与评估 英语翻译&#xff1a;Information Security Management and Evaluation 赛项组别&#xff1a;高职组 赛项归属产业&a…

本周大新闻|John Carmack从Meta离职,OPPO发布双目AR一体机仅38g

本周大新闻&#xff0c;AR方面&#xff0c;微软已向客户承诺新款HoloLens&#xff1b;NASA成立Joint AR项目&#xff0c;计划在宇航服头盔中加入AR功能&#xff1b;OPPO Air Glass 2发布&#xff0c;双目光波导仅38g&#xff1b;Rokid开设全球首家品牌旗舰店&#xff1b;谷歌为…

【数据结构】二叉树的节点总个数、叶子节点个数、第K层节点个数、二叉树的深度

目录 1.结点总个数 1.1 局部静态变量法 思维 代码 不足之处 2.传指针法 程序代码 3.递归法 思想 程序代码 详细过程 2.叶子节点个数 思想 程序代码 3.第K层节点个数 思想 程序代码 4.二叉树深度 思想 程序代码 求二叉树节点总个数、叶子节点个数、第k层节点…

汀丶的创作纪念日

机缘 csdn的博龄5年了&#xff0c;但实际创作时间只有两年&#xff1b;第一次接触csdn主要是用来查找代码bug并收藏一些有价值博客&#xff0c;但渐渐地自己也就习惯把自己学到的知识和技术分享出来&#xff0c;一起共建。 主要是关于机器学习、强化学习、数据挖掘、强化学习以…

ADI Blackfin DSP处理器-BF533的开发详解62:DSP控制ADXL345三轴加速度传感器-贪食蛇游戏(含源码)

硬件准备 ADSP-EDU-BF533&#xff1a;BF533开发板 AD-HP530ICE&#xff1a;ADI DSP仿真器 软件准备 Visual DSP软件 硬件链接 MEMS三轴加速度传感器 我做了一个三轴加速度传感器的子卡&#xff0c;插在这个板子上&#xff0c;然后写了一些有意思的应用程序。 代码实现功能…

Bootstrap5 侧边栏导航(Offcanvas)

Bootstrap5 侧边栏侧边栏类似于模态框&#xff0c;在移动端设备中比较常用。 创建滑动导航 我们可以通过 JavaScript 来设置是否在 .offcanvas 类后面添加 .show 类&#xff0c;从而控制侧边栏的显示与隐藏&#xff1a; .offcanvas 隐藏内容 (默认).offcanvas.show 显示内容…

JVM之native关键字与PC寄存器

native关键字&#xff1a; native关键字主要用于修饰方法&#xff1a; 被native关键字修饰的方法叫做本地方法&#xff0c;一个native方法就是一个Java调用非Java代码的接口&#xff0c;该方法的实现由非Java语言实现&#xff0c;而是使用C或C等其他编程语言实现 native方法…

Compose 和 Android 传统View 互相调用

1. 前言 Compose 具有超强的兼容性&#xff0c;兼容现有的所有代码&#xff0c;Compose 能够与现有 View 体系并存&#xff0c;可实现渐进式替换。这就很有意义了&#xff0c;我们可以在现有项目中一小块一小块逐步地替换Compose&#xff0c;或者在旧项目中实现新的需求的时候…

设计模式之外观模式

Facade design pattern 外观模式的概念、外观模式的结构、外观模式的优缺点、外观模式的使用场景、外观模式的实现示例、外观模式的源码分析 1、外观模式的概念 外观模式&#xff0c;为多个复杂的子系统提供一个统一的接口&#xff0c;使得这些子系统更加容易被访问。在现有的…

【AI with ML】第 11 章 :对序列模型使用卷积和递归方法

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

SAP Gateway Foundation 里的 batch 操作

SAP Gateway Foundation (SAP_GWFND) 是一个在 SAP NetWeaver 中可用的软件组件。 SAP Gateway Foundation 提供开发和生成工具来为各种客户端开发工具创建 OData 服务。 简而言之&#xff0c;它在应用程序或 SAP Business Suite 数据与目标客户、平台和编程框架之间建立连接。…