概念
消费者消费完消息后,向_consumer_offset主题发送消息,用来保存每个分区的偏移量。
流程说明
- consumer发送JoinGroup请求;
- coordinator选出一个consumer作为leader,并将topics发送给leader消费者;
- leader consumer负责制定消费方案;
- leader consumer将消费方案发送给coordinator;
- coordinator将消费方案发送给CG中的每个consumer;
- 每个consumer与coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该consumer被移除,触发再平衡,或者消费者处理消息过长(max.poll.interval.ms=300s),也会触发再平衡;
适用场景
消费者数量发生变化、消费者订阅主题发生变化或者分区数量发生变化时,会触发kafka的再平衡(Rebalance),再平衡后,消费者可能被分到新的分区,为保证高可用和伸缩性,消费者需要读取每个分区最后一次偏移量。
注意:再平衡期间,群组不可用,消费者无法读取消息。
再平衡(Rebalance)
再平衡(Rebalance),是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。
触发场景
- 消费者个数发生变化,有新的消费者或分组中的消费者停止消费;
- 订阅的主题(topic)个数发生变化;
- 订阅的主题分区发生变化(partition);
影响
- 再平衡时,消费者组下的所有消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配;
- 再平衡过程会对消费者组产生非常严重的影响,所有的消费者都将停止工作,直到再平衡执行完成;
分区分配策略
Range范围分配策略
参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
算法
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个
图解
n = 2 = 8/3
m = 2 = 8%3
前2个消费者消费(2+1)个,剩余消费者消费2个。
RoundRobin轮询策略
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
图解
Stricky粘性分配策略
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor
图解
- 故障前
- 故障后
代码示例
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
public:
// 消费者组再平衡回调
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition *> &partitions)
{
if (RdKafka::ERR__ASSIGN_PARTITIONS == err) // 分区分配成功
{
// 消费者订阅这些分区
consumer->assign(partitions);
// 获取消费者组本次订阅的分区数量,可以属于不同的topic
m_partitionCount = (int)partitions.size();
}
else // 分区分配失败
{
// 消费者取消订阅所有的分区
consumer->unassign();
// 消费者订阅分区的数量为0
m_partitionCount = 0;
}
}
private:
int m_partitionCount; // 消费者组本次订阅的分区数量
};
RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
printf("create conf failed\n");
return;
}
std::string errorStr = "";
RdKafka::RebalanceCb* rebalance_cb = new ConsumerRebalanceCb;
RdKafka::Conf::ConfResult errorCode = t_config->set("rebalance_cb", rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("set conf(rebalance_cb) failed, err:%s\n", errorStr.c_str());
delete t_config;
return;
}
提交方式
自动提交
参数配置
# 默认自动提交,消费者close时也会自动提交
enable.auto.comnit=true
# 自动提交周期,默认5s
auto.commit.interval.ms=5000
代码示例
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
// 消费消息
ConsumeMsg_(msg);
// 消息消费完后无需手动处理,kafka自动提交偏移
delete msg;
}
存在的问题
如果在周期5s内发生再平衡,导致偏移量未提交,未提交的消息会被重复消费。
手动提交
参数配置
RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
printf("create conf failed\n");
return;
}
RdKafka::Conf* topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (NULL == topicConfig)
{
printf("create topic conf failed\n");
delete t_config;
return;
}
std::string errorStr = "";
RdKafka::Conf::ConfResult errorCode = topicConfig->set("enable.auto.commit", " false", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
printf("set topic conf(enable.auto.commit) failed, err:%s\n", errorStr.c_str());
delete topicConfig;
delete t_config;
return;
}
// 设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = topicConfig->set("auto.offset.commit", " earliest", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
printf("set topic conf(auto.offset.commit) failed, err:%s\n", errorStr.c_str());
delete topicConfig;
delete t_config;
return;
}
// 默认 topic 配置,用于自动订阅 topics
errorCode = t_config->set("default_topic_conf", topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("set conf(default_topic_conf) failed, err:%s\n", errorStr.c_str());
delete topicConfig;
delete t_config;
return;
}
同步提交
- 消息消费完,手动调用commitSync;
- 在同步提交未完成的情况下发生再平衡,消息会被重复消费;
- commitSync会阻塞直到偏移提交成功;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
// 消费消息
ConsumeMsg_(msg, NULL);
// 开启手动提交
m_consumer->commitSync();
delete msg;
}
异步提交
- 消息消费完,手动调用commitAsync;
- commitAsync不会重试提交偏移量;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
// 消费消息
ConsumeMsg_(msg, NULL);
// 开启手动提交
m_consumer->commitAsync();
delete msg;
}
存在的问题
重复消费(同步提交)
- auto.offset.commit参数设置为earliest;
- 上次提交的偏移量为1;
- 由于网络故障、超时等原因,2~7已消费完的情况下,8未提交成功,由于设置了参数auto.offset.commit=earliest,分区再平衡后会继续从2开始消费,会导致消息重复消费的问题;
消息丢失(异步提交)
- auto.offset.commit参数设置为latest;
- 上次提交的偏移量为1;
- 本次消费的偏移量范围为27,消费者立马提交了偏移量8,由于网络故障、超时等原因,27未消费完,由于设置了参数auto.offset.commit=latest,再平衡后会继续从8开始消费,会导致消息重复丢失的问题;
解决方案
根据实际场景选择同步提交还是异步提交。如果对消息可靠性要求比较高,不允许数据丢失,建议选择同步提交+“auto.offset.commit=earliest”,性能略差。