SpringBoot集成kafka-自定义拦截器(可以在拦截器中做记录日志、安全检查等操作)

news2025/1/15 13:08:40

@TOC

在这里插入图片描述

1、kafka配置类

  • kafka配置类添加@Configuration注解,springboot启动后会自动读取该配置类;
  • 由于在application.yml文件中我们找不到kafak拦截器相关的配置项,因此需要自定义拦截器;
  • 消费者相关配置方法中添加自定义拦截器配置,这样就可以在自定义拦截器中处理个性化业务需求;
  • 配置类中需要注入消费者工厂bean和消费者监听器工厂,以替换kafak内置默认的消费者工厂和消费者监听器工厂。
package com.power.config;

import com.power.Inteceptor.CustomConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka配置类
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    /**
     * 消费者相关配置
     * @return
     */
    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        //添加一个消费者拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
        return props;
    }

    /**
     * 消费者工厂
     */
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String,String> ourConsumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerContainerFactory.setConsumerFactory(ourConsumerFactory);
        return listenerContainerFactory;
    }

}

2、自定义拦截器类

package com.power.Inteceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

/**
 * 自定义的消费者拦截器
 */
public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {

    /**
     * 在消费消息之前执行
     * @param record
     * @return
     */
    @Override
    public ConsumerRecords onConsume(ConsumerRecords record) {
        System.out.println("onConsumer方法执行(在消费消息之前执行),record="+record);
        return record;
    }

    /**
     * 在拿到消息之后,提交offset之前执行该方法
     * @param offsets
     */
    @Override
    public void onCommit(Map offsets) {
        System.out.println("onCommit方法执行(在拿到消息之后,提交offset之前执行该方法),offsets="+offsets);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3、消费者

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    @KafkaListener(topics = {"interceptorTopic"}, groupId = "interceptorGroup", containerFactory = "ourKafkaListenerContainerFactory")
    public void onEvent(ConsumerRecord<String, String> record) {
        System.out.println("消费者消费消息record = " + record);
    }
}

4、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        User user = User.builder().id(1).phone("15676767673").birthday(new Date()).build();
        String userJson = JSONUtils.toJSON(user);
        kafkaTemplate.send("interceptorTopic","k", userJson);

    }

}

5、实体类(用于发送接收对象消息)

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

6、JSON工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

7、启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

import java.util.Map;

@SpringBootApplication
public class Kafka04Application {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(Kafka04Application.class, args);

		Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);
		beansOfType.forEach((k,v)->{
			System.out.println(k+" -- "+v);
		});

		System.out.println("----------------------------------------------------");

		Map<String, KafkaListenerContainerFactory> beansOfType2 = context.getBeansOfType(KafkaListenerContainerFactory.class);
		beansOfType2.forEach((k,v)->{
			System.out.println(k+" -- "+v);
		});
	}

}

以下红框内容用于查看SpringBoot启动后注入的类型
在这里插入图片描述
在这里插入图片描述

8、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot04KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.sendEvent();
    }

}

9、测试

  • 先启动消费者监听
  • 在启动生产者发送消息
  • 测试结果发现,消费者走了我们自定义的拦截器

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2075285.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FORTIFY: FD_ISSET: file descriptor 1024 >= FD_SETSIZE 128 记录

问题 在开发过程中&#xff0c;遇到一个问题&#xff0c;即使用FD_ISSET时&#xff0c;当文件描述符数量超过1023&#xff0c;导致netd进程出现crash。通过代码和log分析&#xff0c;发现这是由于内核限制导致的数组越界问题。 总结&#xff1a;FD_ISSET(sock, &read_fds)…

【日记】这个月花了好多钱(1317 字)

正文 这几天都好热。热到人不想动&#xff0c;只想睡觉。 今天写文章发现自己有个很显著的特点&#xff0c;就是在有个框架之后&#xff0c;具体细节完全没有预设。我只能像马尔可夫链一样&#xff0c;形成一个比较窄的窗口&#xff0c;接着这个窗口里的情节往下写&#xff0c;…

.NET Razor类库 - 生成NuGet包

上一篇讲了Razor类库组件化&#xff1a;https://blog.csdn.net/CsethCRM/article/details/141558974 本篇说一下Razor类库生成NuGet包 1.右键Razor类库项目 - 属性 2. 输入Nuget 包信息 点击 左侧菜单 包 在生成操作期间 创建包文件 打勾 版本号 我们输入 2023.1.0 作者 Xxx…

外卖霸王餐项目是什么?怎么搭建属于自己的外卖霸王餐小程序 ?

前言&#xff1a; 外卖霸王餐项目是一种结合了优惠促销与推广合作的商业模式&#xff0c;主要针对外卖行业。这个项目的核心是通过提供低于市场价的外卖餐品&#xff08;通常是半价或者更大折扣&#xff09;来吸引新用户尝试&#xff0c;并通过用户的口碑传播来增加餐厅的知名…

降本高达30%,磁集成是电源企业的福音吗?

导语 为什么说磁集成将会是大功率电源产品趋势?因为终端价格战越来越激烈&#xff0c;只有磁集成才能同时解决电源企业的三大核心竞争需求。 终端持续“卷”价格 储能价格正式步入0.5元时代。从价格战的角度来看&#xff0c;储能领域自2023年起就已经进入“0.5元/Wh时代”&…

集群 NAT(地址转换)、TUN(IP隧道)、DR(直接路由)

