Kafka 数据重复怎么办?(案例)

news2024/12/25 10:32:31

一、前言

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:

  1. 生产端: 遇到异常,基本解决措施都是 重试
    • 场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。
    • 场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。
    • 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。
  2. 消费端: poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。

怎么解决?

先来了解下消息的三种投递语义:

  • 最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如: mqttQoS = 0
  • 至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如: mqttQoS = 1
  • 精确一次(exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如: mqttQoS = 2

了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

  1. Kafka 幂等性 Producer 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
  2. Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。
  3. **消费端幂等: ** 保证消费端接收消息幂等。蔸底方案。

1)Kafka 幂等性 Producer

**幂等性指:**无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

幂等性使用示例:在生产端添加对应配置即可

Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
复制代码
  1. 设置幂等,启动幂等。
  2. 配置 acks,注意:一定要设置 acks=all,否则会抛异常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5,否则会抛异常 OutOfOrderSequenceException
    • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
    • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

为了更好理解,需要了解下 Kafka 幂等机制:

  1. Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)
  2. Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num
  3. 判断是否重复:<pid, seq num>Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在
    • 如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。
    • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
    • 反之,要么重复,要么丢消息,均拒绝。

这种设计针对解决了两个问题:

  1. 消息重复: 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。
  2. 消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:

  1. 如果已经使用 acks=all,使用幂等也可以。
  2. 如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

2)Kafka 事务

使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。

Tips 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

事务使用示例:分为生产端 和 消费端

Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待数
props.put("transactional.id", "my-transactional-id");  // 4. 设定事务 id

Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 初始化事务
producer.initTransactions();

try{
    // 开始事务
    producer.beginTransaction();

    // 发送数据
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
 
    // 数据发送及 Offset 发送均成功的情况下,提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 数据发送或者 Offset 发送出现异常时,终止事务
    producer.abortTransaction();
} finally {
    // 关闭 Producer 和 Consumer
    producer.close();
    consumer.close();
}
复制代码

这里消费端 Consumer 需要设置下配置:isolation.level 参数

  • read_uncommitted 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。

  • read_committed 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

3)消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

典型的方案是使用:消息表,来去重:

  • 上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。
  • 如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。


 

二、案例:Kafka 幂等性 Producer 使用

环境搭建可参考:链接

准备工作如下:

  1. Zookeeper:本地使用 Docker 启动

    $ docker run -d --name zookeeper -p 2181:2181 zookeeper
    a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
    复制代码
  2. Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)

  3. 启动生产者:Kafka 源码中 exmaple

  4. 启动消息者:可以用 Kafka 提供的脚本

    # 举个栗子:topic 需要自己去修改
    $ cd ./kafka-2.7.1-src/bin
    $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
    复制代码

创建 topic 1副本,2 分区

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
复制代码

生产者代码:

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord 
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {

        final Properties props = new Properties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}
复制代码

启动生产者后,控制台输出如下:

启动消费者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
复制代码

修改配置 acks

启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:

  • 修改配置 acks = 1
  • 修改配置 acks = 0

会直接报错:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. 
Otherwise we cannot guarantee idempotence.
复制代码

修改配置 max.in.flight.requests.per.connection

启用幂等的情况下,调整此配置,结果是怎样的:

  • max.in.flight.requests.per.connection > 5 会怎样?

当然会报错:

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
复制代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/63976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Scanner、Random、stirng

