kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

news2025/1/11 2:51:12

文章目录

  • 1、kafka数据传递语义
  • 2、kafka生产者事务
  • 3、事务消息发送
    • 3.1、application.yml配置
    • 3.2、创建生产者监听器
    • 3.3、创建生产者拦截器
    • 3.4、发送消息测试
    • 3.5、使用Java代码创建主题分区副本
    • 3.6、屏蔽 kafka debug 日志 logback.xml
    • 3.7、引入spring-kafka依赖
    • 3.8、控制台日志

1、kafka数据传递语义

kafka发送消息时是否需要重试

  1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
  2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
  3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

2、kafka生产者事务

保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

  1. 配置ack为-1 分区所有副本均落盘成功
  2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
  3. 需要给事务分配事务id(区分一个事务中的多条消息)

3、事务消息发送

3.1、application.yml配置

server:
  port: 8110

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    producer: # producer 生产者
      retries: 1 # 重试次数 0表示不重试
      acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
      transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务
      batch-size: 16384 # 批次大小 单位byte
      buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

3.2、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {

    //生产者 ack 配置为 0 只要发送即成功
    //ack为 1  leader落盘  broker ack之后 才成功
    //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
    @Override
    public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
        //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
        System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
        +",partition = "+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value()
        +",offset = "+recordMetadata.offset());
    }

    //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
    @Override
    public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
                +",partition = "+producerRecord.partition()
                +",key = "+producerRecord.key()
                +",value = "+producerRecord.value()
                +",offset = "+recordMetadata.offset());
        System.out.println("异常信息:" + exception.getMessage());
    }
}

3.3、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
    //kafka生产者发送消息前执行:拦截发送的消息预处理
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
        +",partition:"+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value());
        return null;
    }

    //kafka broker 给出应答后执行
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //exception为空表示消息发送成功
        if(e == null){
            System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                    +",partition:"+recordMetadata.partition()
                    +",offset="+recordMetadata.offset()
            +",timestamp="+recordMetadata.timestamp());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.4、发送消息测试

package com.atguigu.kafka.producer;

import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;

@SpringBootTest
class KafkaProducerApplicationTests {

    //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
    @Resource
    KafkaTemplate kafkaTemplate;

    @Resource
    MyKafkaInterceptor myKafkaInterceptor;

    @PostConstruct
    public void init() {
        kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
    }
    @Test
    void contextLoads() throws IOException {
        kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
        //回调是等kafka,ack以后才执行,需要阻塞
        System.in.read();
    }

    //kafka事务支持spring-tx的事务注解
    //单元测试中的事务会自动回滚

    @Test
    void testTransaction() throws  IOException {

       //多个消息的发送在一个事务中执行
        kafkaTemplate.executeInTransaction((var1) -> {
            //通过一个事务中的operations对象来发送消息,执行事务操作
            var1.send("my_topic1",0,"", "spring-kafka-事务1");
            var1.send("my_topic1",0,"", "spring-kafka-事务2");
            int i = 1/0;
            var1.send("my_topic1",0,"", "spring-kafka-事务3");
            return "发送消息失败";
        });
        System.in.read();
    }
}

3.5、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {
    @Bean
    public NewTopic myTopic1() {
        //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
        return TopicBuilder.name("my_topic1")//主题名称
                .partitions(3)//主题分区
                .replicas(3)//主题分区副本数
                .build();//创建
    }
}

3.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

3.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was aborted

java.lang.ArithmeticException: / by zero

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1805905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vip学员作业--横屏下面怎么自适应展示竖屏应用方案征集(排除原生letterbox方式)

背景&#xff1a; 昨天相关的视频和公众号blog已经讲解了&#xff0c;横屏下如何正确显示竖屏app相关android原生的处理方案具体链接如下&#xff1a; https://mp.weixin.qq.com/s/P95rN7dBOSCENc38KZ4Ulg 采用了原生letterbox即装相框模式画面变成如下: letterbox上面画面…

Zabbix6.0自定义监控项

文章目录 一、自定义监控整体流程二、自定义监控案例1、监控TCP 443端口案例2、监控服务器异地登入(带参监控项) 一、自定义监控整体流程 操作端流程备注Agent端1️⃣ linux&#xff1a;通过命令、脚本取出对应的值2️⃣ linux&#xff1a;根据zbx要求按照格式、编写配置文件、…

Polar Web【中等】search

Polar Web【中等】search Contents Polar Web【中等】search思路&探索首页一般注入方式 EXP&效果Payload 总结 思路&探索 见到题目标题&#xff0c;预测可能有目录扫描或者输入框查询数据之类情况&#xff0c;具体细节在破解过程中才能清楚 打开站点&#xff0c;显…

less---20-28

