Kafka - 3.x Producer 生产者最佳实践

news2024/9/22 19:38:15

文章目录

  • 生产经验_生产者提高吞吐量
    • 核心参数
    • Code
  • 生产经验_数据可靠性
    • 消息的发送流程
    • ACK应答机制
    • ack应答级别
    • 应答机制 小结
    • Code
  • 生产经验_数据去重
    • 数据传递语义
    • 幂等性
      • 幂等性原理
      • 开启幂等性配置(默认开启)
    • 生产者事务
      • kafka事务原理
      • 事务代码流程
  • 生产经验_数据有序
  • 生产经验_数据乱序

在这里插入图片描述


生产经验_生产者提高吞吐量

核心参数

在这里插入图片描述

Code

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerParameters {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");

        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // batch.size:批次大小,默认16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // linger.ms:等待时间,默认0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // RecordAccumulator:缓冲区大小,默认32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        // compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}
    

在这里插入图片描述


生产经验_数据可靠性

消息的发送流程

回顾下消息的发送流程如下:

在这里插入图

ACK应答机制

在这里插入图片描述

背景Kafka提供的解决方案
Leader收到数据,所有Follower开始同步数据,但有一个Follower因故障无法同步,导致Leader一直等待直到同步完成才发送ACK。- Leader维护了一个动态的In-Sync Replica Set (ISR)和Leader保持同步的Follower集合。
- 当ISR中的Follower完成数据同步后,Leader向Producer发送ACK。
- 如果某个Follower长时间(replica.lag.time.max.ms)未向Leader同步数据,则该Follower将被移出ISR。
- 在Leader发生故障时,将从ISR中选举新的Leader。

ack应答级别

在这里插入图片描述

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置

acks描述
0提供最低延迟,Leader副本接收消息后返回ack,尚未写入磁盘。可能导致数据丢失,特别是在Leader发生故障时。
1Leader副本将消息写入磁盘后返回ack,但如果Leader在Follower副本同步数据之前发生故障,可能会丢失数据。
-1或者 (all) ,Leader和所有Follower副本都将消息写入磁盘后才返回ack。如果在Follower副本同步完成后,Leader副本在发送ack之前发生故障,可能会导致数据重复。

应答机制 小结

在这里插入图片描述


Code

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerAck {

    public static void main(String[] args) throws InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");

        // key,value序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 重试次数retries,默认是int最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-ack" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}
    

在这里插入图片描述

生产经验_数据去重

数据传递语义

在这里插入图片描述


幂等性

幂等性原理

在这里插入图片描述

开启幂等性配置(默认开启)

在prudocer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。


生产者事务

kafka事务原理

在这里插入图片描述

事务代码流程

// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

生产经验_数据有序

在这里插入图片描述


生产经验_数据乱序

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1141649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode—121.买卖股票的最佳时机【简单】

2023每日刷题&#xff08;十一&#xff09; Leetcode—17.电话号码的字母组合 枚举法题解 参考自灵茶山艾府 枚举法实现代码 int maxProfit(int* prices, int pricesSize){int i;int max 0;int minPrice prices[0];for(i 1; i < pricesSize; i) {int tmp prices[i] -…

拼多多根据ID取商品详情 API 返回值说明

item_get-根据ID取商品详情 pinduoduo.item_get 公共参数 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;获取key和secret接入secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请求地址中&#xff09;[item_sea…

Small Tip: 如何实现从Eclipse里面直接跳转到Analysis for Office

查看ADSO或者CP的数据时&#xff0c;一般情况下&#xff0c;预览只能有这两个选项。 可以扩展成以下这样&#xff1a; 方法&#xff1a; SPRO-> 选参数&#xff0c;填文本。然后重启Eclipse.

Web:探索 SpreadJS强大的在线电子表格库

1、概述 SpreadJS 是葡萄城结合 40 余年专业控件技术和在电子表格应用领域的经验而推出的纯前端表格控件,基于 HTML5,兼容 450 多种 Excel 公式,具备“高性能、跨平台、与 Excel 高度兼容”的产品特性,SpreadJS 在界面和功能上与 Excel 高度类似,但又不局限于 Excel,而是…

模拟算法及其优化

第一题 替换所有问号 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 class Solution { public:string modifyString(string s) {string ret;for(int i0;i<s.size();i){if(i0){if(s[i]?&&i1<s.size()){for(char aa;a<z;a){if(a!s…

搭建产品技术说明书,借助工具10分钟搞定!

说明书是新产品上市销售时所配套的非常重要的文件&#xff0c;是新产品的产品包非常重要的组成部分&#xff0c;其编写质量的好坏&#xff0c;直接影响了后续客户和用户对新产品的满意度&#xff0c;是新产品好口碑的重要来源&#xff0c;同时也是后续售后服务工作的起始点。 业…

大数据Flink(一百零四):SQL任务参数配置

文章目录 SQL任务参数配置 一、参数设置方式

bitlocker 加密锁定的固态硬盘,更换到别的电脑上,怎么把原密钥写进新电脑TPM芯片内,开启无需手动填密钥

环境: Win11 专业版 联想E14笔记本 512G ssd 问题描述: 一台笔记本因充电故障,需要拿去维修,不想重装系统,将bitlocker 加密锁定的固态硬盘拆下更换到别的笔记本电脑上,现在开机要手动填密钥,怎么把原密钥写进新电脑TPM芯片内,开启无需手动填密钥和之前那台电脑一…

广州华锐互动:VR虚拟现实物理学习平台,开启数字化教学新格局

随着虚拟现实(VR)技术的不断发展&#xff0c;越来越多的领域开始应用这一技术。广州华锐互动开发的VR虚拟现实物理学习平台就得到了广泛应用&#xff0c;平台涉及力学、光学、热学等初中物理知识&#xff0c;还包含了物理名人、实验器具、物理现象的还原和学习&#xff0c;相比…

AI与Prompt:解锁软件开发团队的魔法咒语,在复杂任务上生成正确率更高的代码

AI与Prompt&#xff1a;解锁软件开发团队的魔法咒语 写在最前面论文&#xff1a;基于ChatGPT的自协作代码生成将团队协作理论应用于代码生成的研究自协作框架原理1、DOL任务分配2、共享黑板协作3、Instance实例化 案例说明简单任务&#xff1a;基本操作&#xff0c;生成的结果1…

竞赛 深度学习大数据物流平台 python

文章目录 0 前言1 课题背景2 物流大数据平台的架构与设计3 智能车货匹配推荐算法的实现**1\. 问题陈述****2\. 算法模型**3\. 模型构建总览 **4 司机标签体系的搭建及算法****1\. 冷启动**2\. LSTM多标签模型算法 5 货运价格预测6 总结7 部分核心代码8 最后 0 前言 &#x1f5…

力扣:141. 环形链表(Python3)

题目&#xff1a; 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的…

Android笔记(八):基于CameraX库结合Compose和传统视图组件PreviewView实现照相机画面预览和照相功能

CameraX是JetPack库之一&#xff0c;通过CameraX可以向应用增加相机的功能。在下列内容中&#xff0c;将介绍一个结合CameraX实现一个简单的拍照应用。本应用必须采用Android SDK 34。并通过该简单示例&#xff0c;了解传统View层次组件的UI组件如何与Compose组件结合实现移动应…

【pwn入门】使用python打二进制

声明 本文是B站你想有多PWN学习的笔记&#xff0c;包含一些视频外的扩展知识。 程序网络交互初体验 将程序部署成可以远程访问的 socat tcp-l:8877,fork exec:./question_1_plus_x64,reuseaddr通过网络访问程序 nc 127.0.0.1 8877攻击脚本 import socket import telnetli…

PHP简单实现预定义钩子和自定义钩子

在PHP中&#xff0c;钩子&#xff08;Hooks&#xff09;是一种机制&#xff0c;允许开发人员在特定的时机插入自定义代码。通过使用钩子&#xff0c;开发人员可以在应用程序的特定事件发生时执行自定义的功能或逻辑 钩子有两种类型&#xff1a;预定义钩子和自定义钩子。 预定…

shell的执行流控制

目录 1.for语句 2.条件语句 while...do语句 until...do 语句 if...then...elif...then...else...fi 语句 3.case语句 4.expect 5.break,continue,exit 1.for语句 作用&#xff1a;为循环执行动作 for语句结构 for //定义变量 do //使用变量&#xff0…

系列十八、请描述下bean的生命周期

一、概述 bean的生命周期是指bean从创建到销毁的整个过程。 二、生命周期 bean的生命周期是指bean从创建到销毁的整个过程&#xff0c;大致可以分为如下四个过程&#xff1a; 2.1、实例化 实例化可以通过如下几种方式完成&#xff1a;&#xff08;参考系列十五&#xff09…

区块链物联网中基于属性的私有数据共享与脚本驱动的可编程密文和分散密钥管理

Attribute-Based Private Data Sharing With Script-Driven Programmable Ciphertext and Decentralized Key Management in Blockchain Internet of Things 密钥生成算法 第 1 步&#xff1a;对于属性集A 的用户IDk&#xff0c;他首先将属性集A发送给Pi并且计算 &#xff0c…

MySQL大揭秘:深入探索Linux数据库管理

1 数据库的基本介绍 1.1 数据库的基本介绍 数据库的本质就是高级的excle表格。常见的数据库有&#xff1a;MySQL、Oracle 、sqlservermariadb就是MySQL的一种。数据库常用的名词 1.字段 &#xff1a;表格中的表头,每一列就是一个字段 2.表 &#xff1a;表格 3.库 &#xff1a…

Corel Products Keygen-X-FORCE 2023(Corel会声会影2023注册机)

Corel All Products Universal Keygens通用注册机是一款非常实用的激活工具&#xff0c;专门用于激活Corel全系列产品。尤其是被广泛使用的CorelDRAW作图软件和Corel VideoStudio会声会影视频编辑处理软件。小编也是一直关注由X-Force团队制作的注册机&#xff0c;目前已更新至…