(七)消息队列-Kafka 序列化avro(传递)

news2025/3/3 23:13:43

(七)消息队列-Kafka 序列化avro(传递)

客从远方来,遗我双鲤鱼。呼儿烹鲤鱼,中有尺素书。
——佚名《饮马长城窟行》

在这里插入图片描述

本文已同步CSDN、掘金平台、知乎等多个平台,图片依然保持最初发布的水印(如CSDN水印)。(以后属于本人原创均以新建状态在多个平台分享发布)

前言

多年前,由于工作的性质,发现这系列没有写完,想了想,做人做事还是要有始有终。🤣实在是借口太多了,太不像话了…由于时间过得太久了,这篇开始,可能很多技术以最新或最近的几个版本为主了。

问题背景

在Kafka中,生产者与消费者之间传输消息时,通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题:

  1. 数据冗余:字段名重复存储,占用带宽;
  2. 兼容性差:新增或删除字段时容易导致上下游解析失败;
  3. 类型安全缺失:动态解析易引发运行时错误。

而Avro作为一种高效的二进制序列化框架,通过Schema定义数据结构,可实现紧凑存储动态兼容性强类型校验,成为Kafka生态中推荐的序列化方案27。


核心原理
  1. Schema驱动
    Avro要求所有数据必须与预定义的Schema文件(.avsc)匹配。Schema以JSON格式描述数据结构,例如:

    {
      "type": "record",
      "name": "User",
      "namespace": "com.example.avro",
      "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"}
      ]
    }
    

    然后使用 avro-maven-plugin 生成 Java 类:

    <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.11.0</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    

    执行 mvn clean compile 后,com.example.avro.User 类会被自动生成。

    生产者与消费者需共享同一Schema,确保序列化与反序列化的一致性。

  2. 二进制编码
    Avro将数据转换为紧凑的二进制格式,相比JSON减少约30%-50%的存储与传输开销。例如,整型字段直接以二进制存储,无需字段名冗余7。

  3. Schema Registry
    为实现Schema动态管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:

    • Schema版本控制与兼容性检查;
    • 通过唯一ID关联消息与Schema,避免传输完整Schema带来的性能损耗。

实现步骤

以下以Java代码为例,展示Kafka集成Avro的配置方法:

1. 添加依赖
<dependencies>
    <!-- Spring Kafka 依赖-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- Avro 依赖 -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
    </dependency>
    
    <!-- Schema Registry 依赖 -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.2.1</version>
    </dependency>
</dependencies>

运行 HTML

2. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

// 构建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");

producer.send(new ProducerRecord<>("user-topic", user));



------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      schema.registry.url: http://localhost:8081
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      
@Service
public class UserProducer {
    private final KafkaTemplate<String, User> kafkaTemplate;
    
    @Value("${kafka.topic.user}")
    private String topic;

    public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendUser(User user) {
        kafkaTemplate.send(topic, user.getId().toString(), user);
    }
}

在 Spring Boot 启动后,我们可以使用以下代码发送一个 User 消息:
User user = User.newBuilder()
    .setId(1)
    .setName("Alice")
    .build();
userProducer.sendUser(user);

控制台应该能够看到消费者成功接收到 User 数据
3. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");

Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));

while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, GenericRecord> record : records) {
        System.out.println("Received: " + record.value().get("name"));
    }
}

------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数:

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true

然后编写 Kafka 消费者代码:

@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {

    @KafkaHandler
    public void consume(User user) {
        System.out.println("Received user: " + user.getName());
    }
}



常见问题与解决方案
  1. Schema兼容性错误
    • 现象:生产者更新Schema后消费者无法解析旧数据。
    • 解决:在Schema Registry中配置兼容性策略(如BACKWARD),允许新增字段并设置默认值7。
  2. ClassNotFoundException
    • 现象:反序列化时提示Avro生成的类不存在。
    • 解决:通过Maven插件avro-maven-plugin自动生成Java类,并确保生成路径在编译范围内2。
  3. 性能瓶颈
    • 现象:高吞吐场景下序列化延迟较高。
    • 优化:复用DatumWriterDatumReader实例,避免重复初始化开销7。

