RocketMQ5.0–部署与实例
一、Idea调试
1.相关配置文件
在E:\rocketmq创建conf、logs、store三个文件夹。从RocketMQ distribution部署目录中将broker.conf、logback_namesrv.xml、logback_broker.xml文件复制到conf目录。如下图所示。
其中logback_namesrv.xml、logback_broker.xml分别是NameServer、Broker的日志配置文件,修改打印日志文件路径即可。broker.conf文件是Broker启动时的加载配置文件,如下代码所示。
注意:NameServer启动端口默认是9876,Broker启动端口默认10911。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.156.245
# 存储路径
storePathRootDir=E:\\rocketmq\\store
#commitLog 存储路径
storePathCommitLog=E:\\rocketmq\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\rocketmq\\store\\consumequeue
# 消息索引|存储路径
storePathindex=E:\\rocketmq\\store\\index
#checkpoint 文件存储路径
storeCheckpoint=E:\\rocketmq\\store\\checkpoint
#abort 文件存储路径
abortFile=E:\\rocketmq\\store\\abort
2.启动NameServer
org.apache.rocketmq.namesrv.NamesrvStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”,如下图所示。
出现“The Name Server boot success. serializeType=JSON”时,则NameServer启动成功。
3.启动Broker
org.apache.rocketmq.broker.BrokerStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”;配置启动参数:-c E:\rocketmq\conf\broker.conf。如下图所示。
出现“The broker[broker-a, 192.168.156.245:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876”时,则Broker启动成功。
二、Linux部署
1.执行Maven命令
mvn -Prelease-all -DskipTests clean install -U
查看打包的可部署文件路径:.\distribution\target\rocketmq-5.0.0\rocketmq-5.0.0,如下图所示。
2.复制rocketmq-5.0.0
3.启动NameServer
nohup sh bin/mqnamesrv &
可能出现问题,如下图所示。原因是Windows系统下打包,换行符出现问题。解决:notepad++编辑器在Windows环境下将文本转换为Unix格式,步骤为:用Notepad++打开脚本 >> 编辑 >> 档案格式转换 >> 选择转换为UNIX格式。
4.启动Broker
nohup sh bin/mqbroker -n 192.168.1.55:9876 -c /home/rocketmq-5.0.0/conf/broker.conf &
启动命令中,-n是指定NameServer地址,-c 是broker的配置文件。
5.关闭NameServer、Broker命令
关闭NameServer:sh bin/mqshutdown namesrv
关闭Broker:sh bin/mqshutdown broker
三、事务消息实例
1.事务消息监听类
实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:
- executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态
- checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息
package com.common.instance.demo.config.rocketmq;
import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* @description 订单事务消息监听器实现类
* @author TCM
* @version 1.0
* @date 2023/1/1 16:44
**/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {
@Resource
private TMessageTransactionService tMessageTransactionService;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 组装事务
TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);
// 保存事务中间表
tMessageTransactionService.insert(tMessageTransaction);
// 推荐返回UNKNOW状态,待事务状态回查
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 获取用户属性tabId
String tabId = messageExt.getUserProperty("tabId");
// 查询事务消息
List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);
if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
return LocalTransactionState.COMMIT_MESSAGE;
}
LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 组装事务
private TMessageTransaction packageTMessageTransaction(Message message) {
TMessageTransaction tMessageTransaction = new TMessageTransaction();
// 获取用户属性tabId
String tabId = message.getUserProperty("tabId");
// 事务ID
String transactionId = message.getTransactionId();
tMessageTransaction.setTabId(tabId);
tMessageTransaction.setTransactionId(transactionId);
tMessageTransaction.setCreateBy("auto");
tMessageTransaction.setCreateTime(new Date());
return tMessageTransaction;
}
}
2. 事务消息生产者
package com.common.instance.demo.config.rocketmq;
import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
* @description 订单事务消息生产者
* @author TCM
* @version 1.0
* @date 2023/1/1 16:54
**/
@Component
public class OrderTransactionProducer {
@Resource
private OrderProducerProperties orderProducerProperties;
@Resource
private OrderTransactionListenerImpl orderTransactionListener;
private TransactionMQProducer orderTransactionMQProducer;
@PostConstruct
public void start() {
try {
LogUtil.info("start rocketmq: order transactionProducer");
orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
// 注册事务监听器
orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
orderTransactionMQProducer.start();
} catch (MQClientException e) {
LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
}
}
public void sendTransactionMessage(WcPendantTab data) {
sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
}
public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
try {
// 消息内容
byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
// 消息对象
Message message = new Message(topic, tags, keys, msgBody);
message.putUserProperty("tabId", data.getTabId());
// 发送事务消息
orderTransactionMQProducer.sendMessageInTransaction(message, null);
} catch (Exception e) {
LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
}
}
@PreDestroy
public void stop() {
if (orderTransactionMQProducer != null) {
orderTransactionMQProducer.shutdown();
}
}
}
3. 事务消息消费者
package com.common.instance.demo.config.rocketmq;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @description 订单消费者
* @author TCM
* @version 1.0
* @date 2023/1/1 14:29
**/
@Component
public class OrderConsumer implements MessageListenerConcurrently {
@Resource
private OrderConsumerProperties orderConsumerProperties;
private DefaultMQPushConsumer orderMQConsumer;
@PostConstruct
public void start() {
try {
LogUtil.info("start rocketmq: order consumer");
orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
orderMQConsumer.registerMessageListener(this); // 注册监听器
orderMQConsumer.start();
} catch (MQClientException e) {
LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
}
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
int index = 0;
try {
for (; index < msgs.size(); index++) {
// 完整消息
MessageExt msg = msgs.get(index);
// 消息内容
String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
LogUtil.info("消费组消息内容:" + messageBody);
}
} catch (Exception e) {
LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
} finally {
if (index < msgs.size()) {
// 消费应答
context.setAckIndex(index + 1);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@PreDestroy
public void stop() {
if (orderMQConsumer != null) {
orderMQConsumer.shutdown();
}
}
}
四、参考资料
https://www.cnblogs.com/qdhxhz/p/11094624.html