2. springboot集成kafka入门使用教程

news2024/12/24 11:32:58

 项目demo地址 : https://mp.weixin.qq.com/s?__biz=MzkzODQyNzE3

1. 项目结构

 ─src
    ├─main
    │  ├─java
    │  │  └─org
    │  │      └─example
    │  │          │  KafkaApplication.java
    │  │          │
    │  │          └─demo
    │  │                  KafkaConsumerListener.java (监听消息类)
    │  │
    │  └─resources
    │          application.yml
    │
    └─test
        └─java
            └─org
                └─example
                    └─demo
                            KafkaProducerTest.java (发送消息测试类)

2. kafka依赖

<!--kafka-->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>${kafka.version}</version>
</dependency>

3. 消息的发送

3.1 同步发送

同步发送是指发送消息后等待Kafka的响应,确认消息已成功发送。这个方式的优点在于可靠性高,但缺点是会阻塞当前线程,影响系统的响应速度。

    /**
     * 同步发送
     */
    @Test
    public void synchronizeSend() {
        Map<String, Object> map = new HashMap<>();
        map.put("say", "你好, kafka........");
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map));
        try {
            SendResult<String, String> result = future.get();
            log.info("获取同步消息结果:{}", result.getRecordMetadata().topic());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
3.2 异步发送

异步发送是指发送消息后立即返回,不等待Kafka的响应,而是通过回调函数处理发送结果。这种方式不会阻塞线程,更适合高并发的场景。

@Test
public void asynchronousSend() {
    Map<String, Object> map = new HashMap<>();
    map.put("say", "你好, kafka........");
    kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("发送失败:{}", ex.getMessage());
        }

        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info("异步消息结果:{}", result.getRecordMetadata().topic());
        }
    });
}
3.3 kafka事务

Kafka事务确保一组消息要么全部成功,要么全部失败,用于实现消息的原子性。适用于需要保证一致性的场景,例如订单处理。

@Test
public void transaction() {
    kafkaTemplate.executeInTransaction(t -> {
        t.send(TOPIC, "kafka事务消息...");
        if (true) {
            throw new RuntimeException("发生异常kafka回滚事务");
        }
        t.send(TOPIC, "你好, kafka........");
        return true;
    });
}

注意 :

如果想使用kafka事务需要在配置文件中开启事务

# 开启事务
transaction-id-prefix: tx_

开启事务后, 同步/异步发送消息都需要加@Transactional注解

4. 消息的接收

Kafka消息的接收是通过监听器实现的。监听器会自动接收指定主题的消息,并处理接收到的消息。以下是一个简单的示例:

/**
 * 监听kafka数据
 */
