目录
1.环境搭建
2.命名服务器和业务服务器的启动
3.名词说明
4.执行步骤
5.示例
1.导入依赖
2.配置(至少指定下面两个)
3.代码
6.常见问题
1.环境搭建
解压缩进行安装,默认服务端口:9876
环境变量的配置
2.命名服务器和业务服务器的启动
//命名服务器的启动,默认端口:9876
cd bin的目录下
mqnamesrv.cmd
//业务服务器的启动
win+r,进入命令输入页面
//设置命名服务器的地址
set NAMESRV_ADDR 127.0.0.1:9876
mqbroker.cmd
//可能会遇到的问题
业务服务器启动报找不到或无法加载主类 Files\Java\jdk1.8.0_291\lib\dt.jar;C:\Program
解决方法:
1.启动mqbroke.cmd报错的时候,需要改一下runserver.cmd, runbroker.cmd这两个文件里面的这个地方:set CLASSPATH=.;%BASE_DIR%conf;"%CLASSPATH%",这个地方的%CLASSPATH%外面要添加上半角双引号。而set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%" 这里面的%CLASSPATH%的头尾不能再添加半角双引号
2.jdk是安装在Program Files目录下的,问题就出在这里。卸载JDK,重新安装到其他没有空格的目录
启动成功的结果如下
命名服务器:
业务服务器:
3.名词说明
- Producer:消息的发送者;
- Consumer:消息接收者;
- Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
- Broker:暂存和传输消息;
- NameServer:管理 Broker;
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
- Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息
- Tag可以看作子主题,它是消息的第二级类型。同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag来标识
4.执行步骤
消息发送者步骤分析:
- 创建消息生产者 producer,并指定生产者组名
- 指定 Nameserver 地址
- 启动 producer
- 创建消息对象,指定主题 Topic、Tag 和消息体发送消息
- 关闭生产者 producer
消息消费者步骤分析:
- 创建消费者 Consumer,制定消费者组名
- 指定 Nameserver 地址
- 订阅主题 Topic 和 Tag
- 设置回调函数,处理消息
- 启动消费者 consumer
整体流程:
- NameServer 先启动
- Broker 启动时向 NameServer 注册
- 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
- NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除。
- 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定
5.示例
1.导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2.配置(至少指定下面两个)
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: rocket-order
3.代码
服务层接口
package com.example.demo.rocketmq;
public interface MessageService {
//发送消息
public void sendMessage(String msg);
//接收消息
public String doMessage();
}
服务层实现
package com.example.demo.rocketmq.impl;
import com.example.demo.rocketmq.MessageService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author linaibo
* @version 1.0
* Create by 2022/12/17 13:40
*/
@Service
public class MessageRocketServiceImpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String msg) {
String mdg = msg + "88888";
System.out.println("rocketmq消息开始发送" + msg);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败" + throwable.getMessage());
}
};
rocketMQTemplate.asyncSend("order:tag", msg, callback);
// rocketMQTemplate.asyncSend("order:tag2", mdg, callback);
}
@Override
public String doMessage() {
return null;
}
}
使用异步发送方式, RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可。
监听器
package com.example.demo.rocketmq.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* @author linaibo
* @version 1.0
* Create by 2022/12/17 13:50
*/
@Component
@RocketMQMessageListener(consumerGroup = "rocket-order", topic = "order",selectorType = SelectorType.TAG,selectorExpression = "tag")
public class RocketmqListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到发送的消息" + s);
}
}
通过selectorType属性指定消费的选择类型为Tag,这个类型也是selectorType属性的默认值
通过selectorExpression属性来选择消费的Tag。默认是"*",即会消费该topic下所有的Tag的消息。
需要注意的是:
1.如果我们的消费者指定了消费的Tag后,发送的消息如果不带tag,将会消费不到;
2.如果我们的生产者指定了Tag,但是消费者的selectorExpression没有设置,即用默认的“*”,那么这个消费者也会消费到。
6.常见问题
官方的常见问题解答地址
常见问题解答 | RocketMQ
1.rocket报错No route info of this topic: order(也可能是其他原因,这里的原因是其中的一种)
原因:命名服务器正常启动成功,业务服务器没有正常启动导致
2.在同一个group下,相同主题不同tag的消息,设置两个监听器来监听不同的tag时,两种消息都可以正常发送,但是只有一个tag的消息背消费,另一个消费不了
原因:RocketMq消费者如果针对同一个topic不同的tag配置了相同的group,会导致消息消费混乱。针对不同的tag配置不同的group即可。比如说在两个项目中,配置文件中定义不同的生产者group,两个系统发送相同topic,不同tag的消息,然后对不同group,相同topic,不同的tag进行监听处理
linux版的可参照下面的博客
RocketMQ详细配置与使用_一名小码农的博客-CSDN博客_rocketmq