总结

Avro通过Schema定义与二进制编码,为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理,适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优,具体工具链配置可参考Confluent官方文档27。


引用说明

  • 代码结构参考自SpringBoot RestTemplate配置方案,通过替换默认组件实现功能增强。
  • Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。

后续

下期预告,敬请关注:
(八)消息队列-Kafka 生产者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2309222.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot使用redis

springboot使用redis redis-service.exe : 服务端,启动后不要关闭 redis-cli.exe : 客户端,访问redis中的数据 redisclient-win32.x86_64.2.0.jar : redis的图形界面客户端,执行方式是在这个文件的目录执行 java -jar redisclient-win32.x86_64.2.0.jar或者在这个jar包的目录…

【实战篇】【深度解析DeepSeek:从机器学习到深度学习的全场景落地指南】

一、机器学习模型:DeepSeek的降维打击 1.1 监督学习与无监督学习的"左右互搏" 监督学习就像学霸刷题——给标注数据(参考答案)训练模型。DeepSeek在信贷风控场景中,用逻辑回归模型分析百万级用户数据,通过特征工程挖掘出"凌晨3点频繁申请贷款"这类魔…

SpringBoot高校运动会管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.报名赛事代码2.用户登录代码3.保存成绩代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBoot框架开发的高校运动会管理系统项目。首先&#xff0c;这…

【Linux网络-HTTP协议】HTTP基础概念+构建HTTP

代码定位&#xff1a;南毅c/Linux - Gitee.com HTTP协议 介绍 虽然我们说&#xff0c;应用层协议是我们程序猿自己定的.但实际上,已经有大佬们定义了一些现成的,又非常好用的应用层协议,供我们直接参考使用。HTTP(超文本传输协议)就是其中之一。 在互联网世界中&#xff0c…

高频 SQL 50 题(基础版)_626. 换座位

高频 SQL 50 题&#xff08;基础版&#xff09;_626. 换座位 select(case when mod(id,2)!0 AND counts ! id then id1when mod(id,2)!0 AND counts id then idelse id -1end) as id,student fromseat,(selectcount(*) as countsfrom seat) as seat_counts order by id asc;

python第十一课:并发编程 | 多任务交响乐团

&#x1f3af; 本节目标 理解多线程/多进程/协程的应用场景掌握threading与multiprocessing核心用法学会使用asyncio进行异步编程开发实战项目&#xff1a;高并发爬虫引擎破解GIL锁的性能迷思 1️⃣ 并发编程三剑客 &#x1f3bb; 生活化比喻&#xff1a; 多线程 → 餐厅多个…

基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成

本教程的演示都将在 Flink CDC CLI 中进行&#xff0c;无需一行 Java/Scala 代码&#xff0c;也无需安装 IDE。 这篇教程将展示如何基于 Flink CDC YAML 快速构建 MySQL 到 Kafka 的 Streaming ELT 作业&#xff0c;包含整库同步、表结构变更同步演示和关键参数介绍。 准备阶段…

解决 ERROR 1130 (HY000): Host is not allowed to connect to this MySQL server

当使用 MySQL 时&#xff0c;您可能会遇到错误信息“ERROR 1130 (HY000): Host ‘hostname’is not allowed to connect to this MySQL server”这是 MySQL 用于防止未经授权的访问的标准安全特性。实际上&#xff0c;服务器还没有配置为接受来自相关主机的连接。 Common Caus…

科普|无人机专业术语

文章目录 前言一、飞控二、电调三、通道四、2S、3S、4S电池五、电池后面C是什么意思?六、电机的型号七、什么是电机的KV值?八、螺旋桨的型号九、电机与螺旋桨的搭配 前言 无人机飞控系统控制飞行姿态&#xff0c;电调控制电机转速&#xff0c;遥控器通道控制飞行动作。电池C…

Qt:窗口

