Kafka Schema-Registry

news2025/1/16 1:36:05

一、为什么需要Schema-Registry

1.1、注册表

无论是 使用传统的Avro API自定义序列化类和反序列化类 还是 使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?

我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:
在这里插入图片描述

  1. 把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
  2. 负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。
  3. 序列化器和反序列化器分别负责处理 schema 的注册和拉取。

schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。

1.2、为什么使用 Avro

Avro 序列化相比常见的序列化(比如 json)会更快,序列化的数据会更小。相比 protobuf ,它可以支持实时编译,不需要像 protobuf 那样先定义好数据格式文件,编译之后才能使用。

1.3、Confluent Schema-Registry

Confluent公司为了能让 Kafka 支持 Avro 序列化,创建了 Kafka Schema Registry 项目,项目地址为 https://github.com/confluentinc/schema-registry 。对于存储大量数据的 kafka 来说,使用 Avro 序列化,可以减少数据的存储空间提高了存储量,减少了序列化时间提高了性能。 Kafka 有多个topic,里面存储了不同种类的数据,每种数据都对应着一个 Avro schema 来描述这种格式。Registry 服务支持方便的管理这些 topic 的schema,它还对外提供了多个 `restful 接口,用于存储和查找。

二、Confluent Schema-Registry 安装与使用

2.1、安装

  1. Schema Registry的各个发现行版本的下载链接
  2. 上传到linux系统进行解压安装。
  3. 本教程使用外部以安装好的Kafka集群不使用内部默认的。
  4. 修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置文件
# 注册服务器的监听地址及其端口号
listeners=http://0.0.0.0:8081

# 有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
  1. 注册服务器的启动…/…/bin/schema-registry-start -daemon …/…/etc/schema-registry/schema-registry.properties

2.2、RestAPI使用

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

2.2、Java 代码

2.2.0、注册

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://localhost:8081/subjects/chb_test/versions

2.2.1、添加依赖

我们需要 confluent-common 目录下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar

将本地jar包导入到本地仓库

mvn install:install-file -Dfile=G:\迅雷下载\kafka-avro-serializer-6.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=6.2.0  -Dpackaging=jar

添加到pom.xml

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.3.0</version>
		</dependency>
		<!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索-->
		<!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用-->
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>5.3.2</version>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-registry-client</artifactId>
			<version>5.3.2</version>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-config</artifactId>
			<version>5.3.2</version>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-utils</artifactId>
			<version>5.3.2</version>
		</dependency>

		<dependency>
			<groupId>org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.8.2</version>
		</dependency>

		<!-- jaskson start -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.9.10</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.9.10</version>
		</dependency>
		<!-- jaskson end -->

2.2.2、Kafka 客户端使用原理

Kafka Schema Registry 提供了 KafkaAvroSerializer 和 KafkaAvroDeserializer 两个类。Kafka 如果要使用 Avro 序列化, 在实例化 KafkaProducer 和 KafkaConsumer 时, 指定序列化或反序列化的配置。

客户端发送数据的流程图如下所示:

在这里插入图片描述

2.2.2.1、KafkaProducer
package com.chb.common.kafka.schema;


