深入探索生产者拦截器的使用以及源码设计

news2025/1/10 21:50:01

文章目录

  • 一、介绍
  • 二、使用
    • 1. ProducerInterceptor接口
    • 2. 实现之统计
    • 3. 实现之二次处理
    • 4. 小结
  • 三、实现原理
    • 1. 初始化流程
    • 2. 生效流程
  • 四、总结

一、介绍

在软件设计中,为了方便能够应对不同的场景,一般在一些容易有差异的环节会考虑允许用户自定义逻辑,拦截器就是其中的一种实现方式,像Spring、Kafka、Pulsar等都支持这种方式。流程简化起来就如下图,客户端跟服务端的写消息请求和接收请求都要先通过一遍拦截器,因此用户都过自定义拦截器逻辑就能以一种无侵入、规范化的方式来改动消息发送以及处理响应的行为。
在这里插入图片描述

二、使用

1. ProducerInterceptor接口

ProducerInterceptor是Pulsar提供的接口,通过实现该接口用户可以在消息发送和发送成功阶段注入自定义的逻辑来扩展Pulsar客户端的能力,进而优雅的解决某些场景的问题。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ProducerInterceptor extends AutoCloseable {

    void close();

    boolean eligible(Message message);

    Message beforeSend(Producer producer, Message message);

    void onSendAcknowledgement(
            Producer producer, Message message, MessageId msgId, Throwable exception);
  
    default void onPartitionsChange(String topicName, int partitions) {
    }
}

这里针对这五个方法大概介绍下

  • close:由于该接口实现了AutoCloseable,因此也要定义生产者关闭时要释放的资源,如果没有就空着
  • eligible:判断拦截器针对那些消息生效,默认false不生效。这个相当于Java8 Stream里的filter,属于职责分离的设计
  • beforeSend:在每条消息要发送时会调用此方法,因此如果在发送前想做点什么可以考虑在这里实现
  • onSendAcknowledgement:在每条消息消息发送服务端响应后(无论成功失败)会调用此方法
  • onPartitionsChange:在分区数有变动的时候会调用这里的逻辑。这是3.2版本新加的逻辑,2.8以及之前的版本没有此接口

2. 实现之统计

这里对生产者累计发送的消息条数进行统计,实现逻辑如下

public class SherlockCountProducerInterceptor implements ProducerInterceptor {

    private AtomicLong count = new AtomicLong(1);

    @Override
    public void close() {

    }

    @Override
    public boolean eligible(Message message) {
        return true;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
        System.out.println("累计发送消息条数:"+count.getAndIncrement());
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {

    }
}

逻辑比较简单,其实就是通过一个计数器在每次发送时进行加1即可,并且在eligible中返回true也就是对所有发送的消息都生效,每条消息在发送前都会调用一次beforeSend方法进行自增操作并打印出来。

现在拦截器的逻辑已经定义好了,接下来怎么使用呢,请继续往下看

public static void customInterceptorProducer() throws Exception {
        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
                .intercept(new SherlockCountProducerInterceptor())	//拦截器生效逻辑
                .create();

        for (int i = 0; i < 200; i++) {
            producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());
        }

        producer.close();
        pulsarClient.close();
    }

上述就是使用拦截器的case,通过这种方式就能轻松的定义所需要注入的逻辑。上述代码执行后输出如下
在这里插入图片描述

可以看到我们通过拦截器完成了消息发送的统计功能,可以发散设想一想,像根据不同key进行分组统计、统计某个时间段消息发送失败的条数等功能也同样可以通过拦截器实现。

3. 实现之二次处理

实现统计感觉还不得劲,再折腾一个。假设咱们的生产者中会发送很多地区的消息,这些消息有些是中国的,有些是新加坡的,有些是巴西的,这个时候它们的时间就有歧义了,因为不同时区的时间是有差异的,那咱们尝试用拦截器来实现一下

public class SherlockAdapterTimeProducerInterceptor implements ProducerInterceptor {
    @Override
    public void close() {

    }

