SpringBoot基础Kafka示例

news2025/4/21 21:06:31

这里将生产者和消费者放在一个应用中

使用的Boot3.4.3

引入Kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:
  application:
    name: kafka-1

  #kafka连接地址
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #配置生产者
    producer:
      #消息发送失败重试次数
      retries: 0
      #一个批次可以使用内存的大小
      batch-size: 16384
      #一个批次消息数量
      buffer-memory: 33554432
      #键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all


    consumer:
      #是否自动提交
      enable-auto-commit: false
      #自动提交的频率
      auto-commit-interval: 1000
      #earliest	从分区的最早偏移量开始消费	需要消费所有历史消息  latest	从分区的最新偏移量开始消费,忽略历史消息	只关心新消息
      #none	如果没有有效的偏移量,抛出异常	严格要求偏移量必须存在
      #exception spring-kafka不支持
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #用于配置消费者如何处理消息的确认  ack配置方式  这里指定由消费者手动提交偏移量
      #Acknowledgment.acknowledge() 方法来提交偏移量
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3

server:
  port: 8099

生产者示例,一般可能是一个MQTT接收消息入口

package com.hrui.kafka1.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author hrui
 * @date 2025/3/10 14:56
 */
@RestController
public class EventProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/sendMessage")
    public String sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        return "Message sent to topic '" + topic + "': " + message;
    }


    @RequestMapping("/sendMessage2")
    public String sendMessage2() {
        //通过构建器模式创建Message对象
        Message<String> message = MessageBuilder.withPayload("Hello, Kafka!")
                .setHeader(KafkaHeaders.TOPIC, "ceshi")
                .build();
        kafkaTemplate.send(message);
        return "Message sent to topic";
    }

}

消费者示例

注意:如果配置了手动提交ack,那么

主要目的不仅仅是避免重复消费,而是为了确保消息的可靠处理和偏移量(offset)的正确提交。它可以避免重复消费,但更重要的是保证消息不会丢失,并且在消息处理失败时能够重新消费。

package com.hrui.kafka1.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @author hrui
 * @date 2025/3/10 15:57
 */
@Component
public class EventConsumer {

    @KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")
    public void onMessage(ConsumerRecord<String,String> message){
        System.out.println("接收到消息1:"+message.value());
    }

    @KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")
    public void onMessage(String message){
        System.out.println("接收到消息2:"+message);
    }

    @KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
    public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.GROUP_ID) String groupId) {
        try {
            System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);
            // 处理消息逻辑
            // ...


        } catch (Exception e) {
            // 处理异常,记录日志
            System.err.println("处理消息失败: " + e.getMessage());
            // 可以根据业务需求决定是否重新抛出异常
        }finally {
            // 手动提交偏移量
            ack.acknowledge();
        }
    }
}

生产者可选择异步或者同步发送消息

生产者发送消息有同步异步之说 那么消费者在消费消息时候 有没有同步异步之说呢???

在 Kafka 消费者中,消费消息的方式本质上是由 Kafka 的设计决定的,而不是由消费者代码显式控制的。Kafka 消费者在消费消息时,通常是以拉取(poll)的方式从 Kafka 服务器获取消息,然后处理这些消息。从这个角度来看,消费者的消费行为是同步的,因为消费者需要主动调用 poll 方法来获取消息。

然而,消费者的消息处理逻辑可以是同步异步的,具体取决于业务实现。以下是对消费者消费消息的同步和异步行为的详细分析:

 消费者的同步消费

在默认情况下,Kafka 消费者的消费行为是同步的,即:

  • 消费者通过 poll 方法从 Kafka 拉取一批消息。

  • 消费者逐条处理这些消息。

  • 每条消息处理完成后,消费者提交偏移量(offset)。

  • 消费者继续调用 poll 方法获取下一批消息。

特点:
  • 消息处理是顺序的,即一条消息处理完成后才会处理下一条消息。

  • 如果某条消息处理时间较长,会影响后续消息的处理速度。

  • 适合消息处理逻辑简单、处理时间较短的场景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {
    try {
        System.out.println("接收到消息:" + message.value());
        // 同步处理消息逻辑
        processMessage(message);
    } catch (Exception e) {
        System.err.println("处理消息失败: " + e.getMessage());
    } finally {
        ack.acknowledge(); // 手动提交偏移量
    }
}