一、企业群集应用概述 1、群集的含义 ①、Cluster、集群、群集 ②、由多台主机构成&#xff0c;但对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与IP地址&#xff09;&#xff0c;相当于一台大型计算机。 2、问题 ①、互联网应用中&#xff0c;随着站…

composer常用命令列表和实践使用、服务器lnmp环境自动化部署脚本及netstat命令常用选项笔记-及state各值的意义

一、composer常用命令列表和实践使用 1. composer常用的命令列表如下&#xff1a; #. composer install 命令&#xff08;composer.lock与composer.json&#xff09; 如果当前目录下存在composer.lock文件&#xff0c;则从此文件读取依赖版本&#xff0c;否则就读取compose…

计算机毕业设计选题推荐-社区康养管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

SQL注入漏洞WAF绕过

目录 如何检测和防范SQL注入攻击中的编码伪装&#xff1f; 检测SQL注入攻击中的编码伪装 防范SQL注入攻击中的编码伪装 WAF在处理SQL注入时为什么有时会对大小写不敏感&#xff1f; SQL注入中的联合查询注入有哪些常见的攻击方式&#xff1f; 在绕过Web应用防火墙&#xf…

软件工程造价师习题练习 22

1.公文管理系统可以设置公文处理提示的方式和频率。系统缺省设置为邮件方式及每天提醒。则对于公文管理系统,“公文处理提示方式及频率的缺省设置信息”配置信息缺省默认值是业务数据。 正确 错误 要判断“公文处理提示方式及频率的缺省设置信息”配置信息缺省默认值是否是业务…

Nginx 反向代理实现 Tomcat 高可用性负载均衡详解

Tomcat 简介 Tomcat是Apache软件基金会&#xff08;Apache Software Foundation&#xff09;的Jakarta 项目中的一个核心项目&#xff0c;由Apache、Sun和其他一些公司及个人共同开发而成。 Tomcat服务器是一个免费的开放源代码的Web应用服务器&#xff0c;属于轻量级应用服务…

vue项目关于ERR_OSSL_EVP_UNSUPPORTED的问题

opensslErrorStack: [ error:03000086:digital envelope routines::initialization error ], library: digital envelope routines, reason: unsupported, code: ERR_OSSL_EVP_UNSUPPORTED 该问题通常与 OpenSSL 库版本不兼容或配置问题有关&#xff0c;特别是在使用 No…

Docusign IAM|5 种方式优化团队的协议管理工作流!

本文将介绍 Docusign IAM 如何帮助你的团队自定义协议工作流程并改进端到端协议流程的五种方式。 团队创建、承诺和管理协议的传统方式充满漏洞。这些流程涉及过多的技术系统、繁复的步骤&#xff0c;以及员工在不同工具间手动转移文档和数据的繁琐操作。我们在与企业讨论其协议…

【运维高级内容--MySQL】

目录 一、mysql安装 二、MySQL主从复制 一、mysql安装 yum install cmake gcc-c openssl-devel ncurses-devel.x86_64 rpcgen.x86_64 #安装依赖性 #在root路径下下载mysql-boost-5.7.44、libtirpc-devel-1.3.3-8.el9_4.x86_64.rpm安装包 yum install libtirpc-devel…

基于深度学习的道路缺陷检测系统(含UI界面、yolov5、Python代码、数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下&#xff1a; 算法模型&#xff1a;     yolov5、yolov5 SE注意力机制&#xff0c;两个模型都已训练好&#xff0c;可直接使用。 数据集&#xff1a;     网上下载的数据集&#xff0c;格式都已转好&#xff0c;可…

下载官方llama

1.官网.pth格式 去官网&#xff08;Download Llama (meta.com)&#xff09;申请 具体可以看这个B站视频 Llama2模型申请与本地部署详细教程_哔哩哔哩_bilibili&#xff08;视频是llama2&#xff0c;下载llama3是另外一个git&#xff09; 相关代码如下 git clone https://g…

【那些年错过的好书】——Python数据可视化:科技图表绘制

正文开始 前言推荐理由作者简介书籍介绍章节介绍实书示例写在最后 前言 读万卷书&#xff0c;行万里路。 书籍免费获取方式&#xff1a;小程序搜索【中二少年工具箱】&#xff0c;找到抽奖功能&#xff08;如果已经做出来的话&#xff09;&#xff0c;直接抽奖获取。或者私信…

五、数组、排序和查找

文章目录 一、数组1.1 数组介绍1.2 数组的使用1.3 数组使用注意事项和细节1.4 数组赋值机制1.5 数组反转1.6 数组添加 二、排序2.1 排序的介绍2.2 冒泡排序 三、查找四、多维数组4.1 二维数组的使用4.2 二维数组的应用案例4.3 二维数组使用细节和注意事项4.4 课堂练习 细节知识…

车企重新审视「自研」

一直以来&#xff0c;“全栈自研”成了车企布局智能化赛道的关键词&#xff0c;尤其是「软件定义汽车」被视为行业游戏规则的改变者。然而&#xff0c;在很多供应商看来&#xff0c;“运营一个高效的内部软件开发团队&#xff0c;极具挑战性。” 至少到目前为止&#xff0c;软件…

PHP多商户跨店统一消费券系统程序源码

&#x1f389;【购物新风尚】多商户跨店统一消费券系统&#xff0c;省钱新体验来袭&#xff01;&#x1f4b0; &#x1f6cd;️ 开篇&#xff1a;告别单一&#xff0c;拥抱多元优惠 还在为错过心仪店铺的优惠券而懊恼吗&#xff1f;&#x1f614; 告别那份遗憾&#xff0c;多…