Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

news2025/1/15 12:44:56

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:
  kafka:
    bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号
    producer:
      acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入
      batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。
      buffer-memory: 33554432  #生产者用于缓存消息的内存大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。
      retries: 0
    consumer:
      group-id: test #消费者组ID
      #消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
      # earliest:无提交记录,表示从最早的消息开始消费
      #latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息
      enable-auto-commit: true #是否自动提交偏移量offset
      auto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式
      max-poll-records: 2  #一次 poll 操作最多返回的消息数量
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        #消费者与 Kafka 服务端的会话超时时间
        session.timeout.ms: 120000

        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        #消费者调用 poll 方法的最大间隔时间
        max.poll.interval.ms: 300000

        #消费者发送请求到 Kafka 服务端的超时时间
        #配置控制客户端等待请求响应的最长时间。
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000

        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true

        #消费者向协调器发送心跳的间隔时间。
        #poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000

        #每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        #0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        #仍然会返回该消息,以确保消费者可以进行

        #每个分区最多拉取的消息字节数。
        #max.partition.fetch.bytes=1048576  #1M

    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      #ack-mode: manual_immediate   手动 ACK 的方式。

      #如果监听的主题不存在, 是否启动失败。
      missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略

      #消费方式, single 表示单条消费, batch 表示批量消费
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch

      #并发消费的线程数
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲

      #默认的主题名称
    template:
      default-topic: "test"
      #springboot启动的端口号
server:
  port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)

    public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        for (String message : messages) {

            //因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象
            final JSONObject entries = JSONUtil.parseObj(message);
            System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));
            //ack.acknowledge();
        }
    }

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1911325.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql查询语句执行流程

流程图 连接器&#xff1a;建立连接&#xff0c;管理连接、校验用户身份&#xff1b;查询缓存&#xff1a;查询语句如果命中查询缓存则直接返回&#xff0c;否则继续往下执行。MySQL 8.0 已删除该模块&#xff1b;解析 SQL&#xff0c;通过解析器对 SQL 查询语句进行词法分析、…

构造二进制字符串

目录 LeetCode3221 生成不含相邻零的二进制字符串 #include <iostream> #include <vector> using namespace std;void dfs(string s,int n,vector<string>& res){if(s.size()n){res.push_back(s);return;}dfs(s"0",n,res);dfs(s"1"…

Zabbix Sia Zabbix 逻辑漏洞(CVE-2022-23134)

前言 CVE-2022-23134是一个中等严重度的漏洞&#xff0c;影响Zabbix Web前端。这个漏洞允许未经身份验证的用户访问setup.php文件的某些步骤&#xff0c;这些步骤通常只对超级管理员开放。利用这个漏洞&#xff0c;攻击者可以通过跳过某些步骤来重新配置Zabbix前端&#xff0c…

代码随想录算法训练营第四十八天| 115.不同的子序列、583. 两个字符串的删除操作、 72. 编辑距离

115.不同的子序列 题目链接&#xff1a;115.不同的子序列 文档讲解&#xff1a;代码随想录 状态&#xff1a;不会 思路&#xff1a; dp[i][j] 表示在 s 的前 j 个字符中&#xff0c;t 的前 i 个字符作为子序列出现的次数。 匹配的情况&#xff1a; 1.当 s[j-1] 与 t[i-1] 匹配…

软考高级里《系统架构设计师》容易考吗?

我还是22年通过的架构考试。系统架构设计师属于软考高级科目&#xff0c;难度比初级和中级都要大&#xff0c;往年的通过率也比较低&#xff0c;一般在10-20%左右。从总体来说&#xff0c;这门科目确实是不好过的&#xff0c;大家如果想要备考系统架构设计师的话&#xff0c;还…

营销宝典:4P理论全方位解读,营销人的必修课

这么说吧&#xff0c;4P是营销的一切&#xff0c;也是一切的营销&#xff01; 4P理论产生于20世纪60年代的美国&#xff0c;随着营销组合理论的提出而出现的。 1953年&#xff0c;尼尔博登&#xff08;NeilBorden&#xff09;在美国市场营销学会的就职演说中创造了“市场营销…

