RocketMQ整合代码
一 构建Java基础环境
在maven项⽬中构建出RocketMQ消息示例的基础环境,即创建⽣产者程序和消费者程序。通过⽣产者和消费者了解RocketMQ操作消息的原⽣API
引⼊依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
</dependencies>
编写⽣产者程序
public class SyncProducer {
public static void main(String[] args) throws
MQClientException, UnsupportedEncodingException,
RemotingException, InterruptedException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("producerGroup1");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.253.101:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
编写消费者程序
public class MyConsumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("172.16.253.101:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s % n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
启动消费者和⽣产者,验证消息的收发
1. 简单消息示例
简单消息分成三种:同步消息、异步消息、单向消息。
1.1 同步消息
⽣产者发送消息后,必须等待broker返回信息后才继续之后的业务逻辑,在broker返回信息之前,⽣产者阻塞等待
同步消息的应⽤场景:如重要通知消息、短信通知、短信营销系统等。
public class SyncProducer {
public static void main(String[] args) throws
MQClientException, UnsupportedEncodingException,
RemotingException, InterruptedException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.253.101:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
1.2 异步消息
⽣产者发完消息后,不需要等待broker的回信,可以直接执⾏之后的业务逻辑。⽣产者提供⼀个回调函数供broker调⽤,体现了异步的⽅式。
异步传输⼀般⽤于响应时间敏感的业务场景。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.253.101:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
1. 3 单向消息
⽣产者发送完消息后不需要等待任何回复,直接进⾏之后的业务逻辑,单向传输⽤于需要中等可靠性的情况,例如⽇志收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.253.101:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
2. 顺序消息
顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息分成两种:局部顺序和全局顺序。
2.1 局部顺序
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。消费者使⽤MessageListenerOrderly类做消息监听,实现局部顺序。
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2.2 全局顺序
消费者消费全部消息都是顺序的,只能通过⼀个某个topic只有⼀个队列才能实现,这种应⽤场景较少,且性能较差。
2.3 乱序消费
消费者消费消息不需要关注消息的顺序。消费者使⽤MessageListenerConcurrentl类做消息监听。
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3. ⼴播消息
⼴播是向主题(topic)的所有订阅者发送消息。订阅同⼀个topic的多个消费者,能全量收到⽣产者发送的所有消息。
消费者
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
⽣产者
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest", "TagA", "OrderID188",
("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
4. 延迟消息
延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递。
消息⽣产者
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message" + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
消息消费者
public class ScheduledConsumer {
public static void main(String[] args) throws MQClientException {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" +
message.getMsgId() + "] "
+ (System.currentTimeMillis() -
message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
延迟等级:RocketMQ设计了18个延迟等级,分别是:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
5. 批量消息
批量发送消息提⾼了传递⼩消息的性能。
使⽤限制:同⼀批次的消息应该具有:相同的主题、相同的 waitStoreMsgOK 并且不⽀持延迟消息和事务消息。
使⽤批量消息
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
producer.shutdown();
}
}
超出限制的批量消息:官⽅建议批量消息的总⼤⼩不应超过1m,实际不应超过4m。如果超过4m的批量消息需要进⾏分批处理,同时设置broker的配置参数为4m(在broker的配置⽂件中修改: maxMessageSize=4194304 )
/*
* ⼤批量消息的处理
*/
public class MaxBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
// producer.send(messages);
//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
}
}
ListSplitter
/*
* 批量消息
*/
public class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() +
message.getBody().length;
Map<String, String> properties =
message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > sizeLimit) {
//it is unexpected that single message exceeds the sizeLimit
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break,otherwise just break nextIndex ++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
6. 过滤消息
在⼤多数情况下,标签是⼀种简单⽽有⽤的设计,可以⽤来选择您想要的消息。
6.1 普通过滤
tag过滤的⽣产者
public class TagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("TagFilterTest", tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
tag过滤的消费者
public class TagConsumer {
public static void main(String[] args) throws
MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s % n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
6.2 使用sql过滤
消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签,这可能不适⽤于复杂的场景。在这种情况下,您可以使⽤ SQL 表达式来过滤掉消息。
使⽤SQL过滤:SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下,可以实现⼀些有趣的逻辑。
使⽤注意:只有推模式的消费者可以使⽤SQL过滤。拉模式是⽤不了的。
这是⼀个例⼦:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
语法:RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它。
1. 数值⽐较,如`>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符⽐较,如`=`, `<>`, `IN`;
3. `IS NULL`或`IS NOT NULL`;
4. 逻辑`AND`, `OR`, `NOT`;
常量类型有:
1. 数字,如 123、3.1415;
2. 字符,如'abc',必须⽤单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE`或`FALSE`;
SQL过滤的⽣产者示例
public class SQLProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
SQL过滤的消费者示例
public class SQLConsumer {
public static void main(String[] args) throws
MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name");
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in('TagA', 'TagB'))"
+ "and (a is not null and a between 0 and 3) "));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
7. 事务消息
事务消息的定义:它可以被认为是⼀个两阶段的提交消息实现,以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。
事务消息有三种状态:
-
TransactionStatus.CommitTransaction
:提交事务,表示允许消费者消费该消息。 -
TransactionStatus.RollbackTransaction
:回滚事务,表示该消息将被删除,不允许消费。 -
TransactionStatus.Unknown
:中间状态,表示需要MQ回查才能确定状态。
使⽤限制:
- 事务性消息没有调度和批处理⽀持。
- 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但⽤户可以通过更改
transactionCheckMax
来更改此限制,参数在broker的配置中,如果⼀条消息的检查次数超过transactionCheckMax
次,broker默认会丢弃这条消息,同时打印错误⽇志。⽤户可以通过重写AbstractTransactionCheckListener
类来改变这种⾏为。 - 事务消息将在⼀定时间后检查,该时间由代理配置中的参数
transactionTimeout
确定。并且⽤户也可以在发送事务消息时通过设置⽤户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,这个参数优先于transactionMsgTimeout
参数。 - ⼀个事务性消息可能会被检查或消费不⽌⼀次。
- 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使⽤同步双写机制。
- 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。
事务消息的实现流程
生产者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionListener transactionListener = new
TransactionListenerImpl();
TransactionMQProducer producer = new
TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("172.16.253.101:9876");
ExecutorService executorService = new
ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-checkthread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC",
"TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i %
tags.length], "KEY" + i,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult =
producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException |
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
本地事务处理-TransactionListener
public class TransactionListenerImpl implements TransactionListener {
/
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/
* When no response to prepare(half) message. broker will send
* check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt
msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
}
消费者
public class TransactionConsumer {
public static void main(String[] args) throws
MQClientException {
//1.创建消费者对象
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("my-consumer-group1");
//2.指明nameserver的地址
consumer.setNamesrvAddr("172.16.253.101:9876");
//3.订阅主题:topic 和过滤消息⽤的tag表达式
consumer.subscribe("TopicTest", "*");
//4.创建⼀个监听器,当broker把消息推过来时调⽤
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
context) {
for (MessageExt msg : msgs) {
// System.out.println("收到的消息:" + new String(msg.getBody()));
System.out.println("收到的消息:" + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
二 SpringBoot整合RocketMQ
Springboot提供了快捷操作RocketMQ的RocketMQTemplate对象。
1.引⼊依赖
注意依赖的版本需要和RocketMQ的版本相同。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2.编写配置⽂件
# 应⽤名称
spring.application.name=my-boot-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8080
# nameserver地址
rocketmq.name-server=172.16.253.101:9876
# 配置⽣产者组
rocketmq.producer.group=my-producer-boot-group1
3.编写⽣产者发送普通消息
@Component
public class MyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/
* 发送消息
*
* @param msg 消息内容
* @param topic ⽬标主题
*/
public void sendMessage(String msg, String topic) {
//将msg转换成Message对象并发送
rocketMQTemplate.convertAndSend(topic, msg);
}
}
4.编写JUnit单元测试发送消息
@Test
void testSendMessage() {
String topic = "MyBootTopic";
String message = "hello spring boot rocketmq";
producer.sendMessage(message, topic);
}
5.创建消费者程序
@Component
@RocketMQMessageListener(consumerGroup = "my-boot-consumer-group", topic = "MyBootTopic")
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("收到的消息:" + msg);
}
}
6.发送事务消息
编写⽣产者⽅法
public void sendMessageInTransaction(String msg, String topic) throws Exception {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//注意该message为org.springframework.messaging.Message
Message<String> message =
MessageBuilder.withPayload(msg).build();
//topic和tag整合在⼀起,以":"隔开
String destination = topic + ":" + tags[i % tags.length];
//第⼀个destination为消息要发到的⽬的地,第⼆个destination为
消协携带的业务数据
TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction(destination, message,
destination);
System.out.println(sendResult);
Thread.sleep(10);
}
}
编写事务监听器类
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState
executeLocalTransaction(Message msg, Object arg) {
//获得业务参数中的数据
String destination = (String) arg;
//使⽤RocketMQUtil将spring的message转换成rocketmq的message
org.apache.rocketmq.common.message.Message message =
RocketMQUtil.convertToRocketMessage(
new StringMessageConverter(), "utf-8", destination, msg);
//获得消息中的业务数据tags
String tags = message.getTags();
if (StringUtils.contains(tags, "TagA")) {
//提交本地事务
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagB")) {
//回滚
return RocketMQLocalTransactionState.ROLLBACK;
} else {
//中间状态
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState
checkLocalTransaction(Message msg) {
return null;
}
}
7.编写单元测试发送事务消息
@Test
void testSendTransactionMessage() throws Exception {
String topic = "MyBootTopic";
String message = "hello transaction spring boot rocketmq";
producer.sendMessageInTransaction(message,topic);
}
三 Spring Cloud Stream整合RocketMQ
1.Spring Cloud Stream介绍
Spring Cloud Stream 是⼀个框架,⽤于构建与共享消息系统连接的⾼度可扩展的事件驱动微服务。
该框架提供了⼀个灵活的编程模型,该模型基于已经建⽴和熟悉的 Spring 习惯⽤法和最佳实践,包括对持久 pub/sub 语义、消费者组和有状态分区的⽀持。
Spring Cloud Stream 的核⼼构建块是:
Destination Binders
:负责提供与外部消息传递系统集成的组件。Destination Bindings
:外部消息系统和最终⽤户提供的应⽤程序代码(⽣产者/消费者)之间的桥梁。Message
:⽣产者和消费者⽤来与⽬标绑定器(以及通过外部消息系统的其他应⽤程序)进⾏通信的规范数据结构。
2.编写⽣产者
引⼊依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qf</groupId>
<artifactId>my-s-rocketmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>my-s-rocketmq-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-
8
</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-
8
</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.2.RELEASE
</springcloud-alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-streamrocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibabadependencies</artifactId>
<version>${spring-cloud-alibaba.version}
</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.qf.my.s.rocketmq.demo.MySRocketmqDemoApplication
<
/mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
注意,Rocket官⽅维护的spring-cloud-stream依赖中rocket⽤的版本为4.4,需要排除后再加⼊4.7.1的依赖。
编写配置⽂件
# 应⽤名称
spring.application.name=my-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8080
# output ⽣产者
spring.cloud.stream.bindings.output.destination=TopicTest
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server=172.16.253.101:9876
启动类上打上注解;其中 @EnableBinding(Source.class)
指向配置⽂件的output参数
@EnableBinding(Source.class)
@SpringBootApplication
public class MySRocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MySRocketmqDemoApplication.class,
args);
}
}
编写生产者程序
@Component
public class MyProducer {
@Resource
private Source source;
public void sendMessage(String msg) {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "TagA");
MessageHeaders messageHeaders = new
MessageHeaders(headers);
Message<String> message =
MessageBuilder.createMessage(msg, messageHeaders);
source.output().send(message);
}
}
编写单元测试发送消息
@SpringBootTest
class MySRocketmqDemoApplicationTests {
@Autowired
private MyProducer producer;
@Test
void testSendMessage() {
producer.sendMessage("hello spring cloud stream");
}
}
3.编写消费者
引⼊依赖;与⽣产者相同
编写配置⽂件
# 应⽤名称
spring.application.name=my-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8081
# input 消费者
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.bindings.input.group=spring-cloud-strema-group
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server=172.16.253.101:9876
启动类上打上注解;其中@EnableBinding(Sink.class)
指向配置⽂件的input参数
@EnableBinding(Sink.class)
@SpringBootApplication
public class MySpringCConsumerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MySpringCConsumerDemoApplication.class, args);
}
}
编写消费者程序
@Component
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("收到的消息:"+message);
}
}