less-20 这关登陆成功会显示cookie,所以抓包在cookie处注入 less-21 这关登陆成功会显示cookie,所以抓包在cookie处注入&#xff0c;发现不成功&#xff0c;查看代码发现被编码 先对注入语句进行base64编码再注入 less-22 闭合字符",同21关 less-23 这关查看代码发现…

什么是 AOF 重写?AOF 重写机制的流程是什么?

引言&#xff1a;在Redis中&#xff0c;持久化是确保数据持久性和可恢复性的重要机制之一。除了常见的RDB&#xff08;Redis Database&#xff09;持久化方式外&#xff0c;AOF&#xff08;Append Only File&#xff09;也是一种常用的持久化方式。AOF持久化通过记录Redis服务器…

总结七大排序算法

插入排序 直接插入排序是一种简单的插入排序法&#xff0c;其基本思想是&#xff1a;把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。实际中我们玩扑克牌时&#xff0c;就用了…

10.邮票问题

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/625 题目描述 有四种面值的邮票,分别是 …

【文件导出2】导出html文件数据

导出html文件数据 文章目录 导出html文件数据前言一、实现代码1.controller层2.接口层3.接口实现类4.FileUtil 工具类 二、文件导出效果总结 前言 springBoot项目实现在线导出html文件数据的功能。 一、实现代码 1.controller层 GetMapping("/record/_export") Ap…

Java——简单图书管理系统

前言&#xff1a; 一、图书管理系统是什么样的&#xff1f;二、准备工作分析有哪些对象&#xff1f;画UML图 三、实现三大模块用户模块书架模块管理操作模块管理员操作有这些普通用户操作有这些 四、Test测试类五、拓展 哈喽&#xff0c;大家好&#xff0c;我是无敌小恐龙。 写…

评书下载到u盘,下载到内存卡,下载到手机或电脑的方法

评书下载的方法有很多种&#xff0c;无论是通过什么方法&#xff0c;我们都可以快速的获取喜爱的评书。下面将详细介绍常见的评书下载方法&#xff0c;帮助您快速上手。 1、搜索“十方评书网”。 2、要下载那个评书家的选择那个评书家就可以。 3、点击进去后可以一键下载单部评…

C语言详解(结构体)

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#xff0c;在这里撰写成文一…

英语学习笔记33——A fine day

A fine day 风和日丽 词汇 Vocabulary day n. 日子&#xff0c;白天 复数&#xff1a;days 常见节日&#xff1a;Mothers’ Day 母亲节      Fathers’ Day 父亲节      Teachers’ Day 教师节      Children’s Day 儿童节      Women’s Day 妇女节 c…

美银美林:看好铜价涨到12000美元,这类铜矿企业弹性更大

美银美林指出&#xff0c;考虑到能源转型以及AI投资热潮对铜的需求巨大&#xff0c;到2026年铜供需缺口有望扩大一倍。给予紫金矿业、江西铜业等多家巨头买入评级&#xff0c;并认为一旦铜价上行&#xff0c;KGHM等规模较小、成本较高的企业的利润增长可能更为显著。 高盛、花…

OpenCV-绘制虚线

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 功能函数 // 绘制虚线 void DrawDottedLine(cv::Mat &input, cv::Point p1, cv::Point p2, cv::Scalar color, int thickne…

9.2 Go 接口的实现

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

Pytorch 从零实现 Transformer

前言 之前虽然了解过 Transformer 架构&#xff0c;但是没有自己实现过。 最近阅读 transformers 库中 Llama 模型结构&#xff0c;于是想试着亲手实现一个简单的 Transformer。 在实现过程中加深了理解&#xff0c;同时发现之前阅读 Llama 中一些错误的地方&#xff0c;因此…

在Anaconda中安装keras-contrib库

文章目录 1. 有git2. 无git2.1 步骤12.2 步骤22.3 步骤3 1. 有git 如果环境里有git&#xff0c;直接运行以下命令&#xff1a; pip install githttps://www.github.com/farizrahman4u/keras-contrib.git2. 无git 2.1 步骤1 打开网址&#xff1a;https://github.com/keras-tea…

Application UI

本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 如何&#xff1a;手动构建Office风格的UI 本教程演示…

微服务第二轮

学习文档 背景 由于每个微服务都有不同的地址或端口&#xff0c;入口不同 请求不同数据时要访问不同的入口&#xff0c;需要维护多个入口地址&#xff0c;麻烦 前端无法调用nacos&#xff0c;无法实时更新服务列表 单体架构时我们只需要完成一次用户登录、身份校验&#xff…

Uber 提升 Presto 集群稳定性的 GC 调优方法

Presto at Uber Uber 利用开源的 Presto 查询各种数据源&#xff0c;无论是流式还是归档数据。Presto 的多功能性赋予我们做出基于数据的明智商业决策的能力。我们在两个地区运行了大约20个 Presto 集群&#xff0c;总共超过10,000个节点。我们有大约12,000个每周活跃用户&…