kafka-生产者拦截器(SpringBoot整合Kafka)

news2025/1/9 16:58:43

文章目录

  • 1、生产者拦截器
    • 1.1、创建生产者拦截器
    • 1.2、KafkaTemplate配置生产者拦截器
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者拦截器

1.1、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
    //kafka生产者发送消息前执行:拦截发送的消息预处理
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
        +",partition:"+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value());
        return null;
    }

    //kafka broker 给出应答后执行
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //exception为空表示消息发送成功
        if(e == null){
            System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                    +",partition:"+recordMetadata.partition()
                    +",offset="+recordMetadata.offset()
            +",timestamp="+recordMetadata.timestamp());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

1.2、KafkaTemplate配置生产者拦截器

package com.atguigu.kafka.producer;

import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;

@SpringBootTest
class KafkaProducerApplicationTests {

    //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
    @Resource
    KafkaTemplate kafkaTemplate;

    @Resource
    MyKafkaInterceptor myKafkaInterceptor;

    @PostConstruct
    public void init() {
        kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
    }
    @Test
    void contextLoads() throws IOException {
        kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");

        //回调是等kafka,ack以后才执行,需要阻塞
        System.in.read();
    }
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {
    @Bean
    public NewTopic myTopic1() {
        //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
        return TopicBuilder.name("my_topic1")//主题名称
                .partitions(3)//主题分区
                .replicas(3)//主题分区副本数
                .build();//创建
    }
}

1.4、application.yml配置----v1版

server:
  port: 8110

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    producer: # producer 生产者
      retries: 0 # 重试次数 0表示不重试
      acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)
      batch-size: 16384 # 批次大小 单位byte
      buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>


1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "spring-kafka-生产者拦截器",
      "timespan": 1717490776329,
      "date": "2024-06-04 08:46:16"
    }
  ]
]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1788649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Precision和Recall

Precision&#xff08;精确率 / 查准率&#xff09;和 Recall&#xff08;召回率 / 查全率&#xff09;是分类任务中常用的两种性能度量&#xff0c;它们用于评估模型在处理二分类或多分类问题时的表现。 Precision&#xff08;精确率&#xff09; 精确率衡量的是模型预测为正…

linux内存缓存占用过高分析和优化

1、什么是buffer/cache &#xff1f; buffer/cache其实是作为服务器系统的文件数据缓存使用的&#xff0c;尤其是针对进程对文件存在read/write操作的时候&#xff0c;所以当你的服务进程在对文件进行读写的时候&#xff0c;Linux内核为了提高服务的读写速度&#xff0c;则将会…

Redis页面优化

文章目录 1.Redis页面缓存1.思路分析2.首先记录一下目前访问商品列表页的QPS1.线程组配置10000次请求2.请求配置3.开始压测1.压测第一次 平均QPS为6122.压测第二次 平均QPS为6153.压测第三次 平均QPS为617 3.然后记录一下访问商品详情页的QPS1.线程组配置10000次请求2.请求配置…

SwiftUI 利用 Swizz 黑魔法为系统创建的默认对象插入新协议方法(五)

功能需求 在 SwiftUI 的开发中,我们往往需要借助底层 UIKit 的“上帝之手”来进一步实现额外的定制功能。比如,在可拖放(Dragable)SwiftUI 的实现中,会缺失拖放取消的回调方法让我们这些秃头码农们“欲哭无泪” 如上图所示,我们在拖放取消时将界面中的一切改变都恢复如初…

python怎么退出help

Python中查看帮助可以在命令提示行中输入“help()”即可。 如果想要退出帮助&#xff0c;有三种方法&#xff0c;具体如下&#xff1a; 1、直接按键盘上的“enter”键退出帮助。 2、按键盘上的“q”键退出帮助。 3、按键盘上的“CtrlZ”键退出帮助。

带DSP音效处理D类数字功放TAS5805M中文资料

国产替代D类数字功放中文资料访问下方链接 ACM8628 241W立体声182W单通道数字功放中文寄存器表 内置DSP多种音频处理效果ACM8628M-241W立体声或182W单通道数字功放 1 特性 具有增强处理能力和低功率损耗的 TAS5805M 23W、无电感器、数字输入、立体声、闭环 D 类音频放大器 …

AI网络爬虫:用GraphQL查询爬取动态网页数据

任务&#xff1a;爬取网站www.skillshare.com搜索结果页面数据&#xff1a; 查看网站的请求信息&#xff1a; 请求网址: https://www.skillshare.com/api/graphql 请求方法: POST 状态代码: 200 OK 远程地址: 127.0.0.1:10809 引荐来源网址政策: strict-origin-when-…

【微信小程序开发】小程序中的上滑加载更多,下拉刷新是如何实现的?

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

