SpringBoot集成Kafka和avro和Schema注册表

news2024/12/27 7:44:30

Schema注册表

为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
在这里插入图片描述
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。

安装confluent

参考:安装手册

# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz

解压以后目录结构如下:

文件夹描述
bin可执行文件
etc配置文件
lib服务
libexec多平台的客户端库
sharejar包和license
src源码
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help

启动confluent服务

启动zookeeper

cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper

启动kafka

cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动
 ./bin/kafka-server-start  -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092

启动confluent

cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081

新建springboot项目

新建avro的schema文件User.avsc

{
  "namespace": "com.github.xjs.protocol",
  "type": "record",
  "name": "UserRecord",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

pom中添加avro-maven-plugin插件

<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!-- 
	命令行执行:mvn generate-sources 把avsc转化成java文件 
-->
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.11.1</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

添加avro和kafka的依赖

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.7.1</version>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

添加对应的配置

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.200.128:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 重点关注这里的KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      group-id: test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       # 重点关注这里的.KafkaAvroDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      # confluent的地址
      schema.registry.url: http://192.168.200.128:8081

消息生产者

public void send(UserRecord record) {
    if (Objects.isNull(record)) {
        return;
    }
    log.info("send message, value:{}", record.toString());
    // 跟发送普通消息一样,可以直接发送UserRecord
    kafkaTemplate.send("demo-topic", record);
}

消息消费者

@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){
    // 跟接收普通消息一样,可以直接接收UserRecord
    log.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}

完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252884.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++领域展开第一幕——入门基础(命名空间、iostream、缺省参数、函数重载、nullptr、inline(内联函数))超详细!!!!

文章目录 前言一、c的第一个程序二、命名空间2.1 namespace 的价值2.2 namespace 的定义2.3 命名空间的使用 三、c的输入和输出四、缺省参数五、函数重载六、nullptr七、inline总结 前言 今天小编带着大家进入c的大门&#xff0c;虽然c难&#xff0c;但好事多磨&#xff0c;一起…

Java Web 1HTML快速入门

目录 一、Web开发介绍 1.什么是Web&#xff1f; 2.初识Web前端 二、HTML快速入门 1.什么是HTML、CSS&#xff1f; 2、案例练习 3.小结 三、VS Code开发工具 四、基础标签&样式&#xff08;HTML&#xff09; 2、实现标题--样式1&#xff08;新闻标题的颜色&#xff0…

【Python网络爬虫笔记】7-网络爬虫的搜索工具re模块

目录 一、网络爬虫中的正则表达式和re模块&#xff08;一&#xff09;数据提取的精确性&#xff08;二&#xff09;处理复杂的文本结构&#xff08;三&#xff09;提高数据处理效率 二、正则表达式的内涵&#xff08;一&#xff09;、常用元字符&#xff08;二&#xff09;、量…

42_GAN网络详解(2)---常见的GAN

DCGAN CGAN 条件生成对抗网络&#xff08;Conditional Generative Adversarial Networks, CGAN&#xff09;是生成对抗网络&#xff08;Generative Adversarial Networks, GAN&#xff09;的一种变体&#xff0c;由Mehdi Mirza和Simon Osindero在2014年提出。CGAN的主要改进在…

PC端阅读器--koodo reader

官网&#xff1a;请在必应搜索引擎上输入 koodo reader GitHub&#xff1a;GitHub - koodo-reader/koodo-reader: Windows, macOS, Linux and Web 123云windows版&#xff1a;Koodo-Reader-1.5.1.exe下载 提取码&#xff1a;4455 优&#xff1a; 1.开源&#xff0c;懂&#x…

PyQt设计界面优化 #qss #ui设计 #QMainWindow

思维导图 通过qss实现ui界面设计优化 Qss是Qt程序界面中用来设置控件的背景图片、大小、字体颜色、字体类型、按钮状态变化等属性&#xff0c;它是用来美化UI界面。实现界面和程序的分离&#xff0c;快速切换界面。 首先我们在Pytchram创建一个新目录 然后将我们所需要的图片打…

多维数组及其应用————13

1. 二维数组 如果我们把 ⼀维数组做为数组的元 素&#xff0c;这时候就是⼆维数组&#xff0c; ⼆维数组作为数组元素的数组被为三维数组&#xff0c;⼆维数组以上的数组统称 为多维数组。 1.1 二维数组的创建 先行后列 其实也可以这样理解&#xff1a;把二维数组当成特殊的一维…

基于Java Springboot校园导航微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse微信开发…

k8s,声明式API对象理解

