关于KafkaTemplate与 @KafkaListener生产者与消费者功能的实现

news2024/12/23 12:13:12

1.前言:

1.1关于生产者与消费者的详细介绍请查看另一篇文章:

使用JavaApi实现模拟Kafka的消息生产者与发送者icon-default.png?t=N7T8http://t.csdnimg.cn/ukNSU

 

1.2 本文使用 KafkaTemplate与 @KafkaListener实现生产者与消费者功能:

        Kafka 是一个流行的分布式流处理平台,广泛用于构建实时数据管道和流应用程序。在 Java 应用程序中,Spring Framework 提供了对 Kafka 的集成支持,通过 spring-kafka 模块实现。KafkaTemplate@KafkaListener 是 Spring Kafka 中的两个重要组件,它们分别用于发送消息和接收消息。

所需依赖:

!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

2. KafkaTemplate生产者发送消息到kafka

        KafkaTemplate 是Spring Kafka提供的一个高级抽象,用于简化Kafka消息的生产。它封装了KafkaProducer生产者客户端的复杂性,并提供了一系列发送消息的方法。

2.1 主要特性:

  1. 线程安全:KafkaTemplate 是线程安全的,可以在多个线程中共享使用。
  2. 消息发送:支持同步和异步发送消息。
  3. 消息类型:支持发送键值对(Key-Value)消息和仅值消息。
  4. 事务管理:支持在事务中发送消息。

2.2 常用方法:

  • send(String topic, V data):发送一个仅值消息到指定主题。
  • send(String topic, K key, V data):发送一个键值对消息到指定主题。
  • send(Message<?> message):发送一个Spring Kafka Message对象。
  • send(String topic, Integer partition, K key, V data):发送消息到指定主题的指定分区。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
    kafkaTemplate.send(topic, message);
}

3.@KafkaListener消费者从kafka中获取消息进行消费

        @KafkaListener 是Spring Kafka提供的一个注解,它简化了消息的接收和处理。用于标记方法作为 Kafka 消费者来监听特定的主题。当消息到达时,被标记的方法将会被自动调用来处理这些消息。这使得消息的处理能够以事件驱动的方式进行,无需轮询或显式拉取。

3.1主要特性:

  1. 消息监听:可以监听一个或多个主题的消息。
  2. 分区监听:可以指定监听特定分区的消息。
  3. 消息类型:支持接收键值对(Key-Value)消息和仅值消息。
  4. 消息转换:支持将接收到的消息转换为具体的对象。

3.2常用属性:

  • topics:监听的Kafka主题,可以是多个。
  • topicPattern:监听的Kafka主题模式,支持正则表达式。
  • groupId:消费者组ID。
  • containerFactory:指定KafkaListenerContainerFactory,用于自定义消费者配置。
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
    System.out.println("Received message: " + message);
}

        在这个例子中,listen 方法将被自动调用,每当 Kafka 消费者从 "myTopic" 主题的 "myGroup" 消费者组接收到消息时。

4. 代码举例

        最近有个需求,记录不同系统之间的交互日志,使用拦截器将日志拦截,进行前置与后置处理,最终组装成json发送到kafka中,消费者消费kafka中的数据,最终将日志数据进行入库操作,前端进行展示。

        下面只将KafkaTemplate与 @KafkaListener举例出来,其他不过多展示。

4.1 生产者KafkaTemplate

@Slf4j
@Tag(name = "运营平台-系统日志管理")
@RestController
@RequestMapping("/log")
public class SystemInteractionLogController {

    @Autowired
    private SystemInteractionLogService systemInteractionLogService;
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/page")
    @Operation(summary = "日志分页查询")
    //分组设计  根据messageId分类
    public Result<PageData<SystemInteractionLogVO>> page(@RequestBody LogQueryPageDTO dto) {
        return Result.success(systemInteractionLogService.queryPage(dto));
    }


    @PostMapping("/queryInfoById/{id}")
    @Operation(summary = "日志详情查询")
    public Result<SystemInteractionLogInfoVO> queryInfoById(@PathVariable Long id) {
        Validator.validateNotNull(id, "id不能为空");
        return Result.success(systemInteractionLogService.queryInfoById(id));
    }

    @PostMapping ("/testSendLogToKafka")
    @Operation(summary = "(测试使用)往kafka中发送数据")
    public void sendMessage(@RequestBody SystemInteractionLog systemInteractionLog) {
        String topicName = "system-interaction-logs";
        if(ObjectUtils.isEmpty(systemInteractionLog)){
            log.info("数据为空,不操作");
            return;
        }
        try {
            String message = JSONUtil.toJsonStr(systemInteractionLog);
            log.info("消息为:{}",message);
            kafkaTemplate.send(topicName, message);
        }catch (Exception e){
            log.error("消息发送失败:{}",e.getMessage());
        }
    }

}