private void processMessage(ConsumerRecord<String, String> message) {
    // 模拟消息处理逻辑
    try {
        Thread.sleep(1000); // 假设处理一条消息需要 1 秒
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

2. 消费者的异步消费

在某些场景下,消费者可能需要以异步的方式处理消息,即:

  • 消费者通过 poll 方法拉取一批消息。

  • 将每条消息提交到一个线程池或异步任务中处理。

  • 消费者继续调用 poll 方法获取下一批消息,而不等待上一条消息处理完成。

特点:
  • 消息处理是并发的,可以提高消息处理的吞吐量。

  • 需要额外的线程池或异步任务管理机制。

  • 适合消息处理逻辑复杂、处理时间较长的场景。

示例代码:

@Autowired
private ExecutorService executorService; // 注入线程池

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {
    if (!StringUtils.hasText(message.value())) {
        ack.acknowledge();
        return;
    }

    // 提交异步任务处理消息
    executorService.submit(() -> {
        try {
            System.out.println("接收到消息:" + message.value());
            processMessage(message); // 异步处理消息
        } catch (Exception e) {
            System.err.println("处理消息失败: " + e.getMessage());
        } finally {
            ack.acknowledge(); // 手动提交偏移量
        }
    });
}

private void processMessage(ConsumerRecord<String, String> message) {
    // 模拟消息处理逻辑
    try {
        Thread.sleep(1000); // 假设处理一条消息需要 1 秒
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

同步代码示例

@RequestMapping("/sendMessage2")
    public String sendMessage2(){
        //通过构建器模式创建Message对象
        Message<String> message = MessageBuilder.withPayload("Hello, Kafka!")
                .setHeader(KafkaHeaders.TOPIC, "ceshi")
                .build();

        CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);
        try {
            //阻塞等待拿结果
            SendResult<String, String> sendResult = send.get();
            System.out.println("说明消息发送成功,如果不成功会抛出异常");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return "Message sent to topic";
    }

异步注册回调的方式

 @RequestMapping("/sendMessage2")
    public String sendMessage2(){
        //通过构建器模式创建Message对象
        Message<String> message = MessageBuilder.withPayload("Hello, Kafka!")
                .setHeader(KafkaHeaders.TOPIC, "ceshi")
                .build();

        CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);
        //非阻塞  异步 注册回调异步通知
        send.thenAccept(result -> {
            System.out.println("消息发送成功");
        }).exceptionally(e->{
            System.out.println("发送失败");
            e.printStackTrace();
            return null;
        });


        return "Message sent to topic";
    }

如果需要发送的不是String类型 

那么要发送的不是String类型

KafkaTemplate<String,Object> kafkaTemplate;

一般来说可以专成JSON字符串发送

在引入spring-kafka的时候     KafkaAutoConfiguration中  配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate发送对象的时候

默认用的String序列化   会报错   除非将对象转为JSON字符串(一般可以这么做)

如果用对象的话   改成JsonSerializer  这样自动转JSON字符串

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2314330.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring 的三种注入方式?

1. 实例的注入方式 首先来看看 Spring 中的实例该如何注入&#xff0c;总结起来&#xff0c;无非三种&#xff1a; 属性注入 set 方法注入 构造方法注入 我们分别来看下。 1.1 属性注入 属性注入是大家最为常见也是使用最多的一种注入方式了&#xff0c;代码如下&#x…

STM32第一天建立工程

新建一个工程 1&#xff1a;新建一个文件&#xff0c;添加文件 a:DOC工程说明 》doc说明文档 b&#xff1a;Libraries固件库 》cmsis内核文件 &#xff08;一般这就是stm32内核文件&#xff09; 》FWLIB外设文件 &#xff08;这种就是stm32外设文件不全&#xff09; 》start…

搭建本地化笔记AI:用Copilot+deepseek+nomic-embed-text构建本地智能知识系统

安装Ollama https://ollama.com/ 下载模型 下载大语言模型 根据自己电脑的配置选择模型的大小 ollama run deepseek-r1:8b 下载向量处理模型 创建向量数据库时需要使用Embedding模型对文本进行向量化处理 ollama pull nomic-embed-text 查看安装的模型 ollama listNAME …

【蓝桥杯单片机】第十一届省赛

一、真题 二、创建工程 1.在C盘以外的盘新建文件夹&#xff0c;并在文件夹里面创建两个文件夹Driver 和Project 2.打开keil软件&#xff0c;在新建工程并选择刚刚建好的project文件夹&#xff0c;以准考证号命名 3.选择对应的芯片型号 4.选择否&#xff0c;即不创建启动文件 …

【存储中间件】Neo4J图数据库超详细教程(一):相关介绍、特点及优势、数据模型、软件安装

文章目录 Neo4J超详细教程一、Neo4J相关介绍1.为什么需要图数据库方案1&#xff1a;Google方案2&#xff1a;Facebook 2.特点和优势3.什么是Neo4j4.Neo4j数据模型图论基础属性图模型Neo4j的构建元素 5.软件安装 个人主页&#xff1a;道友老李 欢迎加入社区&#xff1a;道友老李…

xxl-job部署在docker-destop,实现定时发送预警信息给指定邮箱

XXL-JOB XXL-JOB是一个分布式任务调度平台&#xff08;XXL是作者徐雪里姓名拼音的首字母&#xff09;&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。 源码仓库地址&#xff1a;https://github.com/xuxueli/xxl-job 源码结构&#xff1a; 系统架构 在xxl-j…

【QT】QScrollBar设置样式:圆角、隐藏箭头、上边距等

目录 0.简介 1.原理 2.具体代码 0.简介 环境&#xff1a;Ubuntu22.04、qtDesigner绘制UI 项目需要&#xff0c;按照UI修改滚动条样式&#xff0c;滚动条我使用的是QScrollBar&#xff0c;默认样式和修改之后的样式如下&#xff1a; 1.原理 2.具体代码 我是用qtDesigner绘制…

trae中文版AI搭建完整可用的项目框架

Trae 是由字节跳动推出的 AI 原生集成开发环境&#xff08;AI IDE&#xff09;&#xff0c;号称可以搭建完整项目&#xff0c;个人试用后体验确实比Cursor或cline更便捷&#xff0c;因为他多个文件关联准确率更高。 正式版的trae不支持大陆使用&#xff0c;不过目前已经推出了…

cfi网络安全 网络安全hcip

目录 RIP (路由信息协议) 算法 开销 版本 开销值的计算方式 RIPV1和RIPV2的区别 RIP的数据包 Request(请求)包 Reponse(应答)包 RIP的特征 周期更新 RIP的计时器 1&#xff0c;周期更新计时器 2&#xff0c;失效计时器 3&#xff0c;垃圾回收计时器 RIP的核心思…

Banana Pi 与瑞萨电子携手共同推动开源创新:BPI-AI2N

2025年3月11日&#xff0c; Banana Pi 开源硬件平台很高兴宣布&#xff0c;与全球知名半导体解决方案供应商瑞萨电子&#xff08;Renesas Electronics&#xff09;正式达成技术合作关系。此次合作标志着双方将在开源技术、嵌入式系统和物联网等领域展开深度合作&#xff0c;为全…

linux 命令 ls

ls 是 Linux 系统中用于列出目录内容的核心命令&#xff0c;几乎所有日常操作都会用到。以下是其详细用法和常见场景说明 1. 基础语法 ls [选项] [目录/文件] 不指定目录时&#xff0c;默认列出当前目录的内容。 可以指定文件或目录路径&#xff0c;支持通配符&#xff08;如…

C#-扩展方法-Linq

密封类 sealed&#xff0c;无法被继承 var 可以定义匿名对象 static void test1() {var t 1;t "jack";//报错&#xff0c;类型已经确定好了var s new{id 1,name "tom"};Console.WriteLine(s.id s.name); } 扩展方法 对现有类型做方法的扩展&am…

Go红队开发—web网络编程

文章目录 web网络编程Req快速请求 调试DevModeDebugLogTraceInfo瓶颈分析 控制请求与响应控制请求的字段内容控制调试打印的内容分开dump请求与响应部分请求体设置 作用范围级别设置参数查询URL 路径参数表单请求设置请求头设置 判断响应状态码解析数据SetSuccessResultgjson响…

轻量级模块化前端框架:快速构建强大的Web界面

轻量级模块化前端框架&#xff1a;快速构建强大的Web界面 在当今快节奏的Web开发环境中&#xff0c;选择一个高效且灵活的前端框架至关重要。UIkit 是一个轻量级的模块化前端框架&#xff0c;旨在帮助开发者快速构建功能强大且响应迅速的Web界面。 UIkit提供了丰富的组件和工…

qt+opengl 播放yuv视频

一、实现效果 二、pro文件 Qt widgets opengl 三、主要代码 #include "glwidget.h"GLWidget::GLWidget(QWidget *parent) : QOpenGLWidget(parent) {connect(&m_timer, &QTimer::timeout, this,[&](){this->update();});m_timer.start(1000/33); }v…

5G基本概念

作者:私语茶馆 1. 5G应用场景概述 1.1.5G应用场景 ITU域2015年定义了三大应用场景:eMBB(增强型移动宽带)、uRLLC(低时延高可靠通信)、mMTC(海量物联网通信); emBB:Enhanced Mobile Broadband ,移动互联网应用,是4G MBB(移动宽带)的升级,主要侧重于网络速率、带…

PH热榜 | 2025-03-12

1. Fluently 标语&#xff1a;开始说英语&#xff0c;就像说你的母语一样流利。 介绍&#xff1a;想象一下&#xff0c;有一个像人类一样的英语教练&#xff0c;全天候在线、价格却便宜15倍。这就是 Fluently &#x1f680; 纠正你的错误&#xff0c;提升你的词汇量、发音和语…

Python Web项目的服务器部署

一.部署运行 1.虚拟环境的安装&#xff1a;&#xff08;一行一行运行&#xff09; wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p /opt/miniconda3 echo export PATH"/opt/miniconda3/bin:$PAT…

[项目]基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆

基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆 一.摇杆数据的扫描二.处理摇杆数据三.微调按键处理 一.摇杆数据的扫描 下面摇杆初始化时&#xff0c;启动了ADC-DMA进行了采集&#xff0c;已经开始转换直接将数据通过DMA存入buff数组中&#xff1a; static uint16_t buff[4] …

附下载 | 2024 OWASP Top 10 基础设施安全风险.pdf

《2024 OWASP Top 10 基础设施安全风险》报告&#xff0c;由OWASP&#xff08;开放网络应用安全项目&#xff09;发布&#xff0c;旨在提升企业和组织对基础设施安全风险、威胁与漏洞的意识&#xff0c;并提供高质量的信息和最佳实践建议。报告列出了2024年最重要的10大基础设施…