kafka之protobuf

news2024/11/14 21:45:48

Protobuf.proto 文件是一种描述消息结构的定义文件,使用这种文件可以定义数据结构(消息),然后生成对应语言的类或代码用于序列化和反序列化数据。生成 .proto 文件涉及到编写 .proto 文件定义,然后通过 protoc 编译器生成目标语言的代码(如 Java、Python、Go 等)。

生成 .proto 文件的步骤

1. 编写 .proto 文件

首先,手动编写 .proto 文件来定义消息的结构。每个 .proto 文件定义了消息类型、字段以及字段的类型和编号。

例如,下面的 .proto 文件定义了一个图片消息的结构,包括文件名、格式和二进制数据:

// image.proto
syntax = "proto3";

message ImageRecord {
  // 文件名
  string filename = 1;

  // 文件格式
  string format = 2;

  // 二进制数据
  bytes imageData = 3;
}
2. 使用 protoc 编译 .proto 文件

protoc 是 Google 的 Protocol Buffers 编译器,负责将 .proto 文件编译成对应编程语言的类文件。这些类文件用于序列化和反序列化数据。

2.1. 安装 protoc
  • 下载并安装 protoc
    • Linux/macOS:使用包管理器安装

# macOS
brew install protobuf

# Ubuntu
sudo apt-get install -y protobuf-compiler

Windows:从 官方下载页面 获取并安装。

win 解压上述zip包:

执行的文件就在这里:

为了方便使用可以把这个bin目录配置在系统环境变量里,也可以直接进入到这个文件夹里

如果配置环境变量的话,安装完毕之后验证:

验证安装:

protoc --version
2.2. 编译 .proto 文件

通过 protoc 命令来编译 .proto 文件为目标语言代码。下面是几种常见语言的生成方式。

2.2.1. 生成 Java 代码
# 将 image.proto 编译为 Java 类,生成到指定目录
protoc --java_out=./output image.proto

上述的指令直接在win的cmd命令行里即可完成,记得提前建好 output目录

执行完毕之后就会生成一个 .java文件

编译后,会在 ./output 目录下生成相应的 Java 类(如 ImageRecord.java),你可以直接使用这些类进行 Protobuf 的序列化和反序列化。

3. 使用生成的类

编译生成的类会包含以下功能:

  • 序列化:将定义的消息对象转换为二进制格式,适合传输或存储。
  • 反序列化:将二进制格式的数据解析回消息对象。

例如,使用生成的 Java 类序列化和反序列化 ImageRecord

import com.example.proto.ImageRecord;  // 假设包名为 com.example.proto
import java.nio.file.Files;
import java.io.File;

public class ProtobufExample {
    public static void main(String[] args) throws Exception {
        // 构建 ImageRecord 消息对象
        ImageRecord image = ImageRecord.newBuilder()
            .setFilename("example.jpg")
            .setFormat("jpg")
            .setImageData(ByteString.copyFrom(Files.readAllBytes(new File("example.jpg").toPath())))
            .build();

        // 序列化为二进制数据
        byte[] serializedData = image.toByteArray();

        // 反序列化为 ImageRecord 对象
        ImageRecord deserializedImage = ImageRecord.parseFrom(serializedData);

        System.out.println("Filename: " + deserializedImage.getFilename());
    }
}

4. 定义 .proto 文件的规则

以下是 .proto 文件的常见语法:

  • syntax:定义 Protobuf 版本,推荐使用 proto3,较为简洁并且是最新的标准。

syntax = "proto3";

  • 消息(Message)定义:使用 message 关键字定义数据结构。
message ImageRecord {
    string filename = 1;  // string 类型字段,字段编号为 1
    string format = 2;    // string 类型字段,字段编号为 2
    bytes imageData = 3;  // 二进制数据,字段编号为 3
}

字段类型:常见的 Protobuf 字段类型包括:

  • int32, int64: 整数
  • float, double: 浮点数
  • bool: 布尔值
  • string: 字符串
  • bytes: 二进制数据(如文件、图片、视频)

字段编号:每个字段必须有唯一的编号,编号用于序列化和反序列化。编号必须是正整数,1 到 15 的编号用于最常用的字段,因为它们序列化时占用更少的空间

嵌套消息:可以在消息中定义嵌套的消息类型

message User {
  string username = 1;
  Profile profile = 2; // 嵌套消息类型

  message Profile {
      string email = 1;
      int32 age = 2;
  }
}

总结

  1. 编写 .proto 文件:定义消息结构,包括字段类型、名称和编号。
  2. 使用 protoc 编译:将 .proto 文件编译为目标语言代码,如 Java、Python、Go 等。
  3. 使用生成的类:使用生成的类进行消息的序列化(转换为二进制格式)和反序列化(解析二进制数据)。

kafka和protobuf集成例子:

要将 Protobuf 与 Kafka 集成,我们可以使用 Protobuf 定义的数据结构作为 Kafka 消息体,并通过 Kafka Producer 将序列化的 Protobuf 消息发送到 Kafka。在消费者端,通过 Kafka Consumer 接收消息并反序列化为原始的 Protobuf 对象。

