Kafka Producer 自定义 拦截器 序列化

news2025/1/15 23:43:31

Kafka Producer 拦截器 & 序列化

前言

文章中的版本信息、maven依赖如下

  • JDK17

  • kafka_2.13-3.3.1

  • pom文件

    <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.3.1</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.22</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.13.4.2</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    

拦截器

Kafka拦截器可以在Producer在发送消息之前、以及Producer回调逻辑之前有机会更改应用程序的行为,通常情况下做一些定制化的需求,如修改消息。Producer生产者允许开发者指定多个Interceptor拦截器,多个拦截器按照指定顺序对同一条消息进行逻辑处理,形成一个拦截器链条,即使用责任链的模式对消息一次进行处理。

实现kafka拦截器逻辑,只需要实现ProducerInterceptor接口即可,该接口有两个方法:

  • onSend
    • 该方法在消息发送至kafka之前被调用,再严格一点是消息序列化之前调用
    • 可以捕获消息内容 并加以修改
    • 该方法返回的对象将被徐丽华 并发送至kafka
  • onAcknowledgement
    • Kafka服务器返回响应结果,发送ack确认时调用
    • 该方法不允许修改Kafka Response响应信息,但是可以增强消息头
    • 当发送到服务器的记录已被确认,或在发送记录失败时,调用此方法
    • 该方法在后台IO线程中执行,因此实现速度快;如果从连接线程发送,可能会造成延迟现象

由于Interceptor拦截器可能运行在多线程中,因此需要保证拦截器内部的线程安全。接下来自定义两个拦截器,组成拦截器链,学习下拦截器链内部的执行流程。

时间拦截器

通过该拦截器,可以在传输消息上添加统一的时间戳,相当于更改消息内容。

public class TimeStampInterceptor implements ProducerInterceptor<String,String> {
    @Override
    public ProducerRecord onSend(ProducerRecord<String,String> record) {
        System.out.println("1 ============ TimeStampInterceptor onSend : " + record.partition());
        String key = record.key();
        // 修改数据内容
        String value = record.value() + " -> " + System.currentTimeMillis();
        ProducerRecord result = new ProducerRecord(record.topic(),record.partition(),
                record.timestamp(),key,value,record.headers());
        // 添加头信息
        result.headers()
                .add("application_id", "timeStampApp".getBytes(StandardCharsets.UTF_8));
        return result;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("3 ============ TimeStampInterceptor : " + metadata.offset());
    }
    //...
}

统计拦截器

public class CounterInterceptor implements ProducerInterceptor<String,String> {
    private AtomicLong successCount = new AtomicLong(0);
    private AtomicLong failCount = new AtomicLong(0);

    @Override
    public ProducerRecord onSend(ProducerRecord<String,String> record) {
        System.out.println("2 ============ CounterInterceptor onSend : " + record.partition());
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(metadata != null){
            System.out.println("4 ============ CounterInterceptor onAcknowledgement : " + metadata.offset());
            successCount.addAndGet(1L);
            return;
        }
        failCount.addAndGet(1L);
    }
    @Override
    public void close() {
        String output = String.format("5 ============ CountInterceptor close , successCount: %d , failCount: %d",
                successCount.get(), failCount.get());
        System.out.println(output);
    }
    //...
}

消息生产者

public class InterceptorProducer {

    public static void main(String[] args) throws Exception{
        String topicName = "Testtopic";

        Properties props = new Properties();
        //指定kafka 服务器连接地址
        props.put("bootstrap.servers", "localhost:9092");
        // 消息发送延迟时间 默认为0 表示消息立即发送,单位为毫秒
        props.put("linger.ms",0);
        // 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        List<String> interceptors = new ArrayList<>();
        interceptors.add("org.kafka.example.interceptor.TimeStampInterceptor");
        interceptors.add("org.kafka.example.interceptor.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName ,String.valueOf(i)," message : " + i);
            producer.send(record);
        }
        System.out.println("Message sent successfully");
        // 关闭会话连接 才会执行拦截器的close方法
        producer.close();
    }
}

验证测试

# 1. 创建topic
./bin/kafka-topics.sh --create --topic Testtopic --bootstrap-server localhost:9092

# 2. 打开消费脚本
./bin/kafka-console-consumer.sh --topic Testtopic --from-beginning --bootstrap-server localhost:9092

# 3. 执行生产者代码 日志输出如下

在这里插入图片描述

序列化机制

在Apache Kafka中传输消息的过程中,客户端和服务器同意使用相同的编码方式,才能对数据进行解析转换。序列化是将对象转换为字节的过程。反序列化是相反的过程-将字节流转换为对象。Apache Kafka提供了默认转换器(如String和Long等),同时支持自定义序列化机制。

在这里插入图片描述

如上图显示了,向Kafka主题发送消息的过程。在此过程中,自定义序列化程序在生产者将消息发送到主题之前将对象转换为字节。类似地,它还显示了反序列化程序如何将字节转换回对象,以便消费者正确地进行处理。

默认序列化

Apache Kafka为几种基本类型提供了预构建的序列化器和反序列化器:

在这里插入图片描述

