Schema注册表
为了提升kafka的性能,减少网络传输和存储的数据大小,可以把数据的schema部分单独存储到外部的schema注册表中,整体架构如下图所示:
1)把所有数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。
2)消费者使用 ID 从注册表里拉取 schema 来反序列化记录。
3)序列化器和反序列化器分别负责处理 schema 的注册和拉取。
schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现,比如Confluent Schema Registry。
安装confluent
参考:安装手册
# confluent的安装包中已经包含了zookeeper和kafka的安装包,无需单独再下载
# 下载
curl -O https://packages.confluent.io/archive/7.7/confluent-7.7.1.tar.gz
# 解压
tar -xzf confluent-7.7.1.tar.gz
解压以后目录结构如下:
文件夹 | 描述 |
---|---|
bin | 可执行文件 |
etc | 配置文件 |
lib | 服务 |
libexec | 多平台的客户端库 |
share | jar包和license |
src | 源码 |
# 设置环境变量
vim /etc/profile
export CONFLUENT_HOME=/usr/local/confluent-7.7.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 加载环境变量
source /etc/profile
# 验证
confluent --help
启动confluent服务
启动zookeeper
cd /usr/local/confluent-7.7.1/etc/kafka
vim zookeeper.properties
# 可以调整zookeeper的端口和数据的存储目录
# 启动zookeeper
./bin/zookeeper-server-start -daemon ./etc/kafka/zookeeper.properties
# 验证
ps -ef | grep zookeeper
启动kafka
cd /usr/local/confluent-7.7.1/etc/kafka
vim server.properties
broker.id=0
# 监听地址
listeners=0.0.0.0://:9092
# 对外暴漏的地址
advertised.listeners=PLAINTEXT://192.168.200.128:9092
# zookeeper的地址
zookeeper.connect=localhost:2181
# 启动
./bin/kafka-server-start -daemon ./etc/kafka/server.properties
# 验证
netstat -nap | grep 9092
启动confluent
cd /usr/local/confluent-7.7.1/etc/schema-registry
# 修改schema-registry.properties
vim schema-registry.properties
# schema-registry的监听地址
listeners=http://0.0.0.0:8081
# kafka的访问地址
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# 启动
./bin/schema-registry-start -daemon ./etc/schema-registry/schema-registry.properties
# 验证
netstat -nap | grep 8081
新建springboot项目
新建avro的schema文件User.avsc
{
"namespace": "com.github.xjs.protocol",
"type": "record",
"name": "UserRecord",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}
pom中添加avro-maven-plugin插件
<!--https://avro.apache.org/docs/1.11.1/getting-started-java/-->
<!--
命令行执行:mvn generate-sources 把avsc转化成java文件
-->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
添加avro和kafka的依赖
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加对应的配置
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.200.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 重点关注这里的KafkaAvroSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 重点关注这里的.KafkaAvroDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
# confluent的地址
schema.registry.url: http://192.168.200.128:8081
消息生产者
public void send(UserRecord record) {
if (Objects.isNull(record)) {
return;
}
log.info("send message, value:{}", record.toString());
// 跟发送普通消息一样,可以直接发送UserRecord
kafkaTemplate.send("demo-topic", record);
}
消息消费者
@KafkaListener(topics = "demo-topic")
public void consume(ConsumerRecord<String, UserRecord> user){
// 跟接收普通消息一样,可以直接接收UserRecord
log.info("receive message, topic:{}, key:{}, value:{}", user.topic(), user.key(), user.value());
}
完整的源码下载:https://github.com/xjs1919/learning-demo/tree/master/springboot-kafka-avro