极客时间Kafka - 04 Kafka生产者和消费者拦截器

news2024/9/9 6:10:33

文章目录

      • 1. 什么是拦截器?
      • 2. Kafka 拦截器
      • 3. 典型使用场景
      • 4. 案例分享

1. 什么是拦截器?

如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了 Spring MVC 拦截器的工作原理:

在这里插入图片描述

拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。

Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。

2. Kafka 拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。

当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");  
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
......

现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和UpdateCounterInterceptor 类呢?其实很简单,这两个类以及你自己编写的所有Producer 端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法:

① onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。

② onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

指定消费者拦截器需要实现类org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

① onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。

② onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

3. 典型使用场景

Kafka 拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

我以端到端系统性能检测和消息审计为例来展开介绍下。

今天 Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。

从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。

现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。

我们再来看看消息审计的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。

作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。

4. 案例分享

下面我以一个具体的案例来说明一下拦截器的使用。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。

我曾经给一个公司做 Kafka 培训,在培训过程中,那个公司的人提出了一个诉求。他们的场景很简单,某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。

学习了拦截器之后,我们现在知道可以用拦截器来满足这个需求。既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。

Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器。我们先来实现前者:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor {
    private Jedis jedis; // 省略 Jedis 初始化
    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        // 发送消息前更新总的已发送消息数
        jedis.incr("totalSentMessage");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

下面是消费者端的拦截器实现,代码如下:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private Jedis jedis; // 省略 Jedis 初始化

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        // 在真正消费一批消息前首先更新了它们的总延时
        for (ConsumerRecord<String, String> record : records) {
            // 用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        // 从 Redis 中读取更新过的总延时和总消息数
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        // 从 Redis 中读取更新过的总消息数
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        // 两者相除即得到端到端消息的平均处理延时。
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/68727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无人机边缘计算中的计算卸载——Stackelberg博弈方法论文复现附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

不会还有人不会热修复吧?

Class流派原理 基本原理:加载类的时候是找element&#xff0c;每个element对于一个dex。我要把我修复的那个类单独放到dex插入dexlist前面&#xff0c;在你做类加载从前往后找优先从你的dex加载加载的就是你修复后的class.这就是 实现代码 通过context拿到pathClassLoader&am…

Qt跨平台截图工具

Qt跨平台截图工具 文章目录Qt跨平台截图工具1、概述2、实现效果3、软件构成4、关键代码5、源代码更多精彩内容&#x1f449;个人内容分类汇总 &#x1f448;&#x1f449;Qt自定义模块、工具&#x1f448; 1、概述 Qt版本&#xff1a;V5.12.5兼容系统&#xff1a; Windows&…

2022,记录与华为的这场会议

一、数据治理团体标准发布会 11月26日&#xff0c;中国计算机用户协会信息科技审计分会联合华为与擎创科技共同举办了“金融行业运维数据治理团体标准应用研讨暨2022年度调研报告线上发布会”。来自国家开发银行、中国建设银行、中国邮政储蓄银行、招商银行、兴业银行、中信银行…

【LeetCode_字符串_逻辑分析】9. 回文数

目录考察点第一次&#xff1a;2022年12月7日10:16:33解题思路代码展示题目描述给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左…

340页11万字智慧政务大数据资源平台大数据底座数据治理建设方案

目 录 第一章 项目概况 1.1 项目名称 1.2 项目单位 1.3 项目建设依据 1.4 项目建设内容和目标 1.4.1 建设内容 1.4.2 建设目标 1.5 项目投资估算及建设周期 1.5.1 项目投资估算 1.5.2 服务周期 第二章 现状 2.1 项目单位概况 2.1.1 单位职责、内设及下属机构、人员…

【配准图像】

MU-Net: A MULTISCALE UNSUPERVISED NETWORK FOR REMOTE SENSING IMAGE REGISTRATION &#xff08;MU-Net&#xff1a;一种多尺度无监督遥感图像配准网络&#xff09; 多传感器或多模态图像对的配准是许多遥感应用的基础性任务。为了实现高精度、低成本的遥感图像配准&#x…

彻底搞懂JS原型与原型链

说到JavaScript的原型和原型链&#xff0c;相关文章已有不少&#xff0c;但是大都晦涩难懂。本文将换一个角度出发&#xff0c;先理解原型和原型链是什么&#xff0c;有什么作用&#xff0c;再去分析那些令人头疼的关系。 一、引用类型皆为对象 原型和原型链都是来源于对象而…

浅谈Linux内核编程规范与代码风格

1 缩进 Tab的宽度是八个字符&#xff0c;因此缩进的宽度也是八个字符。有些异教徒想让缩进变成四个字符&#xff0c;甚至是两个字符的宽度&#xff0c;这些人和那些把 PI 定义为 3 的人是一个路子的。 注意&#xff1a;缩进的全部意义在于清晰地定义语句块的开始与结束&#…

《MongoDB》Mongo Shell中的基本操作-删除操作一览

前端博主&#xff0c;热衷各种前端向的骚操作&#xff0c;经常想到哪就写到哪&#xff0c;如果有感兴趣的技术和前端效果可以留言&#xff5e;博主看到后会去代替大家踩坑的&#xff5e; 主页: oliver尹的主页 格言: 跌倒了爬起来就好&#xff5e; 来个关注吧&#xff0c;点个赞…

分布式事务,单JVM进程与多数据库,分布式事务技术选型,0-1过程,代码全。

由于很多小白程序员在单一JVM进程配合多数据库的架构环境中,总是考虑一主多从的mysql集群环境。还不知道mysql集群数据库按照表纵向分割以后,也是可以走数据库使用事务的。那么这里使用到的就是分布式事务,XA协议。现在大部分主流的数据库都支持XA协议。这里不用太多废话,直…

【Web智能聊天客服】之JavaScript、jQuery、AJAX讲解及实例(超详细必看 附源码)

觉得有帮助请点赞关注收藏~~~ 一、JavaScript基础 Javascript是网页编程语言&#xff0c;决定网页元素的动作。HTML页面中通过<script></script>指定Javascript内容&#xff0c;通过//或者 /* */执行代码的备注功能&#xff0c;并且区分大小写。 1&#xff1a;变…

《视觉SLAM十四讲》示例程序编译报错处理(上)

高翔博士《视觉SLAM十四讲》这本书中的代码很不错&#xff0c;适合初学者。可惜有一些可能因为版本的问题会报错&#xff0c;本文总结一下我遇到的问题。 在slambook2/3rdparty文件夹git submodule update&#xff0c;这个版本是和书中的版本一致的。但我已经重新安装了新版Ei…

Webpack中的高级特性

自从webpack4以后&#xff0c;官方帮我们集成了很多特性&#xff0c;比如在生产模式下代码压缩自动开启等&#xff0c;这篇文章我们一起来探讨一下webpack给我们提供的高级特性助力开发。 探索webpack的高级特性 特性&#xff1a;treeShaking 顾名思义treeShaking&#xff0…

C++ Reference: Standard C++ Library reference: Containers: deque: deque: swap

C官网参考里链接&#xff1a;https://cplusplus.com/reference/deque/deque/swap-free/ 函数模板 <deque> std::swap (deque) template <class T, class Alloc> void swap (deque<T,Alloc>& x, deque<T,Alloc>& y); 交换两个双端队列容器的…

【ESP32调试-快速入门】

文章目录ESP32调试一. 环境搭建二. 运行OpenOCD1. 烧入blink2. 找到芯片型号对应的脚本文件&#xff0c;并运行脚本命令三. 运行GDBESP32调试 一. 环境搭建 ESP_IDF环境搭建 二. 运行OpenOCD 1. 烧入blink 如&#xff1a;安装环境中的examples中的blink 路劲&#xff1a;安装…

华为机试 - 探索地块建立

目录 题目描述 输入描述 输出描述 用例 题目解析 算法源码 题目描述 给一块n*m的地块&#xff0c;相当于n*m的二维数组&#xff0c;每个元素的值表示这个小地块的发电量&#xff1b; 求在这块地上建立正方形的边长为c的发电站&#xff0c;发电量满足目标电量k的地块数量…

汽车保养app开发,扩充汽车服务市场发展商机

从汽车市场规模来看&#xff0c;从2017年开始始终保持增长的发展趋势&#xff0c;在2021年市场规模达到140877.18亿元。互联网时代发展下&#xff0c;汽车后市场大力推广电子商务&#xff0c;将互联网技术与汽车保养服务相结合是汽车服务行业强大的必由之路。二者的结合可以让消…

centos7下搭建rabbitmq的开发环境

我们在项目开发的时候都不可避免的会有异步化的问题,比较好的解决方案就是使用消息队列,可供选择的队列产品也有很多,比如轻量级的redis, 当然还有重量级的专业产品rabbitmq,rabbitmq好就好在是用erlang(二郎神)开发的,它那天生的OTP并行计算框架,轻而易举的进程间通…

阿里云ssl免费证书申请

目录为什么申请SSL证书SSL证书申请支持的域名类型ssl免费证书申请过程为什么申请SSL证书 由于web服务部署需要使用https安全协议&#xff0c;因此需要申请相应域名的SSL证书用于部署。测试阶段&#xff0c;为节省成本&#xff0c;使用阿里云提供的免费SSL证书。 SSL证书申请支…