SpringBoot集成系列--Kakfa

news2025/2/24 3:28:24

文章目录

  • 一、代码
    • 1、添加依赖
    • 2、配置kafka
    • 3、创建生产者
    • 4、创建消费者
    • 5、测试
  • 二、遇到问题
    • 1、could not be established. Broker may not be available
    • 2、Error while fetching metadata with correlation id xxx

一、代码

1、添加依赖

在pom.xml文件中添加Kafka的依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka

生产者项目的application.properties文件中配置kafka

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.producer.client-id=forlan-client-id
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.producer.client-id:指定Kafka生产者的客户端ID。这是一个用于标识生产者的唯一字符串。在诊断和监控时,这个ID可以帮助识别发送给Kafka的哪些消息是由哪个生产者发送的。
  • spring.kafka.producer.key-serializer:定义Kafka的消息的键的序列化器。
  • spring.kafka.producer.value-serializer:定义Kafka的消息的值的序列化器。

消费者项目的application.properties文件中配置kafka消费者

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.consumer.client-id=forlan-client-id
spring.kafka.consumer.group-id=forlan-group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.consumer.client-id:指定Kafka消费者的客户端ID。这是一个用于标识消费者的唯一字符串。在诊断和监控时,这个ID可以帮助识别从Kafka接收到的哪些消息是由哪个消费者接收的。
  • spring.kafka.consumer.group-id:指定Kafka消费者的组ID。在消费者组中,成员之间可以共享消息消费进度,以便在需要时进行重新消费或进行故障转移。
  • spring.kafka.consumer.key-deserializer:指定用于反序列化从Kafka接收的消息的键的类。
  • spring.kafka.consumer.value-deserializer:指定用于反序列化从Kafka接收的消息的值的类。

3、创建生产者

设置发送消息服务类

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void testSend(String topic, String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> stringStringSendResult = kafkaTemplate.send(topic, message).get();
        System.out.println(stringStringSendResult);
    }
}

4、创建消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {
	@KafkaListener(topics = "forlan_topic")
	public void listen(String message) {
		System.out.println("Kafka收到消息:" + message);
	}
}

5、测试

编写测试类,发送消息

@Autowired
private KafkaProducer kafkaProducer;

@Test
public void testKafka() throws ExecutionException, InterruptedException {
	kafkaProducer.testSend("forlan_topic", "Forlan测试发送Kafka消息"+ LocalDateTime.now());
}

执行测试类,效果如下:
在这里插入图片描述
消费者后,会收到消息,如下:
在这里插入图片描述

二、遇到问题

1、could not be established. Broker may not be available

[Consumer clientId=consumer-forlan-group-id-1, groupId=forlan-group-id] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

由于我们的kafka是装在容器内,使用下面这种方式,localhost指的是容器的ip,会有问题

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

正确的做法,应该是改为我们宿主机的ip,如下:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.56.100:9092

其实就是把localhost设置为具体的ip,宿主机的ip。

2、Error while fetching metadata with correlation id xxx

 [Producer clientId=forlan-client-id-1] Error while fetching metadata with correlation id 3 : {forlan-topic=LEADER_NOT_AVAILABLE}

重启kafka即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1305080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NFC物联网解决方案应用实例:基于NFC的通用物流链防伪溯源

NFC物联网系统解决方案已在某局进行推广应用&#xff0c;给出了某省内出口蔬菜水果检验检疫监管的物联网解决方案。 依据相关法规&#xff0c;出口蔬菜必须在质检总局注册种植基地进行种植&#xff0c;出口前按批次向产地检验检疫部门进行申报&#xff0c;按时在集中监管区统一…

Leetcode—2961.双模幂运算【中等】

2023每日刷题&#xff08;五十六&#xff09; Leetcode—2961.双模幂运算 实现代码 class Solution { public:int func(int a, int b) {int ans 1;for(int i 0; i < b; i) {ans * a;ans % 10;}return ans;}int func2(int a, int b, int m) {int ans 1;for(int i 0; i …

通过pull request执行结果运行自动化测试脚本

前提条件 已安装 Jenkins&#xff0c;并且安装插件Generic Webhook Trigger Plugin 配置 Jenkins创建一个 pipeline项目如图所示在Jenkins任务的build triggers中勾选Generic Webhook Trigger, 并且填写token在代码仓库管理平台&#xff08;截图的是bitbucket)&#xff0c;配…

Python之Requests库使用总结

概述 Requests是python中一个很Pythonic的HTTP库&#xff0c;用于构建HTTP请求与解析响应 Requests开发哲学 Beautiful is better than ugly.(美丽优于丑陋) Explicit is better than implicit.(直白优于含蓄) Simple is better than complex.(简单优于复杂) Complex is bett…

充电宝详解及推荐

一、充电宝选购攻略 二、充电宝的分类 &#xff08;1&#xff09;常规充电宝&#xff08;慢充&#xff09; 就是输出电压5V &#xff0c;输出电流1A或者2A的这种。 按照功率计算公示&#xff1a;PU*I&#xff0c; 这种充电宝给手机充电最快也就是&#xff1a;5.1V2.1A10.71W…

SpringBoot+Netty+Websocket实现消息推送

这样一个需求&#xff1a;把设备异常的状态每10秒推送到页面并且以弹窗弹出来&#xff0c;这个时候用Websocket最为合适&#xff0c;今天主要是后端代码展示。 添加依赖 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifact…

springboot3.2.x支持虚拟线程

