手拉手springboot整合kafka发送消息

news2025/4/17 3:03:57
环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1717158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL排序字段无法唯一标识一条数据,导致分页查询结果出现重复或遗漏问题

问题描述 MySQL排序字段无法唯一标识一条数据&#xff0c;且排序字段可能有多个&#xff0c;多个排序字段无法唯一标识一条数据&#xff0c;即排序字段标识的相同数据过多&#xff0c;可能导致分页查询出现重复或遗漏的情况。 具体说明&#xff1a;      如果在排序条件中…

Stable Diffusion AI绘画:从创意词汇到艺术图画的魔法之旅

文章目录 一、Stable Diffusion的工作原理二、从提示词到模型出图的过程三、Stable Diffusion在艺术创作中的应用《Stable Diffusion AI绘画从提示词到模型出图》内容简介作者简介楚天 目录前言/序言本书特色特别提示 获取方式 在科技的飞速发展中&#xff0c;Stable Diffusion…

江协科技STM32学习-1 购买24Mhz采样逻辑分析仪

前言&#xff1a; 本文是根据哔哩哔哩网站上“江协科技STM32”视频的学习笔记&#xff0c;在这里会记录下江协科技STM32开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了江协科技STM32教学视频和链接中的内容。 引用&#xff1a; STM32入门教程-2023版 细致讲…

机器人系统ros2-开发学习实践16-RViz 用户指南

RViz 是 ROS&#xff08;Robot Operating System&#xff09;中的一个强大的 3D 可视化工具&#xff0c;用于可视化机器人模型、传感器数据、路径规划等。以下是RViz用户指南&#xff0c;帮助你了解如何使用RViz来进行机器人开发和调试。 启动可视化工具 ros2 run rviz2 rviz2…

【React篇】组件错误边界处理(组件错误引起的页面白屏)

我们知道在生产环境react错误会导致整个页面崩溃&#xff0c;显示为空白页面。 比如下图的错误&#xff0c;导致了左侧页面直接白屏&#xff1a; 由于某一个组件报错导致整个页面崩溃是很严重的问题&#xff0c;那么我们应该如何去降低代码报错带来的影响呢&#xff1f; 我们…

Rockchip芯片 写SN,IMEI,Mac等 写attenstation key 写Remote Key Provisioning

下载AP 写SN等 关机下 按住“音量” 插入USB线 进入loader 方式&#xff0c;在该模式下面写号&#xff0c;设备必须是已经有烧写过固件。 输入sn&#xff0c;点写入&#xff0c;成功。 点读取&#xff0c;成功。 两种设备模式&#xff1a;maskrom 和 loader 模式 maskrom 进…

灶新趋势,跌下神坛的电磁炉,为何被人嫌弃

在厨电市场的广阔天地中&#xff0c;电燃灶与电磁炉作为两种截然不同的烹饪工具&#xff0c;其间的竞争与演变始终牵动着消费者的心弦。近年来&#xff0c;电燃灶以其独特的优势崭露头角&#xff0c;而电磁炉则似乎从昔日的辉煌中跌下神坛&#xff0c;遭到越来越多人的嫌弃。这…

[排序算法]插入排序+希尔排序全梳理!

目录 1.排序是什么&#xff1f;1.1排序的概念1.2排序运用1.3常见的排序算法 2.插入排序分类3.直接插入排序基本思想具体步骤&#xff1a;动图演示代码实现直接插入排序的特性总结&#xff1a; 4. 希尔排序基本思想具体步骤动图演示代码实现希尔排序的特性总结&#xff1a; 5.总…

一种改进的经验小波变换方法(Python环境)

经验小波变换EWT是Gilles基于小波分析理论提出的一种新的自适应信号分解方法&#xff0c;该方法主要分为三个步骤&#xff1a;1.根据傅里叶谱的特性自适应划分频谱&#xff0c;获得一组边界&#xff1b;2.根据边界序列和Meyer小波构造滤波器组&#xff1b;3.滤波重构&#xff0…

Django——Admin站点(Python)

