如果想要保证消息不丢失就要知道,消息可能出现丢失得地方。
1.producer发送消息
2.Broker存储消息
3.Consumer消费消息
4.Broker主从切换
下面一共有9个维度可以保证消息不丢失。
目录
维度一:同步发送
维度二.异步发送
维度三.刷盘策略
维度四.Broker多副本和高可用
维度五.消息确认
维度七.事务消息
维度 9:极端情况
维度一:同步发送
public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}
同步发送回返回四个状态码
1.SEND_OK 消息发送成功,需要主要得是,消息发送到Broker之后,还需要有两个操作,一个刷盘,一个同步给Slave节点,默认这两个操作都是异步得,只有把这两个操作都改成同步的,才会保证这条消息真正得发送成功。
2.FLUSH_DISK_TIMEOUT 消息发送成功,但是刷盘超时
3.FLUSH_SLAVE_TIMEOUT 消息发送成功,但是同步到slave节点超时
4.SLAVE_NOT_AVAILABLE 消息发送成功,但是broker的Slave节点不可用
根据状态码,可以做消息重试,这里设置的是3次
注意:消息重试的时候,消费端一定要保证消息幂等性
维度二.异步发送
public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
// TODO 可以在这里加入重试逻辑
}
});
}
异步发送,在发送的时候会多传递一个SendCallback()方法,需要重写其中的onSuccess和onException方法
维度三.刷盘策略
刷盘策略默认的是异步刷盘,就是消息在写入commitlog中,并不会直接写入到磁盘,而是先写到PageCache缓存后返回成功,然后后台线程异步把消息刷入到磁盘中,异步刷盘提高了消息的吞吐量,但是有可能造成消息丢失的情况,比如断电导致机器停机,PageCache中还没来得及刷盘的消息就会丢失
同步刷盘,就是消息在写入内存后,立刻请求刷盘线程进行刷盘,如果消息没有在约定时间内(默认是5s)刷盘成功,就会返回上面所说的FLUSH_DISK_TIMEOUT,生产者收到这个响应后,可以进行重试,同步刷盘保证了消息的可靠性,同时降低了吞吐量,增加了延迟。修改刷盘策略,就是在broker配置中添加 flushDiskType=SYNC_FLUSH
维度四.Broker多副本和高可用
Borker为了保证高可用,采用一主多从的方式部署。
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER
改为同步复制后,消息复制流程如下:
-
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;
-
master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;
-
slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;
-
master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。
维度五.消息确认
Consumer的消费消息的代码如下:
public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
如果消费成功就会返回CONSUMER_SUCCESS;提交offset并从Broker拉取下一批消息
维度六.Cosumer重试
Consumer消费失败有三种情况
返回RECONSUMER_LATER
返回null
抛出异常
Broker收到这个响应后,会把这条消息放入重试队列,重新发送给Consumer.
注意:1.只有在集群模式下(也就是只要有一个消费者消费成功就行)才会生效,广播模式下(就是每个消费者都需要消费这个消息)是不生效的
2.Consumer一定要做好幂等性处理
3.Broker默认最多重试16次,如果重试16次后都失败,就会放入到死信队列中,Consumer可以订阅死信队列进行消费
重试三次都失败就可以说明出现问题了,这时候我们可以把消息放到本地,给Broker返回Consumer_success来结束重试
int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息写入本地存储
System.out.println("重试次数超过3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
维度七.事务消息
可以参考官网的事务消息:
事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。
整个事务消息的详细交互流程如下图所示:
如下是官网的示例代码
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
维度8.消息索引
Rocketmq核心的数据文件有三个:CommitLog,index,consumerQueue
其中Index就是索引文件
查找消息的时候,首先会根据key的hashcode计算出hash槽的位置,根据这个位置计算Index条目的位置,从Index条目位置读取到消息在CommitLog文件中的offset,从而查找到消息。
在 Producer 发送消息时,可以指定一个 key,代码如下:
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");
维度 9:极端情况
如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。