Kafka 的消息格式:了解消息结构与序列化

news2025/1/18 21:10:25

Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。

1. Kafka 消息结构

Kafka 的消息结构由消息头、消息键、消息值和时间戳等组成。下面是一个典型的 Kafka 消息结构:

----------------------------------------------------------------------------------------------
| Message Header | Key | Value | Timestamp | Optional Headers |
----------------------------------------------------------------------------------------------

1.1 消息头

消息头包含一些元数据信息,例如消息的大小、压缩信息等。消息头的结构可能会根据 Kafka 版本和配置而有所不同。

1.2 消息键与消息值

  • 消息键(Key): 用于标识消息的唯一性,通常用于分区和查找消息。

  • 消息值(Value): 包含实际的消息内容。

1.3 时间戳

时间戳表示消息的产生时间,有两种类型:

  • 创建时间戳: 表示消息被创建的时间。

  • LogAppendTime 时间戳: 表示消息被追加到日志的时间。

2. 消息的序列化与反序列化

Kafka 中的消息在生产者发送和消费者接收时需要进行序列化和反序列化。这是因为 Kafka 是以字节流的形式存储和传输消息的,而实际的消息内容可能是各种不同的数据类型。以下是一些常用的序列化器和反序列化器:

2.1 字符串序列化器

// 生产者端
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");

// 消费者端
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

2.2 Avro 序列化器

Avro 是一种高性能且紧凑的二进制序列化格式,适用于复杂数据结构的消息。

// 生产者端
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("field1", "value1");
avroRecord.put("field2", 42);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my-topic", "key", avroRecord);

// 消费者端
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    GenericRecord value = record.value();
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

2.3 JSON 序列化器

// 生产者端
JsonNode jsonNode = objectMapper.createObjectNode();
((ObjectNode) jsonNode).put("field1", "value1");
((ObjectNode) jsonNode).put("field2", 42);
ProducerRecord<String, JsonNode> record = new ProducerRecord<>("my-topic", "key", jsonNode);

// 消费者端
ConsumerRecords<String, JsonNode> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    JsonNode value = record.value();
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), value);
});

3. 自定义消息格式

在某些情况下,你可能需要定义自己的消息格式。Kafka 提供了 ByteArraySerializerByteArrayDeserializer,允许你将消息以字节数组的形式发送和接收,从而实现自定义的序列化和反序列化逻辑。

// 生产者端
byte[] customMessageBytes = serializeCustomMessage(customMessage);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("my-topic", "key", customMessageBytes);

// 消费者端
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
    byte[] value = record.value();
    CustomMessage customMessage = deserializeCustomMessage(value);
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), customMessage);
});

4. 消息的压缩与解压

Kafka 支持消息的压缩,以减小网络传输的开销。以下是一些常用的压缩选项:

// 生产者端
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);

// 消费者端
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
Consumer<String, String> consumer = new KafkaConsumer<>(props);

5. 消息的版本控制与兼容性

在实际应用中,系统的演进和变化是不可避免的。因此,考虑到消息的版本控制和兼容性是非常重要的。以下是一些相关的注意事项和最佳实践:

5.1 消息的演进

  • 向后兼容性: 新版本的消费者能够处理旧版本的消息。

  • 向前兼容性: 旧版本的消费者能够处理新版本的消息。

5.2 Schema Registry

Schema Registry 是一个用于存储和管理 Avro、JSON 等消息格式的架构的中心化服务。通过使用 Schema Registry,可以更好地管理消息的演进,并确保向前和向后的兼容性。

// 配置 Schema Registry 地址
props.put("schema.registry.url", "http://schema-registry:8081");

6. 消息的认证与加密

Kafka 提供了安全性特性,包括消息的认证和加密。以下是一些相关的配置选项:

6.1 SSL 加密通信

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

6.2 认证配置

// 生产者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

// 消费者端
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

7. 消息的追踪与监控

追踪和监控是保障系统稳定性和性能的重要手段。以下是一些常用的追踪和监控工具:

7.1 JMX 监控

Kafka 提供了 JMX 接口,可以通过 JConsole 或其他 JMX 客户端进行监控。

7.2 Kafka Manager

Kafka Manager 是一款开源的 Kafka 集群管理和监控工具,提供了直观的 Web 界面。

7.3 Prometheus 和 Grafana

使用 Prometheus 进行指标采集,结合 Grafana 进行可视化展示,可以更全面地监控 Kafka 集群的性能和健康状况。

总结

