文章目录
- 一、介绍
- 二、使用
- 1. ProducerInterceptor接口
- 2. 实现之统计
- 3. 实现之二次处理
- 4. 小结
- 三、实现原理
- 1. 初始化流程
- 2. 生效流程
- 四、总结
一、介绍
在软件设计中,为了方便能够应对不同的场景,一般在一些容易有差异的环节会考虑允许用户自定义逻辑,拦截器就是其中的一种实现方式,像Spring、Kafka、Pulsar等都支持这种方式。流程简化起来就如下图,客户端跟服务端的写消息请求和接收请求都要先通过一遍拦截器,因此用户都过自定义拦截器逻辑就能以一种无侵入、规范化的方式来改动消息发送以及处理响应的行为。
二、使用
1. ProducerInterceptor接口
ProducerInterceptor是Pulsar提供的接口,通过实现该接口用户可以在消息发送和发送成功阶段注入自定义的逻辑来扩展Pulsar客户端的能力,进而优雅的解决某些场景的问题。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ProducerInterceptor extends AutoCloseable {
void close();
boolean eligible(Message message);
Message beforeSend(Producer producer, Message message);
void onSendAcknowledgement(
Producer producer, Message message, MessageId msgId, Throwable exception);
default void onPartitionsChange(String topicName, int partitions) {
}
}
这里针对这五个方法大概介绍下
- close:由于该接口实现了AutoCloseable,因此也要定义生产者关闭时要释放的资源,如果没有就空着
- eligible:判断拦截器针对那些消息生效,默认false不生效。这个相当于Java8 Stream里的filter,属于职责分离的设计
- beforeSend:在每条消息要发送时会调用此方法,因此如果在发送前想做点什么可以考虑在这里实现
- onSendAcknowledgement:在每条消息消息发送服务端响应后(无论成功失败)会调用此方法
- onPartitionsChange:在分区数有变动的时候会调用这里的逻辑。这是3.2版本新加的逻辑,2.8以及之前的版本没有此接口
2. 实现之统计
这里对生产者累计发送的消息条数进行统计,实现逻辑如下
public class SherlockCountProducerInterceptor implements ProducerInterceptor {
private AtomicLong count = new AtomicLong(1);
@Override
public void close() {
}
@Override
public boolean eligible(Message message) {
return true;
}
@Override
public Message beforeSend(Producer producer, Message message) {
System.out.println("累计发送消息条数:"+count.getAndIncrement());
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
}
}
逻辑比较简单,其实就是通过一个计数器在每次发送时进行加1即可,并且在eligible中返回true也就是对所有发送的消息都生效,每条消息在发送前都会调用一次beforeSend方法进行自增操作并打印出来。
现在拦截器的逻辑已经定义好了,接下来怎么使用呢,请继续往下看
public static void customInterceptorProducer() throws Exception {
String serverUrl = "http://localhost:8080";
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serverUrl).build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
.intercept(new SherlockCountProducerInterceptor()) //拦截器生效逻辑
.create();
for (int i = 0; i < 200; i++) {
producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
}
producer.close();
pulsarClient.close();
}
上述就是使用拦截器的case,通过这种方式就能轻松的定义所需要注入的逻辑。上述代码执行后输出如下
可以看到我们通过拦截器完成了消息发送的统计功能,可以发散设想一想,像根据不同key进行分组统计、统计某个时间段消息发送失败的条数等功能也同样可以通过拦截器实现。
3. 实现之二次处理
实现统计感觉还不得劲,再折腾一个。假设咱们的生产者中会发送很多地区的消息,这些消息有些是中国的,有些是新加坡的,有些是巴西的,这个时候它们的时间就有歧义了,因为不同时区的时间是有差异的,那咱们尝试用拦截器来实现一下
public class SherlockAdapterTimeProducerInterceptor implements ProducerInterceptor {
@Override
public void close() {
}
@Override
public boolean eligible(Message message) {
// if ("V3".equals(String.valueOf(message.getSchemaVersion()))) {
// return true;
// }
if ("Singapore".equals(message.getKey())) {
System.out.println("这条消息是新加坡地区的,进行处理!");
return true;
}
System.out.println("这条消息是中国地区的,不进行处理!");
return false;
}
@Override
public Message beforeSend(Producer producer, Message message) {
System.out.println("拦截到一条新加坡地区的消息,现在进行处理,消息内容为:"+message.getValue());
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
}
}
上面就是demo,继续将这个拦截器应用于生产者
public static void customInterceptorProducer() throws Exception {
String serverUrl = "http://localhost:8080";
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serverUrl).build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
.intercept(new SherlockAdapterTimeProducerInterceptor())
.create();
producer.newMessage().key("China").value("下单动作").send();
producer.newMessage().key("Singapore").value("收藏动作").send();
producer.newMessage().key("China").value("取消动作").send();
producer.newMessage().key("Singapore").value("订阅动作").send();
producer.close();
pulsarClient.close();
}
执行可以看到下面的输出
通过输出的结果可以分析出来eligible的逻辑是生效的,针对新加坡地区的消息会进行处理,而中国的消息保持不变,所有地区的时间通过此拦截器来统一成东八区的时间。
4. 小结
通过上述例子可以看到我们可以通过拦截器实现任意的逻辑,但是这里需要注意的是,拦截器里面尽量不要放过多的逻辑,因为这可能会影响生产者发送消息的速度,并且也容易造成处理逻辑的分散。拦截器最好是做一些校验、适配、状态记录等一些需要前置完成并且轻量级的操作。
三、实现原理
1. 初始化流程
通过使用我们可以看到在创建生产者对象是只要通过.intercept方法传入拦截器对象即可生效,那么我们就先通过这个方法来看看实现逻辑
ProducerBuilder<T> intercept(org.apache.pulsar.client.api.interceptor.ProducerInterceptor... interceptors);
public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
if (this.interceptorList == null) {
this.interceptorList = new ArrayList();
}
this.interceptorList.addAll(Arrays.asList(interceptors));
return this;
}
通过代码跟踪可以看到ProducerBuilderImpl方法中会先将拦截器对象集合赋值给自己的成员变量,也就是它先保存一份在后面使用。在最终调用create方法来创建Producer时,最终会走到该类的createAsync方法,核心逻辑如下
public CompletableFuture<Producer<T>> createAsync() {
....
return this.interceptorList != null && this.interceptorList.size() != 0 ? this.client.createProducerAsync(this.conf, this.schema, new ProducerInterceptors(this.interceptorList)) : this.client.createProducerAsync(this.conf, this.schema, (ProducerInterceptors)null);
}
}
如果用户通过.intercept方法传入了自定义的拦截器,则会调用PulsarClientImpl带有拦截器对象的构造方法
public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
....
//这个方法的核心逻辑就这一行,继续往下跟踪
return this.createProducerAsync(topic, conf, schema, interceptors);
}
private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic, ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
....
//同理,核心逻辑就这一行
producer = this.newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture);
}
protected <T> ProducerImpl<T> newProducerImpl(....) {
return new ProducerImpl(this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors);
}
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema, ProducerInterceptors interceptors) {
//只有这里有用到,继续跟踪
super(client, topic, conf, producerCreatedFuture, schema, interceptors);
....
}
protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
....
//对父类的成员变量进行赋值
this.interceptors = interceptors;
}
通过上面的代码跟踪我们可以知道,当我们通过拦截器创建的Producer对象,它是有在内部维护一个ProducerInterceptors对象来存储我们所指定的拦截器集合的逻辑
那么我们来看看ProducerInterceptors的实现
public class ProducerInterceptors implements Closeable {
....
private final List<ProducerInterceptor> interceptors; //存储拦截器集合逻辑
....
//在消息发送前进行触发
public Message beforeSend(Producer producer, Message message) {
Message interceptorMessage = message;
for (ProducerInterceptor interceptor : interceptors) {
//调用拦截器的eligible方法来判断是否要对当前这条消息进行拦截处理,这个就是咱们上面实现的eligible接口
if (!interceptor.eligible(message)) {
continue;
}
try {
//循环调用拦截器集合里的每个拦截器对这条消息进行处理
interceptorMessage = interceptor.beforeSend(producer, interceptorMessage);
} catch (Throwable e) {
....
}
}
return interceptorMessage;
}
//逻辑跟beforeSend基本一致
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
for (ProducerInterceptor interceptor : interceptors) {
if (!interceptor.eligible(message)) {
continue;
}
try {
interceptor.onSendAcknowledgement(producer, message, msgId, exception);
} catch (Throwable e) {
log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
}
}
}
public void onPartitionsChange(String topicName, int partitions) {
for (ProducerInterceptor interceptor : interceptors) {
try {
interceptor.onPartitionsChange(topicName, partitions);
} catch (Throwable e) {
log.warn("Error executing interceptor onPartitionsChange callback ", e);
}
}
}
@Override
public void close() throws IOException {
for (ProducerInterceptor interceptor : interceptors) {
try {
interceptor.close();
} catch (Throwable e) {
log.error("Fail to close producer interceptor ", e);
}
}
}
}
通过上述逻辑可以看到ProducerInterceptors本质上就是个批量管理对象,符合高内聚低耦合的设计,解耦了业务逻辑循环处理的逻辑,将这些循环处理的逻辑都封装在ProducerInterceptors类里面,然后ProducerInterceptors仅对外提供触发某几个动作的api,业务只需要在哪个阶段调用这些api即可。
2. 生效流程
在生产者消息发送阶段,最终都会走到ProducerImpl类的internalSendAsync方法,可以看到这里会调用拦截器进行处理
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
//核心方法,跟踪进去
MessageImpl<?> interceptorMessage = (MessageImpl) beforeSend(message);
....
}
protected Message<?> beforeSend(Message<?> message) {
if (interceptors != null) {
//如果配置了拦截器则调用ProducerInterceptors类的beforeSend方法
return interceptors.beforeSend(this, message);
} else {
//如果没有配置拦截器则直接返回原消息
return message;
}
}
这是消息发送的处理逻辑,那如果是再消息发送结束后触发呢?一起来跟踪看下吧,首先还是从ProducerImp类的internalSendAsync方法开始看
@Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
sendAsync(interceptorMessage, new SendCallback() {
....
@Override
public void sendComplete(Exception e) {
try {
if (e != null) {
stats.incrementSendFailed();
//从这里跟踪进去看看
onSendAcknowledgement(interceptorMessage, null, e);
future.completeExceptionally(e);
} else {
onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
future.complete(interceptorMessage.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
} finally {
interceptorMessage.getDataBuffer().release();
}
}
protected void onSendAcknowledgement(Message<?> message, MessageId msgId, Throwable exception) {
if (interceptors != null) {
//可以看到最终也是调用的ProducerInterceptors类的onSendAcknowledgement方法
interceptors.onSendAcknowledgement(this, message, msgId, exception);
}
}
这里的设计是异步回调的方式,将调用拦截器处理逻辑封装成参数传给下一层,在消息发送完成后再调用参数里指定的回调逻辑。那么什么时候触发呢,由于Pulsar客户端跟服务端是通过Netty的TCP通信的,因此直接看看PulsarDecoder的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
....
switch (cmd.getType()) {
....
case PRODUCER_SUCCESS:
//写入消息被Broker处理后会忘生产者客户端通过TCP发送一条PRODUCER_SUCCESS类型的消息也就是这里,跟踪进去看看处理逻辑
checkArgument(cmd.hasProducerSuccess());
handleProducerSuccess(cmd.getProducerSuccess());
break;
}
}
protected void handleProducerSuccess(CommandProducerSuccess success) {
....
//生产者会在队列维护每条未被ack的写入请求消息,在Broker ack时会从这个队列中移除并获取回调处理逻辑
CompletableFuture<ProducerResponse> requestFuture =
(CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
if (requestFuture != null) {
ProducerResponse pr = new ProducerResponse(success.getProducerName(),
success.getLastSequenceId(),
success.getSchemaVersion(),
success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty());
//调用回调逻辑
requestFuture.complete(pr);
} else {
....
}
}
四、总结
通过使用和跟踪原理,我们对Pulsar生产者拦截器有了进一步的认识,除了生产者拦截器,Pulsar还支持Broker侧以及Bookkeeper侧的拦截器,这些放到后面再跟大家一起学习。