Kafka3.0.0版本——生产者 数据去重

news2024/12/26 11:35:48

目录

    • 一、数据传递语义
      • 1.1、至少一次
      • 1.2、最多一次
      • 1.3、精确一次
    • 二、幂等性
      • 2.1、幂等性原理
      • 2.2、重复数据的判断标准
      • 2.3、如何使用幂等性
    • 三、生产者 事务
      • 3.1、Kafka事务原理
      • 3.2、Kafka事务注意事项
      • 3.3、Kafka事务的5个API
        • 3.3.1、初始化事务API
        • 3.3.2、开启事务API
        • 3.3.3、在事务内提交已经消费的偏移量API
        • 3.3.4、提交事务API
        • 3.3.5、放弃事务API
    • 3.4、单个 Producer,使用事务保证消息的仅一次发送的示例代码

一、数据传递语义

1.1、至少一次

  • 至少一次(At Least Once )的含义
    生产者发送数据到kafka集群,kafka集群至少接收到一次数据。
  • 至少一次的条件:
    ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

1.2、最多一次

  • 最多一次(At Most Once )的含义
    生产者发送数据到kafka集群,kafka集群最多接收到一次数据。
  • 最多一次的条件:
    ACK级别设置为0

1.3、精确一次

  • 精确一次( Exactly Once )的含义
    对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
  • 精确一次的条件:
    幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2)

二、幂等性

2.1、幂等性原理

  • 幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

2.2、重复数据的判断标准

  • 具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的Partition 表示分区号Sequence Number是单调自增的
  • 所以幂等性只能保证的是在单分区单会话内不重复

2.3、如何使用幂等性

  • 开启参数 enable.idempotence 默认为 true,false关闭。
  • 官网描述如下图:
    在这里插入图片描述

三、生产者 事务

3.1、Kafka事务原理

在这里插入图片描述

3.2、Kafka事务注意事项

  • Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务
  • 开启事务,必须开启幂等性 。

3.3、Kafka事务的5个API

3.3.1、初始化事务API

  • 初始化事务
    在这里插入图片描述

3.3.2、开启事务API

  • 开启事务
    在这里插入图片描述

3.3.3、在事务内提交已经消费的偏移量API

  • 在事务内提交已经消费的偏移量(主要用于消费者)
    在这里插入图片描述

3.3.4、提交事务API

  • 提交事务
    在这里插入图片描述

3.3.5、放弃事务API

  • 放弃事务(类似于回滚事务的操作)
    在这里插入图片描述

3.4、单个 Producer,使用事务保证消息的仅一次发送的示例代码

  • 示例代码

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerTranactions {
    
        public static void main(String[] args) {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //指定事务id
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactionalId_01");
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //初始化事务
            kafkaProducer.initTransactions();
            //开启事务
            kafkaProducer.beginTransaction();
    
            try {
                //5、调用 send 方法,发送消息
                for (int i = 0; i < 5; i++) {
                    kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i));
                }
                //提交事务
                kafkaProducer.commitTransaction();
            } catch (Exception e) {
                //终止事务
                kafkaProducer.abortTransaction();
            } finally {
                //6、关闭资源
                kafkaProducer.close();
            }
        }
    }
    
  • 在kafka集群上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
    
  • 在 IDEA 中执行代码,观察kafka集群控制台中是否接收到消息。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/462969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CMake Tutorial Step1

CMake Tutorial Step1 参考资料&#xff1a;Step 1: A Basic Starting Point — CMake 3.26.3 Documentation Tutorial工程&#xff1a;官方Tutorial工程 开发环境&#xff1a;CLion CMake简介 方便起见直接问New Bing。 为什么要学习CMake&#xff1f; CMake的最大特点和…

微服务---分布式搜索引擎 elasticsearch基础

分布式搜索引擎 elasticsearch基础 0.学习目标 1.初识elasticsearch 1.1.了解ES 1.1.1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff0c;可以帮助我们从海量数据中快速找到需要的内容 例如&#xff1a; 在GitH…

centos7操作yum命令失败

前言设置网卡开机自动启动设置国内dns服务器系统修改CentOS-Base.repo中的地址 前言 刚安装完的CentOS7的系统&#xff0c;发现无法使用yum命令进行更新&#xff0c;在更新的时候会出现下面这种内容&#xff0c;为此问题有以下这些解决方案可以尝试。 One of the configured r…

两段视频合成一个视频用什么软件 怎么把两段视频合成一段看不出来

两段视频合成一个视频用什么软件&#xff1f;无论是两段视频的合成&#xff0c;还是三段视频的合成&#xff0c;用视频编辑软件都能轻松搞定。但怎么把两段视频合成一段看不出来&#xff1f;这就比较考验制作者的功力了&#xff0c;不过我们还是有捷径的&#xff0c;下面一起来…

new和delete

目录 malloc: 开辟失败&#xff1a;返回值为空指针 new: 内置类型&#xff1a; 申请一个int对象&#xff08;开辟一块存储int类型数据的空间&#xff0c;只能存储一个int数据&#xff09;&#xff1a; 申请5个int对象&#xff08;开辟一块存储int类型数据的空间&#xff…

Blender3.5 边的操作

目录 1. 边操作1.1 边的细分 Subdivide1.2 边的滑移 Edge Slide1.3 边的删除1.4 边的溶解 Dissolve1.5 边线倒角 Bevel1.6 循环边 Loop Edges1.7 并排边 Ring Edges1.8 桥接循环边 1. 边操作 1.1 边的细分 Subdivide 在边选择模式&#xff0c;选中一条边&#xff0c;右键&…

