SpringBoot Kafka发送消息与接收消息实例

news2024/9/20 1:52:29

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>SpringBootKafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootKafka</name>
    <description>SpringBootKafka</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- pom.xml -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>central</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/repository/public/</url>
            <layout>default</layout>
            <!-- 是否开启发布版构件下载 -->
            <releases>
                <enabled>true</enabled>
            </releases>
            <!-- 是否开启快照版构件下载 -->
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:
  kafka:
    bootstrap-servers: 192.168.110.105:9092
    #streams:
      #application-id: my-streams-app
    consumer:
      group-id: myGroupId
      auto-offset-reset: latest
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;

import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

@Slf4j
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final ObjectMapper objectMapper;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
    }

    public void sendMessage(String message) {
        log.info("KafkaProducer message:{}", message);
        //kafkaTemplate.send("test", message).addCallback();
        Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                future.get(); // 等待原始future完成
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

// 使用whenComplete方法
        completableFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("Error occurred: " + ex.getMessage());
                // 成功发送
            } else {
                System.out.println("Completed successfully");
            }
        });

        /*future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 成功发送
                RecordMetadata metadata = result.getRecordMetadata();
                System.out.println("Message sent successfully with offset: " + metadata.offset());
            } else {
                // 发送失败
                System.err.println("Failed to send message due to: " + ex.getMessage());
            }
        });*/


    }

    public void sendUser(User user) throws JsonProcessingException {
        //final ProducerRecord<String, String> record = createRecord(data);

        //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);

        //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);

        String userJson = objectMapper.writeValueAsString(user);
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);
        /*future.addCallback(
                success -> System.out.println("Message sent successfully: " + userJson),
                failure -> System.err.println("Failed to send message: " + failure.getMessage())
        );*/
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                future.get(); // 等待原始future完成
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        completableFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("Error occurred: " + ex.getMessage());
                // 成功发送
            } else {
                System.out.println("Completed successfully");
            }
        });
    }
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;


@Slf4j
@Service
public class KafkaConsumer {