在深入探讨Kafka消息格式、版本控制、安全性和监控等关键主题后,对构建高效、灵活的消息系统有了更为全面的认识。了解消息结构、序列化与反序列化、自定义消息格式,以及消息的压缩与解压,是确保消息传递的基础。随后,版本控制与兼容性的重要性得到了强调,Schema Registry成为管理Avro、JSON等消息格式的利器。在保障消息传递安全方面,SSL加密通信和认证配置提供了可靠的手段。最后,通过JMX监控、Kafka Manager、以及Prometheus和Grafana的运用,能够实时追踪和监控Kafka集群的健康状态。

这篇文章旨在为大家提供全方位的Kafka消息系统知识,使其能够在实际应用中根据业务需求构建稳健、高效的消息处理系统。深入理解这些关键概念,将有助于确保消息系统的可维护性、稳定性和安全性,为实际业务场景中的挑战提供可行的解决方案。继续关注更多Kafka相关的技术内容,将使大家能够不断深化对消息系统的认识,应对日益复杂的数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1290847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能-编译器和解释器

编译器和解释器 命令式编程使用诸如print、“”和if之类的语句来更改程序的状态。 考虑下面这段简单的命令式程序&#xff1a; def add(a, b):return a bdef fancy_func(a, b, c, d):e add(a, b)f add(c, d)g add(e, f)return gprint(fancy_func(1, 2, 3, 4)) 10 Python…

【分布式微服务专题】从单体到分布式(一、SpringCloud项目初步升级)

目录 前言阅读对象阅读导航前置知识笔记正文一、单体服务介绍二、服务拆分三、分布式微服务升级前的思考3.1 关于SpringBoot/SpringCloud的思考【有点门槛】 四、SpringCloud升级整合4.1 新建父子项目 学习总结感谢 前言 从本节课开始&#xff0c;我将自己手写一个基于SpringC…

谷歌刚刚发布了Gemini 1.0,采用了OpenAI的GPT4

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 对于谷歌和安卓来说&#xff0c;这是一个重要时刻。谷歌刚刚发布了 Gemini 1.0&#xff0c;这是其最新的LLM&#xff0c;它采用了 OpenAI 的 GPT4。 共有三种不同…

WPF仿网易云搭建笔记(0):项目搭建

文章目录 前言项目地址项目Nuget包搭建项目初始化项目架构App.xaml引入MateralDesign资源包 项目初步分析将标题栏去掉DockPanel初步布局 资源字典举例 结尾 前言 最近在找工作&#xff0c;发现没有任何的WPF可以拿的出手的工作经验&#xff0c;打算仿照网易云搭建一个WPF版本…

深度解析 Kafka 中的 Offset 管理与最佳实践

Kafka 中的 Offset&#xff08;偏移量&#xff09;是消息处理的关键元素&#xff0c;对于保证消息传递的可靠性和一致性至关重要。本篇博客将深度解析 Kafka 中的 Offset 管理机制&#xff0c;并提供丰富的示例代码&#xff0c;让你更全面地理解 Offset 的原理、使用方法以及最…

鸿蒙Harmony ArkUI十大开源项目

一 OH哔哩 https://gitee.com/ohos_port/ohbili 项目简介 【OH哔哩】是一款基于OpenHarmony系统ArkUI框架开发的哔哩哔哩动画第三方客户端 用到的三方库 bilibili-API-collect 哔哩哔哩-API收集整理ohos_ijkplayer 基于FFmpeg的视频播放器PullToRefresh 下拉刷新、上拉加载组件…

html通过CDN引入Vue组件抽出复用

html通过CDN引入Vue组件抽出复用 近期遇到个需求&#xff0c;就是需要在.net MVC的项目中&#xff0c;对已有的项目的首页进行优化&#xff0c;也就是写原生html和js。但是咱是一个写前端的&#xff0c;写html还可以&#xff0c;.net的话&#xff0c;开发也不方便&#xff0c;还…

CleanMyMac X4.15.0最新官方和谐版下载

Mac系统进行文件清理&#xff0c;一般是直接将文件拖动入“废纸篓”回收站中&#xff0c;然后通过清理回收站&#xff0c;就完成了一次文件清理的操作&#xff0c;但是这么做并无法保证文件被彻底删除了&#xff0c;有些文件通过一些安全恢复手段依旧是可以恢复的&#xff0c;那…

持续集成交付CICD: Sonarqube REST API 查找与新增项目

目录 一、实验 1.SonarQube REST API 查找项目 2.SonarQube REST API 新增项目 一、实验 1.SonarQube REST API 查找项目 &#xff08;1&#xff09;Postman测试 转换成cURL代码 &#xff08;2&#xff09;Jenkins添加凭证 &#xff08;3&#xff09;修改流水线 pipeline…