命令式API 比如&#xff1a; 先kubectl create&#xff0c;再replace的操作&#xff0c;我们称为命令式配置文件操作 kubectl replace的执行过程&#xff0c;是使用新的YAML文件中的API对象&#xff0c;替换原有的API对象&#xff1b;而kubectl apply&#xff0c;则是执行了一…

【北京迅为】iTOP-4412全能版使用手册-第三十五章 WEB控制LED

iTOP-4412全能版采用四核Cortex-A9&#xff0c;主频为1.4GHz-1.6GHz&#xff0c;配备S5M8767 电源管理&#xff0c;集成USB HUB,选用高品质板对板连接器稳定可靠&#xff0c;大厂生产&#xff0c;做工精良。接口一应俱全&#xff0c;开发更简单,搭载全网通4G、支持WIFI、蓝牙、…

轻量的基于图结构的RAG方案LightRAG

LightRAG出自2024年10月的论文《LIGHTRAG: SIMPLE AND FASTRETRIEVAL-AUGMENTED GENERATION》(github)&#xff0c;也是使用图结构来索引和搜索相关文本。 LightRAG作者认为已有的RAG系统有如下两个限制&#xff0c;导致难以回答类似"How does the rise of electric vehi…

分布式cap

P&#xff08;分区安全&#xff09;都能保证&#xff0c;就是在C&#xff08;强一致&#xff09;和A&#xff08;性能&#xff09;之间做取舍。 &#xff08;即立马做主从同步&#xff0c;还是先返回写入结果等会再做主从同步。类似的还有&#xff0c;缓存和db之间的同步。&am…

AD7606使用方法

AD7606是一款8通道最高16位200ksps的AD采样芯片。5V单模拟电源供电&#xff0c;真双极性模拟输入可以选择10 V&#xff0c;5 V两种量程。支持串口与并口两种读取方式。 硬件连接方式&#xff1a; 配置引脚 引脚功能 详细说明 OS2 OS1 OS2 过采样率配置 000 1倍过采样率 …

[VUE]框架网页开发02-如何打包Vue.js框架网页并在服务器中通过Tomcat启动

在现代Web开发中&#xff0c;Vue.js已经成为前端开发的热门选择之一。然而&#xff0c;将Vue.js项目打包并部署到生产环境可能会让一些开发者感到困惑。本文将详细介绍如何将Vue.js项目打包&#xff0c;并通过Tomcat服务器启动运行。 1. 准备工作 确保你的项目能够正常运行,项…

服务器与普通电脑有什么区别?

服务器和普通电脑&#xff08;通常指的是个人计算机&#xff0c;即PC&#xff09;有众多相似之处&#xff0c;主要构成包含&#xff1a;CPU&#xff0c;内存&#xff0c;芯片&#xff0c;I/O总线设备&#xff0c;电源&#xff0c;机箱及操作系统软件等&#xff0c;鉴于使用要求…

2.2 线性表的顺序表示

2.2.1 顺序表的定义 一、顺序表的基本概念 线性表的顺序存储又称顺序表。 它是用一组地址连续的存储单元依次存储线性表中的数据元素&#xff0c;从而使得逻辑上相邻的连个元素在物理上也相邻。 第1个元素存储在顺序表的起始位置&#xff0c;第i个元素存储位置后面紧接着存…

游戏引擎学习第30天

仓库: https://gitee.com/mrxiao_com/2d_game 回顾 在这段讨论中&#xff0c;重点是对开发过程中出现的游戏代码进行梳理和进一步优化的过程。 工作回顾&#xff1a;在第30天&#xff0c;回顾了前一天的工作&#xff0c;并提到今天的任务是继续从第29天的代码开始&#xff0c…

探索HarmonyOS:一键掌握Router与NavPathStatck的传参和页面回调技巧

路由的选择 HarmonyOS提供两种路由实现的方式&#xff0c;分别是 Router 和 NavPatchStack。两者使用场景和特效各有优劣。 组件适用场景特点备注Router模块间与模块内页面切换通过每个页面的url实现模块间解耦NavPathStack模块内页面切换通过组件级路由统一路由管理 什么时候使…

每日计划-1203

1. 完成 236. 二叉树的最近公共祖先 ​ /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode(int x) : val(x), left(NULL), right(NULL) {}* };*/ class Solution {public:TreeNode* lowe…

【AI系统】Auto-Tuning 原理

Auto-Tuning 原理 在硬件平台驱动算子运行需要使用各种优化方式来提高性能&#xff0c;然而传统的手工编写算子库面临各种窘境&#xff0c;衍生出了自动生成高性能算子的的方式&#xff0c;称为自动调优。在本文我们首先分析传统算子库面临的挑战&#xff0c;之后介绍基于 TVM…