【大数据学习 | kafka】producer的参数与结构

news2024/12/23 18:20:25

1. producer的结构

producer:生产者

它由三个部分组成

interceptor:拦截器能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的

serialiazer:序列化器kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定

partitioner: 分区器主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

record accumulator

本地缓冲累加器 默认32M

producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.msbatch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率

batch-size 默认16KB

linger.ms 默认为0

生产者部分的整体流程

首先producer将发送的数据准备好

经过interceptor的拦截器进行处理,如果有的话

然后经过序列化器进行转换为相应的byte[]

经过partitioner分区器分类在本地的record accumulator中缓冲

sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

2. producer的简单代码

2.1 准备:

引入maven依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>

在resources文件中创建log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

2.2 生产者中的设定参数

参数含义
bootstrap.serverskafka集群的地址
key.serializerkey的序列化器,这个序列化器必须和key的类型匹配
value.serializervalue的序列化器,这个序列化器必须和value的类型匹配
batch.size批次拉取大小默认是16KB
linger.ms拉取的间隔时间默认为0,没有延迟
partitioner分区器存在默认值
interceptor拦截器选的

2.3 全部代码

public class producer_test {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        //设定集群地址
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        //batch-size默认是16KB,参数的单位是byte
        pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        //默认等待批次时长是0
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
        //发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值
        producer.send(record);
        producer.close();
    }
}

在x-shell中观察消费的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2230319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基本语法和基础数据类型——针对实习面试

目录 Java基本语法和基础数据类型标识符和关键字有什么区别&#xff1f;Java关键字有哪些&#xff1f;Java基本数据类型有哪些&#xff1f;什么是自动装箱和拆箱&#xff1f;自动装箱&#xff08;Autoboxing&#xff09;自动拆箱&#xff08;Unboxing&#xff09; 自动装箱和拆…

自动化测试工具Ranorex Studio(十九)-其他编辑选项

失败继续运行和禁用 表中列出的每个Action条目&#xff0c;都可以设置为禁用或 “失败继续运行”。 设置action条目为“失败继续运行”时&#xff0c;如果遇到错误&#xff0c;模块不会停在那个位置&#xff0c;而是继续执行。 您可以通过右键菜单或属性窗口设置这两个选项。 设…

知识见闻 - Workday公司介绍

人力资源“一哥”Workday的前世今生 01 Duffield 既然要聊Workday&#xff0c;我们首先要认识一个人。David Duffield&#xff0c;又一位企业软件服务行业的绝对大神。 大卫杜菲尔德&#xff08;David Duffield&#xff09;出生于1941年。 40岁&#xff0c;很多职场人都已经认命…

VS2022配置调试Qt源代码

需要保证源代码和项目使用的版本匹配&#xff0c;符号需要注意是64位还是32位&#xff0c;并且用msvc。 1. 设置源代码路径 2. 设置调试PDB路径 这里最好把4个地方都加进去&#xff0c;防止某些不常用PDB被漏掉。 D:\Qt\5.15.2\msvc2019_64\bin D:\Qt\5.15.2\msvc2019_64\lib…

利用腾讯元器构建商业化AI智能体——【快递100 AI智能体实战教学】

写在开头 随着人工智能技术的不断进步&#xff0c;腾讯元器作为一项强大的工具&#xff0c;使得构建商业化的AI智能体变得更加便捷和高效。本文将带你深入了解如何利用腾讯元器搭建快递100 AI智能体的全过程&#xff0c;从前期规划到最终实现&#xff0c;为你提供一份详尽的实…

「C/C++」C++ 设计模式 之 单例模式(Singleton)

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

【前端基础】CSS基础

目标&#xff1a;掌握 CSS 属性基本写法&#xff0c;能够使用文字相关属性美化文章页。 01-CSS初体验 层叠样式表 (Cascading Style Sheets&#xff0c;缩写为 CSS&#xff09;&#xff0c;是一种 样式表 语言&#xff0c;用来描述 HTML 文档的呈现&#xff08;美化内容&#…

【Oracle】空格单字符通配符查询匹配失败

问题 在进行模糊查询的时候&#xff0c;通过全局任意字符串匹配出含有两个字刘姓的人&#xff0c;但是通过刘_不能匹配出结果。 解决 检查后发现&#xff0c;姓名中包含空格 SELECT * FROM student WHERE TRIM(sname) LIKE 刘_;第一种解决方案就是查询的时候进行去空格处理&a…

查看多个通道32bit音频pcm数据

