SpringBoot3 整合Kafka

news2025/2/3 10:08:56

官网:https://kafka.apache.org/documentation/

消息队列-场景

1. 异步

img

2. 解耦

img

3. 削峰

img

4. 缓冲

img

消息队列-Kafka

1. 消息模式

img

消息发布订阅模式,MessageQueue中的消息不删除,会记录消费者的偏移量

2. Kafka工作原理

img

同一个消费者组里的消费者是队列竞争模式:Consumer1消费Broker-0的news消息,Consumer2消费Broker-1的news消息,Consumer3消费Broker-2的news消息。如果有Consumer4,那他哪个分区都不能消费,就是消费的饥饿问题。

不同消费组中的消费者是发布/订阅模式:Consumer1和Consumer4都能消费0分区(Broker0)。

3. SpringBoot整合

参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

spring.kafka.bootstrap-servers=172.20.128.1:9092

修改C:\Windows\System32\drivers\etc\hosts文件,配置8.130.32.70 kafka

4. 消息发送

kafkaTemplate发送消息的内容是对象时,需要用json序列化,在配置文件中加上:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

@SpringBootTest
class Boot07KafkaApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;
    @Test
    void contextLoads() throws ExecutionException, InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        CompletableFuture[] futures = new CompletableFuture[10000];
        for (int i = 0; i < 10000; i++) {
            CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);
            futures[i]=send;
        }
        CompletableFuture.allOf(futures).join();
        watch.stop();
        System.out.println("总耗时:"+watch.getTotalTimeMillis());
    }
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}

5. 消息监听

@Component
public class OrderMsgListener {

    @KafkaListener(topics = "order",groupId = "order-service")
    public void listen(ConsumerRecord record){
        System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
    }

    @KafkaListener(groupId = "order-service-2",topicPartitions = {
            @TopicPartition(topic = "order",partitionOffsets = {
                    @PartitionOffset(partition = "0",initialOffset = "0")
            })
    })
    public void listenAll(ConsumerRecord record){
        System.out.println("收到partion-0消息:"+record);
    }
}

6. 参数配置

消费者

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another

生产者

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false

7. 自动配置原理

kafka 自动配置在KafkaAutoConfiguration

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