很明显预留的基本类型序列化机制不能满足符合对象传输的要求,kafka预留了Serializer接口,供开发人员实现自定义序列化的业务需求,该接口有三个方法

  • configure - 实现配置详细信息
  • serialize/deserialize - 序列化、反序列化程序逻辑实现
  • close - 调用该方法关闭kafka会话session

自定义序列化

代码实现

  • 符合对象类
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
    private String message;
    private String version;
}
  • 序列化类
public class CustomSerializer implements Serializer<MessageDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, MessageDto data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}
  • 反序列化类
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String topic, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

验证测试

  • 创建主题

    ./bin/kafka-topics.sh --create --topic Stopic --bootstrap-server localhost:9092
    
  • 生产者代码

    public class SerializerProducer {
    
        public static void main(String[] args) throws Exception{
            String topicName = "Stopic";
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            // 序列化方式
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.kafka.example.serializer.CustomSerializer");
            Producer<String, MessageDto> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 10; i++) {
                MessageDto msgProd = MessageDto.builder().message("test" + i).version(i + ".0").build();
                ProducerRecord<String, MessageDto> record = new ProducerRecord<>(topicName ,String.valueOf(i),msgProd);
                producer.send(record);
            }
            producer.close();
        }
    }
    
  • 消费者代码

    
    public class SerializerConsumer {
    
        public static void main(String[] args) throws Exception{
    
            String topicName = "Stopic";
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "SC_2");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.kafka.example.serializer.CustomDeserializer");
    
            KafkaConsumer<String, MessageDto> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
            try {
                while (true) {
                    ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
                    records.forEach(record -> {
                        System.out.println("Message received " + record.value());
                    });
                }
            }finally {
                consumer.close();
            }
        }
    }
    

    先执行消费者,然后执行生产者;如下图,可以看到消费者已经接收生产者发送的对象,并输出控制台

    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/107032.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NR HARQ (四)Type-2 HARQ-ACK codebook

微信同步更新欢迎关注同名modem协议笔记 上篇提到type-1 HARQ-ACK codebook&#xff0c;即semi-static codebook&#xff0c;UE要为每个PDSCH候选位置生成反馈&#xff0c;也会包含实际没有下行传输的PDSCH&#xff0c;再加上配置CBG的场景&#xff0c;HARQ-ACK 码本中包含的无…

【Linux 内核 内存管理】物理内存组织结构

一、 UMA和NUMA两种模型 共享存储型多处理机有两种模型 一致内存访问&#xff08;Uniform-Memory-Access&#xff0c;简称UMA&#xff09;模型 非一致内存访问&#xff08;Nonuniform-Memory-Access&#xff0c;简称NUMA&#xff09;模型 UMA模型 物理存储器被所有处理器件均…

超标量处理器设计——第八章_发射

超标量处理器设计——第八章_发射 参考《超标量处理器》姚永斌著 文章目录超标量处理器设计——第八章_发射8.1 简述8.1.1 集中式 VS. 分布式8.1.2 数据捕捉 VS. 非数据捕捉8.1.3 压缩 VS. 非压缩8.2 发射过程的流水线8.2.1 非数据捕捉结构的流水线8.2.2 数据捕捉结构的流水线8…

随手写系列——写一个凯撒密码转换页面

文章目录先展示效果H5编写C3编写JS编写——方法一&#xff1a;过程版JS编写——方法二&#xff1a;对象版代码获取先展示效果 &#xff08;因为主要是实现功能&#xff0c;所以CSS写的很粗糙&#xff09; H5编写 基础结构如下&#xff1a; 先构成最外面的大盒子.box&#…

【Flutter】之便于提高开发效率的周边库和轮子

GetX 状态管理 GetX包含很多功能&#xff0c;各种弹出widget、路由管理、国际化、Utils、状态管理等。 基于路由管理 1. 添加到项目中 1.1. 将此添加到pubspec.yaml文件中。 get: 4.1.4 1.2. 在命令行中运行 flutter packages get 1.3. 在MaterialApp前面加上 “Get”&…

centos7 yum安装postgreSQL

安装环境centos7.6 安装步骤&#xff1a; 1、安装postgresql&#xff1a; yum install postgresql-server 2、安装postgresql 扩展包&#xff1a; yum install postgresql-contrib 3、初始化&#xff1a; postgresql-setup initdb 4、启动开机自启动&#xff1a; systemc…

说话人识别神经网络推理方式

概述 说话人识别是一个序列总结&#xff08;Sequence Summarization&#xff09;任务&#xff0c;输入是音频&#xff08;或者说&#xff0c;声学特征的序列&#xff09;&#xff0c;输出是说话人的嵌入码&#xff0c;有的神经网络可以输入一对音频&#xff0c;直接输出这对音…

java微信支付v3系列——9.微信支付之商家转账API

这个功能就比较复杂了&#xff0c;首先是得有90天的资金流水才能开通&#xff0c;其次开通后还需要在官网进行配置&#xff0c;不能直接调用&#xff0c;并且限制了IP地址。 如下图所示&#xff0c;首先需要进行产品设置&#xff0c;将里面都设置好后才能进行开发&#xff0c;…