本文分析一个32位多通道pcm数据&#xff0c;一方面简单解释一下pcm数据格式&#xff0c;另外一方面看清楚实际数据的精度是多少。 说明&#xff1a;这是一个alsa采集到的10路32bit的pcm数据。 使用bc打开&#xff0c;16进制数据显示如下图&#xff1a; 图中左边是一个10通道32…

【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割!

【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割&#xff01; 【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割&#xff01; 文章目录 【语义分割|代码解析】CMTFNet-2: …

Webpack 配置module.css报错Uncaught TypeError: Cannot read properties of undefined

我的项目结构如下: 入口文件是index.jsx&#xff0c;组件Button.jsx使用了样式button.module.css .btn {background-color: #4CAF50;border: none;color: white; padding: 15px 32px;text-align: center;text-decoration: none;display: inline-block;font-size: 16px;margin:…

Redis(2):内存模型

一、Redis内存统计 工欲善其事必先利其器&#xff0c;在说明Redis内存之前首先说明如何统计Redis使用内存的情况。 在客户端通过redis-cli连接服务器后&#xff08;后面如无特殊说明&#xff0c;客户端一律使用redis-cli&#xff09;&#xff0c;通过info命令可以查看内存使用情…

Docker打包自己项目推到Docker hub仓库(windows10)

一、启用Hyper-V和容器特性 1.应用和功能 2.点击程序和功能 3.启用或关闭Windows功能 4.开启Hyper-V 和 容器特性 记得重启生效&#xff01;&#xff01;&#xff01; 二、安装WSL2&#xff1a;写文章-CSDN创作中心https://mp.csdn.net/mp_blog/creation/editor/143057041 三…

python如何安装扩展包

1、扩展包 Python安装额外的扩展包&#xff0c;一般使用anconda进行管理。 1、1安装命令 一般我们在anconda中通过pip install 包名 的方式进行安装&#xff0c;不过由于这些包在国外下载&#xff0c;因此需要配置合适的镜像促使其下载更快。 1、2 镜像源配置 1、2、1 一次…

【风冷换热器设计及计算

某空气调节工程需要将33.3kg/s的空气&#xff0c;由干球温度t123℃、相对湿度ψ160&#xff05;&#xff0c;冷却到干球温度t212℃、相对湿度ψ290&#xff05;&#xff0c;冷凝器出口为40℃的饱和液体&#xff0c;制冷剂采用R22&#xff0c;试设计一台直接蒸发式空气冷却器。 …

LabVIEW程序员赚钱不仅限于上班

LabVIEW程序员拥有多种途径来实现财富增值&#xff0c;而不仅仅局限于传统的全职工作。以下是一些他们可以利用自身技能和专业知识实现更高财务收益的方法&#xff1a; 1. 专注领域的自由职业与合同工作 制造、科研、医疗技术等行业都需要LabVIEW的专业知识。通过自由职业&…

C++ shared_ptr 动态内存

C shared_ptr 动态内存 智能指针 为了更加容易的使用动态内存&#xff0c;新的标准库提供了智能指针&#xff0c;用来管理动态对象&#xff0c;智能指针的行为类似常规指针&#xff0c;重要的区别是他负责自动释放所指的对象。新标准库提供的智能指针的区别在于管理底层指针的…

动态代理:面向接口编程,屏蔽RPC处理过程

RPC远程调用 使用 RPC 时&#xff0c;一般的做法是先找服务提供方要接口&#xff0c;通过 Maven把接口依赖到项目中。在编写业务逻辑的时候&#xff0c;如果要调用提供方的接口&#xff0c;只需要通过依赖注入的方式把接口注入到项目中&#xff0c;然后在代码里面直接调用接口…

Spring Boot:打造动态定时任务,开启灵活调度之旅

一、描述 在 Spring Boot 中设置动态定时任务是一种非常实用的功能&#xff0c;可以根据实际需求在运行时动态地调整定时任务的执行时间、频率等参数。以下是对 Spring Boot 设置动态定时任务的简单介绍&#xff1a; 1、传统定时任务的局限性 在传统的 Spring Boot 定时任务…

Chrome与夸克谁更节省系统资源

在当今数字化时代&#xff0c;浏览器已经成为我们日常生活中不可或缺的一部分。无论是工作、学习还是娱乐&#xff0c;我们都依赖于浏览器来访问互联网。然而&#xff0c;不同的浏览器在性能和资源消耗方面存在差异。本文将探讨Chrome和夸克两款浏览器在系统资源消耗方面的表现…