kafka(四)——生产者流程分析(c++)

news2024/11/25 20:54:58

前言

  • kafka生产者负责将数据发布到kafka集群的主题;
  • kafka生产者消息发送方式有两种:
    • 同步发送
    • 异步+回调发送

流程

在这里插入图片描述

流程说明:

  • Kafka Producer整体可看作是一个异步处理操作;
  • 消息发送过程中涉及两个线程:main线程和sender线程;
  • main线程负责将消息发送至一个双端队列,sender线程负责从双端队列取消息并发送至kafka broker;

消息可靠性

producer的acks参数表示生产者生产消息时,写入到副本的严格程度。决定了生产者的性能与可靠性

  • 0:生产者发送过来的数据,不等待broker确认,直接发送下一条数据,性能最高,但可能存在丢数据;

在这里插入图片描述

  • 1:生产者发送过来的数据,等待Leader副本确认后发送下一条数据,性能中等;
    在这里插入图片描述

  • -1(all):生产者发送过来的数据,等待所有副本将数据同步后发送下一条数据,性能最慢,安全性最高;

在这里插入图片描述

消息有序性

消息保序策略:按key分区,可以实现局部有序,但这又可能会导致数据倾斜,可根据实际情况选择。

示例:

// 指定消息key,即倒数第二个参数,当有相同的两条消息先后存储同一个key,消费者可按顺序消费到

RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

Main线程与Sender线程

Main线程

流程

  • 创建消息
// librdkafka源码 rdkafka_msg.c

/* Create message */
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,
                        key, keylen, msg_opaque, &err, &errnox, NULL, 0,
                        rd_clock());
if (unlikely(!rkm)) {
    /* errno is already set by msg_new() */
    rd_kafka_set_last_error(err, errnox);
    return -1;
}
  • 选择分区
/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err)) {
    rd_kafka_set_last_error(0, 0);
    return 0;
}
  • 调用拦截器
/* Interceptor: unroll failing messages by triggering on_ack.. */
rkm->rkm_err = err;
rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
                                         &rkm->rkm_rkmessage);

Sender线程

参数说明

batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks见“消息可靠性”章节
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。

流程

  • 达到batch.size大小或满足linger.ms时间发送消息;
  • 消息发送至的kafka服务器后,如果kafka没有应答,默认每个broker节点队列最多缓存 5 个请求,与“max.in.flight.requests.per.connection”参数有关;
  • 如配置了“retries”、“ retry.backoff.ms”参数,消息发送失败由kafka内部自动重试,无需手动在回调函数中重试;

同步和异步流程

同步流程

流程说明

  • 通过produce方法将消息推送至双端队列;
  • 通过flush方法等待发送结果,如outq_len()大于0,说明存在未发送成功的消息;

代码示例

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{
	int32_t len = (int32_t)str.length();
	void *payload = const_cast<void *>(static_cast<const void *>(str.data()));

	// produce 方法,生产和发送单条消息到 Broker
	// 如果不加时间戳,内部会自动加上当前的时间戳
	RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

	if (RdKafka::ERR_NO_ERROR != errorCode) 
	{
		// kafka 队列满,等待 100 ms
		if (RdKafka::ERR__QUEUE_FULL == errorCode) 
		{
			m_producer->poll(100);
		}
        
        return -1;
	}
    
    // 同步等待200ms
    m_producer->flush(200);
    if(m_producer->outq_len() > 0)  // 用于调试
    {
        printf("Existed not send message.size:%d\n", m_producer->outq_len());
        return -1;
    }
    
    return 0;
}

异步流程

流程说明

  • 设置生产者投递报告回调
  • 设置生产者自定义分区策略回调
  • 消息发送

代码示例

  • 设置生产者投递回调
// 生产者投递报告回调
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb 
{
public:
	void dr_cb(RdKafka::Message& message)
	{	
		if (message.err())   // 出错回调
		{
			// TODO
		} 
		else                 // 正常回调
		{  
			// TODO
		}
	}
};

// 设置生产者投递报告回调
m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);    // 异步方式发送数据
if (RdKafka::Conf::CONF_OK != errCode) 
{
    printf("Conf set(dr_cb) failed, errorStr:%s", errorStr.c_str());
    break;
}
  • 设置生产者自定义分区策略回调
// 生产者自定义分区策略回调:partitioner_cb
class HashPartitionerCb : public RdKafka::PartitionerCb 
{
public:
	// @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL
	// @return 返回分区,(0, partition_cnt)
	int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
		int32_t partition_cnt, void *msg_opaque) 
	{
		// 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cnt
		int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
		return partition_id;
	}