#前言&#xff1a; 该博客为小编Django基础知识操作博客的最后一篇&#xff0c;主要讲解了关于Admin站点的一些基本操作&#xff0c;小编会继续尽力更新一些优质文章&#xff0c;同时欢迎大家点赞和收藏&#xff0c;也欢迎大家关注等待后续文章。 一、简介&#xff1a; Djan…

Midjourney应用:电商模特换装

今天我们应用的是Midjourney应用&#xff1a;电商模特换装 网上找到一件衣服&#xff0c;没有模特 方法一&#xff1a;两图片融合&#xff0c;BLEND命令&#xff0c;效果不是很理想失真 方法二&#xff1a;服装图片垫图说明细节缺失https://cdn.discordapp.com/attachments/1…

map/set和unordered_map/unordered_set的区别及其适用情况

本专栏内容为&#xff1a;C学习专栏&#xff0c;分为初阶和进阶两部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握C。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&…

我喜欢的vscode插件

有个更全的&#xff1a;提高编程效率的30个VScode插件 Image preview&#xff08;图片预览&#xff09; any-rule&#xff08;正则表达式大全&#xff09; px to rem & rpx & vw(cssrem)&#xff08;px和rem之间转换&#xff09; 小程序开发助手 Auto Close Tag A…

【Vulhub】Fastjson 1.2.24_rce复现

文章目录 一&#xff0c;Fastjson是什么&#xff1f;二&#xff0c;fastjson漏洞原理三&#xff0c;判断是否有fastjson反序列化四&#xff0c;复现Fastjson 1.2.24_rce(vulhub)环境配置1.判断是否存在Fastjson反序列化2.反弹shell3.启动RMI服务器4.构造恶意POST请求 一&#x…

【赠书第26期】AI绘画教程:Midjourney使用方法与技巧从入门到精通

文章目录 前言 1 Midjourney入门指南 1.1 注册与登录 1.2 界面熟悉 1.3 基础操作 2 Midjourney进阶技巧 2.1 描述词优化 2.2 参数调整 2.3 迭代生成 3 Midjourney高级应用 3.1 创意启发 3.2 团队协作 3.3 商业应用 4 总结与展望 5 推荐图书 6 粉丝福利 前言 在…

使用QT可视化操作信号与槽函数详解

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言 二、QT信号与槽机制概述 三、实际操作步骤 四、案例演示 五、总结 一、引言 在…

防火墙技术基础篇:eNSP配置防火墙主备备份的双机热备

防火墙技术基础篇&#xff1a;配置主备备份的双机热备 防火墙双机热备&#xff08;High Availability, HA&#xff09;技术是网络安全中的一个关键组成部分&#xff0c;通过它&#xff0c;我们可以确保网络环境的高可靠性和高可用性。下面我们一起来了解防火墙双机热备的基本原…

在CentOS系统上安装Oracle JDK(华为镜像)

在CentOS系统上安装Oracle JDK(华为镜像) 先爱上自己&#xff0c;再遇见爱情&#xff0c;不庸人自扰&#xff0c;不沉溺过去&#xff0c;不为自己的敏感而患得患失&#xff0c;不为别人的过失而任性&#xff0c;这才是终身浪漫的开始。 https://repo.huaweicloud.com/java/jdk …

详解 Spark 核心编程之 RDD 算子

RDD 算子就是 RDD 的方法 一、转换算子 根据数据处理方式的不同可以分为单 Value 类型、双 Value 类型和 Key-Value 类型 1. map /**单 Value 类型算子函数签名&#xff1a;def map[U: ClassTag](f: T > U): RDD[U]功能&#xff1a;将处理的数据逐条进行映射转换&#xff0…

【CGAL】Region_Growing 检测平面并保存

目录 说明一、算法原理二、代码展示三、结果展示 说明 本篇博客主要介绍CGAL库中使用Region_Growing算法检测平面的算法原理、代码以及最后展示结果。其中&#xff0c;代码部分在CGAL官方库中提供了例子。我在其中做了一些修改&#xff0c;使其可以读取PLY类型的点云文件&…