SpringBoot集成kafka-监听器手动确认接收消息(主要为了保证业务完成后再确认接收)

news2024/9/24 11:27:42

SpringBoot集成kafka-监听器手动确认接收消息

  • 1、说明
  • 2、示例
    • 2.1、application.yml
    • 2.2、消费者
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、测试

1、说明

kafak中默认情况下是自动确认消息接收的,也就是说先启动消费者监听程序,再启动生产者发送消息,此时消费者监听到生产者发送的消息后,程序会自动确认接收成功,偏移量会自动下移,此时再启动消费者,偏移量会从新的位置读取数据,如果本次出现异常,业务没有处理完成,那么下次启动消费者是读取不到本次的消息的,所以可以采用手动确认的配置,确保本次消费者接收到了消息,并且业务正常处理完毕了,给kafak手动反馈接收成功。

在这里插入图片描述

2、示例

在这里插入图片描述

2.1、application.yml

在这里插入图片描述

2.2、消费者

package com.power.consumer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class EventConsumer {

    @KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.group}")
    public void onEvent4(String userJson,
                         @Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                         ConsumerRecord<String,String> record,
                         Acknowledgment ack){
        try {
            User user =JSONUtils.toBean(userJson,User.class);
            System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);
            System.out.println("读取/消费到的事件:"+record.toString());

            int a = 10/0;

            //业务确认完成,给kafka服务器反馈确认
            ack.acknowledge();//手动确认消息,就是告诉kafka服务器,该消息我已经接收到了,默认情况下是自动确认
            //手动确认后,下次启动消费者,偏移量会从新的位置开始;没有手动确认,下次启动消费者,偏移量还是从老位置开始
        }catch (Exception e){
            e.printStackTrace();
        }

    }

}

2.3、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent2(){
        User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();
        String userJson = JSONUtils.toJSON(user);
        kafkaTemplate.send("helloTopic",userJson);
    }

}

2.4、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot02KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent2(){
        eventProducer.sendEvent2();
    }

}

2.5、测试

先启动消费者监听程序
再启动生产者发送消息
程序再业务中出现了异常:

在这里插入图片描述
再次启动消费者程序,因为再上次启动时出现了异常,也没有进行手动确认接收,所以本地启动消费者后依然可以读取到上次未完成业务时接收到的数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2071085.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【动态规划】第 N 个泰波那契数

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 文章目录 题目讲解算法原理代码实现 题目 题目如下&#xff1a; 讲解算法原理 我们先说一下动态规划题目的整体做题思路&#xff1a; 第一步&#xff1a; 状态表示 什么是状态表示? 做动态规划类题目一般…

跟李沐学AI:样式迁移

样式迁移需要两张输入图像&#xff1a;一张是内容图像&#xff0c;另一张是样式图像。 我们将使用神经网络修改内容图像&#xff0c;使其在样式上接近样式图像&#xff0c;得到合成图片。类似手机相册中的滤镜效果。 奠基性工作&#xff1a;基于CNN的样式迁移 任务&#xff1…

vue3+vite+axios+mock从接口获取模拟数据实战

文章目录 一、安装相关组件二、在vite.config.js中配置vite-plugin-mock插件三、实现mock服务四、调用api接口请求mock数据方法一、直接使用axios 请求mock 数据方法二、对axios进行封装统一请求mock数据 五、实际运行效果 在用Vue.js开发前端应用时通常要与后端服务进行交互&a…

WPF 选择对应控件技巧

当界面控件过多&#xff0c;选择对应的控件是比较困难的。

白酒与青年文化:潮流与传统的碰撞

在时代的洪流中&#xff0c;青年文化如同一股涌动的潮流&#xff0c;不断冲击着传统的边界。而白酒&#xff0c;作为中国传统文化的瑰宝&#xff0c;也在这一潮流中找到了新的表达方式。今天&#xff0c;我们就来探讨一下白酒与青年文化之间的碰撞与整合&#xff0c;以及豪迈白…

项目问题 | vscode连接远程Linux服务器报错: “> Host key verification failed. > 过程试图写入的管道不存在”

远程连接服务器时报错&#xff1a; Please contact your system administrator. Add correct host key in C:\Users\LiHon/.ssh/known_hosts to get rid of this message. Offending ECDSA key in C:\Users\LiHon/.ssh/known_hosts:9 Host key for 124.71.71.215 has changed a…

七种有效将msvcp140.dll丢失的解决方法,快速修复msvcp140.dll错误

在使用Windows操作系统的计算机上安装或运行软件时&#xff0c;用户可能遭遇“msvcp140.dll丢失”这一常见错误。这个问题通常发生在尝试启动某些程序时&#xff0c;系统会弹出一个警告窗口&#xff0c;提示“无法继续执行代码&#xff0c;因为系统未找到msvcp140.dll”。这样的…

【学习笔记】AD实现原理图的元器件自动标号

【学习笔记】AD24实现原理图的元器件自动标号 在原理图绘制过程中&#xff0c;载入的元器件封装并不会默认标号&#xff0c;而是“&#xff1f;”的形式显示&#xff0c;为避免手动标号所带来的大量繁琐工作&#xff0c;自动标号会是一个很好的选择。 在 Altium Designer&…