目录 菜单栏 QMenuBar 菜单添加快捷键 添加子菜单 添加分割线和添加图标 QMenuBar创建方式 工具栏 QToolBar 和菜单栏搭配 创建多个工具栏 状态栏 QStatusBar 状态栏中添加其他控件 浮动窗口 QDockWidget 对话框 对话框的内存释放问题 自定义对话框界面 模态对话…

深入浅出 Go 语言:协程(Goroutine)详解

深入浅出 Go 语言&#xff1a;协程(Goroutine)详解 引言 Go 语言的协程&#xff08;goroutine&#xff09;是其并发模型的核心特性之一。协程允许你轻松地编写并发代码&#xff0c;而不需要复杂的线程管理和锁机制。通过协程&#xff0c;你可以同时执行多个任务&#xff0c;并…

Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

实验:k8s+keepalived+nginx+iptables

1、创建两个nginx的pod&#xff0c;app都是nginx nginx1 nginx2 2、创建两个的pod的service 3、配置两台keepalived的调度器和nginx七层反向代理&#xff0c;VIP设置192.168.254.110 keepalived调度器master keepalived调度器backup 两台调度器都配置nginx七层反向代理&#…

LlamaFactory-webui:训练大语言模型的入门级教程

LlamaFactory是一个开源框架&#xff0c;支持多种流行的语言模型&#xff0c;及多种微调技术&#xff0c;同时&#xff0c;以友好的交互式界面&#xff0c;简化了大语言模型的学习。 本章内容&#xff0c;从如何拉取&#xff0c;我已经搭建好的Llamafactory镜像开始&#xff0…

手机打电话时如何识别对方按下的DTMF按键的字符-安卓AI电话机器人

手机打电话时如何识别对方按下的DTMF按键的字符 --安卓AI电话机器人 一、前言 前面的篇章中&#xff0c;使用蓝牙电话拦截手机通话的声音&#xff0c;并对数据加工&#xff0c;这个功能出来也有一段时间了。前段时间有试用的用户咨询说&#xff1a;有没有办法在手机上&#xff…

基于SpringBoot和PostGIS的省域“地理难抵点(最纵深处)”检索及可视化实践

目录 前言 1、研究背景 2、研究意义 一、研究目标 1、“地理难抵点”的概念 二、“难抵点”空间检索实现 1、数据获取与处理 2、计算流程 3、难抵点计算 4、WebGIS可视化 三、成果展示 1、华东地区 2、华南地区 3、华中地区 4、华北地区 5、西北地区 6、西南地…

【Qt】详细介绍如何在Visual Studio Code中编译、运行Qt项目

Visual Studio Code一只用的顺手&#xff0c;写Qt的时候也能用VS Code开发就方便多了。 理论上也不算困难&#xff0c;毕竟Qt项目其实就是CMake&#xff08;QMake的情况这里就暂不考虑了&#xff09;项目&#xff0c;VS Code在编译、运行CMake项目还是比较成熟的。 这里笔者打…

本地部署大语言模型-DeepSeek

DeepSeek 是国内顶尖 AI 团队「深度求索」开发的多模态大模型&#xff0c;具备数学推理、代码生成等深度能力&#xff0c;堪称"AI界的六边形战士"。 Hostease AMD 9950X/96G/3.84T NVMe/1G/5IP/RTX4090 GPU服务器提供多种计费模式。 DeepSeek-R1-32B配置 配置项 规…

SQLAlchemy系列教程:SQLAlchemy快速入门示例项目

SQLAlchemy是与数据库交互的Python开发人员不可或缺的库。这个强大的ORM允许使用python结构进行简单的数据库操作。设置过程很简单&#xff0c;并且允许可扩展的数据库应用程序开发。本文通过入门项目完整介绍SQLAlchemy的应用过程&#xff0c;包括安装依赖包&#xff0c;创建连…

【Linux网络#10】:Https协议原理

&#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;Linux—登神长阶 ⛺️ 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f442;&#x1f3fd;留言 &#x1f60d;收藏 &#x1f49e; &#x1f49e; &#x1f49e; 生活总是不会一帆风顺&#xf…