解决finalshell右键选择粘贴后出现直接执行的问题

文章目录 已经找到问题原因我的问题错误的解决 已经找到问题原因 复制的时候&#xff0c;只复制名字&#xff0c;不要复制后面多出来的东西&#xff0c;不然会自动加上回车换行 我的问题 我当时是想通过 ls -l 查出jdk的文件后&#xff0c; 复制文件名就不用看着敲了&#x…

李宏毅bert记录

一、自监督学习&#xff08;Self-supervised Learning&#xff09; 在监督学习中&#xff0c;模型的输入为x&#xff0c;若期望输出是y&#xff0c;则在训练的时候需要给模型的期望输出y以判断其误差——有输入和输出标签才能训练监督学习的模型。 自监督学习在没有标注的训练…

U-boot(八):官方uboot移植

本文主要探讨从ubboot官方移植uboot到x210。 基础 确定设备的配置文件 通过board.cfg中的cpu型号(s5pc1xx)确定设备的配置文件 头文件:include/configs/s5p_goni.h cpu: u-boot-2013.10\arch\arm\cpu\armv7 board: u-boot-2013.10\b…

AI 绘画 | Stable Diffusion 动漫人物真人化

前言 如何让一张动漫人物变成真实系列人物?Stable Diffusion WebUI五步即可实现。快来使用AI绘画打开异世界的大门吧!!! 动漫真人化 首先在图生图里上传一张二次元动漫人物图片,然后选择一个真实系人物画风的大模型,最后点击DeepBooru 反推,自动填充提示词,调整重绘…

CleanMyMac x4.15软件应用程序永久使用

许多刚从Windows系统转向Mac系统怀抱的用户&#xff0c;一开始难免不习惯&#xff0c;因为Mac系统没有像Windows一样的C盘、D盘&#xff0c;分盘分区明显。因此这也带来了一些问题&#xff0c;关于Mac的磁盘的清理问题&#xff0c;怎么进行清理&#xff1f;怎么确保清理的干净&…

系统设计-缓存介绍

该图说明了我们在典型架构中缓存数据的位置。 沿着流程有多个层次。 客户端应用程序&#xff1a;HTTP 响应可以由浏览器缓存。我们第一次通过 HTTP 请求数据&#xff0c;返回时在 HTTP 标头中包含过期策略&#xff1b;我们再次请求数据&#xff0c;客户端应用程序首先尝试从浏…

04 ECharts基础入门

文章目录 一、ECharts介绍1. 简介2. 相关网站3. HTML引入方式4. 基本概念 二、常见图表1. 柱状图2. 折线图3. 饼图4. 雷达图5. 地图 三、应用1. 动画2. 交互 一、ECharts介绍 1. 简介 ECharts是一个使用JavaScript实现的开源可视化库&#xff0c;用于生成各种图表和图形。 EC…

确定TME浸润模式的TMEscore包(胃癌)

步骤学习&#xff1a; 1&#xff0c;基因筛选&#xff1a; 作者使用先前研究得出的 244 肿瘤免疫相关基因&#xff08;244里有AB两个细分亚集&#xff09;&#xff0c;对特征基因进行缩减。从多个免疫治疗队列中获取这些基因的重要性特征。&#xff08;TCGA-SKCM、GSE78220、…

layui实现下拉框多选

引用layui第三方扩展实现下拉框选择渲染 第三方插件地址xmSelect下拉多选 xmSelect 实现效果 //第三方扩展插件 <script type"text/javascript" src"${ctx }/config/layui/dist/xm-select.js"></script> //jquery渲染 <script type&qu…

微服务的利与弊

一、前言 自从大多数web架构从单体演进到服务拆分&#xff0c;到微服务一统天下的几年来&#xff0c;应该没有web应用不是微服务架构的吧。最开始是阿里的doubble分层架构&#xff0c;到后来的SpringCloud全家桶&#xff0c;还有各个大厂自己定义的一套服务治理框架。微服务无…

visual Studio MFC 平台实现拉普拉斯和拉普拉斯与直方图均衡化与中值滤波相结合实现比较

拉普拉斯变换的原理与应用 本文使用visual Studio MFC 平台实现图像增强中的拉普拉斯变换&#xff0c;同时拉普拉斯一般不会单独使用&#xff0c;与其他平滑操作相结合&#xff0c;本文使用了拉普拉斯与直方图均衡化以及与中值滤波相结合&#xff0c;也对三种方式进行了对比 关…