序列化
kafka 发送消息需要把key 和value 进行序列化处理
一般配置字符串方式,如果消息量大需要优化成其他方式。
代码配置
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.encoding", "UTF-8"); // 设置编码格式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("deserializer.encoding", "UTF-8"); // 设置编码格式
配置文件配置
spring:
kafka:
auto:
initTopic: false
bootstrap-servers: *****
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="*****" password="****";
producer: # 生产者
retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="******" password="******";
listener:
missing-topics-fatal: false
ack-mode: manual
这里没有配置编码格式的地方,那么分析一下源码,发现可以这样写
参考资料:https://www.fancv.com/article/1733134314