kafka发送消息-自定义消息发送的拦截器

news2025/1/17 9:00:46

在这里插入图片描述

1、自定义拦截器

创建自定义拦截器类,实现ProducerInterceptor接口。对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作…
在这里插入图片描述

package com.power.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {

    /**
     * 发送消息时,会调用该方法,对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作......
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String,Object> onSend(ProducerRecord record) {
        System.out.println("拦截消息:"+record.toString());
        return record;
    }

    /**
     * 服务器收到消息后的一个确认
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(null!=metadata){
            System.out.println("服务器接收到该消息:"+metadata.toString());
        }else {
            System.out.println("消息发送失败了,exception = "+exception);
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

2、kafak配置类

在这里插入图片描述

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerProducerInterceptor.class.getName());
        return props;
    }

    public ProducerFactory<String, ?> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, ?> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    //第二次创建
    @Bean
    public NewTopic newTopic9() {
        return new NewTopic("heTopic", 9, (short) 1);
    }
}

3、生产者

在这里插入图片描述

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",user);
    }
}

4、测试类

在这里插入图片描述

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.send10();
    }

}

5、执行测试类

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2070389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

会话技术:Cookie、Session、JWT的优缺点分析与实践

登录认证 会话技术方案一&#xff1a;Cookie方案二&#xff1a;Session方案三&#xff1a;令牌技术JWT令牌介绍生成和校验登录下发令牌案例 会话技术 会话&#xff1a;用户打开浏览器&#xff0c;访问web服务器的资源&#xff0c;会话建立&#xff0c;直到有一方断开连接&…

Java语言程序设计——篇十七(2)

&#x1f33f;&#x1f33f;&#x1f33f;跟随博主脚步&#xff0c;从这里开始→博主主页&#x1f33f;&#x1f33f;&#x1f33f; 欢迎大家&#xff1a;这里是我的学习笔记、总结知识的地方&#xff0c;喜欢的话请三连&#xff0c;有问题可以私信&#x1f333;&#x1f333;&…

vscode tailwind插件无提示

说明 Tailwind CSS IntelliSense插件 版本: v0.12.7 问题: 无代码提示 解决 降版本

string的常用函数

目录 一.string 的 构造函数 二. iterator&#xff08;迭代器&#xff09;&#xff08;类似指针&#xff0c;但不是指针&#xff09; 2.1 begin 和 end 2.2 rbegin 和 rend 三. Capacity 函数 3.1 size 函数 3.2 capacity 函数 3.3 resize 函数 3.4 reserve函数 …

MySQL 相关知识笔记

一、字符编码 MySQL 默认的编码是 utf8&#xff0c;仅支持三个字节的存储&#xff1b;utf8mb4 支持四个字节的存储。 二、数据库操作 查询 查询所有数据库&#xff1a;SHOW DATABASES;查询当前数据库&#xff1a;SELECT DATABASE(); 创建 CREATE DATABASE [IF NOT EXISTS] 数据…

搜维尔科技:Manus Prime 3 Mocap 数据手套VR手套动作捕捉手套

通过在设置中添加手动捕捉功能&#xff0c;轻松创建动画内容。非常适合独立Mocap、预可视化、现场表演流和V-tubing。即插即用符合行业标准具有无与伦比的易用性。 通过添加精确的手部和手指动作捕捉数据&#xff0c;为您的虚拟角色加入情感和个性。Manus Prime 3 Mocap 数据手…

欧拉函数.

欧拉函数 给定 n个正整数 ai&#xff0c;请你求出每个数的欧拉函数。 欧拉函数的定义 输入格式 第一行包含整数 n。 接下来 n行&#xff0c;每行包含一个正整数 ai。 输出格式 输出共 n行&#xff0c;每行输出一个正整数 ai的欧拉函数。 数据范围 1≤n≤100, 1≤ai≤2109 输入…

【逐行注释】MATLAB下的IMM-EKF代码

IMM-EKF 基于EKF的多模型交互。以CV和CT两个模型进行交互&#xff0c;这里对代码进行逐行注释。 注释较多&#xff0c;个人理解的时候如果有误&#xff0c;欢迎指正。 每一行都有注释&#xff1a; 模型概况 二维平面上的运动模型&#xff0c;由CV和CT构成&#xff0c;基于…

【机器学习-监督学习】支持向量机

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…

Linux:Bash中的命令介绍(简单命令、管道以及命令列表)

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 在Bash中&#xff0c;命令执行的方式可以分为简单命令、管道和命令列表组成。这些结构提供了强大的工具&#xff0c;允许用户组合命令并精确控制其执行方式。以下是对这…

分享一个基于文本挖掘的微博舆情分析系统Python网络舆情监控系统Flask爬虫项目大数据(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

Comsol TPMS_Diamond多孔构型吸声性能仿真

TPMS_Diamond多孔构型是一种新型的吸声材料结构&#xff0c;它采用了三维打印技术制备而成。该构型的设计灵感来自于结晶体的晶格结构&#xff0c;通过将正方形的单元堆积并沿着特定方向旋转&#xff0c;形成了一种类似于钻石的多孔结构。 TPMS_Diamond多孔构型具有以下特点&a…

systemback制作Ubuntu自己的系统镜像

systemback制作Ubuntu自己的系统镜像 目录1.安装、制作2.解决镜像大于4GB的问题3.写入镜像到U盘&#xff14;.安装 目录 systemback制作Ubuntu自己的系统镜像 最近需要备份自己的系统&#xff0c;同时制作安装镜像在另一台笔记本上安装&#xff0c;这里找到了一款很不错的软件…

excel表格输入数据生成函数曲线图

1.新建一个excel表格&#xff0c;横向或者纵向输入x轴点的数据 下图以横向数据为例子&#xff0c;两个y轴数据&#xff0c;生成两个函数曲线 2.右键选中上面输入的数据&#xff0c;点击插入&#xff0c;选择你要构造的函数样式 3.选中带平滑散点图即可生成前面数据生成的函数 …

内存管理篇-06Per-CPU页帧缓存

per-CPU缓存是对伙伴系统的完善&#xff0c;也是伙伴系统中的一部分。再回顾一下zone结构体的内容&#xff0c;这里的__percpu *pageset实际上就是Per-CPU的实现机制&#xff0c;所以这里的内存实际上最少有三部分&#xff0c;&#xff08;1&#xff09;free_area管理了大部分的…

数学建模之Matlab快速入门--全

前言&#xff1a; 本文是之前学Matlab时候做的笔记&#xff0c;很适合快速入门数学建模中matlab和python是最常用的两个软件&#xff0c;现在本人更喜欢python去做数学建模 文章目录 界面介绍与操作快捷操作 数据类型数值型整型浮点型复型逻辑型字符型struct数组cell数组函数句…

区块链国赛第六套样题(关于运维)

任务1-2&#xff1a;区块链系统部署与运维 围绕食品安全溯源区块链平台部署与运维需求&#xff0c;进行项目相关系统、节点以及管理工具的部署工作。通过监控工具完成对网络、节点服务的监控。最终利用业务需求规范&#xff0c;完成系统日志、网络参数、节点服务等系统结构的维…

Jetson安装Archiconda3全过程

1. 下载Archiconda3 下载网址&#xff1a; 发布 Archiconda/build-tools --- Releases Archiconda/build-tools (github.com)​​​​​​ 2. 执行命令 bash ./Archiconda3-0.2.2-Linux-aarch64.sh 3. conda换源 conda config --add channels https://mirrors.tuna.tsing…

计算机网络-PIM-SM组播实验

一、概述 目前为止我们学习了组播转发网络中的PIM协议&#xff0c;PIM模型有两种&#xff1a; PIM-DM主要使用在网络规模较小&#xff0c;用户集中的组播网络中。 PIM-SM主要使用在网络规模较大&#xff0c;用户较为分散的组播网络中。PIM-SM基于组播模型又可以分为PIM-SM&…

5.Lab four —— Trap

首先切换traps分支 git checkout traps make clean RISC-V assembly 代码&#xff1a; #include "kernel/param.h" #include "kernel/types.h" #include "kernel/stat.h" #include "user/user.h"int g(int x) {return x3; }int f(…