什么是泛洪攻击?DDos攻击也是泛洪攻击的一种?

在数字化时代的浪潮中&#xff0c;网络安全已成为一场没有硝烟的战争。其中&#xff0c;泛洪攻击作为一种常见的网络攻击手段&#xff0c;对个人用户、企业乃至国家网络安全构成了严重威胁。本文将对泛洪攻击进行深入剖析&#xff0c;包括其定义、原理、类型、影响以及应对策略…

计算机基础(5)——进制与进制转换

&#x1f497;计算机基础系列文章&#x1f497; &#x1f449;&#x1f340;计算机基础&#xff08;1&#xff09;——计算机的发展史&#x1f340;&#x1f449;&#x1f340;计算机基础&#xff08;2&#xff09;——冯诺依曼体系结构&#x1f340;&#x1f449;&#x1f34…

用增之Firebase

目录 简介 开发准备&#xff1a; 1、在Firebase平台创建项目 2、将项目关联到应用 3、项目配置 简介 前面讲了google ddl部分&#xff0c;本篇为Firebase的事件上报部分&#xff0c;包括在FireBase平台创建应用 &#xff0c; 如果有用到ddl…

element-plus日历组件el-calendar自定义内容,每天绑定不同的值

效果 代码 <template><el-calendar v-model"calendarDate"><template #date-cell"{ data }"><p :class"data.isSelected ? is-selected : ">{{ data.day.split("-").slice(1).join("-") }}{{ d…

使用Redis常遇到的问题

文章目录 概述缓存雪崩、穿透、击穿大key问题热Key问题缓存和数据库双写一致性问题缓存并发竞争Redis线上阻塞要如何排查Redis 常见的性能问题都有哪些Redis 如何做内存优化Redis数据倾斜 概述 在使用Redis时&#xff0c;有几个常见的问题可能会出现&#xff0c;包括但不限于以…

计算机网络ppt和课后题总结(上)

试在下列条件下比较电路交换和分组交换。要传送的报文共 x(bit)。从源点到终点共经过 k 段链路&#xff0c;每段链路的传播时延为 d(s)&#xff0c;数据率为 b(b/s)。在电路交换时电路的建立时间为 s(s)。在分组交换时分组长度为 p(bit)&#xff0c;且各结点的排队等待时间可忽…

进程同步的基本元素

目录 临界资源 临界区 信号量机制 整形信号量 记录型信号量 AND信号量 信号量集 信号量的应用 实现进程互斥 实现前驱关系 管程机制 总结 临界资源 I/O设备属于临界资源。著名的生产者-消费者问题就是关于临界资源的争夺产生的进程同步的问题。 生产者-消费者 描…

阅读 Spring(SpringBoot)源码的一些实用技巧

前言 我们在阅读Spring&#xff08;SpringBoot&#xff09;源码的时候&#xff0c;有可能会被一下前置知识点卡住&#xff0c;影响继续阅读的动力。根据我对Spring的理解&#xff0c;整理一些实用的技巧&#xff0c;减少大家的阅读障碍。如果有什么不正确的地方欢迎大家指正、…

辞职后,如何理性面对公司的挽留?我的职场选择之路

辞职后&#xff0c;面对公司的挽留&#xff0c;你会决定留下还是离开呢&#xff1f;这是一个让人犹豫不决的问题。 让我们来分析一下个人在职场中的价值和期望。每个人都有自己的职业规划和发展目标&#xff0c;这是非常正常的。在工作中&#xff0c;我们希望自己能够得到充分的…

React路由(React笔记之五)

本文是结合实践中和学习技术文章总结出来的笔记(个人使用),如有雷同纯属正常((✿◠‿◠)) 喜欢的话点个赞,谢谢! React路由介绍 现在前端的项目一般都是SPA单页面应用,不再是以前多个页面多套HTML代码项目了,应用内的跳转不需要刷新页面就能完成页面跳转靠的就是路由系统 R…

IPFS节点部署及连接java服务接口

文章目录 引言前言&#xff1a;IPFS网络部署1.下载安装文件2.安装及初始化3.测试上传文件 引入IPFS 依赖包初始化IPFS创建接口类以及实现类创建前端访问的控制类前端设计及验证 引言 该篇文章是记录使用IPFS存储文件与java的Springboot项目连接的过程&#xff0c;前端简单地用…

傲医医疗集成引擎 Rhapsody 在超融合信创平台表现如何?

作者&#xff1a;SmartX 商业团队 黄玉辉 随着越来越多的医疗用户基于超融合基础设施实现 IT 基础架构信创转型&#xff0c;超融合信创架构在医疗业务场景中的实际表现也得到更多关注。尤其是集成平台业务场景——作为三甲医院互联互通评级中不可缺少的核心业务系统&#xff0…