    @KafkaListener(topics = "test", groupId = "myGroupId")
    public void consume(String message) {
        System.out.println("Received message: " + message);
        log.info("KafkaConsumer message:{}", message);
    }
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;

import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MessageController {

    private final KafkaProducer producer;

    @Autowired
    public MessageController(KafkaProducer producer) {
        this.producer = producer;
    }

    @GetMapping("/send-message")
    public String sendMessage() {
        log.info("MessageController sendMessage start!");
        producer.sendMessage("hello, Kafka!");
        log.info("MessageController sendMessage end!");
        return "Message sent successfully.";
    }

    @GetMapping("/send")
    public String sendMessage1() {
        log.info("MessageController sendMessage1 start!");
        User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();
        try {
            producer.sendUser(user);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        log.info("MessageController sendMessage1 end!");
        return "Message sendMessage1 successfully.";
    }
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2139833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

酷炫的航模直升机技术详解

1. 分类与级别&#xff08;400级至90级&#xff09; 航模直升机以其独特的飞行魅力和高难度的操作技巧&#xff0c;吸引了众多飞行爱好者。根据模型的尺寸、重量、动力系统及飞行性能&#xff0c;航模直升机大致可分为多个级别&#xff0c;从入门级的400级到专业级的90级及以上…

简单接口自动化框架实现(Python+requests+pytest)

1、接口自动化流程 1.需求分析2.挑选需要做自动化测试的功能3.设计测试用例4.搭建自动化测试环境[可选]5.设计自动化测试项目的架构[可选]6.编写代码7.执行测试用例8.生成测试报告并分析结果 2、框架结构 --api -->封装请求 --scripts -->编写测试脚本…

15. Springboot集成Redis

目录 1、前言 2、为什么选择Spring Boot集成Redis&#xff1f; 3、快速上手 3.1、引入依赖 3.2、 配置连接信息 3.3、自定义配置类 4、RedisTemplate的使用 4.1、String类型操作 4.2、 Hash类型操作 4.3、List类型操作 4.4、Set类型操作 4.5、SortedSet类型操作 4…

有了数据中台,是否需要升级到数据飞轮?怎么做才能升级到数据飞轮?

在数字化转型的时代&#xff0c;企业纷纷建设了“数据中台”&#xff0c;把各种业务数据整合在一起&#xff0c;仿佛是将所有材料都整理进了厨房的储物柜。 但是问题是&#xff1a;光有储物柜&#xff0c;能做出好吃的菜吗&#xff1f;答案显然是否定的。想要真正利用这些数据…

华为HarmonyOS地图服务 -- 如何实现地图呈现?-- HarmonyOS自学8

如何使用地图组件MapComponent和MapComponentController呈现地图&#xff0c;效果如下图所示。 MapComponent是地图组件&#xff0c;用于在您的页面中放置地图。MapComponentController是地图组件的主要功能入口类&#xff0c;用来操作地图&#xff0c;与地图有关的所有方法从此…

项目实现:云备份③(配置文件加载模块、数据管理模块的实现)

云备份 前言配置文件加载模块配置信息的设计单例文件配置类设计 数据管理模块数据信息类设计数据管理类实现数据持久化存储初始化加载实现 前言 书接上回&#xff1a;云备份&#xff08;实用工具类&#xff09;实现后&#xff0c;接下来会逐步实现不同模块的功能。会频繁用到工…

更主动的对话规划者:PPDPP论文解读

摘要 主动对话在大语言模型&#xff08;LLMs&#xff09;时代中是一个实际且具有挑战性的对话问题&#xff0c;其中对话策略规划是提升LLMs主动性的重要关键。现有的大多数研究通过各种提示方案或使用口头AI反馈迭代增强处理特定案例的能力&#xff0c;以实现LLMs的对话策略规…

元宇宙+教育:打造个性化、互动化学习生态系统!

近两年来&#xff0c;元宇宙风在全球迅速掀起了一股新浪潮。“元宇宙”成为各个行业的热门发展方向&#xff0c;各个行业正试图通过元宇宙寻求新的发展突破口&#xff0c;教育行业也不例外。 教育培训元宇宙作为一种前沿的教育模式&#xff0c;深度融合了虚拟现实、增强现实及…

SAP HCM HR_MAINTAIN_MASTERDATA自带解锁功能

导读 锁功能&#xff1a;在SAP HCM模块有针对人的加锁功能&#xff0c;今天遇到的一个问题是&#xff0c;人员无法被锁住&#xff0c;给我第一反应就是代码没有加锁&#xff0c;代码有问题&#xff0c;但是去看代码系统确实已经加锁&#xff0c;但是系统还是提示这个&#xff…

BPF 调度器 sched_ext 实现机制、调度流程及样例

本文地址&#xff1a;https://www.ebpf.top/post/bpf_sched_ext_dive_into 在文章 Linus 强势拍板合入: BPF 赋能调度器终成正果中&#xff0c;我们回顾了 BPF 在调度器在合入社区过程中的历程&#xff0c;补丁 V7 已经在为合并到 6.11 做好了准备&#xff0c;后续代码仓库也变…

4. 认识 LoRA:从线性层到注意力机制

如果你有使用过 AI 生图&#xff0c;那你一定对 LoRA 有印象&#xff0c;下图来自Civitai LoRA&#xff0c;上面有很多可供下载的LoRA模型。 你可能也曾疑惑于为什么只导入 LoRA 模型不能生图&#xff0c;读下去&#xff0c;你会解决它。 文章目录 为什么需要 LoRA&#xff1f;…

预训练数据指南:衡量数据年龄、领域覆盖率、质量和毒性的影响

前言 原论文&#xff1a;A Pretrainer’s Guide to Training Data: Measuring the Effects of Data Age, Domain Coverage, Quality, & Toxicity 摘要 预训练是开发高性能语言模型&#xff08;LM&#xff09;的初步和基本步骤。尽管如此&#xff0c;预训练数据的设计却严…

STM32 HAL freertos零基础(十一)中断管理

1、简介 在FreeRTOS中,中断管理是一个重要的方面,尤其是在嵌入式系统中。正确地处理中断可以确保系统的实时响应能力,并且能够在中断服务程序(ISR)中执行关键操作。FreeRTOS提供了一些机制来帮助开发者管理中断,并确保在多任务环境下中断处理的安全性和高效性。 任何中…

【AI大模型】Transformer模型:Postion Embedding概述、应用场景和实现方式的详细介绍。

一、位置嵌入概述 \1. 什么是位置嵌入&#xff1f; 位置嵌入是一种用于编码序列中元素位置信息的技术。在Transformer模型中&#xff0c;输入序列中的每个元素都会被映射到一个高维空间中的向量表示。然而&#xff0c;传统的自注意力机制并不包含位置信息&#xff0c;因此需要…

3CCD的工作原理

昨天看编辑送的一本《计算机视觉》中3CCD的工作原理错了&#xff0c;其实是百度百科错了&#xff0c;所以我想有人就照搬照抄错了。专业问题不要问百度&#xff0c;百度就是骗子一样的存在&#xff0c;这么多年就从来没有把心思放在做事上。3CCD通过光学棱镜分光后就已经是单色…

智能摄像头MP4格式化恢复方法

如果说生孩子扎堆&#xff0c;那很显然最近智能摄像头多碎片的恢复也扎堆了&#xff0c;这次恢复的是一个不知名的小品牌。其采用了mp4视频文件方案&#xff0c;不过这个案例的特殊之处在于其感染了病毒且不只一次&#xff0c;我们来看看这个小品牌的智能恢复头格式化的恢复方法…

Oracle发邮件功能:设置的步骤与注意事项?

Oracle发邮件配置教程&#xff1f;如何实现Oracle发邮件功能&#xff1f; Oracle数据库作为企业级应用的核心&#xff0c;提供了内置的发邮件功能&#xff0c;使得数据库管理员和开发人员能够通过数据库直接发送邮件。AokSend将详细介绍如何设置Oracle发邮件功能。 Oracle发邮…

基于web的 BBS论坛管理系统设计与实现

博主介绍&#xff1a;专注于Java .net php phython 小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设&#xff0c;从业十五余年开发设计教学工作 ☆☆☆ 精彩专栏推荐订阅☆☆☆☆☆不然下次找不到哟 我的博客空间发布了1000毕设题目 方便大家学习使用 感兴趣的可以…

Linux 基本使用和 web 程序部署 ( 8000 字 Linux 入门 )

一&#xff1a;Linux 背景知识 1.1. Linux 是什么 Linux 是一个操作系统. 和 Windows 是 “并列” 的关系&#xff0c;经过这么多年的发展, Linux 已经成为世界第一大操作系统&#xff0c;安卓系统本质上就是 Linux. 1.2 Linux 发行版 Linux 严格意义来说只是一个 “操作系…

【楚怡杯】职业院校技能大赛 “云计算应用” 赛项样题三

某企业根据自身业务需求&#xff0c;实施数字化转型&#xff0c;规划和建设数字化平台&#xff0c;平台聚焦“DevOps开发运维一体化”和“数据驱动产品开发”&#xff0c;拟采用开源OpenStack搭建企业内部私有云平台&#xff0c;开源Kubernetes搭建云原生服务平台&#xff0c;选…