toConfiguration`

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1337664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机之变@2023:高端化之“殇”、技术革新与新生机

【潮汐商业评论/原创】 消费者越来越不爱换手机了。 “我的手机用3年了&#xff0c;没坏也没卡&#xff0c;使用需求基本都能满足&#xff0c;没什么可换的。现在的手机出再高的配置&#xff0c;但我的需求没那么高&#xff0c;换一次成本也不小&#xff0c;实在换不动了。”…

Bytebase:统一数据库 CI/CD 解决方案 | 开源日报 No.128

bytebase/bytebase Stars: 7.9k License: NOASSERTION Bytebase 是一个数据库 CI/CD 解决方案&#xff0c;为开发人员和 DBA 提供统一的工具来管理不同数据库系统的开发生命周期。其主要功能包括标准化操作流程、SQL 代码审查、GitOps 集成以及数据访问控制等。关键特性和核心…

【Jmeter】Jmeter基础9-BeanShell介绍

3、BeanShell BeanShell是一种完全符合Java语法规范的脚本语言,并且又拥有自己的一些语法和方法。 3.1、Jmeter中使用的BeanShell 在Jmeter中&#xff0c;除了配置元件&#xff0c;其他类型的元件中都有BeanShell。BeanShell 是一种完全符合Java语法规范的脚本语言,并且又拥…

ServletConfig和ServletContext对象

目录 1.ServletConfig对象 1.1ServletConfig对象是什么 1.2ServletConfig对象里的方法 1.3ServletConfig的配置方式 1.4ServletConfig实现步骤 2. ServletContext对象 2.1ServletContext对象是什么 2.2ServletContext对象里的方法 2.3ServletContext对象的配置方式 2.…

图像处理控件Aspose.page功能演示:在 C# 中将 TIFF 转换为 EPS

TIFF&#xff08;标记图像文件格式&#xff09;和EPS&#xff08;封装 PostScript&#xff09;是两种常见的图像文件格式。TIFF 通常用于存储高质量图像&#xff0c;而 EPS 广泛用于存储矢量图像。您可能出于多种原因想要将 TIFF 图像转换为 EPS 格式。例如&#xff0c;如果您想…

负载均衡——Ribbon

文章目录 Ribbon和Eureka配合使用项目引入RibbonRestTemplate添加LoadBalanced注解注意自定义均衡方式代码注册方式配置方式 Ribbon脱离Eureka使用 Ribbon&#xff0c;Nexflix发布的负载均衡器&#xff0c;有助于控制HTTP和TCP客户端的行为。基于某种负载均衡算法&#xff08;轮…

大数据-Hive练习-环比增长率、同比增长率、复合增长率

目录 &#x1f959;12.1 环比增长率 1. 概述 2. 公式 3. 示例 4.练习-需求:计算各类商品的月环比增长率 &#x1f959;12.2 同比增长率 1. 概述 2. 公式 3. 示例 4. 练习-需求:计算各类商品的月同比增长率 &#x1f959;12.3 复合增长率 1. 概述 2. 公式 3. 示例…

Unity中Shader裁剪空间推导(正交相机到裁剪空间的转化矩阵)

文章目录 前言一、正交相机视图空间 转化到 裁剪空间 干了什么1、正交相机裁剪的范围主要是这个方盒子2、裁剪了之后&#xff0c;需要把裁剪范围内的坐标值化到[-1,1]之间&#xff0c;这就是我们的裁剪空间。3、在Unity中&#xff0c;设置相机为正交相机4、在这里设置相机的近裁…

Ubuntu16.04下载安装藏文字体详细教程(附图)

Ubuntu16.04下安装藏文字体详细教程&#xff08;附图&#xff09; 你是不是也被ubuntu系统中藏文或者中文总是不显示且乱码的问题困扰呢&#xff0c;那么你可以看看我的解决方法。 在没有装藏文或中文字体前你在打开一个文本文件的时候是不是下面这样的 安装步骤 上传或下载若…

学习笔记13——Spring整合Mybatis、junit、AOP、事务

学习笔记系列开头惯例发布一些寻亲消息 链接&#xff1a;https://baobeihuijia.com/bbhj/ Mybatis - Spring&#xff08;使用第三方包new一个对象bean&#xff09; 原始的Mybatis与数据库交互【通过sqlmapconfig来配置和连接】 初始化SqlSessionFactory获得连接获取数据层接口…

人工智能_机器学习078_聚类算法_概念介绍_聚类升维_降维_各类聚类算法_有监督机器学习_无监督机器学习---人工智能工作笔记0118

首先看一下什么是聚类,我们可以进入sklearn的官网去看看 可以看到这里,首先classification 这个分类我们学完了,然后就是regression回归我们也学完了对吧,其实我们现实生活中的,大部分问题就是 这两种问题就可以解决了. 然后我们再来看一个: clustering,这个就是聚类对吧.聚类算…

【JVM】对象

一、对象的内存布局 以Hotspot虚拟机为例&#xff0c;对象在内存中的结构可以分为三部分&#xff1a;对象头&#xff08;header&#xff09;、实例数据&#xff08;instance data&#xff09;、对齐填充&#xff08;padding&#xff09;。 1.1.对象头 对象头的结构大体相似&…

LeetCode day31

LeetCode day31 被创新实践的机器学习大作业和数据库作业折磨力&#xff0c;临近期末&#xff0c;各种大作业以及ddl&#xff0c;搞的咱只能偶尔刷刷力扣&#xff0c;但是csdn就挺难去发布了,大家期末也好好复习过个好年啦&#xff0c;O(∩_∩)O 409. 最长回文串 给定一个包含…

Unity so文件的问题

文章目录 问题在面板上无法显示子节点如何保存继承于so的类必须放置在单个脚本so类文件名和类名要一致 问题 最近自己在写一个行为树出现一些问题记录一下首先NodeTree肯定是so文件但是node可以是单纯的类&#xff0c;也可以是so。后来我发现只能是so 在面板上无法显示 第一…

2024年大学计算机等级考试报名注意事项及照片处理方法

计算机等级考试&#xff08;National Computer Rank Examination&#xff0c;简称NCRE&#xff09;是由国家教育部主管&#xff0c;全国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试中心主办的全国性计算机技术与应用能力水平考试。该考试旨在评估和认证考生…

C++继承与派生——(3)公有继承的访问权限的变化

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 缺乏明确的目标&#xff0c;一生将庸庸…

利用Jmeter做接口测试(功能测试)全流程分析!

利用Jmeter做接口测试怎么做呢&#xff1f;过程真的是超级简单。 明白了原理以后&#xff0c;把零碎的知识点填充进去就可以了。所以在学习的过程中&#xff0c;不管学什么&#xff0c;我一直都强调的是要循序渐进&#xff0c;和明白原理和逻辑。这篇文章就来介绍一下如何利用…

中小工厂更适合什么样的自动仓储管理系统?

阅读本文你将了解中小工厂更适合什么样的自动仓储管理系统&#xff1a;一、确定自身规模&#xff1b;二、考虑功能配置&#xff1b;三、寻求拓展能力。 “我们工厂年产值3亿左右&#xff0c;算是个中小工厂吧&#xff0c;但是上了精细化的仓库管理系统之后&#xff0c;为了适应…

工具系列:TimeGPT_(2)使用外生变量时间序列预测

文章目录 TimeGPT使用外生变量时间序列预测导入相关工具包预测欧美国家次日电力价格案例 TimeGPT使用外生变量时间序列预测 外生变量在时间序列预测中非常重要&#xff0c;因为它们提供了可能影响预测的额外信息。这些变量可以包括假日标记、营销支出、天气数据或与你正在预测…

一个卖美妆的 一个月招了数十万代理!月销售额破亿 你敢相信吗?

商业模式永不过时 大家好&#xff0c;我是吴军&#xff0c;一家软件公司的产品经理 今天我们来聊一下这个纪炫商城 其实&#xff0c;说这个纪炫商城之前&#xff0c;我想跟各位企业家老板聊几句实在话 作为公司两百多号技术的&#xff0c;一个拥有五年软件开发经验的产品经理…