【网络】传输层协议——TCP协议(初阶)

目录 1.TCP协议 1.1.什么是TCP协议 1.2.为什么TCP叫传输控制协议 1.2.TCP是面向字节流的 2.TCP协议段格式 2.1.流量控制——窗口大小&#xff08;16位&#xff09; 2.2.确认应答机制 2.2.1.什么是确认应答机制 5.2.2.推导确认应答机制 5.3.2.确认号和序列号 2.3.六位…

日志审计-graylog ssh登录超过6次告警

Apt 设备通过UDP收集日志&#xff0c;在gray创建接收端口192.168.0.187:1514 1、ssh登录失败次数大于5次 ssh日志级别默认为INFO级别&#xff0c;通过系统rsyslog模块处理&#xff0c;日志默认存储在/var/log/auth.log。 将日志转发到graylog vim /etc/rsyslog.conf 文件末…

四、前后端分离通用权限系统(4)

&#x1f33b;&#x1f33b; 目录 一、前端开发和前端开发工具1.1、前端开发介绍1.2、下载和安装 VS Code1.2.1、下载地址1.2.2、插件安装1.2.3、创建项目1.2.4、保存工作区1.2.5、新建文件夹和网页1.2.6、预览网页1.2.7、设置字体大小 二、Node.js2.1、Node.js 简介2.1.1、什么…

汇编知识MOV,MRS,MSR,PUSH和POP指令

处理器做得最多的事情就是在处理器内部来回的进行数据传递 1) 将数据从一个寄存器传递到另一个寄存器中 2) 将数据从一个寄存器传递到特殊寄存器&#xff0c;例如CPSR,SPSR寄存器 3) 将立即数传递到寄存器。 数据传输常用的三个指令&#xff1a;MOV,MRS,MSR指令 常用的…

微信小程序模板与配置(三)app.json对小程序进行全局性配置

全局配置文件及常用的配置项 小程序根目录下的app.json文件是小程序的全局配置文件。常用的配置项如下&#xff1a; pages 记录当前小程序所有页面的存放路径window 全局设置小程序窗口的外观tabBar 设置小程序底部的tabBar效果style 是否启用新版的组件样式 一、全局配置-w…

Python测试框架Pytest的使用

pytest基础功能 pytset功能及使用示例1.assert断言2.参数化3.运行参数4.生成测试报告5.获取帮助6.控制用例的执行7.多进程运行用例8.通过标记表达式执行用例9.重新运行失败的用例10.setup和teardown函数 pytset功能及使用示例 1.assert断言 借助python的运算符号和关键字实现不…

解决 VMware 中 Ubuntu文件系统磁盘空间不足

目录 问题引入 解决方案 第一步、在VMware中扩展容量&#xff1a; 第二步、查看磁盘空间使用情况&#xff1a; 第三步、安装分区工具&#xff1a; 第四步、启动该分区工具&#xff1a; 第五步、操作分区&#xff1a; 第六步、修改挂载文件夹的读写权限&#xff1a; 第七…

全网最全的yolo系列转换工具,从txt转xml,再从xml转txt,亲自测试好用

在训练yolo的过程中&#xff0c;难免涉及标注的数据格式转化&#xff0c;经过了几次修改和迭代&#xff0c;最终把转化代码跟大家一起分享。 先把xml转txt部分的代码分享一下&#xff0c;py_convert_xml2txt.py&#xff1a; # -*- coding:utf-8 -*-import os import shutil im…

GRAPHCARE:双向图神经网络 + 个性化知识图谱 + 大模型,打开医疗保健预测领域之门

GRAPHCARE&#xff1a;双向图神经网络 个性化知识图谱 大模型&#xff0c;医疗保健预测领域 关系图双向图神经网络个性化知识图谱GRAPHCARE框架创意视角 如果取消双向图神经网络&#xff0c;直接用医学大模型分析&#xff0c;还能做医疗保健预测领域吗&#xff1f;使用双向图…

防患未然:构建AIGC时代下开发团队应对突发技术故障与危机的全面策略

文章目录 一、快速响应与精准问题定位1. 实时监控与预警系统2. 高效的日志管理和分析3. 分布式追踪与调用链分析4. 紧急响应机制 二、建立健全的应急预案与备份机制1. 制定详尽的应急预案2. 定期应急演练3. 数据备份与快速恢复4. 冗余部署与负载均衡 三、事后总结与持续改进1. …

MATLAB 低版本Matlab-读取LAS格式点云文件并可视化(78)

las格式的文件属于标准的激光点云文件,也是最常见的点云文件,下面是读取并可视化方法 MATLAB 低版本Matlab-读取LAS格式点云文件并可视化(78) 一、LAS文件简介二、算法实现1.代码2.下载地址总结之前介绍过MATLAB自带的Las文件读取函数:(稳定,推荐使用该方法) MATLAB 读取…

第135天:内网安全-横向移动非约束委派约束委派数据库攻防

案例一: 横向移动-原理利用-非约束委派 该案例建立了解即可&#xff0c;真实环境应该不可能有人这样配置 非约束委派的原理和利用场景 原理&#xff1a; 机器 A &#xff08;域控&#xff09;访问具有非约束委派权限的机器 B 的服务&#xff0c;会把当前认证用户&#x…