本章节主要是学习RocketMQ, 目标快速入门, 能够回答或解决以下问题:
了解什么是RocketMQ
了解RocketMQ的核心概念
动手安装RocketMQ服务
快速入门,掌握RocketMQ的api使用
对producer、consumer进行详解
了解RocketMQ的存储特点
10.1 RocketMQ简介与安装
10.1.1 核心概念速通
RocketMQ是一个采用Java语言开发的一款高性能、高可靠、高实时的分布式消息中间件. 阿里开源,现在已成为 Apache 顶级孵化项目。
Rocketmq 解决哪些问题?
发布订阅, 消息队列最基本的功能.
消息优先级, 消息有序消费, 消息过滤, 消息持久化
支持分布式事务
看不懂? 没关系, 我也是从网上粘贴到, 你知道RocketMQ是一个分布式消息系统就行, 支持生产者消费者模式(发布订阅). 其他的暂时不用在意.
10.1.2 常见名词介绍
生产者消费者介绍
生产者消费者, 也就是发布订阅模型, 往往有两个角色参与, "生产者, 也就是发布方", "消费者,也就是订阅方", 举个例子, 你关注了一个公众号, 那么公众号就是生产者, Ta负责发布内容, 你就是消费者, 订阅内容.
现在你知道了生产者消费者模式, 那你应该可以理解MQ就是一个推送机制, 生产者将内容推送到消息队列中, 消费者从消息队列中取出内容. 当然实际情况远比我们上面举得例子复杂.
但是基本上都可以归类为两种, 一种是只要有一个消费者消费后队列中内容消失, 另一种是只有所有的消费者都消费后才从列表中移除, 比如:
一个小组共用一个待办列表, 任何一个代办只要有一个人完成了, 那就从代办列表消失.
多人订阅一个公众号, 也就是生产一份内容, 多个消费者订阅后消费这一份.
RocketMQ的结构图介绍
生产者消费者已经介绍过了, 接下来主要介绍下调度员和工人
nameServer cluster, 其实就是nameServer的集群.
nameServer 是RocketMQ的调度员, 当生产者想要发布内容的时候, 先对接调度员, 从而找到真正发布消息的工人,
生产者与nameServer和broker交互流程如下图所示:
然后MQ发布给所有订阅者的流程如下图:
nameServer与broker的关系, 大家懂得都懂, broker定时上报自己是可运行的(默认10s一次), 不然nameServer就认为他失效了(2mins未收到上报就认为他失效了)
其他你会用到的概念
Topic, 也就是一个主题, 生产者和消费者往往是多对多的关系, 就需要Topic进行中继连接, 就像百度贴吧或者微博超话, 生产者和消费者因为topic而聚集在一起.
Message, 这个简单, 就是一条消息, 里面还需要包含一个Topic的指向. 包含一个nameServer的指向.
10.1.3 使用docker安装.
我们使用docker安装体验下方便快捷的方式.
#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
docker pull foxiswho/rocketmq:broker-4.3.2
#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver/logs:/opt/logs \
-v /haoke/rmq/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#创建broker容器
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker/logs:/opt/logs \
-v /haoke/rmq/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver rmqbroker
#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver
如果你得/etc/rocketmq/broker.conf不存在或者是一个文件夹的话, 那么使用这个内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
brokerIP1={替换成你的公网ip}
namesrvAddr={替换成你的公网ip}:9876
brokerName=broker_haoke
enablePropertyFilter=true
如果你使用的是云服务, 那么需要放开以下三个端口, 不然本地会无法连接MQ.
9876
10911
10909
10.1.4 使用java连接测试
第一步,创建itcast-rocketmq工程
第二步,导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>itcast-rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test-group");
producer.setNamesrvAddr("{替换成你的公网ip}:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest11", "TagA",
("Hello RocketMQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
运行测试,效果如下
10.1.5 使用UI管理
#拉取镜像
docker pull styletang/rocketmq-console-ng:1.0.0
#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.16.55.185:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0
然后打开
10.2 快速入门
10.2.1创建topic
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class TopicDemo {
public static void main(String[] args) throws MQClientException {
// 创建一个生产者
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
// 生产者需要连接调度员
producer.setNamesrvAddr("{ip}:9876");
producer.start();
// 生产者去创建一个topic (一般线上topic都需要审批才能创建)
// broker_haoke是连接的broker的名字, 需要和你配置的broker名字一致
// haoke_im_topic 是创建的topic的名字
producer.createTopic("broker_haoke", "haoke_im_topic", 8);
System.out.println("创建topic成功");
producer.shutdown();
}
}
运行,如果没有报错就ok.
10.2.2 生产者发消息(同步)
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 同步发送消息
*/
public class SyncProducer2 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("{你的服务器公网ip}:9876");
producer.start();
// 构建消息内容
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
// 这里会同步等待到服务器的响应
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
producer.shutdown();
}
}
/*
打印结果:
消息状态:SEND_OK
消息id:AC1037A0307418B4AAC2374062400000
消息queue:MessageQueue [topic=haoke_im_topic, brokerName=broker_haoke_im, queueId=6]
消息offset:0
*/
简单介绍下Message对象
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
// 每条message必须指明自己所在的topic
private String topic;
// MQ为应用扩展的字段, 完全由应有自己决定
private int flag;
// 发送消息可以携带一些参数, 以便于自定义化, 当然里面也有一些系统推荐用的变量, 如tags, 这个就是MQ用来实现消息过滤需求的
//
private Map<String, String> properties;
// 消息内容, 需要转为字节数组传入
private byte[] body;
// RocketMQ支持分布式事务, 暂时不用管这个字段, 等学到对应内容
private String transactionId;
// 省略所有方法
}
10.2.3 生产者发消息(异步版)
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("{你的服务器公网ip}:9876");
// 发送失败的重试次数, 默认为2
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 接收到MQ发来的确认,会调用这里
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
producer.shutdown();
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败!" + e);
}
});
System.out.println("发送成功!");
// 如果这个不注释掉, 就会发送失败, 因为是异步发送, 发送还没完成就shutdown
// producer.shutdown();
}
}
10.2.3 消费消息
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
// 创建消费者与创建生产者代码基本一致
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("haoke_im_topic", "*");
// 也可以写成下面这种写法, 只匹配tags中带有SEND_MSG 或者SEND_MSG1的消息, 其他消息都不接收
// consumer.subscribe("haoke_im_topic", "SEND_MSG || SEND_MSG1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
10.2.4 消费消息(自定义过滤版)
先写一个生产者
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducerFilter {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("{你的服务器公网ip}:9876");
producer.start();
String msgStr = "美女001";
Message msg = new Message("haoke_meinv_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 更改这里的age和sex, 可以让不同的消费者获取
msg.putUserProperty("age", "21");
msg.putUserProperty("sex", "女");
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
package cn.itcast.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerFilterDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
// 订阅topic,接收此Topic下的所有消息, 过滤要求:>20岁的女性
consumer.subscribe("haoke_meinv_topic", MessageSelector.bySql("age>=20 AND sex='女'"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
10.3 进阶知识点(部分)
10.3.1 顺序发送与消费消息
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订
单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。
那么现在有一个简单的思考题,
思考题: 已知我们一个topic下有多台机器, 我们能保证接到的顺序, 怎么保证顺序性呢?
很简单, 只要保证生产者和消费者的一次请求都始终绑定某个机器就可以了. 当然实际情况上更灵活, 我们只需要保证一个订单的所有状态都交给同一个机器来处理就可以了,然后消费者需要串行的消费.
接下来一个demo展示具体思路
package cn.itcast.rocketmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
*
*/
public class OrderProducer {
public static void main(String[] args) throws Exception{
// 创建生产者与之前的代码一致
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_ORDER_PRODUCER");
producer.setNamesrvAddr("{你的服务器公网ip}:9876");
producer.start();
// 生成100条消息
for (int i = 0; i < 100; i++) {
String msgStr = "order --> " + i;
int orderId = i % 10; // 模拟生成订单id
Message message = new Message("haoke_order_topic","ORDER_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// send的三个参数:
// 1. message, 消息本体
// 2. MessageQueueSelector selector, 这是选择队列的选择器,MQ会根据你的选择器选用合适的队列
// 3. 参数,这个是你为了给 selector 提供的参数.
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
// 队列选择器, 根据 orderId 选择处理机器, 从而保证消息的有序性
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
// 打印结果
System.out.println(sendResult);
}
producer.shutdown();
}
}
消费者:
package cn.itcast.rocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception{
// 创建消费者
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("HAOKE_ORDER_CONSUMER");
consumer.setNamesrvAddr("{你的服务器公网ip}:9876");
consumer.subscribe("haoke_order_topic", "*");
// 注意,这里使用的监听器不再是之前用的 MessageListenerConcurrently(并发的), 而是 MessageListenerOrderly(有序的)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// orderly监听器 为了保证消费者的有序性, 这里会使用一个消费者线程会顺序消费queue
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
10.3.2 事务消息
往往我们需要在事务开始前就告知消息系统准备一条消息, 当事务完成后将消息发送给消费者. 如果事务失败那么取消发送.
针对这种MQ接收到但是需要等待生产者通知后才能发送的消息, 我们称其为Half(Prepare) Message(半消息).
如果一条半消息长时间没收到成功/失败的通知, 那么会发送一条回查,确定是发送还是取消.
接下来我用一个demo演示下.
package cn.itcast.rocketmq.tx;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 这里我们创建一个支持事务的生产者
TransactionMQProducer producer = new
TransactionMQProducer("transaction_producer");
producer.setNamesrvAddr("{你的Ip}:9876");
// 设置我们自定义的事务监听器, 他会根据这个监听器进行
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送消息
Message message = new Message("pay_topic",
"用户A给用户B转账500元".getBytes("UTF-8"));
// 使用事务发消息
producer.sendMessageInTransaction(message, null);
// 模拟生产者其他操作, 如果关掉会影响回查
Thread.sleep(999999);
producer.shutdown();
}
}
自定义监听器
package cn.itcast.rocketmq.tx;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.HashMap;
import java.util.Map;
// 发送二次确认和接收回查都在这里实现.
public class TransactionListenerImpl implements TransactionListener {
private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行具体的业务逻辑
*
* @param msg 发送的消息对象
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("用户A账户减500元.");
Thread.sleep(500); //模拟调用服务
// 模拟抛出异常, 中断事务
// System.out.println(1/0);
System.out.println("用户B账户加500元.");
Thread.sleep(800); //模拟调用服务
// 存log日志到数据库中
STATE_MAP.put(msg.getTransactionId(),
LocalTransactionState.COMMIT_MESSAGE);
// 二次提交确认
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
}
// 存log日志到数据库中
STATE_MAP.put(msg.getTransactionId(),
LocalTransactionState.ROLLBACK_MESSAGE);
// 回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
/**
* 消息回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 去数据库查log日志, 发现已经成功了
return STATE_MAP.get(msg.getTransactionId());
}
}
消费者代码如下:
package cn.itcast.rocketmq.tx;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
// 事务对消费者来说应该是无感知的
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("HAOKE_CONSUMER");
consumer.setNamesrvAddr("{ip}:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("pay_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
10.3.3 重试策略
在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重
试需要分2种,分别是producer端重试和consumer端重试。
10.3.3.1 producer端重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
package cn.itcast.rocketmq.day2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("{ip}:9876");
// TODO : 消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic", "SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,并且指定超时时间, 一旦超时会自动重试3次.
SendResult sendResult = producer.send(msg, 1000);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
10.3.3.2 队列timeout
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直
至发送成功为止!
也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。
10.3.3.3 consumer上报exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话
费充值,当前消息的手机号被注销,无法充值)等。
如果消息消费失败,那么消息将会在1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h后重试,一直到2h后不再重试
有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。
package cn.itcast.rocketmq.day2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("134.175.110.184:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("my-test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
if (msgs.get(0).getReconsumeTimes() >= 3) {
// 重试3次后,不再进行重试, 也就是会收到四次消息后才会消费掉.
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
}
10.4 接入SpringBoot
依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>itcast-rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RocketMQ相关-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.properties 的内容如下
# Spring boot application
spring.application.name = itcast-rocketmq
rocketmq.name-server={你的ip}:9876
rocketmq.producer.group=my-group
编写启动类
package cn.itcast.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
编写生产者代码:
package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
* @param topic
* @param msg
*/
public void sendMsg(String topic, String msg) {
this.rocketMQTemplate.convertAndSend(topic, msg);
}
}
消费者代码
package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "haoke-consumer", selectorExpression = "*")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -> " + msg);
}
}
再test目录下编写测试类
package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {
@Autowired
private SpringProducer springProducer;
// 运行第一次只会发送消息, 然后junit就结束了, 之后才能让消费者接收到消息.
@Test
public void testSendMsg(){
this.springProducer.sendMsg("my-topic", "第一个Spring消息");
}
}
10.5 IM系统接入RocketMQ
pom中新增依赖:
<!--RocketMQ相关依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
application.properties中新增mq相关
rocketmq.name-server={ip}:9876
rocketmq.producer.group=my-group
修改后的MessageHandler, 既可以作为发送者,也是消费者.
package cn.itcast.haoke.im.websocket;
import cn.itcast.haoke.im.dao.MessageDAO;
import cn.itcast.haoke.im.pojo.Message;
import cn.itcast.haoke.im.pojo.UserData;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.HashMap;
import java.util.Map;
/**
* webSocket 消息处理器
*
* @author 过道
*/
@Component
// 订阅topic, 把自己设置到group中
// 这里我们选择messageModel为广播模式, 以便所有机器都可以收到消息, 然后判断自己是否是目标用户,是的话则处理掉.
@RocketMQMessageListener(topic = "haoke-im-send-message-topic",
consumerGroup = "haoke-im-consumer",
messageModel = MessageModel.BROADCASTING,
selectorExpression = "SEND_MSG")
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {
@Autowired
private MessageDAO messageDAO;
// TODO 引入rocket, 如果有红色下划线不用管哦
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final ObjectMapper MAPPER = new ObjectMapper();
// 记录所有在线的终端, 并配置唯一标识.
private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 将当前用户的session放置到map中,后面会使用相应的session通信
Long uid = (Long) session.getAttributes().get("uid");
SESSIONS.put(uid, session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage
textMessage) throws Exception {
// 解析消息中的发送方, 接收方, 消息内容.
Long uid = (Long) session.getAttributes().get("uid");
JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
Long toId = jsonNode.get("toId").asLong();
String msg = jsonNode.get("msg").asText();
Message message = Message.builder()
// 假装发送用户和接受用户都是从数据库中查出来的
.from(UserData.USER_MAP.get(uid))
.to(UserData.USER_MAP.get(toId))
.msg(msg)
.build();
// 将消息保存到MongoDB
message = this.messageDAO.saveMessage(message);
// 序列化保存整个message
String msgStr = MAPPER.writeValueAsString(message);
// 判断to用户是否在线
WebSocketSession toSession = SESSIONS.get(toId);
if (toSession != null && toSession.isOpen()) {
// 具体格式需要和前端对接
toSession.sendMessage(new TextMessage(msgStr));
// 更新消息状态为已读
this.messageDAO.updateMessageState(message.getId(), 2);
} else {
// TODO 用户不在线,或者不在当前的jvm中,发送消息到RocketMQ, 让MQ去寻找对应的消费者
org.springframework.messaging.Message mqMessage = MessageBuilder
.withPayload(msgStr)
.build();
// topic:tags 设置主题和标签
this.rocketMQTemplate.send("haoke-im-send-message-topic:SEND_MSG",
mqMessage);
}
}
/**
* 收到消息后,
* @param msg
*/
@Override
public void onMessage(String msg) {
// System.out.println("接收到消息 -> " + msg);
try {
JsonNode jsonNode = MAPPER.readTree(msg);
Long toId = jsonNode.get("to").get("id").longValue();
// 判断to用户是否在线
WebSocketSession toSession = SESSIONS.get(toId);
if (toSession != null && toSession.isOpen()) {
toSession.sendMessage(new TextMessage(msg));
// 更新消息状态为已读
this.messageDAO.updateMessageState(new
ObjectId(jsonNode.get("id").asText()), 2);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}