背景&#xff1a; 大家都知道jdk21已经发布一段时间了&#xff0c;springboot3.2开始正式支持虚拟线程了&#xff1b; 支持虚拟线程&#xff1a; 1、spring.threads.virtual.enabledtrue 开启虚拟线程 2、Servlet Web 服务器 当启用虚拟线程时&#xff0c;Tomcat和Jetty将使…

KubeSphere应用【二】Docker安装

一、Docker安装 1.下载Docker安装包 【地址】Index of linux/static/stable/x86_64/ 2.上传至服务器 # 解压文件 tar -xvf docker-20.10.10.tgz# 将docker 目录中的所有文件复制至/usr/bin/目录下 cp docker/* /usr/bin 3.配置docker.service文件 vim /usr/lib/systemd/sy…

Make pixels dance:high-dynamic video generation

1.Introduction 大多数视频生成主要关注文本到视频的生成&#xff0c;PixelDance在文本指令的基础上&#xff0c;将图像指令分别用于视频剪辑的第一帧和最后一帧&#xff0c;第一帧图像指令描绘了视频剪辑的主要场景&#xff0c;最后一帧图像是可选的&#xff0c;描述了剪辑的…

2024年AI云计算专题研究报告:智算带来的变化

今天分享的人工智能系列深度研究报告&#xff1a;《2024年AI云计算专题研究报告&#xff1a;智算带来的变化》。 &#xff08;报告出品方&#xff1a;华泰证券&#xff09; 报告共计&#xff1a;32页 Al 云计算 2024:关注智算带来的新变化 通过对海内外主要云厂商及其产业链…

HarmonyOS 的应用开发语言:ArkTS

本心、输入输出、结果 文章目录 HarmonyOS 的应用开发语言&#xff1a;ArkTS前言ArkTS 产生背景ArkTS 语言特点ArkTS 基本语法ArkTS 声明式 UIArkTS 状态管理ArkTS 渲染控制 ArkTS 轻量化并发机制ArkTS 相关文档花有重开日&#xff0c;人无再少年实践是检验真理的唯一标准 Harm…

mysql语句大全及用法

常用的MySQL语句和简要用法&#xff0c;以帮助你开始学习和使用MySQL。 连接数据库 mysql -u username -p在命令行中使用以上命令来连接到MySQL数据库服务器。username 是你的MySQL用户名&#xff0c;执行后会提示输入密码。 显示数据库 SHOW DATABASES;列出数据库服务器上…

Nginx首页修改及使用Nginx实现端口转发

按照我之前博客给的方法搭建好这样一个CTF靶场 但是呢它默认是在8000端口 如何直接访问IP地址或者域名就可以实现直接访问到靶场呢 我们需要将80端口的内容转发到8000&#xff0c;使用nginx实现端口转发功能 首先我们安装nginx&#xff1a; 安装工具和库 yum -y install gc…

数据结构二维数组计算题,以行为主?以列为主?

1.假设以行序为主序存储二维数组Aarray[1..100,1..100]&#xff0c;设每个数据元素占2个存储单元&#xff0c;基地址为10&#xff0c;则LOC[5,5]&#xff08; &#xff09;。 A&#xff0e;808 B&#xff0e;818 C&#xff0e;1010 D&…

【数字信号处理】DFT

DFT 2023年11月18日 #elecEngeneer 文章目录 DFT1. 离散傅里叶变换-DFT2. 离散傅里叶反变换-IDFT3. DFT的误差下链 1. 离散傅里叶变换-DFT 离散傅里叶变换&#xff08;Discrete Fourier Transform&#xff0c;DFT&#xff09;&#xff0c;是当有 N {N} N 个信号采样点&#…

1,使用IDLE开启我们第一个Python程序

前面我们已经安装好了Python&#xff0c;安装了Python后&#xff0c;他会自动帮我们安装一个IDLE。IDLE是一个Python自带的非常简洁的集成开发环境&#xff08;IDE&#xff09;。他是一个Python Shell&#xff0c;我们可以利用Python Shell与Python交互。下面我们就利用IDLE开发…

10基于matlab的悬臂梁四节点/八节点四边形单元有限元编程(平面单元)

悬臂梁&#xff0c;有限元编程。基于matlab的悬臂梁四节点/八节点四边形单元有限元编程&#xff08;平面单元&#xff09;&#xff0c;程序有详细注解&#xff0c;可根据需要更改参数&#xff0c;包括长度、截面宽度和高度、密度、泊松比、均布力、集中力、单元数量等。需要就拍…

水の数列

这题目没有修改&#xff0c;所以可以考虑预处理 显然\(x\)从大到小或者从小到大&#xff0c;被选中的数字是单调的(尽管区间变化个数没有单调性) 所以我们可以考虑枚举\(x\) 我最开始想的是从大到小枚举\(x\)&#xff0c;但是维护有一点复杂&#xff0c;因为是删除 这个时候就要…

12.12 作业

1&#xff0c; 源代码&#xff1a; #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);speerornew QTextToSpeech(this);idstartTimer(1000);//每隔一秒&#xf…

Linux Ubuntu 手动搭建webDav

1、安装 因为需要跟 zotero 进行交互&#xff0c;因此需要在服务器搭建一个webDav 以下是搭建步骤&#xff1a; sudo apt-get update sudo apt-get install apache2 Ubuntu 安装apache2来实现 不同于Centos 安装好了之后&#xff0c;运行 a2enmod dav_fs a2enmod dav 激…