目录
1.环境
2.生产者、消费者的模式
3.顺序消息
4.广播消息
5.延迟消息
6.批量消息
7.过滤消息
8.事务消息
本文着重聊的是RocketMQ的编程模型,下载安装和概念可以移步博主的另外两篇博文:
RocketMQ基础概念__BugMan的博客-CSDN博客
RocketMQ下载安装、集群搭建保姆级教程__BugMan的博客-CSDN博客
1.环境
RocketMQ版本:4.7.1
Maven依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
2.生产者、消费者的模式
生产者有三种生产模式:
-
同步,生产者等待broker的响应后再往下走。
-
异步,生产者不等待broker的响应,继续往下走,broker的响应通过事件监听的方式,触发回调函数,通知生产者。
-
单向,生产者只管发,不管broker的响应,这种模式没办法做消息丢失后的补救
消费者有两种消费模式:
-
主动拉取
-
等待推送
生产者示例:
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建消费者,创建的时候可以指定该消费者属于哪个消费者组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//指定name server的地址
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
producer.start();
//发送一千条信息
for (int i = 0; i < 1000; i++) {
try {
//消息,topic为TopicTest,后面跟的一串是tag
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//同步发送
SendResult sendResult = producer.send(msg);
/**异步发送,通过自定义回调函数的方式来触发响应
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
}**/
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
消费者示例:
推模式:
public static void main(String[] args) throws InterruptedException, MQClientException {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
//设置name server
consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
拉模式:
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
3.顺序消息
有些场景中我们需要保证消息的有序性,比如以下场景:
1.开通会员,将积分初始化为10分
2.完成赚取积分的操作,+5分
3.完成违规操作,-10分
最后剩余积分5分
如果消息乱序为321,最后积分为10分。
生产者将消息顺序的发到一个MessageQueue上,然后消费者去一个MessageQueue上拿消息,就能保证消息的顺序性。
RocketMQ支持生产者、消费者在生产、消费的时候指定MessageQueue,但是具体怎样利用这一点来实现顺序消费,需要开发人员去手写。
生产者:
for (int i = 0; i < 100; i++) {
for(int j=0;i<5;i++) {
//每一个订单用同一个orderId
int orderId = i;
Message msg =
new Message("TopicTest","order_"+orderId, "KEY" + orderId,
("order_" + orderId+"step"+j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
* 回调函数
* @param mqs broker中的所有MessageQueue
* @param msg 发送的消息
* @param arg 就是传过来的orderId
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
//每个订单的message用同一个orderId,必然会选中同一个MessageQueue,自然会顺序存进去
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
消费者:
//MessageListenerOrderly这种类型的监听器,会让consumer一直去读一个messagequeue的内容,一直读完为止
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("收到的消息内容:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
4.广播消息
默认情况下,topic中的一条消息只会被一个消费者所消费。广播模式,topic中的一条消息,可以被订阅该topic的所有消费者消费。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
5.延迟消息
延迟消息,即指定消息在MQ的一个驻留时间,过多少时间后,过了驻留时间,消费者才能消费到消息。
延迟消息可以用来做定时任务。
API上来说,就是在生产者端,给消息指定一个延迟级别即可:
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//3即第三个等级,10s
msg.setDelayTimeLevel(3);
很明显上面的延迟级别都是定死的,如果我们要用来做定时任务,需要自定义延迟级别。开源版的RocketMQ不支持自定义延迟级别,只有商业版(阿里云上部署的RocketMQ)才支持。
开源版的RocketMQ只能自己去改源码,所以这里也是各个公司做自己定制化的RocketMQ时,一个改造的重点
6.批量消息
RocketMQ支持生产者批量发送消息,以此去减少生产者的网络IO,批量消息只和生产者有关,消费者仍然是按照一条一条的去消费的。
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
7.过滤消息
通过tag,消费者可以消费同一个topic下自己感兴趣的消息
生产者:
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
消费者:
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
8.事务消息
事务消息,即发送到MQ的消息能和本地事务一起被回滚。
MQ消息和业务之间其实会存在类似于分布式事务的一致性问题,举个例子:
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。很明显这几个步骤的调用都应该是在一个事务中的,这些调用都可以用MQ做成移步的,那么问题就来了。这些业务中一旦有一个业务失败,其它业务应该同时失败,但是消息已经发出去了怎么办?
这时候事务消息就派上用场了。
事务消息的生产者在发送消息时,会将消息转为一个half(半消息),并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC这个Topic,这个Topic对消费者是不可见的,当本地事务执行成功后会向MQ发送commit信号,MQ再将消息转为原本的Topic,当本地事务执行失败后会向MQ发送roll_back信号,MQ会丢弃对应消息。如果本地事务没有执行完,可以向MQ发送一个unknown信号,MQ收到unknow信号后,会让对应的消息等待,并且定期回查状态为unknown的消息。
自定义本地事务的提交逻辑和检查逻辑:
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
生产者:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}