1.针对Rokcermq引入可用性降低问题
Rocketmq实现高可用模式,Rocketmq有三种模式:单机模式、主从模式、分片集群模式。
单机模式
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
docker-compose.yml
version: '3'
services:
namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rmqnamesrver
ports:
- "9876:9876"
environment:
JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
command: sh mqnamesrv
broker:
image: foxiswho/rocketmq:4.8.0
container_name: rmqbroker
ports:
- "10911:10911"
environment:
NAMESERV_ADDR: "127.0.0.1:9876"
JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
volumes:
- ./logs:/home/rocketmq/logs
- ./store:/home/rocketmq/store
- ./conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
depends_on:
- namesrv
mqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- "10100:8080"
environment:
JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false
depends_on:
- namesrv
优点:本地开发测试,配置简单,同步刷盘消息一条都不会丢
缺点:不可靠,如果宕机,会导致服务不可用
主从模式:(可动态增加主从节点:如多主多从)
docker-compose.yml文件
version: '3'
services:
namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rmqnamesrver
ports:
- "9876:9876"
environment:
JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
command: sh mqnamesrv
broker-a:
image: foxiswho/rocketmq:4.8.0
container_name: rmqbroker-a
ports:
- "10911:10911"
- "10912:10912"
environment:
NAMESERV_ADDR: "127.0.0.1:9876"
JAVA_OPT_EXT: "-Xms128M -Xmx128M -Xmn128m"
volumes:
- ./broker-a/logs:/home/rocketmq/logs
- ./broker-a/store:/home/rocketmq/store
- ./broker-a/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf
depends_on:
- namesrv
broker-b:
container_name: rmqbroker-b
image: foxiswho/rocketmq:4.8.0
ports:
- '10919:10919'
- '10921:10921'
volumes:
- ./broker-b/logs:/home/rocketmq/logs
- ./broker-b/store:/home/rocketmq/store
- ./broker-b/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128M -Xmx128M -Xmn128m"
command: [ "sh","mqbroker","-c","/home/rocketmq/rocketmq-4.8.0/conf/broker.conf","-n","namesrv:9876","autoCreateTopicEnable=true" ]
depends_on:
- namesrv
mqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- "10100:8080"
environment:
JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false
depends_on:
- namesrv
相关broker.conf
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a
#0 表示Master,>0 表示Slave
brokerId=0
#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
# 设置broker节点所在服务器的ip地址(**这个非常重要,主从模式下,从节点会根据主节点的brokerIP2来同步数据,如果不配置,主从无法同步,brokerIP1设置为自己外网能访问的ip,服务器双网卡情况下必须配置,比如阿里云这种,主节点需要配置ip1和ip2,从节点只需要配置ip1即可)
brokerIP1=172.16.100.81
brokerIP2 = 172.16.100.81
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
#原因下篇博客见~ 哈哈哈哈
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=12
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=98
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
slaveReadEnable = true
2 针对系统复杂度提高
2.1 保证消息没有重复消费
(1) 使用redis去重
生产者代码发送100条代码
/**
* 发送普通消息
*/
public class SendMessage {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException,InterruptedException {
// 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
// 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
MessageExt msg = new MessageExt();
msg.setBody(("RocketMQ Message").getBytes());
msg.setTopic("topicB");
msg.setTags("myTag");
msg.setMsgId(UUID.randomUUID().toString());
// 发送消息
// SendResult send = producer.send(msg, 10000);//同步的
for (int i = 0; i <100 ; i++) {
producer.send(msg, new SendCallback() {//异步的回调函数
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
}, 10000);
}
}
消费者代码
@Slf4j
@Configuration
public class QCCustomer {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${rocketmq.producer.group:groupB}")
private String resultsGroupName;
@Value("${rocketmq.name-server:}")
private String namesrvAddr;
@Autowired
private MQConsumeMsgListenerProcessor mqConsumeMsgListenerProcessor;
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(2);
consumer.setConsumeMessageBatchMaxSize(100);
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置监听
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
try {
MessageExt messageExt = list.get(0);
String msgId = messageExt.getMsgId();
Long increment = redisTemplate.opsForValue().increment(msgId);
redisTemplate.expire(msgId,2,TimeUnit.MINUTES);//
if(increment==1) {
Thread.sleep(100);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = sdf.format(new Date());
log.info("时间{},MQ接收到的消息为:{}", format, new String(messageExt.getBody(), "utf-8"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setInstanceName(System.currentTimeMillis() + "dstMQMsgConsumer");
consumer.setConsumerGroup(resultsGroupName);
try {
consumer.subscribe("topicB", "*");
consumer.start();
log.info("ruleEngineConsumer 创建成功 groupName={}", resultsGroupName);
} catch (MQClientException e) {
log.error("ruleEngineConsumer 创建失败!");
}
return consumer;
}
}
结果如下:
(2) 建立数据库唯一索引,保证数据不会出现重复。
2 如何保证消息的可靠性传输
RocketMQ消息丢失可能发生在以下三个阶段:
1、生产者发送消息到Broker时;
2、Broker内部存储消息到磁盘以及主从复制同步时;
3、Broker把消息推送给消费者或者消费者主动拉取消息时;
解决策略
消息发送方:通过不同的重试策略保证了消息的可靠发送;
(1)、同步发送
同步发送是指发送端在发送消息时,阻塞线程进行等待,直到服务器返回发送的结果。发送端如果需要保证消息的可靠性,防止消息发送失败,可以采用同步阻塞式的发送,然后同步检查Brocker返回的状态来判断消息是否持久化成功。如果发送超时或者失败,则会默认重试2次,RocketMQ选择至少传输成功一次的消息模型,但是有可能发生重复投递,所以消费端需要做好幂等。
(2)、异步发送
异步发送是指发送端在发送消息时,传入回调接口实现类,调用该发送接口后不会阻塞,发送方法会立即返回,回调任务会在另一个线程中执行,消息发送结果会回传给相应的回调函数。具体的业务实现可以根据发送的结果信息来判断是否需要重试来保证消息的可靠性。
(3)、单向发送
单向发送是指发送端发送完成之后,调用该发送接口后立刻返回,并不返回发送的结果,业务方无法根据发送的状态来判断消息是否发送成功,单向发送相对前两种发送方式来说是一种不可靠的消息发送方式,因此要保证消息发送的可靠性,不推荐采用这种方式来发送消息。
(4)、发送重试策略
生产者发送消息失败后,会根据相应的策略进行重试。Producer 的 send 方法本身支持内部重试,重试逻辑如下:
1、至多重试 2 次。
2、如果同步模式发送失败,则轮转到下一个 Broker,如果异步模式发送失败,则只会在当前 Broker 进行重试。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
3、如果本身向 broker 发送消息产生超时异常,就不会再重试。
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用 send 同步方法发送失败时,则尝试将消息存储到 db,然后由后台线程定时重试,确保消息一定到达 Broker。
Broker服务端:通过不同的刷盘机制以及主从复制来保证消息的可靠存储;
(1)刷盘机制
同步刷盘:消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
异步刷盘(默认):消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全。
使用同步刷盘保证数据绝对的安全
消息消费方:通过至少消费成功一次以及消费重试机制来保证消息的可靠消费
(2)复制同步
同步复制:同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
异步复制:异步复制方式是只要Master写成功,即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
2 如何发送顺序消息
Rockermq只支持局部顺序,要实现全局顺序,要保证生产者和topic队列消费者处于一对一的关系
生产者
public class SortProducter {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new SortProducter().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
消费者实现
public class SortCustomer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
3一致性问题
对于一致性问题可以采用事务消息解决