feign 调用常见问题避坑指南!

摘要&#xff1a;主要是总结了一下这段时间在使用 feign 的过程中的遇到的一些坑点。一、Get请求自动转化成POST的问题1、client 请求参数没有加上 RequestParam 注解问题代码&#xff1a;GetMapping("/showName") String showName(String name);错误提示&#xff1a…

让 APISpace 告诉你什么场景使用什么API

Q1&#xff1a;某商家打算搞年底促销活动&#xff0c;需要将活动信息通过短信的形式通知给用户&#xff0c;这个场景可以用什么接口&#xff1f; 发送通知类的短信&#xff0c;可以使用 通知短信 API~ 通知短信&#xff0c;支持三大运营商&#xff0c;虚拟运营商短信发送&…

第14章 并发控制与恢复

第14章 并发控制与恢复 考试范围&#xff1a; 14.1-14.3, 14.8-14.11 考试题型&#xff1a; 事务操作 考试内容&#xff1a; 1、锁/共享锁/排它锁的概念 2、多粒度锁 Multiple Granularity 3、两阶段封锁协议 The Two-Phase Locking Protocol 两段锁协议是指同一事务对任何…

2.前端笔记-JS-JS3种书写位置、注释、输入输出

书写位置 行内式嵌入式外部文件引入 1、行内式JS 可以将单行或少量的JS代码写在HTML标签的事件属性中&#xff08;以on开头的属性&#xff09;&#xff0c;如onclick单双引号使用&#xff1a;HTML中推荐双引号&#xff0c;JS中推荐单引号&#xff0c;如 <input type&quo…

vue实现将自己网站(h5链接)分享到微信中形成小卡片(超详细)

大家好&#xff0c;我是雄雄。 前言 我们在分享公众号信息到微信或者群中的时候&#xff0c;会出现一个小卡片&#xff0c;如下所示&#xff1a; 但是呢&#xff0c;这种小卡片只能走微信的接口来实现&#xff0c;比如我们从公众号、小程序中分享的内容可以是这样的。如果我们…

0基础转行,四个月,改变了我的人生!

转行对于很多人而言都是一个新的开始&#xff0c;但有的人是决定了立马去做&#xff0c;而有的人则是犹犹豫豫&#xff0c;我考虑考虑吧、还没有决定好、过段时间再说吧…… 就这样&#xff0c;相似情况的两个人&#xff0c;最后有了不同的结果。 很多人总是以我很忙、学历不高…

自学100天,零基础转行软件测试,我要以更好的姿态奔赴下一场山海!

三年大专一场空 专业是电子商务&#xff0c;18年毕业&#xff0c;当时在报考时时觉得电子商务挺高大上的&#xff0c;觉得电商肯定会有前途&#xff0c;以后毕业肯定好找工作&#xff0c;跟大多数人一样&#xff0c;我开始幻想我以后毕业以后的纸醉金迷的生活&#xff0c;我以…

模数转换器ADC

模数转换器ADC F28335内部的ADC模块是一个12位分辨率的、具有流水线结构的模数转换器,其结构框图如图11-1所示。从图11-1可以看到,F28335的ADC模块一共具有16个采样通道,分成了两组,一组为ADCINAO~ ADCINA7,另一组为ADCINBO~ADCINB7。A组的通道使用采样保持器A,也就是图…

【SCI论文解读复现NO.1】基于Transformer-YOLOv5的侧扫声纳图像水下海洋目标实时检测

前言 此前出了目标改进算法专栏&#xff0c;但是对于应用于什么场景&#xff0c;需要什么改进方法对应与自己的应用场景有效果&#xff0c;并且多少改进点能发什么水平的文章&#xff0c;为解决大家的困惑&#xff0c;此系列文章旨在给大家解读最新目标检测算法论文&#xff0…

【小学信息技术教资面试】《制作通讯录》教案

1.题目&#xff1a;制作通讯录 2.内容&#xff1a; 3.基本要求&#xff1a; &#xff08;1&#xff09;使用任务驱动法进行教学。 &#xff08;2&#xff09;掌握表格的插入和信息的填写。 &#xff08;3&#xff09;试讲时间是10分钟。 《制作通讯录》教案 一、教学目标&am…

我国核桃种植深加工行业供给较为充足 未来大健康消费时代将带来广阔市场

根据观研报告网发布的《2022年中国核桃种植深加工行业分析报告-行业竞争策略与发展动向研究》显示&#xff0c;核桃加工分为初加工与深加工。其中核桃深加工包括核桃油的压榨、核桃蛋白粉的制备、以核桃仁为原料生产休闲食品、以核桃青皮、壳等为原料生产加工日化产品等&#x…

Java agent 使用

一、前言 于一个即将上线的应用来说&#xff0c;系统监控是必不可少的&#xff0c;为什么需要监控呢&#xff1f;应用是跑在服务器上的&#xff0c;应用在运行过程中会发生各自意想不到的问题&#xff0c;像大家熟知的OOM&#xff0c;mysql故障&#xff0c;服务器宕机&#xff…