    @Override
    public boolean eligible(Message message) {
//        if ("V3".equals(String.valueOf(message.getSchemaVersion()))) {
//            return true;
//        }
        if ("Singapore".equals(message.getKey())) {
            System.out.println("这条消息是新加坡地区的,进行处理!");
            return true;
        }
        System.out.println("这条消息是中国地区的,不进行处理!");
        return false;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
        System.out.println("拦截到一条新加坡地区的消息,现在进行处理,消息内容为:"+message.getValue());
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {

    }
}

上面就是demo,继续将这个拦截器应用于生产者

 public static void customInterceptorProducer() throws Exception {
        String serverUrl = "http://localhost:8080";
        PulsarClient pulsarClient =
                PulsarClient.builder().serviceUrl(serverUrl).build();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3")
                .intercept(new SherlockAdapterTimeProducerInterceptor())
                .create();

        producer.newMessage().key("China").value("下单动作").send();
        producer.newMessage().key("Singapore").value("收藏动作").send();
        producer.newMessage().key("China").value("取消动作").send();
        producer.newMessage().key("Singapore").value("订阅动作").send();

        producer.close();
        pulsarClient.close();
    }

执行可以看到下面的输出
在这里插入图片描述

通过输出的结果可以分析出来eligible的逻辑是生效的,针对新加坡地区的消息会进行处理,而中国的消息保持不变,所有地区的时间通过此拦截器来统一成东八区的时间。

4. 小结

通过上述例子可以看到我们可以通过拦截器实现任意的逻辑,但是这里需要注意的是,拦截器里面尽量不要放过多的逻辑,因为这可能会影响生产者发送消息的速度,并且也容易造成处理逻辑的分散。拦截器最好是做一些校验、适配、状态记录等一些需要前置完成并且轻量级的操作。

三、实现原理

1. 初始化流程

通过使用我们可以看到在创建生产者对象是只要通过.intercept方法传入拦截器对象即可生效,那么我们就先通过这个方法来看看实现逻辑

ProducerBuilder<T> intercept(org.apache.pulsar.client.api.interceptor.ProducerInterceptor... interceptors);

public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
  if (this.interceptorList == null) {
    this.interceptorList = new ArrayList();
  }

  this.interceptorList.addAll(Arrays.asList(interceptors));
  return this;
}

通过代码跟踪可以看到ProducerBuilderImpl方法中会先将拦截器对象集合赋值给自己的成员变量,也就是它先保存一份在后面使用。在最终调用create方法来创建Producer时,最终会走到该类的createAsync方法,核心逻辑如下

    public CompletableFuture<Producer<T>> createAsync() {
						....
            return this.interceptorList != null && this.interceptorList.size() != 0 ? this.client.createProducerAsync(this.conf, this.schema, new ProducerInterceptors(this.interceptorList)) : this.client.createProducerAsync(this.conf, this.schema, (ProducerInterceptors)null);
        }
    }

如果用户通过.intercept方法传入了自定义的拦截器,则会调用PulsarClientImpl带有拦截器对象的构造方法

 public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
   			....
        //这个方法的核心逻辑就这一行,继续往下跟踪
				return this.createProducerAsync(topic, conf, schema, interceptors);
    }


    private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic, ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors interceptors) {
      	....
        //同理,核心逻辑就这一行
        producer = this.newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture);
    }

    protected <T> ProducerImpl<T> newProducerImpl(....) {
        return new ProducerImpl(this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors);
    }


    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema, ProducerInterceptors interceptors) {
      	//只有这里有用到,继续跟踪
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
      	....
    }

protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
        ....
        //对父类的成员变量进行赋值
        this.interceptors = interceptors;
    }

通过上面的代码跟踪我们可以知道,当我们通过拦截器创建的Producer对象,它是有在内部维护一个ProducerInterceptors对象来存储我们所指定的拦截器集合的逻辑
在这里插入图片描述

那么我们来看看ProducerInterceptors的实现

public class ProducerInterceptors implements Closeable {
		....
    private final List<ProducerInterceptor> interceptors;	//存储拦截器集合逻辑
		....
    //在消息发送前进行触发
    public Message beforeSend(Producer producer, Message message) {
        Message interceptorMessage = message;
        for (ProducerInterceptor interceptor : interceptors) {
          	//调用拦截器的eligible方法来判断是否要对当前这条消息进行拦截处理,这个就是咱们上面实现的eligible接口
            if (!interceptor.eligible(message)) {
                continue;
            }
            try {
              	//循环调用拦截器集合里的每个拦截器对这条消息进行处理
                interceptorMessage = interceptor.beforeSend(producer, interceptorMessage);
            } catch (Throwable e) {
                ....
            }
        }
        return interceptorMessage;
    }

