kafka-生产者监听器(SpringBoot整合Kafka)

news2024/11/17 4:43:01

文章目录

  • 1、生产者监听器
    • 1.1、创建生产者监听器
    • 1.2、创建生产者拦截器
    • 1.3、发送消息测试
    • 1.4、使用Java代码创建主题分区副本
    • 1.5、application.yml配置----v1版
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、控制台日志

1、生产者监听器

1.1、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {

    //生产者 ack 配置为 0 只要发送即成功
    //ack为 1  leader落盘  broker ack之后 才成功
    //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
    @Override
    public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
        //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
        System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
        +",partition = "+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value()
        +",offset = "+recordMetadata.offset());
    }

    //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
    @Override
    public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
                +",partition = "+producerRecord.partition()
                +",key = "+producerRecord.key()
                +",value = "+producerRecord.value()
                +",offset = "+recordMetadata.offset());
        System.out.println("异常信息:" + exception.getMessage());
    }
}

1.2、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
    //kafka生产者发送消息前执行:拦截发送的消息预处理
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
        +",partition:"+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value());
        return null;
    }

    //kafka broker 给出应答后执行
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //exception为空表示消息发送成功
        if(e == null){
            System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                    +",partition:"+recordMetadata.partition()
                    +",offset="+recordMetadata.offset()
            +",timestamp="+recordMetadata.timestamp());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}


1.3、发送消息测试

package com.atguigu.kafka.producer;

import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;

@SpringBootTest
class KafkaProducerApplicationTests {

    //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
    @Resource
    KafkaTemplate kafkaTemplate;

    @Resource
    MyKafkaInterceptor myKafkaInterceptor;

    @PostConstruct
    public void init() {
        kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
    }
    @Test
    void contextLoads() throws IOException {
        kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");

        //回调是等kafka,ack以后才执行,需要阻塞
        System.in.read();
    }
}

1.4、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {
    @Bean
    public NewTopic myTopic1() {
        //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
        return TopicBuilder.name("my_topic1")//主题名称
                .partitions(3)//主题分区
                .replicas(3)//主题分区副本数
                .build();//创建
    }
}


1.5、application.yml配置----v1版

server:
  port: 8110

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    producer: # producer 生产者
      retries: 0 # 重试次数 0表示不重试
      acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)
      batch-size: 16384 # 批次大小 单位byte
      buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器


1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>


1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>



1.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0

在这里插入图片描述

在这里插入图片描述

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "spring-kafka-生产者监听器",
      "timespan": 1717573749549,
      "date": "2024-06-05 07:49:09"
    }
  ]
]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1800044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机组成结构—IO方式

目录 一、程序查询方式 1. 程序查询基本流程 2. 接口电路 3. 接口工作过程 二、程序中断方式 1. 程序中断基本流程 2. 接口电路 3. I/O 中断处理过程 三、DMA 方式 1. DMA 的概念和特点 2. DMA 与 CPU 的访存冲突 3. DMA 接口的功能 4. DMA 接口的组成 5. DMA 的…

阿里 Qwen2 模型开源,教你如何将 Qwen2 扩展到百万级上下文

本次开源的 Qwen2 模型包括 5 个尺寸&#xff0c;分别是 0.5B、1.5B、7B、72B、57B&#xff0c;其中 57B 的属于 MoE 模型&#xff08;激活参数 14B&#xff09;&#xff0c;其余为 Dense 模型&#xff0c;本篇文章会快速介绍下各个尺寸模型的情况&#xff0c;然后重点介绍下如…

数据结构初阶 堆(一)

一. 堆的概念和性质 我们在上一篇博客介绍存储二叉树的两种方式 分别是顺序结构和链式结构 数据结构初阶 初识二叉树-CSDN博客 1. 堆的概念 这里注意&#xff01;&#xff01;&#xff01; 这里说的堆和操作系统里面的堆没有半点关系&#xff01;&#xff01;&#xff01; …

环 境 变 量

如果希望某一个文件在 CMD 窗口的任意路径下都可以打开&#xff0c;则需要将该文件的路径存放在环境变量中。 在 CMD 中运行该文件时&#xff0c;优先查看当前路径下的文件&#xff0c;如果没有找到&#xff0c;则进入环境变量中记录的路径下寻找该文件&#xff0c;如果能找到…

kafka的副本机制

目录 Producer的ACKs参数 配置 acks配置为0 acks配置为1 acks配置为-1或者all 副本的目的就是冗余备份&#xff0c;当某个Broker上的分区数据丢失时&#xff0c;依然可以保障数据可用。因为在其他的Broker上的副本是可用的。 Producer的ACKs参数 对副本关系较大的是&…

Go源码--sync库(2)

简介 这边文章主要讲解 Sync.Cond和Sync.Rwmutex Sync.Cond 简介 sync.Cond 经常用来处理 多个协程等待 一个协程通知 这种场景&#xff0c; 主要 是阻塞在某一协程中 等待被另一个协程唤醒 继续执行 这个协程后续的功能。cond经常被用来协调协程对某一资源的访问 ants协程池…

Android存储空间不足?试试这8个快速解决方案!

在当今的科技时代&#xff0c;Android智能手机已成为我们日常生活的重要组成部分&#xff0c;因为它们保存着我们大量的关键数据。然而&#xff0c;随着我们的使用模式不断扩大&#xff0c;手机内部存储的可用性经常变得有限。手机存储空间不足不仅会损害设备的功能和响应能力&…

数据库系统概论(超详解!!!)第十节 过程化SQL