JVM系列(十一) 垃圾收集器之 Concurrent Mark Sweep 并发标记清除

垃圾收集器之 Concurrent Mark Sweep 并发标记清除 上几篇文章我们讲解了单线程垃圾收集器 Serial/SerialOld ,多线程垃圾收集器 Parallel Scavenge/Old, 本文我们讲解下 Concurrent Mark Sweep 简称CMS垃圾收集器 垃圾收集器 新生代收集器&#xff1a; Serial、ParNew、Par…

图解 | 原来这就是网络

​​ 你是一台电脑&#xff0c;你的名字叫 A 很久很久之前&#xff0c;你不与任何其他电脑相连接&#xff0c;孤苦伶仃。 ​ 直到有一天&#xff0c;你希望与另一台电脑 B 建立通信&#xff0c;于是你们各开了一个网口&#xff0c;用一根网线连接了起来。 ​ 用一根网线连接起来…

[晕事]今天做了件晕事7

今天在使用iptables与grep的时候碰到一件晕事&#xff1b; 第一步添加了一条rule到OUTPUT&#xff1a; iptables -A OUTPUT --source 10.87.51.2 --destination 10.87.51.10 -p tcp --sport 5060 -j DROP 第二步使用&#xff1a;iptables -nL | grep DROP 发现这条记录跑到了FO…

玩转ESP32 PWM输出,制作炫酷呼吸灯效果

文章目录 什么是PWM软硬件使用ESP32实现PWM输出代码讲解结语 什么是PWM PWM&#xff08;Pulse Width Modulation&#xff09;是一种常用的模拟信号产生技术&#xff0c;它通过对一个定时器的计数值进行调整来改变输出信号的占空比&#xff0c;从而控制输出信号的平均电压值&am…

idea使用 ( 二 ) 创建java项目并导入依赖jar

3.创建java项目 3.1.创建普通java项目 3.1.1.打开创建向导 接 2.3.1.创建新的项目 也可以 从菜单选择建立项目 会打开下面的选择界面 3.1.2.不使用模板 3.1.3.设置项目名 Project name : 项目名 Project location : 项目存放的位置 确认创建 3.1.4.关闭tips 将 Dont s…

Spring Boot集成ShardingSphere实现数据分片(一) | Spring Cloud 40

一、背景 传统的将数据集中存储至单一节点的解决方案&#xff0c;在性能、可用性和运维成本这三方面已经难于满足海量数据的场景。 从性能方面来说&#xff0c;由于关系型数据库大多采用 B 树类型的索引&#xff0c;在数据量超过阈值的情况下&#xff0c;索引深度的增加也将使…

Mail 邮件服务

~ Postfix ~ sdskill.com 的邮件发送服务器 ~~ 支持smtps(465)协议连接,使用Rserver颁发的证书,证书路径/CA/cacert.pem ~ 创建邮箱账户“user1~user99”(共99个用户),密码为Chinaskill20!; ~ Dovecot ~ sdskill.com 的邮件接收服务器; ~ 支持imap…

6.微服务项目实战---Sleuth--链路追踪

6.1 链路追踪介绍 在大型系统的微服务化构建中&#xff0c;一个系统被拆分成了许多模块。这些模块负责不同的功能&#xff0c;组合成 系统&#xff0c;最终可以提供丰富的功能。在这种架构中&#xff0c;一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上…

Docker compose-实现多服务、nginx负载均衡、--scale参数解决端口冲突问题

Docker compose-实现多服务、nginx负载均衡、--scale参数解决端口冲突问题 问题&#xff1a;scale参数端口冲突解决方法&#xff1a;nginx实现多服务、负载均衡修改docker-compose.yml配置新增nginx本地配置文件验证启动容器查看容器状态访问web应用 问题&#xff1a;scale参数…

《二》HTTP 请求报文和响应报文、请求方法、状态码

请求报文和响应报文&#xff1a; 请求报文: 客户端向服务器发送的请求信息&#xff0c;就叫做请求报文。 客户端发送一个 HTTP 请求到服务器&#xff0c;请求信息包含四部分&#xff1a;请求行、请求头、空行、请求体。 请求行&#xff1a;包含三部分&#xff0c;分别是请…

查看库文件是32位还是64位|查看lib是静态库还是导入库|判断是debug模式还是release模式

文章目录 dll位数查看lib位数查看查看lib库是静态库还是导入库dll库文件信息查看lib库文件内容查看dll库查看编译模式是debug还是release方法一方法二方法三 lib静态库查看编译模式是debug还是release方法一方法二 lib导入库查看编译模式是debug还是release查看Linux下的.a库&a…

ROS学习第十五节——常用API(C++)

由于时间问题&#xff0c;从这一节开始只记录C实现效果&#xff0c;加油 以下附上这一节调试用的程序 https://download.csdn.net/download/qq_45685327/87708069 1.初始化函数 void init(int &argc, char **argv, const std::string& name, uint32_t options 0); …

openEuler NFS+协议全新发布:实现NAS存储性能与可靠性倍增

4月21日&#xff0c;在openEuler Developer Day 2023上&#xff0c;openEuler发布NFS协议&#xff0c;实现单客户端访问NAS存储可靠性提升3倍、性能提升6倍&#xff0c;助力NAS存储全面满足新型生产核心场景下苛刻要求。 传统NFS面临挑战 网络文件系统&#xff08;NFS&#xf…

vue打包如何开启gzip压缩

文章目录 场景gzip压缩有两种方案&#xff1a;个人实践 场景 本人前端打包的js达到了6.9M,导致网站加载很慢&#xff0c;想了下可以用gzip的方式压缩&#xff0c;减少文件大小。 “前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c…