kafka如何保证消息不丢失

news2025/1/4 20:27:24

                Kafka发送消息是异步发送的,所以我们不知道消息是否发送成功,所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失,那么主要有三种解决方法。

生产者(producer)端处理

生产者默认发送消息代码如下:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
 
public class KafkaMessageProducer {
 
    public static void main(String[] args) {
        // 配置Kafka生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
 
        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        String topic = "test"; // Kafka主题
 
        try {
            // 发送消息到Kafka
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
                producer.send(record);
                System.out.println("Sent message: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭Kafka生产者
            producer.close();
        }
    }
}

生产者端要保证消息发送成功,可以有两个方法:

1.把异步发送改成同步发送,这样producer就能实时知道消息的发送结果。

要将 Kafka 发送方法改为同步发送,可以使用 `send()` 方法的返回值Future<RecordMetadata>`, 并调用 `get()` 方法来等待发送完成。

以下是将 Kafka 发送方法改为同步发送的示例代码:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.RecordMetadata;
 
public class KafkaMessageProducer {
 
    public static void main(String[] args) {
        // 配置 Kafka 生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
 
        // 创建 Kafka 生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        String topic = "test"; // Kafka 主题
 
        try {
            // 发送消息到 Kafka
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
                RecordMetadata metadata = producer.send(record).get(); // 同步发送并等待发送完成
                System.out.println("Sent message: " + message + ", offset: " + metadata.offset());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 关闭 Kafka 生产者
            producer.close();
        }
    }
}

在这个示例代码中,通过调用 send(record).get() 实现了同步发送,其中 get() 方法会阻塞当前线程,直到发送完成并返回消息的元数据。

2.添加异步回调函数来监听消息发送的结果,如果发送失败,可以在回调函数里重新发送。

要保持发送消息成功并添加回调函数,你可以在发送消息的时候指定一个回调函数作为参数。回调 函数将在消息发送完成后被调用,以便你可以在回调函数中处理发送结果。

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
 
public class KafkaMessageProducer {
 
    public static void main(String[] args) {
        // 配置 Kafka 生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
 
        // 创建 Kafka 生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);
 
        String topic = "test"; // Kafka 主题
 
        try {
            // 发送消息到 Kafka
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
 
                // 发送消息并指定回调函数
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("Sent message: " + message + ", offset: " + metadata.offset());
                        } else {
                            // 这里重新发送消息
                            producer.send(record);
                            exception.printStackTrace();
                        }
                    }
                });
            }
        } finally {
            // 关闭 Kafka 生产者
            producer.close();
        }
    }
}

在这个示例代码中,我们使用了 send(record, callback) 方法来发送消息,并传递了一个实现了 Callback 接口的匿名内部类作为回调函数。当消息发送完成后,回调函数的 onCompletion() 方法会被调用。你可以根据 RecordMetadata 和 Exception 参数来处理发送结果。
另外producer还提供了一个重试参数,这个参数叫retries,如果因为网络问题或者Broker故障导致producer发送消息失败,那么producer会根据这个参数的值进行重试发送消息。

服务器端(Broker)端

Kafka Broker(服务器端)通过以下方式来确保生产者端消息发送的成功和不丢失:

1. 消息持久化(异步刷盘):Kafka Broker将接收到的消息持久化到磁盘上的日志文件中。这样即使在消息发送后发生故障,Broker能够恢复并确保消息不会丢失。(注意:持久化是由操作系统调度的,如果持久化之前系统崩溃了,那么就因为不能持久化导致数据丢失,但是Kafka没提供同步刷盘策略)

2. 复制与高可用性:Kafka支持分布式部署,可以将消息分布到多个Broker上形成一个Broker集群。在集群中,消息被复制到多个副本中,以提供冗余和高可用性。生产者发送消息时,它可以将消息发送到任何一个Broker,然后Broker将确保消息在集群中的所有副本中都被复制成功。

3. 消息提交确认:当生产者发送消息后,在收到Broker的确认响应之前,生产者会等待。如果消息成功写入并复制到了指定的副本中,Broker会发送确认响应给生产者。如果生产者在指定的时间内没有收到确认响应,它将会尝试重新发送消息,以确保消息不会丢失。

4. 可靠性设置(同步刷盘):生产者可以配置一些参数来提高消息发送的可靠性。例如,可以设置`acks`参数来指定需要收到多少个Broker的确认响应才认为消息发送成功。可以将`acks`设置为`"all"`,表示需要收到所有副本的确认响应才算发送成功。

总之,Kafka Broker通过持久化和复制机制,以及消息确认和可靠性设置,确保生产者端的消息发送成功且不丢失。同时,应注意及时处理可能的错误情况,并根据生产者端需求和场景合理配置相应的参数。

对于使用YAML文件进行Kafka配置的情况,你可以按照以下格式设置acks参数:

# Kafka生产者配置
producer:
  bootstrap.servers: your-kafka-server:9092
  acks: all        # 设置acks参数为"all"
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: org.apache.kafka.common.serialization.StringSerializer

消费者(Consumer)处理

        Kafka Consumer 默认会确保消息的至少一次传递(at least once delivery)。这意味着当 Consumer 完成对一条消息的处理后,会向 Kafka 提交消息的偏移量(offset),告知 Kafka 这条消息已被成功处理。如果 Consumer 在处理消息时发生错误,可以通过回滚偏移量来重试处理之前的消息。

以下是一些确保消息消费成功的方法:

  •  使用自动提交偏移量(Auto Commit Offsets)
  • 手动提交偏移量(Manual Commit Offsets)
  • 设置消费者的最大重试次数:
  • 设置适当的消费者参数

尽管 Kafka 提供了可靠的消息传递机制,但仍然需要在消费者端实现适当的错误处理和重试逻辑,以处理可能发生的错误情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1806205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qmt量化交易策略小白学习笔记第16期【qmt编程之获取北向南向资金(沪港通,深港通和港股通)】

qmt编程之获取北向南向资金 qmt更加详细的教程方法&#xff0c;会持续慢慢梳理。 也可找寻博主的历史文章&#xff0c;搜索关键词查看解决方案 &#xff01; 北向南向资金&#xff08;沪港通&#xff0c;深港通和港股通&#xff09; #北向南向资金交易日历 获取交易日列表…

Cartographer学习笔记

Cartographer 是一个跨多个平台和传感器配置提供 2D 和 3D 实时同步定位和地图绘制 (SLAM) 的系统。 1. 文件关系 2. 代码框架 common: 定义了基本数据结构和一些工具的使用接口。例如&#xff0c;四舍五入取整的函数、时间转化相关的一些函数、数值计算的函数、互斥锁工具等…

gcc源码分析 词法和语法分析

gcc源码分析 词法和语法分析 一、输入参数相关1、命令行到gcc二、词法与语法分析1、词法分析1.1 struct cpp_reader1.2 struct tokenrun/struct cpp_token/lookahead字段1.3 struct ht2.1 语法符号相关的结构体c_token定义如下:2.2在语法分析中实际上有多个API组成了其接口函数…

【Python】Selenium基础入门

Selenium基础入门 一、Selenium简介二、Selenium的安装三、Selenium的使用1.访问web网站2.元素定位根据标签 id 获取元素根据标签 name 属性的值获取元素根据 Xpath 语句获取元素根据标签名获取元素根据CSS选择器获取元素根据标签的文本获取元素&#xff08;精确定位&#xff0…

JVM学习-监控工具(一)

使用数据说明问题&#xff0c;使用知识分析问题&#xff0c;使用工具处理问题 无监控&#xff0c;不调优&#xff01; 命令行工具 在JDK安装目录下&#xff0c;可以查看到相应的命令行工具&#xff0c;如下图 jps(Java Process Status) 显示指定系统内所有的Hotpot虚拟机…

【算法刷题 | 动态规划08】6.9(单词拆分、打家劫舍、打家劫舍||)

文章目录 21.单词拆分21.1题目21.2解法&#xff1a;动规21.2.1动规思路21.2.2代码实现 22.打家劫舍22.1题目22.2解法&#xff1a;动规22.2.1动规思路22.2.2代码实现 23.打家劫舍||23.1题目23.2解法&#xff1a;动规23.2.1动规思路23.2.2代码实现 21.单词拆分 21.1题目 给你一…

如何获取当前dll或exe模块所在路径?

有时我们需要在当前运行的dll或exe模块中去动态加载当前模块同路径中的另一个库&#xff0c;或者启动当前模块同路径中的另一个exe程序&#xff0c;一般需要获取当前模块的路径&#xff0c;然后去构造同路径下目标模块的绝对路径&#xff0c;然后通过该绝对路径去加载或启动该目…

SpringBoot集成缓存功能

1. 缓存规范 Java Caching定义了五个核心接口&#xff0c;分别是&#xff1a;CachingProvider、CacheManager、Cache、Entry和Expiry。 CachingProvider&#xff1a;定义了创建、配置、获取、管理和控制多个CacheManager。一个应用可以在运行期访问多个CachingProvider。CacheM…

URL的编码解码(一),仅针对ASCII码字符

用十六进制对特定字符编码&#xff0c;利用百分号标识搜索字符串解码十六进制字符。 (笔记模板由python脚本于2024年06月09日 18:05:25创建&#xff0c;本篇笔记适合喜好探寻URL的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free…

基于深度学习的中文语音识别模型(支持wav、mp4、m4a等所有格式音频上传)【已开源】

基于深度学习的中文语音识别模型&#xff08;支持wav、mp4、m4a等所有格式音频上传&#xff09; 前言 该开源项目旨在提供一个能够自动检测并识别中文语音的模型&#xff0c;支持wav、mp4、m4a等格式的音频文件上传。无论是从录音设备中获取的wav文件&#xff0c;还是从视频中…

Kingshard:MySQL代理,释放数据库潜能

Kingshard&#xff1a; 简化数据库管理&#xff0c;提升性能极限。- 精选真开源&#xff0c;释放新价值。 概览 kingshard是一个由Go开发的高性能MySQL Proxy项目&#xff0c;专为简化数据库管理和提升查询性能而设计。kingshard在满足基本的读写分离的功能上&#xff0c;致力…

Llama模型家族之Stanford NLP ReFT源代码探索 (四)Pyvene论文学习

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

【react】react项目支持鼠标拖拽的边框改变元素宽度的组件

目录 安装使用方法示例Props 属性方法示例代码调整兄弟div的宽度 re-resizable github地址 安装 $ npm install --save re-resizable这将安装re-resizable库并将其保存为项目的依赖项。 使用方法 re-resizable 提供了一个 <Resizable> 组件&#xff0c;它可以包裹任何…

[hddm]python模块hddm安装后测试代码

测试环境&#xff1a; hddm0.8.0 测试文件test.csv subj_idx,stim,rt,response,theta,dbs,conf 0,LL,1.21,1.0,0.65627512226100004,1,HC 0,WL,1.6299999999999999,1.0,-0.32788867166199998,1,LC 0,WW,1.03,1.0,-0.480284512399,1,HC 0,WL,2.77,1.0,1.9274273452399999,1,L…

使用 Scapy 库编写 TCP 劫持攻击脚本

一、介绍 TCP劫持攻击&#xff08;TCP Hijacking&#xff09;&#xff0c;也称为会话劫持&#xff0c;是一种攻击方式&#xff0c;攻击者在合法用户与服务器之间的通信过程中插入或劫持数据包&#xff0c;从而控制通信会话。通过TCP劫持&#xff0c;攻击者可以获取敏感信息、执…

网络基础-IP协议

文章目录 前言一、IP报文二、IP报文分片重组IP分片IP分片示例MTUping 命令可以验证MTU大小Windows系统&#xff1a;Linux系统: 前言 基础不牢&#xff0c;地动山摇&#xff0c;本节我们详细介绍IP协议的内容。 一、IP报文 第一行&#xff1a; 4位版本号指定IP协议的版本&#…

C语言详解(联合和枚举)

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习笔记&#xff0c;在这里撰写…

UltraEditUEStudio软件最新版下载及详细安装教程

UEStudio简介&#xff1a; UEStudio建立在上文本编辑器UltraEdit的功能基础上&#xff0c;并为团队和开发人员提供了其他功能&#xff0c;例如深度Git集成。您可以直接在UEStudio中克隆&#xff0c;签出&#xff0c;更新&#xff0c;提交&#xff0c;推入/拉入等操作&#xff…

[CR]厚云填补_综述整理

SAR-to-Optical Image Translation and Cloud Removal Based on Conditional Generative Adversarial Networks: Literature Survey, Taxonomy, Evaluation Indicators, Limits and Future Directions Abstract 由于光学图像的局限性&#xff0c;其波段无法穿透云层&#xff0…

《精通ChatGPT:从入门到大师的Prompt指南》附录C:专业术语表

附录C&#xff1a;专业术语表 本附录旨在为读者提供一本全面的术语表&#xff0c;帮助理解《精通ChatGPT&#xff1a;从入门到大师的Prompt指南》中涉及的各种专业术语。无论是初学者还是高级用户&#xff0c;这些术语的定义和解释将为您在使用ChatGPT时提供重要参考。 A AI&…