@KafkaListener(topics = {"test-topic"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {
    log.info("监听kafka消息>>>>>>>>>>>>>>>>>主题topic={}, 分区offset={}, 信息message={}", consumerRecord.topic(), consumerRecord.offset(), consumerRecord.value());
    // 收到监听数据后面可以进行入库等业务操作
}

原文地址(阅读体验更佳) : 2. springboot集成kafka入门使用教程 (yuque.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

跟李沐学AI:目标检测、锚框

边缘框 用于表示物体的位置&#xff0c;一个边缘框通过四个数字定义&#xff1a;(坐上x, 左上y, 右下x, 右下y)或&#xff08;左上x, 左上y, 宽, 高&#xff09; 通常物体检测或目标检测的数据集比图片分类的数据集小很多&#xff0c;因为物体检测数据集标注成本高很多。 目…

音视频相关知识

H.264编码格式 音频 PCM就是要把声音从模拟信号转换成数字信号的一种技术&#xff0c;他的原理简单地说就是利用一个固定的频率对模拟信号进行采样。 pcm是无损音频音频文件格式

【Qt】QWidget的font属性

QWidget的font属性 API说明 font() 获取当前 widget 的字体信息. 返回 QFont 对象. setFont(const QFont& font) 设置当前 widget 的字体信息. 关于Qfont 属性说明 family 字体家族. ⽐如 "楷体", "宋体", "微软雅⿊" 等. pointSiz…

“面试通关秘籍:高频题目与算法整理”

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…

Postman断言

目录 概述 断言工作原理 常用断言方法 Status code: Code is 200 Status code: Successful POST request Status code: Code name has string Response body: Contains string Response body: JSON value check Response body: ls equal to a string Response headers…

鸿萌数据恢复服务:SQL Server 中的 GAM、SGAM、IAM,及数据库损坏的修复方法

天津鸿萌科贸发展有限公司从事数据安全服务二十余年&#xff0c;致力于为各领域客户提供专业的数据恢复、数据备份、网络及终端数据安全等解决方案与服务。 同时&#xff0c;鸿萌是国际主流数据恢复软件(Stellar、UFS、R-Studio、ReclaiMe Pro 等)的授权代理商&#xff0c;为专…

开源的数据库增量订阅和消费的中间件——Cancl

目录 工作原理 MySQL主备复制原理 Canal 工作原理 主要功能和特点 应用场景 实验准备 安装JDK11 下载MySQL8.0 配置canal.admin 配置canal-deployer 测试数据读取 新增一台主机用做被同步的目标机器测试 官方地址&#xff1a;https://github.com/alibaba/canal?ta…

极狐 GitLab 依赖扫描:助力开发者管理软件供应链

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门面向中国程序员和企业提供企业级一体化 DevOps 平台&#xff0c;用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规&#xff0c;而且所有的操作都是在一个平台上进行&#xff0c;省事省心省钱。可以一键安装极狐GitL…

LeetCode.22。括号生成

题目描述&#xff1a; 数字 n 代表生成括号的对数&#xff0c;请你设计一个函数&#xff0c;用于能够生成所有可能的并且 有效的 括号组 输入输出实例&#xff1a; 思路&#xff1a;对于这道题目我们可以用回溯法&#xff0c;创建一个函数backtrack(当前字符&#xff0c;左括…

解锁 Starknet 的深层洞察:利用 Dune 构建动态数据可视化

原文&#xff1a;https://dev.to/lordghostx/queries-to-insights-visualizing-starknet-data-with-dune-j8p 作者&#xff1a;LordGhostX 编译&#xff1a;TinTinLand Starknet 的链上数据为其区块链生态系统提供了丰富的洞察。它为用户活动、交易模式和网络交互提供了全面…

【k8s从节点报错】error: You must be logged in to the server (Unauthorized)

k8s主节点可以获取nodes节点信息&#xff0c;但是从节点无法获取&#xff0c;且报错“error: You must be logged in to the server (Unauthorized)” 排查思路&#xff1a; 当时证书过期了&#xff0c;只处理的主节点的证书过期&#xff0c;没有处理从节点的 kubeadm alpha …

ctfshow-web入门-sql注入(web221、web222、web223)limit 注入与 group 注入

目录 1、web221 2、web222 3、web223 1、web221 limit 注入 分页 sql 格式&#xff1a;select * from table limit (start-1)*pageSize,pageSize; 其中 start 是页码&#xff0c;pageSize 是每页显示的条数。 比如&#xff1a; 查询第1条到第10条的数据的sql是&#xff…

倒计时启动!2024东北医院信息网络大会即将在这里举办!

随着全球医疗行业步入信息化转型的新时代&#xff0c;2024年8月24日至25日&#xff0c;以“科技赋能&#xff0c;重塑未来医疗”为主题的2024东北医院信息网络大会将在长春开曼宴都酒店&#xff08;长春市高新区海外街1号&#xff09;隆重举行。此次大会与国家卫健委、中医药管…

Python青少年简明教程:输入输出

Python青少年简明教程&#xff1a;输入输出 Python的输入输出是编程中的基本操作。Python的标准输入输出主要通过内置的input()函数和print()函数来实现。这两个函数使得从用户那里接收输入和向用户展示输出变得非常简单。 输入&#xff08;Input&#xff09;函数 input()函数…

Denosing RayDN-对同一射线的误检测优化

Denosing操作理解 DN-DETR增加denosing操作&#xff0c;帮助快速拟合&#xff0c;提高了目标检测任务中的效率和精度。通过这种方式&#xff0c;DN-DETR 克服了原始 DETR 的一些限制&#xff0c;使其在实际应用中具有更好的性能和鲁棒性。 GTBoxes通过随机偏移H, L,W进行偏移&…

Nuxt3【路由中间件】middleware

路由中间件类似路由守卫&#xff0c;即在导航到特定路由之前运行一段代码 内联路由中间件 在页面中定义的路由中间件&#xff0c;因没有名称&#xff0c;所以也叫匿名路由中间件 definePageMeta({middleware: [function (to, from) {console.log("执行了内联路由中间件&q…

[Meachines] [Medium] Popcorn SQLI+Upload File+PAM权限提升

信息收集 IP AddressOpening Ports10.10.10.6TCP:22&#xff0c;80 $ nmap -p- 10.10.10.6 --min-rate 1000 -sC -sV PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 5.1p1 Debian 6ubuntu2 (Ubuntu Linux; protocol 2.0) | ssh-hostkey: …

2024新型数字政府综合解决方案(七)

新型数字政府综合解决方案通过集成人工智能、大数据、区块链和云计算技术&#xff0c;创建了一个高度智能化和互联互通的政府服务平台&#xff0c;旨在全面提升行政效率、服务质量和透明度。该平台实现了跨部门的数据整合与实时共享&#xff0c;利用人工智能进行智能决策支持和…

RockerMQ学习

消息中间件以前常用RabbitMQ和ActiveMQ&#xff0c;由于业务需要&#xff0c;后期业务偏向大数据&#xff0c;现着重学习一下RocketMQ&#xff08;RocketqMQ原理同ctg-mq&#xff09;&#xff0c;后续更新Kafka 一、RocketMQ特性 Kafka特性 &#xff08;高性能分布式&#xff…

MySQL数据库进阶知识(三)《优化》

学习目标&#xff1a; 一周掌握SQL优化知识 学习内容&#xff1a; 一、插入数据 1.insert优化 批量插入 insert into tb_test values(1,Tom),(2,Cat),(3,Jerry);手动提交事务 start transaction; insert into tb_test values(1,Tom),(2,Cat),(3,Jerry); insert into tb_te…