Apache Kafka - ConsumerInterceptor 实战(2)

news2024/10/7 8:31:30

文章目录

  • Pre
  • 思路
  • 示例
    • 配置文件
    • 自定义 拦截器
    • 使用
    • 测试
  • 小结

在这里插入图片描述


Pre

Apache Kafka - ConsumerInterceptor 实战 (1) 用代码的方式实现了ConsumerInterceptor , 接下来我们用 配置的方式来实现一下 。


思路

如何找配置类
KafkaProperties

在这里插入图片描述

有些属性是很明显的有的,其他没有的一般都在 Map里

在这里插入图片描述

那map的 key value 从哪里找呢?

找原生的配置 Kafka Consumer的 都在 ConsumerConfig

在这里插入图片描述
找到

    public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";

在这里插入图片描述

OK,继续


示例

配置文件

在这里插入图片描述

自定义 拦截器

package net.zf.module.system.kafka.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @author artisan
 */

@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {

    /**
     * 消息消费前的拦截处理
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        // TODO
        log.info("FailureRateInterceptor#onConsume");

        // 根据设定的规则计算失败率,并进行判断是否跳过消息的消费
        // 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象

        return consumerRecords;
    }

    /**
     * 消息提交前进行拦截处理
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        log.info("FailureRateInterceptor#onCommit");
    }


    /**
     * 拦截器关闭前进行拦截处理(如果有的话)
     */
    @Override
    public void close() {
        log.info("FailureRateInterceptor#close");
    }

    /**
     * 初始化配置(如果有的话)
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        log.info("FailureRateInterceptor#configure");
    }
}


使用

在这里插入图片描述


测试

启动服务,发送消息,进行消费

在这里插入图片描述


小结

在Spring Boot中配置Kafka消费者的拦截器需要进行以下步骤:

  1. 首先,创建一个拦截器类,实现Kafka的ConsumerInterceptor接口,定义拦截器的逻辑。
  2. 在应用的配置文件(例如application.propertiesapplication.yml)中,添加拦截器相关的配置项,其中包括设置interceptor.class属性为拦截器类的全限定名。

下面是一个示例,演示如何在Spring Boot中配置Kafka消费者的拦截器:

  1. 创建拦截器类:
@Slf4j
@Component
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        // 在消息消费前的处理逻辑
        // ...
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 在消息提交前的处理逻辑
        // ...
    }

    @Override
    public void close() {
        // 拦截器关闭前的处理逻辑
        // ...
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化配置的处理逻辑
        // ...
    }
}
  1. 在应用的配置文件中设置拦截器相关的配置项:
spring.kafka.consumer.properties.interceptor.classes=com.example.MyConsumerInterceptor

或者在application.yml文件中:

spring:
  kafka:
    consumer:
      properties:
        interceptor.classes: com.example.MyConsumerInterceptor

这样配置之后,Spring Boot会自动创建Kafka消费者,并将指定的拦截器应用于消费者。在消费者处理消息的过程中,拦截器的方法将会被调用,可以在这些方法中编写自定义的逻辑来处理消息或拦截操作。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/569374.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32单片机(二)STM32环境搭建

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

LLMs开源模型们的分布式训练和量化

前一篇博文整理了&#xff1a; LLMs开源模型们和数据集简介 这篇博文主要整理一下目前流行的训练方法和量化。 &#xff08;图自Towards a Unified View of Parameter-Efficient Transfer Learning&#xff09; Tuning Strategies 使通用LLMs适应下游任务的最常见方法是微调…

电气器件系列三十七:多路温度测试仪、温度巡检仪

巡检仪适用于多点测量显示及控制&#xff0c;集多台仪表功能于一体&#xff0c;一般可巡检1&#xff5e;64路测量信号,可巡回检测和显示多路信号&#xff0c;与各类传感器、变送器配合使用&#xff0c;现已开发出八路巡检仪\十六路巡检仪\24路巡检仪-64路可对多路温度、压力、液…

项目总结 车牌识别

代码贴&#xff1a;OpenCV实战5 车牌号识别_opencv车牌字符识别_爱钓鱼的歪猴的博客-CSDN博客 目录 1、效果 2、代码思路 0、准备车配字符模板图片以及字符文件 1、对整图进行预处理 得到突出车牌的cany边缘图 2、车牌字体联通在一起&#xff0c;形成一个区域 3、筛选出车…

【华为OD机试】太阳能板最大面积【2023 B卷|100分】

【华为OD机试】-真题 !!点这里!! 【华为OD机试】真题考点分类 !!点这里 !! 题目描述 给航天器一侧加装长方形或正方形的太阳能板(图中的红色斜线区域),需要先安装两个支柱(图中的黑色竖条), 再在支柱的中间部分固定太阳能板。 但航天器不同位置的支柱长度不同,太阳…

URLConnection(一)

文章目录 1. 简介2. 打开URLConnection3. 读取服务器的数据4. 读取首部5. 获取任意首部字段 1. 简介 URLConnection是一个抽象类&#xff0c;表示指向URL指定资源的活动连接。URLConnection有两个不同但相关的用途。首先&#xff0c;与URL类相比&#xff0c;它对服务器&#x…

chatgpt赋能python:简介:什么是PythonShapiro?

简介&#xff1a;什么是Python Shapiro&#xff1f; Python Shapiro是一种用来进行正态性检验的工具&#xff0c;也就是说&#xff0c;它可以帮助我们检验一个给定的数据集是否符合正态分布的要求。它是从R语言中的Shapiro-Wilk测试方法改编而来的。 如何使用Python Shapiro&…

继瑞吉外卖后的又一个项目——SpringBoot+Vue的前后端博客系统

文章目录 博客系统项目介绍前言项目演示前台演示后台演示 组织结构后端组织结构前端组织结构 技术选型前端技术后端技术架构图系统架构图业务架构图 模块介绍前端模块后端模块 环境搭建开发工具开发环境项目运行 未完待续结语 博客系统项目介绍 前言 本项目已开源在Gitee 后端…

谈谈linux网络编程中的应用层协议定制、Json序列化与反序列化那些事

linux【网络编程】之协议定制、序列化与反序列化 一、序列化与反序列化二、应用层协议如何定制三、网络通信中数据流动的本质四、网络版计算器编写4.1 业务流程4.2 核心代码 一、序列化与反序列化 由于socket api的接口&#xff0c;在读写数据的时候是以字符串的方式发送接收的…

电子科技大学编译原理复习笔记(三):控制结构

目录 前言 重点一览 语句级控制结构 单元级控制结构 四种单元级控制结构 本章小结 前言 本复习笔记基于张老师的课堂PPT&#xff0c;供自己期末复习与学弟学妹参考用。 重点一览 语句级控制结构 定义&#xff1a;用来构造各种语句执行顺序的机制 传统三种语句级控制结…

Hyperledger Fabric explorer区块链浏览器搭建

https://github.com/hyperledger-labs/blockchain-explorer 官方浏览器的github地址 根据文档&#xff0c;采用docker容器的方法搭建explorer。 首先创建explorer的项目&#xff0c; mkdir explorer根据官方提供的文件&#xff0c;需要创建的目录结构如下&#xff1a; 这是官…

【计算机网络复习之路】网络层(谢希仁第八版)万字详解 主打基础

专栏&#xff1a;计算机网络复习之路 目录&#xff1a; 一、网络层的几个重要概念 1.1 网络层提供的两种服务 1.2 网络层的两个层面 二、网际协议 IP 2.1 虚拟互连网络 2.2 IP地址 2.2.1 IP地址及其表示方法 2.2.2 分类的IP地址 2.2.3 无分类编址CIDR &#xff08…

【2023 · CANN训练营第一季】应用开发(初级)第四章——模型推理

AscendCL运行资源管理 申请运行管理资源时&#xff0c;需按顺序依次申请: Device、Context、Stream&#xff0c;然后根据实际需求调用aclrtGetRunMode接口获取软件栈的运行模型(当同一个应用既支持在Host运行&#xff0c;也支持在Device运行时&#xff0c;在编程时需要就需要根…

k8s进阶5——AppArmor、Seccomp、ImagePolicyWebhook

文章目录 一、AppArmor限制容器对资源访问1.1 实现步骤1.1.1 定义策略1.1.2 加载策略1.1.3 引用策略 2.2 案例 二、Seccomp 限制容器进程系统调用案例一&#xff1a;使用自定义策略案例二&#xff1a;使用容器运行时默认策略 三、动态准入控制Webhook3.1 ImagePolicyWebhook控制…

PowerPoint输出图片分辨率设置

最近想用ppt画几张图&#xff0c;但是输出的分辨率有点不够意思&#xff0c;然后就想着改一下输出分辨率&#xff0c;这里记录一下方便以后查阅。 PowerPoint输出图片分辨率设置

PS-调色

图片的储存格式 JPEG格式&#xff1a;日常作图存储格式、只有一个图层、通用图片格式、内存小画质高 RAW格式&#xff1a;CR2、CR3佳能、NEF尼康、ARW索尼、IIQ哈苏、RAF富士、RW2松下 PNG格式&#xff1a;图片透明格式、用于抠出来的素材用这个格式保存可以是透明底没有背景 …

Linux——操作系统详解

目录 一.操作系统的含义 1.操作系统是什么&#xff1f; 2.那么操作系统为什么要对软硬件资源进行管理呢&#xff1f;这样做的好处在哪里&#xff1f; 3.操作系统又是怎么进行管理的&#xff1f; 如何理解“先描述&#xff0c;再组织”&#xff1f; 二.总结&#xff1a; …

conda环境安装使用教程

conda&#xff0c;anaconda&#xff0c;miniconda傻傻分得清楚 Conda是一个开源的包管理系统和环境管理系统&#xff0c;可以用于安装、管理和卸载软件包以及创建和管理虚拟环境。Anaconda是一个基于Python的数据科学平台&#xff0c;包括Python解释器、Conda包管理器、Jupyte…

Linux:为xfs文件系统卷 设置磁盘配额

首先准备一个xfs文件系统的 卷 || 分区 可以是逻辑卷 &#xff0c;也可以是普通卷&#xff0c;等等……但是他们的文件格式都要是xfs格式 我这里选择的是逻辑卷&#xff0c;普通卷也是一样的道理 开始前要有两个软件包需要安装 如果已安装直接看下一步 Linux&#xff1a;rpm…

一个人的旅行

说一下&#xff0c;两次的旅行。一次是三月底四月初&#xff0c;一次是四月底五月初。 我的第一站&#xff0c;帝都&#xff01;&#xff01;&#xff01; 31号晚上八点半的高铁去北京&#xff0c;到达北京已经快十二点了。武局的G528次列车。 来到北京后&#xff0c;这是我第…