private:
	// 自定义哈希函数 
	static inline unsigned int generate_hash(const char *str, size_t len) 
	{
		unsigned int hash = 5381;
		for (size_t i = 0; i < len; i++)
			hash = ((hash << 5) + hash) + str[i];
		return hash;
	}
};

// 设置生产者自定义分区策略回调
m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errCode) 
{
    printf("Conf set(partitioner_cb) failed, errorStr:%s", errorStr.c_str());
    break;
}
  • 消息发送

注意:此处produce执行成功不代表消息发送成功,需根据dr_cb消息回调结果判断消息是否发送成功。

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{
	int32_t len = (int32_t)str.length();
	void *payload = const_cast<void *>(static_cast<const void *>(str.data()));

	// produce 方法,生产和发送单条消息到 Broker
	// 如果不加时间戳,内部会自动加上当前的时间戳
	RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

	// 轮询处理
	m_producer->poll(0);
	if (RdKafka::ERR_NO_ERROR != errorCode) 
	{
		// kafka 队列满,等待 100 ms
		if (RdKafka::ERR__QUEUE_FULL == errorCode) 
		{
			m_producer->poll(100);
		}
        
        return -1;
	}
    
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1578779.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文读懂RISC-V与ARM

RISC-V和ARM是近年来备受关注的两种处理器架构。RISC-V是一种基于精简指令集计算(RISC)原理的开源指令集架构(ISA)&#xff0c;而ARM是一种专有ISA&#xff0c;由于其长期存在于嵌入式系统和移动设备中&#xff0c;已成为嵌入式系统和移动设备的主导选择。市场以及多年积累的信…

【网络】什么是RPC

RPC 是Remote Procedure Call的缩写&#xff0c;译为远程过程调用。是一个计算机通信协议。 1、为什么需要远程调用 在如何给女朋友解释什么是分布式这一篇文章中介绍过&#xff0c;为了提升饭店的服务能力&#xff0c;饭店从一开始只有一个负责所有事情的厨师发展成有厨师、切…

前端二维码工具小程序产品使用说明书

一、产品概述 前端二维码工具小程序是一款便捷实用的二维码生成与识别工具&#xff0c;通过本小程序&#xff0c;用户可以轻松根据文本或链接生成二维码&#xff0c;并支持扫一扫功能识别二维码内容&#xff0c;同时提供复制识别内容的功能。此外&#xff0c;本小程序还具备美…

如何使用Java和RabbitMQ实现延迟队列(方式二)?

前言 昨天写了一篇关于Java和RabbitMQ使用插件实现延迟队列功能的文章&#xff0c;今天来讲下另外一种方式&#xff0c;不需要RabbitMQ的插件。 前期准备&#xff0c;需要安装好docker、docker-compose的运行环境。 需要安装RabbitMQ的可以看下面这篇文章。 如何使用PHP和R…

React - 请你说一说setState是同步的还是异步的

难度级别:中高级及以上 提问概率:70% 在React项目中,使用setState可以更新状态数据,而不能直接使用为this.state赋值的方式。而为了避免重复更新state数据,React首先将state添加到状态队列中,此时我们可以通过shouldComponentUpdate这个钩…

WWDC24定档6月 | 崩坏3将推Mac系统版 苹果AI启航 visionOS 2.0将系数登场WWDC24

这几天又有一件苹果用户圈大事发生了&#xff01;WWDC24正式定档&#xff0c;将在6月10日-14日召开&#xff0c;届时一众软件系统&#xff0c;包括iOS18&#xff0c;iPadOS&#xff0c;WatchOS&#xff0c;VisionOS等等&#xff0c;都将迎来更新。另外就是手游崩坏3官宣&#x…

vector的使用和底层模拟实现

爱吃喵的鲤鱼 个人主页 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 我们已经 学习了string在来实现vector会发现他们两的结构很像&#xff0c;而string只支持存储字符串&#xff0c;vector支持任意类型&#xff1b; 一、vector是什么…

3. Django 初探路由

3. 初探路由 一个完整的路由包含: 路由地址, 视图函数(或者视图类), 可选变量和路由命名. 本章讲述Django的路由编写规则与使用方法, 内容分为: 路由定义规则, 命名空间与路由命名, 路由的使用方式.3.1 路由定义规则 路由称为URL (Uniform Resource Locator, 统一资源定位符)…

-bash: cd: /etc/hadoop: 没有那个文件或目录

解决办法&#xff1a;source /etc/profile 运行 source /etc/profile 命令会重新加载 /etc/profile 文件中的配置&#xff0c;这样做的目的是使任何更改立即生效&#xff0c;而不需要注销并重新登录用户。通常&#xff0c;/etc/profile 文件包含系统范围的全局 Shell 配置&…

电商社交新零售:创新引领新趋势,变革新零售思维格局-亿发

新零售O2O模式是如何颠覆传统零售商业模式&#xff1f; 传统电商出现瓶颈&#xff1a; 传统电商在发展过程中逐渐出现了瓶颈&#xff0c;主要表现在市场竞争激烈、用户获取成本上升、用户黏性下降等问题。传统电商往往只能通过价格竞争或促销活动来吸引用户&#xff0c;而这种…

hexo接入github Discussions评论系统

评论存储仓 可以是你的博客项目的(github)仓库&#xff0c;也可以单独新建一个评论存储仓库。 我的博客项目在gitee上&#xff0c;就以新建存储仓为例&#xff1a; 使用Discussions评论系统必须开通Discussions模块&#xff01; 安装giscus插件 https://github.com/apps/…

数据仓库发展历史与架构演进

从1990年代Bill Inmon提出数据仓库概念后经过四十多的发展&#xff0c;经历了早期的PC时代、互联网时代、移动互联网时代再到当前的云计算时代&#xff0c;但是数据仓库的构建目标基本没有变化&#xff0c;都是为了支持企业或者用户的决策分析&#xff0c;包括运营报表、企业营…

vscode中vue插件

在Visual Studio Code (VSCode) 中&#xff0c;有许多插件可以帮助Vue开发者提高工作效率和代码质量。以下是一些针对Vue开发的必备VSCode插件&#xff0c;结合了多篇搜索结果中的信息&#xff0c;以提供详尽的介绍。 Volar Volar是Vue.js开发者的官方推荐插件&#xff0c;专门…

python基于opencv实现数籽粒

千粒重是一个重要的农艺性状&#xff0c;通过对其的测量和研究&#xff0c;我们可以更好地理解作物的生长状况&#xff0c;优化农业生产&#xff0c;提高作物产量和品质。但数籽粒数目是一个很繁琐和痛苦的过程&#xff0c;我们现在用一个简单的python程序来数水稻籽粒。代码的…

React之基础项目搭建

前言 React的生态系统非常庞大&#xff0c;拥有大量的第三方库和工具&#xff0c;如React Native&#xff08;用于构建原生移动应用&#xff09;、Next.js&#xff08;用于构建服务器渲染应用&#xff09;、Create React App&#xff08;用于快速搭建React应用的脚手架&#x…

【机器学习300问】62、若想将逻辑回归用于多分类有哪些常见做法?

逻辑回归算法在设计之初是用于二分类问题的&#xff0c;但若想把它用在多分类上也不是不行&#xff0c;这得看你具体面临的多分类问题是什么样的&#xff08;问题的定义&#xff09;。不同的问题就有不同的应对之策&#xff1a; 一、一对一 &#xff08;1&#xff09;方法的原…

【JavaEE】_Spring MVC项目获取Header

目录 1. 使用Servlet原生方法获取Header 2. 使用Spring注解获取Header 1. 使用Servlet原生方法获取Header .java文件内容如下&#xff1a; package com.example.demo.controller;import com.example.demo.Person; import org.springframework.web.bind.annotation.*; impor…

Linux-等待子进程

参考资料&#xff1a;《Linux环境编程&#xff1a;从应用到内核》 僵尸进程 进程退出时会进行内核清理&#xff0c;基本就是释放进程所有的资源&#xff0c;这些资源包括内存资源、文件资源、信号量资源、共享内存资源&#xff0c;或者引用计数减一&#xff0c;或者彻底释放。…

14届蓝桥杯 C/C++ B组 T5 接龙排序 (最长上升子序列DP+优化)

不难发现这是一个LIS问题&#xff0c;但是如果直接套用LIS的模版&#xff0c;在数据范围到达 1 e 5 1e5 1e5 的情况下&#xff0c;就只能够得到一半的分数&#xff0c;所以我们需要对其进行优化。 首先给出暴力的代码&#xff1a; #include<iostream> using namespace…

python+django教师业绩考评考核评分系统flask

在设计过程中&#xff0c;将参照一下国内外的一些同类网站&#xff0c;借鉴下他们的一些布局框架&#xff0c;将课题要求的基本功能合理地组织起来&#xff0c;形成友好、高效的交互过程。开发的具体步骤为&#xff1a;   第一步&#xff0c;进行系统的可行性分析&#xff0c…