  	//逻辑跟beforeSend基本一致
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
        for (ProducerInterceptor interceptor : interceptors) {
            if (!interceptor.eligible(message)) {
                continue;
            }
            try {
                interceptor.onSendAcknowledgement(producer, message, msgId, exception);
            } catch (Throwable e) {
                log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
            }
        }
    }

    public void onPartitionsChange(String topicName, int partitions) {
        for (ProducerInterceptor interceptor : interceptors) {
            try {
                interceptor.onPartitionsChange(topicName, partitions);
            } catch (Throwable e) {
                log.warn("Error executing interceptor onPartitionsChange callback ", e);
            }
        }
    }

    @Override
    public void close() throws IOException {
        for (ProducerInterceptor interceptor : interceptors) {
            try {
                interceptor.close();
            } catch (Throwable e) {
                log.error("Fail to close producer interceptor ", e);
            }
        }
    }
}

通过上述逻辑可以看到ProducerInterceptors本质上就是个批量管理对象,符合高内聚低耦合的设计,解耦了业务逻辑循环处理的逻辑,将这些循环处理的逻辑都封装在ProducerInterceptors类里面,然后ProducerInterceptors仅对外提供触发某几个动作的api,业务只需要在哪个阶段调用这些api即可。

2. 生效流程

在生产者消息发送阶段,最终都会走到ProducerImpl类的internalSendAsync方法,可以看到这里会调用拦截器进行处理

CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
  //核心方法,跟踪进去
	MessageImpl<?> interceptorMessage = (MessageImpl) beforeSend(message);
	....
}

protected Message<?> beforeSend(Message<?> message) {
  if (interceptors != null) {
    //如果配置了拦截器则调用ProducerInterceptors类的beforeSend方法
    return interceptors.beforeSend(this, message);
  } else {
    //如果没有配置拦截器则直接返回原消息
    return message;
  }
}