步骤:
  1. 编写 .proto 文件:定义消息的结构。
  2. 使用 protoc 编译生成类:使用 Protobuf 编译器将 .proto 文件编译为 Java/Python 等语言的类。
  3. Kafka Producer 发送 Protobuf 消息:使用生成的类,构造 Protobuf 消息并通过 Kafka Producer 发送。
  4. Kafka Consumer 接收并反序列化 Protobuf 消息:在 Kafka Consumer 中接收消息,并反序列化为 Protobuf 对象。
1. 编写 Protobuf .proto 文件

例如,定义一个包含图片信息的 ImageRecord.proto 文件:

syntax = "proto3";

message ImageRecord {
  string filename = 1;
  string format = 2;
  bytes imageData = 3;
}
2. 使用 protoc 编译生成 Java 类

假设使用 Java,将 .proto 文件编译为 Java 类:

protoc --java_out=./output ImageRecord.proto
3. Kafka Producer 发送 Protobuf 消息

通过 Kafka Producer 发送 Protobuf 格式的消息:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
import java.nio.file.Files
import java.io.File
import com.example.proto.ImageRecord

object ProtobufKafkaProducer {
    def main(args: Array[String]): Unit = {
        // Kafka Producer 配置
        val props = new Properties()
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")

        val producer = new KafkaProducer[String, Array[Byte]](props)

        // 构建 Protobuf 消息
        val imageBytes = Files.readAllBytes(new File("/path/to/image.jpg").toPath)
        val imageRecord = ImageRecord.newBuilder()
            .setFilename("image.jpg")
            .setFormat("jpg")
            .setImageData(com.google.protobuf.ByteString.copyFrom(imageBytes))
            .build()

        // 序列化并发送 Protobuf 消息到 Kafka
        val record = new ProducerRecord[String, Array[Byte]]("image_topic", "image_key", imageRecord.toByteArray)
        producer.send(record)

        producer.close()
    }
}
4. Kafka Consumer 接收并反序列化 Protobuf 消息

通过 Kafka Consumer 接收 Protobuf 消息并反序列化:

import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerRecords}
import java.util.Properties
import com.example.proto.ImageRecord

object ProtobufKafkaConsumer {
    def main(args: Array[String]): Unit = {
        // Kafka Consumer 配置
        val props = new Properties()
        props.put("bootstrap.servers", "localhost:9092")
        props.put("group.id", "test")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
        props.put("auto.offset.reset", "earliest")

        val consumer = new KafkaConsumer[String, Array[Byte]](props)
        consumer.subscribe(java.util.Arrays.asList("image_topic"))

        // 消费并反序列化消息
        while (true) {
            val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(100)
            records.forEach { record =>
                // 反序列化 Protobuf 消息
                val imageRecord = ImageRecord.parseFrom(record.value())
                println(s"Filename: ${imageRecord.getFilename}, Format: ${imageRecord.getFormat}")
            }
        }
    }
}
Kafka 与 Protobuf 集成的优势:
  1. 高效序列化:Protobuf 生成的二进制格式非常紧凑,适合大数据量和高吞吐场景。
  2. 跨语言支持:Protobuf 支持多种语言,因此 Kafka 与 Protobuf 的集成能轻松跨多语言系统工作。
  3. Schema 支持:通过 Protobuf,数据结构的变化可以通过 .proto 文件的模式演进进行管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2138436.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Acrobat XI 安装教程

软件介绍 Adobe Acrobat 是由Adobe公司开发的一款PDF(Portable Document Format,便携式文档格式)编辑软件。借助它,可以以PDF格式制作和保存文档,以便于浏览和打印,同时还可以使用一些高级工具来创建、编辑…

Linux PTP 测量实操 (IEEE 1588)

测量 IEEE 1588 需要使用 linuxptp 这个工程, 官网是 https://linuxptp.sourceforge.net/ 获取代码可以通过git git clone git://git.code.sf.net/p/linuxptp/code linuxptp 如果是当前环境本地编译的话, 直接在下载好的代码路径make就可以. 如果需要在开发板上面使用的话…

通信工程学习:什么是接入网(AN)中的TF传送功能

接入网(AN)中的TF传送功能 在通信工程中,TF(Transfer Function)传送功能是指为接入网(AN)不同位置之间提供通道和传输介质,以实现数据的有效传输。以下是关于TF传送功能的详细解释&a…

水滴式粉碎机:辣椒粉碎轻松搞定

在食品加工行业中,辣椒作为一种重要的调味品,其加工方式直接影响到产品的口感。水滴式粉碎机的粉碎方式不仅保留了辣椒原有的色泽、香味和营养成分,还减少了加工过程中的热损失和氧化反应,确保了辣椒粉的品质。 精细度与均匀度&am…

Kafka 消息丢失如何处理?

