概述:
- 使用规范主要从,生产、可靠性、和消费为轴线定义使用规范;
- kafka使用核心:削峰、解耦、向下游并行广播通知(无可靠性保证)和分布式事务,本规范仅从削峰、解耦、向下游并行广播通知论述;
1、可靠性(强制):
可靠性包括Producer发送消息机制的可靠性,RocketMQ Server(Broker)消息持久化刷盘机制和Broker主从节点消息同步机制,Consumer消息的消费机制。
1.1、Producer发送消息的可靠性:
1.1.1、核心参数设置:
生产端(Producer):
- sendMsgTimeout:消息发送超时时长,默认:3000,单位毫秒;
- retryTimesWhenSendFailed:同步发送重试次数,默认:2;
- retryTimesWhenSendAsyncFailed:异步发送重试次数,默认:2;
- compressMsgBodyOverHowmuc:消息body需要压缩的阈值,默认:4K;
- maxMessageSize:客户端验证,允许发送的最大消息体大小,默认:4M;
注:
- rocketmq 的 client 端及 broker 端均有对消息体大小是否超出 maxMessageSize 进行校验;
- client 端的 DefaultMQProducer 定义了 maxMessageSize,默认是 4M 大小;
- send 方法及 batch 方法都会校验消息的大小;
- 服务端 conf/broker.conf 可以指定 maxMessageSize 大小,如果需要修改 maxMessageSize 大小需要跟服务端配合一起修改,否则可能投递失败;
消费端(Consumer):
- pullBatchSize:每批次从broker拉取消息的最大个数,默认值是32;
- consumeMessageBatchMaxSize:单次消费时一次性消费多少条消息;
- consumeFromWhere:指定消息消费读取策略,CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_LAST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半个小时以前,后续再启动着上次消费的进度开始消费;
-
consumeThreadMin:最小消费线程池数量;
- consumeThreadMax:最大消费线程池数量;
- messageModel:消费者消费模式,CLUSTERING:集群模式,默认是CLUSTERING;BROADCASTING:广播模式;
注:
- pullBatchSize 的大小受制于 Broker 配置文件中 maxTransferCountOnMessageInMemory 参数的设置,该参数默认设置为 32,也即是每次从服务端拉取的最大的数量不能超过 32,因此即使设置 pullBatchSize 超过32,最后也只返回 32。因此,若要每次拉取的消息量超过 32,可以修改 broker 配置文件里该参数的值,并重启 broker 服务;
- 当从 Broker 拉取消息的大小超过 consumeMessageBatchMaxSize 的大小时,将会对消息进行拆分,然后提交到线程池进行处理;
1.1.2、刷盘机制:
rocketmq 刷盘机制分为同步刷盘,和异步刷盘。
1.1.2.1、同步刷盘:
同步刷盘数据可靠性更高,主要是防止异常断电消息丢失,但消息发送效率不高。
1.1.2.2、异步刷盘:
异步刷盘数据可靠性不高,异常断电消息可能会丢失,但消息发送效率高。
注: 刷盘方式可以通过Broker配置文件里的flushDiskType参数设置,这个参数有两种值:
- SYNC_FLUSH (同步刷盘);
- ASYNC_FLUSH (异步刷盘);
这个参数开发人员无法决定,运维人员确定。
1.1.3、同步机制:
同步机制主要指,Broker 主从节点之间同步消息,防止单点故障消息丢失,同步机制有同步复制、异步复制两种策略。通过 Broker 配置文件里的 brokerRole 参数设置,有三种选项:sync_master、async_master 和 slave。sync_master 和 async_master 用于 Master 角色 Broker 的配制,sync_master 同步复制,async_master 异步复制。slave 则是在 slave 的 Broker 中指定。
- 在 SYNC_MASTER 场景下:消息发送到 Master 后,暂时不返回成功/失败,而是等待 slave 拉取,若在规定时间内(默认3s)没有拉取到该消息,则 Master 会返回一个 FLUSH_SLAVE_TIMEOUT 异常给发送方,此时该消息发送即算作失败;
- 在 ASYNC_MASTER 场景下:消息发送到 Master 后,不管 slave 有没有拉取到该消息,Master 都会返回成功;
注:不管是哪一种策略,底层同步逻辑是一致的:均是由slave不断轮询master拉取消息,并提交同步offset。
1.1.4、消息生产(producer):
Rocketmq 投递消息有三种方式:单向消息、同步消息、异步消息。
1.1.4.1、单向消息:
单向(Oneway)发送特点只负责发送消息,不等待服务器回应,且没有回调函数触发。即:只发送请求不等待应答。发送效率极高,但极易丢失消息。
1.1.4.2、同步消息:
同步发送指消息发出后,会阻塞工作线程,直致成功,或者失败返回。发送效率极低,但数据可靠性极高。
1.1.4.3、异步消息:
异步发送指消息发出后,不会阻塞当前工作线程。异步发送实现发送回调接口,异步处理响应结果,成功、失败、或异常。
1.1.5、消息消费(consumer):
RocketMq 消息的消费机制可分为分组消费、广播消费、消费模式、消费可靠性和死信队列。
1.1.5.1、分组消费:
分组消费,指多个消费端通过同一消费组ID去消费,此种消费同一组,一对一消费模式。分组消费初次订阅Topic时,可以指定Offset从哪消费,即从Topic头开始消费,还是末端消费(最新),消费以后会以消费组维度记录Topic的消费Offset。
1.1.5.2、广播消费:
广播消费所有 Consumer 都能收到订阅以后最新的 Topic 消息,即:只消费最新的,Consumer 停了以后也不会去记录 Consumer 消费的 Offset。而且失败不会进入重试和死信队列。
1.1.5.3、消费模式:
RocketMq 的消费模式分为,push 和 pull 消费两种,pull 即:主动从消息服务器拉取信息,push 即 Broker 主动推送消息到 Counsumer (其实 RocketMq 没有做到,本质上还是拉取,仅是拉取的频率高,近似推送。)
1.1.5.4、消费可靠性:
消息的可靠性分为消息消费的提交方式和重试机制。
提交方式指:
- 先提交后消费;
- 先消费,消费成功后再提交;
1可以解决重复消费的问题但是会丢失消息(不可靠),2会导制消息重复(可靠),得去从幂等。
重试机制:
消费者消费消息后,需要给 Broker 返回消费状态,Topic 消息队列的 Offset 才会下移,否则会重试,重试分为:
- 异常重试:由于 Consumer 端逻辑出现了异常,导致返回了 RECONSUME_LATER 状态,那么 Broker 就会在一段时间后尝试重试;
- 超时重试:如果 Consumer 端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker 就会认为 Consumer 消费超时,此时会发起超时重试;
RocketMQ 可在 broker.conf 文件中配置 Consumer 端的重试次数和重试时间间隔,如下:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
但是在大部分情况下,如果 Consumer 端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。
RocketMQ 会有一个针对消费组创建重试队列,当消费失败后会放入重试队列,后续消息周期间隔性消费是通过重试队列实现的,达到最大次数会放入死信队列。
1.1.5.5、死信队列:
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,即死信队列,具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例;
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic;
- 死信队列是一个特殊的 Topic,名称为%DLQ%consumerGroup;
死信队列中的消息需要人工干预,在 RocketMQ 中,可以通过使用 console 控制台对死信队列的权限更改为读写,然后对消息进行重发,或者订阅对应的 Topic 使得消费者实例再次进行消费。