这是消息发送的处理逻辑,那如果是再消息发送结束后触发呢?一起来跟踪看下吧,首先还是从ProducerImp类的internalSendAsync方法开始看

 @Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
    sendAsync(interceptorMessage, new SendCallback() {
						....
            @Override
            public void sendComplete(Exception e) {
                try {
                    if (e != null) {
                        stats.incrementSendFailed();
                      	//从这里跟踪进去看看
                        onSendAcknowledgement(interceptorMessage, null, e);
                        future.completeExceptionally(e);
                    } else {
                        onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
                        future.complete(interceptorMessage.getMessageId());
                        stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
                    }
                } finally {
                    interceptorMessage.getDataBuffer().release();
                }
  
}
      

protected void onSendAcknowledgement(Message<?> message, MessageId msgId, Throwable exception) {
    if (interceptors != null) {
      	//可以看到最终也是调用的ProducerInterceptors类的onSendAcknowledgement方法
       	interceptors.onSendAcknowledgement(this, message, msgId, exception);
    }
}

这里的设计是异步回调的方式,将调用拦截器处理逻辑封装成参数传给下一层,在消息发送完成后再调用参数里指定的回调逻辑。那么什么时候触发呢,由于Pulsar客户端跟服务端是通过Netty的TCP通信的,因此直接看看PulsarDecoder的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		....
    switch (cmd.getType()) {
        
       	....
        case PRODUCER_SUCCESS:
        	//写入消息被Broker处理后会忘生产者客户端通过TCP发送一条PRODUCER_SUCCESS类型的消息也就是这里,跟踪进去看看处理逻辑
          checkArgument(cmd.hasProducerSuccess());
          handleProducerSuccess(cmd.getProducerSuccess());
          break;
    }
}


protected void handleProducerSuccess(CommandProducerSuccess success) {
        ....
        //生产者会在队列维护每条未被ack的写入请求消息,在Broker ack时会从这个队列中移除并获取回调处理逻辑
        CompletableFuture<ProducerResponse> requestFuture =
                (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
        if (requestFuture != null) {
            ProducerResponse pr = new ProducerResponse(success.getProducerName(),
                    success.getLastSequenceId(),
                    success.getSchemaVersion(),
                    success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty());
          	//调用回调逻辑
            requestFuture.complete(pr);
        } else {
            ....
        }
    }

四、总结

通过使用和跟踪原理,我们对Pulsar生产者拦截器有了进一步的认识,除了生产者拦截器,Pulsar还支持Broker侧以及Bookkeeper侧的拦截器,这些放到后面再跟大家一起学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1607041.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构(图)

定义 G (V, E) 图 (点&#xff0c;边) 图&#xff0c;Graph 点&#xff0c;Vertex 边&#xff0c;edge 有空表&#xff0c;空树&#xff0c;但没有空图 图可以没有边|E| 0&#xff0c;但不能没有一个点 稠密图 &稀疏图 是边的多少决定的 &#xff08;见Ex…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 4月19日,星期五

每天一分钟&#xff0c;知晓天下事&#xff01; 2024年4月19日 星期五 农历三月十一 谷雨 1、 景海鹏获颁特级航天功勋奖章&#xff0c;朱杨柱、桂海潮被授予英雄航天员称号。 2、 工信部&#xff1a;加快6G、万兆光网研发力度&#xff0c;加速推进大数据、AI等研发应用。 3、…

NAND数据恢复的方案

NAND Flash是固态硬盘&#xff08;SSD&#xff09;的核心数据存储。然而&#xff0c;NAND Flash因其物理特性和工作原理&#xff0c;存在一定的内在脆弱性&#xff0c;尤其是在数据存储的长期可靠性方面。 比特错误是指在读取NAND Flash时&#xff0c;原本存储的二进制位&#…

单输入多输出(SIMO)和多输入多输出(MIMO)模型是什么?

当谈到单输入多输出&#xff08;SIMO&#xff09;和多输入多输出&#xff08;MIMO&#xff09;模型时&#xff0c;通常指的是工程和信号处理领域中的系统和算法。我列举了一些我研究过的实例&#xff1a; 单输入多输出&#xff08;SIMO&#xff09;&#xff1a; 多标签分类器&a…

RK3588 开发板的魅力所在!

处理器是计算机硬件系统的核心部件&#xff0c;其性能的提升对于设备功能和用户体验起着重要的作用。处理器也是开发板的核心&#xff0c;它决定了其计算性能和图形性能。那么&#xff0c;RK3588处理器属于什么档次&#xff1f;其性能和市场定位如何&#xff1f;市场上有哪些用…

C++信奥教学PPT:CSP_J_算法之双指针算法(中)

1、⼀个⻓度为 n-1 的递增排序数组中的所有数字都是唯⼀的&#xff0c;并且每个数字都在范围0&#xff5e;n-1 之内。在范围 0&#xff5e; n-1 内的 n 个数字中有且只有⼀个数字不在该数组中&#xff0c;请找出这个数字。 2、循环最大值&#xff08;Maximum in the Cycle of 1…

TSINGSEE青犀算法中台消防通道堵塞/占压AI检测算法的介绍及应用

消防通道是建筑物内用于紧急疏散的通道&#xff0c;其畅通无阻对于保障人员生命安全至关重要。然而&#xff0c;由于各种原因&#xff0c;消防通道经常会被杂物、车辆等堵塞&#xff0c;一旦发生火灾等紧急情况&#xff0c;后果不堪设想。为了有效解决这一问题&#xff0c;我们…

去除【关注博主即可阅读全文】插件

这两天闲着没事看csdn&#xff0c;看到好多博主弄这个关注才可以看文章 正好好久没写过那个油猴的插件&#xff0c;今天就用油猴写个这玩意。大家可以试着玩 代码我贴下面了&#xff0c;想用自取啊 // UserScript // name 去除关注才可以阅读 // namespace http:/…

项目实战:Qt获取CTP量化交易接口测试数据工具 v1.0.0(获取深度行情数据、订阅取消订阅)

若该文为原创文章&#xff0c;转载请注明出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/137937666 红胖子(红模仿)的博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬结…

Python教学入门:函数

在 Python 中&#xff0c;def 关键字用于定义函数。函数是一段可重用的代码块&#xff0c;用于执行特定的任务或操作。通过定义函数&#xff0c;可以将一段代码封装起来&#xff0c;使其可以在程序中被多次调用&#xff0c;提高代码的复用性和可维护性。 下面是 def 函数定义的…

安装docker的PHP环境NLMP环境在国产deepin操作系统上

1: 先安装docker 安装完后执行,权限设置 sudo usermod -aG docker $USER或者sudo usermod -aG docker kentrl#添加当前用户到Docker用户组中 sudo newgrp docker#更新用户组数据,必须执行否则无效 sudo systemctl restart docker 先看目录结构: 2:按照目录结构挂载磁盘,…

MLP/CNN/RNN/Transformer主流深度学习模型的区别

1. 多层感知机(MLP) 核心特征: 结构:MLP 是一种基本的前馈神经网络,包含一个输入层、一个或多个隐藏层以及一个输出层。每层由全连接的神经元组成。用途:适用于简单的分类和回归任务。限制:不适用于处理序列数据或图像数据,因为它不具备处理输入之间时间或空间关系的能…

IDEA @Autowired不显示红线

IDEA 中&#xff0c;Autowired 显示红线一般情况是注入 Mapper 或者 Dao 时出现的&#xff0c;如下图&#xff1a; 这个报错是因为 Mapper 或者 Dao 上没有加 Repository 或者 Mapper&#xff0c;Autowired 注入时就判断为这不是一个 Bean。 不建议通过加上面两个注解的方式解…

python自动化之网易自动点歌

这个代码是是使用的pyautogui库和pyperclip库完成的&#xff0c;这个库是开源的地址如下&#xff1a;https://github.com/asweigart/pyautogui这里详细的用法想学习的可以到这看看 下面是代码&#xff1a; import pyautogui import subprocess import pyperclip import time i…

ubuntu设置扩充swap交换空间

Swap是指Linux系统中的交换分区,类似于Windows的虚拟内存,当内存不足的时候,把一部分硬盘空间虚拟成内存来使用,从而解决内存不足的问题。交换分区,它的功能就是在内存不够的情况下,操作系统先把内存中暂时不用的数据,存到硬盘的交换空间,腾出内存来让别的程序运行! …

【uniapp】微信小程序2024手机号快速验证及无感登录教程(内附代码)

组件&#xff1a;手机号快速验证组件 适用对象&#xff1a;企业/个体 费用&#xff1a;0.03元/次 目录 前言思路前端后端代码无感登录onload事件无感登录方法登录判断后端mini_login2 最后 前言 最近注册了公司&#xff0c;可以注册具有支付能力的小程序了&#xff0c;各种材料…

用了 18 个月时间,做 AI 应用从 0 到 200 万用户,从亏损到盈利(4000 字全面复盘)

前言 距离上次《离职一年&#xff0c;收入10倍增长》总结到现在已经过去了 22 个月。在这段时间里我经历了从高峰跌到谷底又慢慢回弹。组建团队后经历了 10 个月的连续亏损&#xff0c;目前已经连续 12 个月盈利&#xff0c;专注于 AI 应用小程序方向&#xff0c;已累计 200 多…

Hadoop大数据处理技术-Linux相关命令

​7.Linux常用命令 1&#xff09;Windows中的dir&#xff1a;列出当前目录下所有的文件和目录 2&#xff09;cd&#xff1a;改变当前目录 cd命令并不能直接实现这种跳跃转换目录的功能 它只能让你在当前目录和其子目录之间来回切换 就像在一张平面地图上移动一样 如果想跨目录…

【Excel】使用VBA宏简单自定义Excel软件界面

改行做经济师学习Excel&#xff0c;偶有心得&#xff0c;摘录于此&#xff0c;备忘。 言简意赅&#xff0c;仅供自用。 1 实现效果 在Excel的左上角可添加按钮&#xff0c;该按钮的功能可由我们自己通过编写代码定义&#xff0c;能实现特定功能&#xff0c;并且在所有打开的…

Web端Webrtc,SIP,RTSP/RTMP,硬件端,MCU/SFU融合视频会议系统方案分析

Web端视频融合&#xff0c;会议互通已经是视频会议应用的大趋势&#xff0c;一是目前企业有大量的老视频会议硬件设&#xff0c;二新业务又需要Web端支持视频会议监控直播需求&#xff0c;迫切需要一个融合对接的方案&#xff0c;即能把老的设备用起来&#xff0c;又能对接新的…