Kafka Producer 拦截器 & 序列化
前言
文章中的版本信息、maven依赖如下
-
JDK17
-
kafka_2.13-3.3.1
-
pom文件
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.4.2</version> <scope>compile</scope> </dependency> </dependencies>
拦截器
Kafka拦截器可以在Producer在发送消息之前、以及Producer回调逻辑之前有机会更改应用程序的行为,通常情况下做一些定制化的需求,如修改消息。Producer生产者允许开发者指定多个Interceptor拦截器,多个拦截器按照指定顺序对同一条消息进行逻辑处理,形成一个拦截器链条,即使用责任链的模式对消息一次进行处理。
实现kafka拦截器逻辑,只需要实现ProducerInterceptor接口即可,该接口有两个方法:
- onSend
- 该方法在消息发送至kafka之前被调用,再严格一点是消息序列化之前调用
- 可以捕获消息内容 并加以修改
- 该方法返回的对象将被徐丽华 并发送至kafka
- onAcknowledgement
- Kafka服务器返回响应结果,发送ack确认时调用
- 该方法不允许修改Kafka Response响应信息,但是可以增强消息头
- 当发送到服务器的记录已被确认,或在发送记录失败时,调用此方法
- 该方法在后台IO线程中执行,因此实现速度快;如果从连接线程发送,可能会造成延迟现象
由于Interceptor拦截器可能运行在多线程中,因此需要保证拦截器内部的线程安全。接下来自定义两个拦截器,组成拦截器链,学习下拦截器链内部的执行流程。
时间拦截器
通过该拦截器,可以在传输消息上添加统一的时间戳,相当于更改消息内容。
public class TimeStampInterceptor implements ProducerInterceptor<String,String> {
@Override
public ProducerRecord onSend(ProducerRecord<String,String> record) {
System.out.println("1 ============ TimeStampInterceptor onSend : " + record.partition());
String key = record.key();
// 修改数据内容
String value = record.value() + " -> " + System.currentTimeMillis();
ProducerRecord result = new ProducerRecord(record.topic(),record.partition(),
record.timestamp(),key,value,record.headers());
// 添加头信息
result.headers()
.add("application_id", "timeStampApp".getBytes(StandardCharsets.UTF_8));
return result;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("3 ============ TimeStampInterceptor : " + metadata.offset());
}
//...
}
统计拦截器
public class CounterInterceptor implements ProducerInterceptor<String,String> {
private AtomicLong successCount = new AtomicLong(0);
private AtomicLong failCount = new AtomicLong(0);
@Override
public ProducerRecord onSend(ProducerRecord<String,String> record) {
System.out.println("2 ============ CounterInterceptor onSend : " + record.partition());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(metadata != null){
System.out.println("4 ============ CounterInterceptor onAcknowledgement : " + metadata.offset());
successCount.addAndGet(1L);
return;
}
failCount.addAndGet(1L);
}
@Override
public void close() {
String output = String.format("5 ============ CountInterceptor close , successCount: %d , failCount: %d",
successCount.get(), failCount.get());
System.out.println(output);
}
//...
}
消息生产者
public class InterceptorProducer {
public static void main(String[] args) throws Exception{
String topicName = "Testtopic";
Properties props = new Properties();
//指定kafka 服务器连接地址
props.put("bootstrap.servers", "localhost:9092");
// 消息发送延迟时间 默认为0 表示消息立即发送,单位为毫秒
props.put("linger.ms",0);
// 序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
List<String> interceptors = new ArrayList<>();
interceptors.add("org.kafka.example.interceptor.TimeStampInterceptor");
interceptors.add("org.kafka.example.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName ,String.valueOf(i)," message : " + i);
producer.send(record);
}
System.out.println("Message sent successfully");
// 关闭会话连接 才会执行拦截器的close方法
producer.close();
}
}
验证测试
# 1. 创建topic
./bin/kafka-topics.sh --create --topic Testtopic --bootstrap-server localhost:9092
# 2. 打开消费脚本
./bin/kafka-console-consumer.sh --topic Testtopic --from-beginning --bootstrap-server localhost:9092
# 3. 执行生产者代码 日志输出如下
序列化机制
在Apache Kafka中传输消息的过程中,客户端和服务器同意使用相同的编码方式,才能对数据进行解析转换。序列化是将对象转换为字节的过程。反序列化是相反的过程-将字节流转换为对象。Apache Kafka提供了默认转换器(如String和Long等),同时支持自定义序列化机制。
如上图显示了,向Kafka主题发送消息的过程。在此过程中,自定义序列化程序在生产者将消息发送到主题之前将对象转换为字节。类似地,它还显示了反序列化程序如何将字节转换回对象,以便消费者正确地进行处理。
默认序列化
Apache Kafka为几种基本类型提供了预构建的序列化器和反序列化器:
很明显预留的基本类型序列化机制不能满足符合对象传输的要求,kafka预留了Serializer接口,供开发人员实现自定义序列化的业务需求,该接口有三个方法
- configure - 实现配置详细信息
- serialize/deserialize - 序列化、反序列化程序逻辑实现
- close - 调用该方法关闭kafka会话session
自定义序列化
代码实现
- 符合对象类
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}
- 序列化类
public class CustomSerializer implements Serializer<MessageDto> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
- 反序列化类
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}
@Override
public void close() {
}
}
验证测试
-
创建主题
./bin/kafka-topics.sh --create --topic Stopic --bootstrap-server localhost:9092
-
生产者代码
public class SerializerProducer { public static void main(String[] args) throws Exception{ String topicName = "Stopic"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.kafka.example.serializer.CustomSerializer"); Producer<String, MessageDto> producer = new KafkaProducer<>(props); for(int i = 0; i < 10; i++) { MessageDto msgProd = MessageDto.builder().message("test" + i).version(i + ".0").build(); ProducerRecord<String, MessageDto> record = new ProducerRecord<>(topicName ,String.valueOf(i),msgProd); producer.send(record); } producer.close(); } }
-
消费者代码
public class SerializerConsumer { public static void main(String[] args) throws Exception{ String topicName = "Stopic"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "SC_2"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.kafka.example.serializer.CustomDeserializer"); KafkaConsumer<String, MessageDto> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName)); try { while (true) { ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(record -> { System.out.println("Message received " + record.value()); }); } }finally { consumer.close(); } } }
先执行消费者,然后执行生产者;如下图,可以看到消费者已经接收生产者发送的对象,并输出控制台