API的使用 API : Application Programming Interface [应用程序编程接口] -> 帮助文档,词典 [对JDK的翻译文档][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aWo9KVNQ-1670235353300)(https://cdn.staticaly.com/gh/quinhua/picsmain/markdown/…

RabbitMQ发布确认高级

在生产环境中由于一些不明原因&#xff0c;导致 RabbitMQ 重启&#xff0c;在 RabbitMQ 重启期间生产者消息投递失败&#xff0c;导致消息丢失&#xff0c;需要手动处理和恢复。 在这样比较极端的情况&#xff0c;RabbitMQ 集群不可用的时候&#xff0c;无法投递的消息该如何处…

Chrome安装油猴插件详细教程

Chrome安装油猴插件详细教程 一、油猴安装方法 方法一&#xff1a;Google官方商店安装&#xff08;推荐&#xff0c;需要科学上网&#xff09; 方法二&#xff1a;本地安装&#xff08;无需科学上网&#xff0c;不会科学上网的适用&#xff09; 二、安装油猴插件 方法一&am…

FineReport数据图表制作-标签控件

1. 概述 1.1 版本 报表服务器版本 功能变更 11.0 -- 1.2 应用场景 「标签控件」不支持填报应用&#xff0c;只能在参数页面下使用。如下图所示&#xff1a; 注&#xff1a;移动端不识别「标签控件」&#xff0c;识别的是普通控件的「标签名称」属性&#xff0c;所以移动端…

我不说你知道 DotImage SDK之DotViewer 在线文档查看编辑?

关注我 日日新&#xff0c;关注我 一手料&#xff0c;关注我 懂国外&#xff0c;关注我 更新快&#xff0c;关注我 不玩假&#xff01;&#xff01;&#xff01;&#xff01; 开发团队被请求淹没&#xff0c;并被迫用更少的资源做更多的事情。DotViewer 旨在帮助解决这一挑战&…

sequencer和sequence

●了解了sequencer与driver之间传递sequence item的握手过程,同时也掌握了sequence与item之间的关系。 ●接下来需要就sequence挂载到sequencer的常用方法做出总结,大家可以通过对这些常用方法和宏的介绍&#xff0c;了解到它们不同的使用场景。 ●面对多个sequence如果需要同时…

Python学习----静态web服务器

开发静态web服务器 开发步骤&#xff1a; 1、编写一个TCP服务端程序 2、获取浏览器发送的HTTP请求报文数据 3、读取固定页面数据&#xff0c;把页面数据组装HTTP响应报文数据发送给浏览器 4、HTTP响应报文数据发送完成之后&#xff0c;关闭服务于客户端的套接字 import socke…

(Java)Mybatis学习笔记(四)

前言 继续学习自定义映射&#xff0c;今天便是mybatis初步学完的最后一天了&#xff0c;加油&#xff0c;奥里给~ 搭建MyBatis框架 步骤说明 创建表时把email打成了eamil&#xff0c;导致报了下错 1️⃣在mybatis下创建一个module 2️⃣配置pom.xml文件&#xff0c;导入相…

[附源码]计算机毕业设计基于springboot的桌游信息管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

深度解析布谷鸟过滤器(上篇)

深度解析布谷鸟过滤器 0 引言 布隆过滤器&#xff08;Bloom Filter&#xff09;&#xff0c;诞生于UNIX元年&#xff08;1970年&#xff09;的一个老牛逼的过滤器&#xff0c;与时间戳同寿&#xff0c;经久不衰老而弥坚&#xff0c;查重性能至今令人非常满意。美中不足的是有…

Windows任务计划程序Task Scheduler笔记

微软文档居然搜不到了 Windows任务计划程序已经存在许多年了&#xff0c;原来在微软的TechNet上有详细的操作介绍的&#xff0c;现在发现网站改版&#xff0c;原来的介绍居然搜索不到了&#xff0c;微软的平台上出现这种事情&#xff0c;也是比较吃惊了。 添加任务计划不难 …

ORB-SLAM2 ---- Tracking::Relocalization函数

目录 1.函数作用 2.步骤 3.code 4.函数解释 4.1 将当前帧的描述子转化为BoW向量 4.2 用词袋找到与当前帧相似的候选关键帧 4.3 遍历所有的候选关键帧&#xff0c;通过词袋进行快速匹配&#xff0c;用匹配结果初始化PnP Solver 4.4 通过一系列操作&#xff0c;直到找到…

项目成本管理软件能为你做什么?

成本管理与企业在当前以及未来项目中取得成功的能力密切相关。投资可靠的项目成本管理软件可以带来巨大的节约。一个好的成本管理解决方案不会把它当作一个孤立的功能&#xff0c;而是把它作为项目和投资组合绩效的关键因素加以利用&#xff0c;并在各个项目之间进行数据关联。…

金山表单结果如何自动通知企业微信

金山表单内置了丰富的模版&#xff0c;从表单、接龙、问卷、投票&#xff0c;可以满足你各种表单数据数据收集的需求。但是很多用户经常也会有一个痛点&#xff0c;通过金山表单收集的信息&#xff0c;如何才能实时通知企业微信/钉钉/飞书呢&#xff1f; 比如防疫登记、安全复工…

1.2 异步相关概念:深入了解

1.同步(Synchronous) VS 异步(Asynchronous) 所谓同步&#xff0c;可以理解为每当系统执行完一段代码或者函数后&#xff0c;系统将一直等待该段代码或函数返回的值或消息&#xff0c;直到系统接收到返回的值或消息后才继续往下执行下一段代码或者函数&#xff0c;在等待返回值…

汽车行业大趋势——软件定义汽车

文章目录 前言一、软件定义汽车的驱动力二、SOA架构在软件定义汽车中的作用三、车载软件架构&#xff08;内核、中间件、应用层&#xff09;长期趋势总结前言 最早在2007年4月份的IEEE会议中提出“软件定义汽车”&#xff08;SDV&#xff0c;Software Define Vehicle&#xff…

flask-admin菜鸟学习笔记

近期在工作中需要维护若干个信息表&#xff0c;在这个过程中需要经常对表格进行操作、交叉操作、各个表格同步&#xff0c;和某平台信息同步。。。在此过程中需要建立一个“隐性”的流程&#xff0c;要第一步同步A和B&#xff0c;再同步B和C&#xff0c;。。。而检索更是痛苦&a…

Python 中 4 个高效的技巧!

今天我想和大家分享 4 个省时的 Python 技巧&#xff0c;可以节省 10~20% 的 Python 执行时间。 反转列表 Python 中通常有两种反转列表的方法&#xff1a;切片或 reverse() 函数调用。这两种方法都可以反转列表&#xff0c;但需要注意的是内置函数 reverse() 会更改原始列表…

最近面了12个人,发现这个测试基础题都答不上来...

一般面试我都会问一两道很基础的题目&#xff0c;来考察候选人的“地基”是否扎实&#xff0c;有些是操作系统层面的&#xff0c;有些是 python语言方面的&#xff0c;还有些… 深耕IT行业多年&#xff0c;我们发现&#xff0c;对于一个程序员而言&#xff0c;能去到一线互联网…

托管海外服务器有哪些要求?

对于成长中的企业来说&#xff0c;需要使用自己的Web 服务器来有效地控制网站运营安全以及关键数据管理。但与此同时&#xff0c;创建一个高效的数据中心需要巨大的基础设施成本&#xff0c;24小时现场经验丰富的技术和管理支持团队来管理所有数据中心需求、服务器和数据等等。…