1.Transact-SQL概述 SQL(Structure Query Language的简称&#xff0c;即结构化查询语言) 是被国际标准化组织(ISO)采纳的标准数据库语言&#xff0c;目前所有关系数据库管理系统都以SQL作为核心&#xff0c;在JAVA、VC、VB、Delphi等程序设计语言中也可使用SQL&#xff0c;它是…

Angular 由一个bug说起之六:字体预加载

浏览器在加载一个页面时&#xff0c;会解析网页中的html和css&#xff0c;并开始加载字体文件。字体文件可以通过css中的font-face规则指定&#xff0c;并使用url()函数指定字体文件的路径。 比如下面这样: css font-face {font-family: MyFont;src: url(path/to/font.woff2…

Docker的安装、启动和配置镜像加速

前言&#xff1a; Docker 分为 CE 和 EE 两大版本。CE 即社区版&#xff08;免费&#xff0c;支持周期 7 个月&#xff09;&#xff0c;EE 即企业版&#xff0c;强调安全&#xff0c;付费使用&#xff0c;支持周期 24 个月。 而企业部署一般都是采用Linux操作系统&#xff0c;而…

电脑找不到mfc140udll怎么修复,总结几种靠谱的修复方法

在日常使用电脑娱乐工作的过程中&#xff0c;我们可能会遇到一些错误提示或程序无法正常运行的情况。其中&#xff0c;一个常见的问题是计算机缺失mfc140u.dll文件。这个问题可能会导致某些应用程序无法启动或运行&#xff0c;给工作和学习带来不便。本文将介绍5种解决计算机缺…

RPC框架原理(一)

RPC框架原理 网络和IO的关系&#xff0c;IO&#xff08;input和output&#xff09;面向的是谁&#xff1f;OSI 7层参考模型&#xff0c;TCP/IP协议为什么会出现一个会话层三次握手socket心跳keep alive四次挥手 网络IO&#xff08;IO模型&#xff09; IO框架底层 学习顺序&…

SpringCloud整合Seata简易使用(注册中心Nacos)

SpringCloud整合Seata解决分布式事务&#xff08;注册中心Nacos&#xff09; Seata下载与配置在Nacos中配置seata相关配置持久化为db时&#xff0c;需要提前在数据库中创建seata数据库&#xff0c;SpringCloud整合Seata服务GlobalTransactional注解使用 本案例是在windows中运行…

抢人!抢人!抢人! IT行业某岗位已经开始抢人了!

所谓抢滩鸿蒙&#xff0c;人才先行。鸿蒙系统火力全开后&#xff0c;抢人已成鸿蒙市场的主题词&#xff01; 智联招聘数据显示&#xff0c;春节后首周&#xff0c;鸿蒙相关职位数同比增长163%&#xff0c;是去年同期的2.6倍&#xff0c;2023年9-12月鸿蒙相关职位数同比增速为3…

长文预警:自动驾驶の核燃料库!Tesla数据标注系统解析

长文预警&#xff1a;自动驾驶の核燃料库&#xff01;Tesla数据标注系统解析 前言 本文整理自原文链接&#xff0c;写的非常好&#xff0c;给了博主很多启发&#xff0c;投原创是因为平台机制&#xff0c;希望能被更多人看到。 掐指一算&#xff0c;又到了该学习的时间&#…

【权威出版/投稿优惠】2024年机器视觉与自动化技术国际会议(MVAT 2024)

2024 International Conference on Machine Vision and Automation Technology 2024年机器视觉与自动化技术国际会议 【会议信息】 会议简称&#xff1a;MVAT 2024截稿时间&#xff1a;(以官网为准&#xff09;大会地点&#xff1a;中国重庆会议官网&#xff1a;www.icmvat.co…

北京崇文门中医医院贾英才与行业共进——第二届海峡两岸中西医结合肾脏病学术大会

第二届海峡两岸中西医结合肾脏病学术大会授牌仪式于2024年6月7号在北京前门国医堂举行。 第二届海峡两岸中西医结合肾脏病学术大会的主要议程可能包括以下内容&#xff1a; 学术讲座&#xff1a;来自海峡两岸的专家学者发表演讲&#xff0c;分享肾脏病防治、透析技术等方面的研…

鸿蒙全栈开发-浅谈鸿蒙~线程模型

前言 如果你现在正巧在找工作&#xff0c;或者琢磨着换个职业跑道&#xff0c;鸿蒙开发绝对值得你考虑一下。 为啥&#xff1f;理由很简单&#xff1a; 市场需求大&#xff1a;鸿蒙生态还在持续扩张&#xff0c;应用开发、系统优化、技术支持等岗位需求旺盛&#xff0c;找工作…

【Text2SQL 论文】C3:使用 ChatGPT 实现 zero-shot Text2SQL

论文&#xff1a;C3: Zero-shot Text-to-SQL with ChatGPT ⭐⭐⭐⭐ arXiv:2307.07306&#xff0c;浙大 Code&#xff1a;C3SQL | GitHub 一、论文速读 使用 ChatGPT 来解决 Text2SQL 任务时&#xff0c;few-shots ICL 的 setting 需要输入大量的 tokens&#xff0c;这有点昂贵…

LabVIEW阀性能试验台测控系统

本项目开发的阀性能试验台测控系统是为满足国家和企业相关标准而设计的&#xff0c;主要用于汽车气压制动系统控制装置和调节装置等产品的综合性能测试。系统采用工控机控制&#xff0c;配置电器控制柜&#xff0c;实现运动控制、开关量控制及传感器信号采集&#xff0c;具备数…