今天给大家分享一个在面试中经常遇到的问题:Kafka 消息丢失该如何处理? 这个问题啊,看似简单,其实里面藏着很多“套路”。 来,咱们先讲一个面试的“真实”案例。 面试官问:“Kafka 消息丢失如何处理&#x…

基于SSM+Vue+MySQL的在线医疗服务系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着医疗信息化的快速发展和患者对便捷医疗服务需求的日益增长,开发一个高效、可靠的在线医疗服务系统显得尤为重要。基于SSM(SpringSpring MVCMyBatis)框架、前端采用Vue.js、后端连接MySQL数…

CrossOver24.0.5破解版免费下载和永久激活图文教程,苹果电脑怎么玩《黑神话:悟空》

CrossOver24可以玩《黑神话:悟空》么?答案是可以的。 1、首先我们需要下载CrossOver24软件。 CrossOver24安装包夸克网盘链接:https://pan.quark.cn/s/35e64d746778 2、下载完成后,我们双击CrossOver.pkg开始安装,然…

LeetCode[简单] 141.环形链表

给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置(…

C++初阶学习——探索STL奥秘——模拟实现list类

1、基本框架 list 由三个类构建而成: 节点类:每个节点必须的三部分(指向前一个节点的指针、指向后一个节点的指针、当前节点存储的数据) 迭代器类:此时的迭代器为双向迭代器,比较特殊,需要对其进行封装,如 it并非使迭代器单纯向后移动&…

QT添加图标标题和打包项目

QT项目打包 项目的标题和图标标题项目图标exe图标 可执行文件——生成exeexe运行报错“找不到qt6gui.dll”等 相关库文件——生成zip安装包打包程序——生成exe安装包 项目的标题和图标 项目打包要好看点,得有个好点的标题和图标,这次打包的项目是我上一…

excel如何快速选中某个数字或者某串数字

鼠标光标放在某个数字或者某串数字的末尾,进行双击鼠标左键即可 (就会选中当前鼠标光标前相邻的所有数字):

【Node.js】RabbitMQ 延时消息

概述 在 RabbitMQ 中实现延迟消息通常需要借助插件(如 RabbitMQ 延迟队列插件),因为 RabbitMQ 本身不原生支持延迟消息。 延迟消息的一个典型场景是,当消息发布到队列后,等待一段时间再由消费者消费。这可以通过配置…

【拦截导弹】

​ 题目 ​​ 题解 题解:1010. 拦截导弹(dp与贪心) - AcWing 我谈几点: 第一,由此复习了upper_bound和lower_bound函数 第二,由此学习了贪心方式求“最多分割不严格递减子序列的数目”和“最长不严格递…

算法参数对拥塞控制的影响

来看看参数对公平收敛的影响。仅假象一下就知道应该是个加权公平,但事实如何,还是要具体看一下。 首先看 aimd,标准的 reno 算法是每 round 之后 cwnd 加 1,但如果有些流加 1,有些流加 2,会如何&#xff1…

踩坑【已解决】:使用maven打印结果是控制台输出中文乱码

报错原图: 解决方案: 1、修改maven->runner中的配置添加如下信息: -Dfile.encodingUTF-8 2、检查编码的配置信息: 3、检查窗口右下角的配置信息: 解决结果:

SEGGERS实时系统embOS推出Linux端模拟器

SEGGER 发布了两个新的 embOS 仿真模拟器:embOS Sim Linux 和 embOS-MPU Sim Linux。 通过模拟 Linux 主机系统上的硬件,取代物理硬件,为开发人员提供了一种无缝的方式来构建原型和测试应用程序。 embOS Sim Linux 端口支持 32 位和 64 位系…

【在Linux世界中追寻伟大的One Piece】网络命令|验证UDP

目录 1 -> Ping命令 2 -> Netstat命令 3 -> Pidof命令 4 -> 验证UDP-Windows作为client访问Linux 4.1 -> UDP client样例 1 -> Ping命令 Ping命令是一种网络诊断工具,它使用ICMP(Internet Control Message Protocol,互联网控制消…

CAN BUS

CAN BUS 原理 网上资料非常丰富,是车载系统主要BUS之一。 我们关注如下方面 can bus 是什么网络结构CAN BUS 协议ECU node实现其他 What is CAN Bus? Control Area Network (CAN) bus is a serial communication protocol that allows devices to exchange dat…

MySQL:视图【详解】

1、视图 1.1 视图的定义 视图是在数据库中定义的虚拟表。它是一个基于一个或多个实际表的查询结果集,可以像实际表一样被查询和操作。视图可以看作是一个动态生成的数据表,其内容是从其他表中选择、过滤和计算得到的。 视图通过使用SQL查询语句来定义…

Framebuffer应用编程

目录 前言 LCD操作原理 涉及的 API 函数 open函数 ioctl 函数 mmap 函数 Framebuffer程序分析 源码 1.打开设备 2.获取LCD参数 3.映射Framebuffer 4.描点函数 5.随便画几个点 上机实验 前言 本文介绍LCD的操作原理和涉及到的API函数,分析Framebuffer…