【Python】已解决:SyntaxError: invalid character in identifier

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决&#xff1a;SyntaxError: invalid character in identifier 一、分析问题背景 在Python编程中&#xff0c;SyntaxError: invalid character in identifier是一个常见的编译…

java Web 优秀本科毕业论文系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 优秀本科毕业论文系统是一套完善的web设计系统&#xff0c;对理解JSP java serlvet 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&a…

苍穹外卖--导入分类模块功能代码

把各层代码拷贝到所需文件夹下&#xff0c; 进行编译 在运行 提交和推送仓库

教程系列1 | 趋动云『社区项目』极速部署 SD WebUI

在上周&#xff0c;趋动云新推出的『社区项目』功能&#xff0c;以“一键克隆”的极致便捷与“省时省力”的高效体验&#xff0c;赢得了广大用户的关注。 随后&#xff0c;启动趋动云『社区项目』教程系列&#xff0c;旨在从零开始&#xff0c;全方位、手把手地引领您深入探索…

【鸿蒙学习笔记】Stage模型

官方文档&#xff1a;Stage模型开发概述 目录标题 Stage模型好处Stage模型概念图ContextAbilityStageUIAbility组件和ExtensionAbility组件WindowStage Stage模型-组件模型Stage模型-进程模型Stage模型-ArkTS线程模型和任务模型关于任务模型&#xff0c;我们先来了解一下什么是…

【Python】一文向您详细介绍 np.inner()

【Python】一文向您详细介绍 np.inner() 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1a;985高校的普通本硕&#xff0c;曾…

突破传统,实时语音技术的革命。Livekit 开源代理框架来袭

🚀 突破传统,实时语音技术的革命!Livekit 开源代理框架来袭! 在数字化时代,实时通信已成为我们日常生活的一部分。但你是否曾想象过,一个能够轻松处理音视频流的代理框架,会如何改变我们的沟通方式?今天,我们就来一探究竟! 🌟 什么是 Livekit 代理框架? Live…

【微信小程序开发】微信小程序界面弹窗,数据存储相关操作代码逻辑实现

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

【Stable Diffusion】(基础篇三)—— 关键词和参数设置

提示词和文生图参数设置 本系列笔记主要参考B站nenly同学的视频教程&#xff0c;传送门&#xff1a;B站第一套系统的AI绘画课&#xff01;零基础学会Stable Diffusion&#xff0c;这绝对是你看过的最容易上手的AI绘画教程 | SD WebUI 保姆级攻略_哔哩哔哩_bilibili 本文主要讲…

tk Text文本框赋值,清空

import tkinter as tk# 创建主窗口 root tk.Tk() root.title("文本框内容赋值示例")# 创建一个Text小部件 text_area tk.Text(root, height10, width50) text_area.pack()# 将内容赋值给Text小部件 text_area.insert(tk.END, "这是文本框中的内容。\n")#…

【多线程】wait()和notify()

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. 为什么需要wait()方法和notify()方法&#xff1f;2. wait()方法2.1 wait()方法的作用2.2 wait()做的事情2…

App UI性能测试 - PerfDog使用全教程

App 性能测试指标: 响应、内存、CPU、FPS、GPU渲染、耗电、耗流等。 PerfDog的性能数据更加全面,所以下面以PerfDog来介绍安装使用流程及测试数据的获取与分析。 官网: PerfDog | 全平台性能测试分析专家 第一步,先访问官网进行注册, 注册好账号后,点击下载PerfDog,下…

微信公众平台、公众号、小程序联动

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 微信公众平台、公众号、小程序联动 如何通过unionid获取到微信公众openid如何根据code获取微信公…

UNDO 表空间使用率高 active段占用高 无对应事务执行

UNDO表空间使用率告警&#xff0c;查看占用情况 active段占比很高 select tablespace_name,status,sum(bytes/1024/1024) mb from dba_undo_extents group by tablespace_name,status;不同状态的含义&#xff1a;**ACTIVE **&#xff1a;有活动事务在使用 Undo&#xff0c;这…