import java.util.Properties;
import java.util.Random;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ConfluentProducer {
    public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
            "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
            "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:6667");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Random rand = new Random();
        int id = 0;
        while (id < 100) {
            id++;
            String name = "name" + id;
            int age = rand.nextInt(40) + 1;
            GenericRecord user = new GenericData.Record(schema);
            user.put("id", id);
            user.put("name", name);
            user.put("age", age);
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test-topic", user);
            System.out.println(user);
            producer.send(record);
            Thread.sleep(1000);
        }

        producer.close();
    }
}
2.2.2.2、KafkaConsumer
package com.chb.common.kafka.schema;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class ConfluentConsumer {


    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:6667");
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
2.2.2.3、Consumer的消费结果
value = [user.id = 98, user.name = name98, user.age = 38], partition = 0, offset = 97
value = [user.id = 99, user.name = name99, user.age = 30], partition = 0, offset = 98
value = [user.id = 100, user.name = name100, user.age = 39], partition = 0, offset = 99

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/597956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

持续集成部署-微前端 镜像可以有多小?

微前端 镜像可以有多小&#xff1f; 1. 需求2. 开整 1. 需求 目前项目前端的镜像大小基本在 150M 左右&#xff0c;试下能不能缩小到 20M&#xff1f; 看了下前端打包后的压缩包只有 几 兆&#xff1b; 想着有空调试下&#xff0c;第一反应应该是使用 alpine 镜像&#xff0…

Delving into Shape-aware Zero-shot Semantic Segmentation(CVPR2023)

文章目录 摘要本文方法Pixel-wise Vision-Language AlignmentShape ConstraintSelf-supervised Spectral Decomposition推理 实验结果 摘要 由于大规模视觉语言预处理取得了令人瞩目的进展&#xff0c;最近的识别模型可以以零样本和开放集的方式对任意对象进行分类&#xff0c…

WIKIBON:大模型炒作中,有哪些云与AI的新趋势?

进入2023年以来&#xff0c;以ChatGPT为代表的大模型喧嚣引发了AI的新一轮炒作热潮&#xff0c;堪比当年的加密货币。不同的是&#xff0c;以微软、NVIDIA、AWS、Google等为代表的云与芯片大厂纷纷实质性入局大模型&#xff0c;为大模型AI注入持续的生命力。因此ChatGPT可类比于…

【网络原理】TCP/IP四层模型中的重点网络协议

目录 &#x1f31f;一、应用层协议 &#x1f308;1、XML协议 &#x1f308; 2、JSON &#x1f308; 3、其他协议 &#x1f31f;二、传输层协议&#xff08;UDP与TCP重点&#xff09; &#x1f308;1、UDP协议格式 &#x1f308; 2、TCP协议格式 &#x1f389;TCP的10条…

字母钥匙圈

钥匙圈&#xff0c;字母&#xff01; 项目概况&#xff1a; 在这个项目中&#xff0c;您将学习使用字母制作钥匙圈&#xff01; 放置字母 是时候发挥创意了。为您的朋友或您自己设计一个。 指示 首先将字母拖到红色底座上&#xff08;位于“设计入门”>“字母和数字”下…

RESTful Python

RESTful Python是一种使用Python编程语言实现RESTful API的方法。下面是一些常用的Python库和框架&#xff0c;可以用来创建RESTful API: Flask&#xff1a; Flask是一个轻量级的Python Web框架&#xff0c;可以用来创建RESTful API。它具有灵活、易于使用和快速开发的特点。 …

[数据库]关于数据库设计的原则

数据表设计原则: 自动编号的ID应该设计为bigint&#xff0c;因为int可能不够用&#xff0c;并且&#xff0c;为了便于统一管理&#xff0c;写的舒心不出错&#xff0c;建议所有表的自增ID全部使用bigint 。&#xff08;缺点是占空间&#xff0c;如果有20亿条数据&#xff0c;浪…

地震勘探基础(二)之地震分辨率

地震分辨率 分辨率&#xff08;resolution&#xff09;表示分离出两个十分靠近的物体的能力&#xff0c;一般用距离表示。如果两个物体之间的距离大于某个特定距离时可以分辨出是两个分离的物体&#xff0c;小于这个特定距离时就不再能分辨出是两个物体&#xff0c;那么这个特…

电压放大器在超声波测距仪中的应用实例研究

超声波测距仪是一种用于测量距离的设备&#xff0c;其原理是利用超声波在空气中传播的速度和反射特性来计算距离。而电压放大器则是超声波测距仪中的一个重要组成部分&#xff0c;它可以将超声波信号放大到足够强的水平&#xff0c;以便于后续处理和显示。本安泰电子将为大家介…

【学习日记2023.6.1】数据库隔离级别

1. 数据库隔离级别 1.1 事务 事务只是一个改变&#xff0c;是一些操作的集合&#xff1b;用专业的术语讲&#xff0c;他就是一个程序的执行单元&#xff1b;事务本身其实并不包含这4个特性&#xff0c;只是我们需要通过某些手段&#xff0c;尽可能的让这个执行单元满足这四个特…

【C语言进阶笔记】

文章目录 1. const&#xff08;常量指针、指针常量&#xff09;2. static3. extern4. 指针数组和数组指针5. 结构体对齐6. int / uint取值范围、二进制形式与转换、负数表示7. \0&#xff0c;0&#xff0c;"0"&#xff0c;0之间的区别8. 类型自动转换9. 内存结构10. …

【前端之旅】nvm-Node版本管理工具

一名软件工程专业学生的前端之旅,记录自己对三件套(HTML、CSS、JavaScript)、Jquery、Ajax、Axios、Bootstrap、Node.js、Vue、小程序开发(Uniapp)以及各种UI组件库、前端框架的学习。 【前端之旅】Web基础与开发工具 【前端之旅】手把手教你安装VS Code并附上超实用插件…

【HttpRunnerManager】搭建接口自动化测试平台操作流程

一、需要准备的知识点 1. linux: 安装 python3、nginx 安装和配置、mysql 安装和配置 2. python: django 配置、uwsgi 配置 二、我搭建的环境 1. Centos7 &#xff08;配置 rabbitmq、mysql 、Supervisord&#xff09; 2. python 3.6.8 &#xff08;配置 django、uwsgi&am…

自然语言处理实战9-大语言模型的训练与文本生成过程

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理实战9-大语言模型的训练与文本生成过程&#xff0c;以下是本文的目录结构&#xff1a; 文章目录 1.引言 2.大语言模型概述 3.大语言模型的应用项目 3.1 语言生成 3.2 机器翻译 3.3 问答系统 3.4 自动…

Express框架从入门到如土

目录 前言一&#xff0c;初体验二&#xff0c;路由2.1 什么是路由2.2. 路由的使用2.3 获取请求报文参数2.4 id的通配2.5 响应的设置 三&#xff0c;中间件3.1 中间件概述3.2 全局中间件与路由中间件的比对3.3 全局中间件的使用3.4 局部中间件的使用3.5 静态资源中间件&#xff…

“邮件营销新趋势,这个平台让你收获颇丰!

随着各媒体平台的迅速发展&#xff0c;2023年大家更专注于视频营销、网红营销、直播营销等营销方式。可以见得&#xff0c;数字媒介手段的发展&#xff0c;对于营销方式也产生了巨大的影响。但是&#xff0c;企业在拥抱新兴的营销方式的同时&#xff0c;也不要忽视传统的营销方…

好用的Chrome浏览器插件推荐(不定期更新)

好用的Chrome浏览器插件推荐 1.1 CSDN-浏览器助手1.2 Google 翻译1.3 JSON Viewer1.4 ModHeader - Modify HTTP headers1.5 Octotree - GitHub code tree 1.1 CSDN-浏览器助手 CSDN-浏览器助手 是一款集成本地书签、历史记录与 CSDN搜索(so.csdn.net) 的搜索工具 推荐&#x…

碳中和背景下我国空调系统发展趋势2022(李先庭)

碳中和背景下我国空调系统发展趋势 摘要 我国建筑空调系统在运行阶段的年碳排放量约为&#xff19;&#xff0e;&#xff19;亿&#xff54;二氧化碳&#xff0c;降低其碳排放是实现碳达峰碳中和目标的重要挑战之一。本文对我国当前空调系统碳排放量进行了拆解&#xff0c;分…

node-exporter,prometheus,grafana三者之间的联系

一、node-exporter与节点机 用于收集节点机器的数据信息&#xff0c;那么node-exporter与节点机器的连接在哪里&#xff1f; node-exporter.yaml apiVersion: apps/v1 kind: DaemonSet metadata:name: node-exporternamespace: kube-systemlabels:k8s-app: node-exporter spe…

调整直线导轨间隙有什么方法?

直线导轨作为机械行业中非常重要的传动部件&#xff0c;应用范围当然相当广泛&#xff0c;尤其是自动化设备&#xff0c;基本上我们都能看到它的作用。 在机械行业待得久的人都知道&#xff0c;直线导轨在使用的过程中&#xff0c;为了保证直线导轨的正常工作&#xff0c;直线导…