4.2 消费者@KafkaListener

/**跨系统交互日志监听
 * @author ZhaoShuhao
 * @data 2024/7/13 9:46
 */
@Slf4j
@Component
public class SystemInteractionLogListener {
    @Resource
    private OperateApi operateApi;
 @KafkaListener(topics="system-interaction-logs",groupId="system-interaction-logs-groupId")
 public void receiveTask(String message, Acknowledgment ack) {
     try {
         log.info("接收到消息:{}", message);
         if(StringUtils.isNoneBlank(message)){
             SystemInteractionLogDto logDto  = JSONUtil.toBean(message, SystemInteractionLogDto.class);
              //保存入库操作
             Result<Boolean> result  = operateApi.saveLog(logDto);
             if(!ObjectUtils.isEmpty(result)){
                 if(result.getCode() == 200){
                     log.info("消息处理成功:{}", logDto);
                     //只有保存成功后才会确认接收到消息
                     ack.acknowledge();
                 }else {
                     log.info("消息保存失败:{}", logDto);
                 }
             }
         }
     }catch (Exception e) {
         //异常数据不做处理
         log.info("消息保存失败,参数为{}", message);
         log.error(ExceptionUtils.getStackTrace(e));
     }
 }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1928135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决RuntimeError: Couldn‘t load custom C++ ops. This can happen if your PyTorch

问题描述 刚下好yolov8的代码&#xff0c;想测一下能否成果&#xff0c;果然没成功&#xff0c;报错如下 RuntimeError: Couldnt load custom C ops. This can happen if your PyTorch and torchvision versions are incompatible, or if you had errors while compiling tor…

设计模式-创建型模式之工厂方法模式

和简单工厂模式中工厂负责生产所有产品相比&#xff0c;工厂方法模式将生成具体产品的任务分发给具体的产品工厂&#xff0c;定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪个产品类对象。 工厂方法模式的主要角色: 抽象工厂(AbstractFactory):提供了创建产品的接…

C++20中的constinit说明符

constinit说明符断言(assert)变量具有静态初始化&#xff0c;即零初始化和常量初始化(zero initialization and constant initialization)&#xff0c;否则程序格式不正确(program is ill-formed)。 constinit说明符声明具有静态或线程存储持续时间(thread storage duration)的…

代谢组数据分析(十四):代谢物组间网络分析(spearman coefficient)

介绍 在代谢物网络分析领域,研究者采用斯皮尔曼系数来定量评估代谢物之间的相关性。该系数作为一种有效的非参数统计工具,能够揭示代谢物间潜在的关联模式,不受它们分布特性的限制。通过计算所有代谢物配对间的斯皮尔曼系数,研究者能够构建出反映代谢物相互关系的网络。 …

Word创建多级列表的样式

Word创建多级列表的样式 要求结果方法创建样式修改样式设置段落创建快捷键 关联多级列表 要求 创建自定义的三级列表样式&#xff0c;要求标题均为黑体&#xff0c;小四字号&#xff0c;1.5倍行距&#xff0c;有快捷键。 结果 方法 在样式中创建三个样式。 创建样式 录入名…

BL201分布式I/O耦合器连接Profinet网络

钡铼技术的BL201分布式I/O耦合器是一个用于Profinet网络的设备&#xff0c;用于连接远程输入/输出&#xff08;I/O&#xff09;设备到控制系统&#xff0c;如可编程逻辑控制器&#xff08;PLC&#xff09;&#xff0c;能够实现分布式的I/O连接和通信。 它支持标准Profinet IO …

鸿蒙语言基础类库:【@system.bluetooth (蓝牙)】

蓝牙 说明&#xff1a; 开发前请熟悉鸿蒙开发指导文档&#xff1a;gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到。 从API Version 7 开始&#xff0c;该接口不再维护&#xff0c;推荐使用新接口[ohos.bluetooth]。本模块首批接口从API version…

【Python学习笔记】:Python爬取音频

【Python学习笔记】&#xff1a;Python爬取音频 背景前摇&#xff08;省流可以不看&#xff09;&#xff1a; 人工智能公司实习&#xff0c;好奇技术老师训练语音模型的过程&#xff0c;遂请教&#xff0c;得知训练数据集来源于爬取某网页的音频。 很久以前看B站同济子豪兄的《…

Android 10.0 SystemUI下拉状态栏固定展开QsPanel不收缩功能实现

1. 前言 在10.0的系统ROM产品定制化开发中,在systemUi的原生下拉状态栏中,首次下拉展开quickQsPanel,第二次展开就显示 QsPanel,在产品开发中,需要下拉状态栏固定展开QsPanel,不需要二次展开,接下来分析下相关功能的实现,如图: 2.SystemUI下拉状态栏固定展开QsPanel不收…

原创音乐小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;歌曲类型管理&#xff0c;歌曲信息管理&#xff0c;热门歌手管理&#xff0c;音乐资讯管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;歌曲信息&am…

教你用服务器部署欧洲卡车模拟器2开服

1、购买后登录服务器&#xff08;百度莱卡云&#xff09; 进入控制面板后会出现正在安装的界面&#xff0c;安装大约5分钟&#xff08;如长时间处于安装中请联系我们的客服人员&#xff09; 2、修改查询端口 点击网络&#xff0c;两个端口已经创建完成 复制不是首选的端口&am…

pycharm+pyqt6配置

1、pip install pyqt6 pyqt6-toools 2、pycharm配置 配置&#xff1a;designer Program&#xff1a;&#xff1a;D:\Python39\Lib\site-packages\qt6_applications\Qt\bin\designer.exe Working directory: $ProjectFileDir$ 配置&#xff1a;pyuic6.exe Program&#xff1a…

高频面试题基本总结回顾2(含笔试高频算法整理)

目录 一、基本面试流程回顾 二、基本高频算法题展示 三、基本面试题总结回顾 &#xff08;一&#xff09;Java高频面试题整理 &#xff08;二&#xff09;JVM相关面试问题整理 &#xff08;三&#xff09;MySQL相关面试问题整理 &#xff08;四&#xff09;Redis相关面试…

uniapp中使用uni-ui组件库

src目录下新建components目录从uni-ui引入对应的组件目录&#xff0c;如下图 直接使用组件&#xff0c;demo <template><view id"my" data-name"王五" data-age"18">my页面</view><uni-data-select :localdata"local…

POLYGCL GRAPH CONTRASTIVE LEARNING VIA LEARNABLE SPECTRAL POLYNOMIAL FILTERS

发表于:ICLR24 推荐指数: #paper/⭐⭐⭐ 写作动机 作者之前写过一篇文章:ChebNetII,但是,作者那个时候只考虑了低通滤波器,这在异配图中是有限制的,因此作者写了这篇文章.是spot light文章,证明严谨,值得一读(但是需要阅读相关文章) 相关工作 对比学习 图增强基础的对比学…

JVM的三种垃圾回收算法

目录 1、标记-清除算法 2、标记-复制算法 3、标记-整理算法 总结&#xff1a; jvm先判断哪些需要回收&#xff0c;哪些需要保留&#xff0c;通常采用可达性分析算法标记存活对象&#xff1a;判断对象能否回收的两种方法&#xff0c;以及JVM引用-CSDN博客 1、标记-清除算法 …

数据结构(复杂度)

复杂度 算法在编写成可执行程序后&#xff0c;运⾏时需要耗费时间资源和空间(内存)资源。因此衡量⼀个算法的好 坏&#xff0c;⼀般是从时间和空间两个维度来衡量的&#xff0c;即时间复杂度和空间复杂度。 时间复杂度主要衡量⼀个算法的运⾏快慢&#xff0c;⽽空间复杂度主要…

1. 黑盒测试

黑盒测试 1. 黑盒测试定义 黑盒测试是一种软件测试技术&#xff0c;它可以检查软件的功能&#xff0c;而不会窥视其内部结构或编码。黑盒测试的主要来源是客户声明的要求规范。 黑盒测试的特点&#xff1a; 黑盒测试与软件的具体实现过程无关&#xff0c;如果实现过程发生了…

NetSuite RPA技术实践

近期有同学提出一个需求。 “需要存取的報表是存貨分類帳(stock ledger)&#xff0c;將查到的各個[Item|Location]作為一組key&#xff0c;分別將報表中的「期末庫存量」「期末平均成本」「期末庫存量價值」這三欄的值&#xff0c;在每個月月底的時候自動將這個報表的這三欄數…

毕设项目springboot+vue实现的在线求职平台

一、前言 随着信息技术的飞速发展和互联网的普及&#xff0c;线上求职已成为众多求职者和企业招聘的重要渠道。为满足市场需求&#xff0c;我们利用Spring Boot和Vue技术栈&#xff0c;开发了一款功能全面、用户友好的在线求职平台。本文将对该平台的设计、实现及关键技术进行详…