kafka安装配置及集成springboot

news2025/1/12 20:47:02

1. 安装

单机安装kafka
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
dockerhub网址: https://hub.docker.com

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题,因为创建容器时参数设置与wurstmeister/kafka不同)

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:latest

  • 测试
    终端窗口A
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181   (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test   (创建生产者)
>hello    (发送消息)
>haha

终端窗口B

[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   (创建接收者)
hello    (收到了消息)
haha
  • 安装kafka可视化工具(运行容器后打不开,不知道为啥)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="192.168.200.131:2181" nickzurich/efak:latest

集群安装

  1. kafka.yml
version: '3.8'
services:
  zookeeper:
    image: zookeeper:3.7.0
    restart: always
    hostname: 192.168.200.131
    container_name: zookeeper
    privileged: true
    ports:
      - 2181:2181
    volumes:
      - /usr/local/server/zookeeper/data/:/data
    build:
      context: .
      network: host

  kafka1:
    container_name: kafka1
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9092:9092
      - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_PORT: 9092
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19092"
      JMX_PORT: 19092
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper
  kafka2:
    container_name: kafka2
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9093:9093
      - 19093:19093
    environment:
      KAFKA_BROKER_ID: 2
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_PORT: 9093
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19093"
      JMX_PORT: 19093
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper
  kafka3:
    container_name: kafka3
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9094:9094
      - 19094:19094
    environment:
      KAFKA_BROKER_ID: 3
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_PORT: 9094
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19094"
      JMX_PORT: 19094
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper

  eagle:
    image: gui66497/kafka_eagle
    container_name: eagle_monitor
    restart: always
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "8048:8048"
    environment:
      ZKSERVER: "192.168.200.131:2181"
  1. 命令

docker-compose -f kafka.yml up -d
docker-compose -f kafka.yml down
docker-compose -f kafka.yml ps

[root@192 images]#  ls
kafka.yml
[root@192 images]# docker-compose -f kafka.yml up -d
[+] Running 6/6
 ⠿ Network images_default   Created                                                                                        0.1s
 ⠿ Container kafka2         Started                                                                                        1.0s
 ⠿ Container kafka3         Started                                                                                        1.0s
 ⠿ Container zookeeper      Started                                                                                        1.0s
 ⠿ Container kafka1         Started                                                                                        1.0s
 ⠿ Container eagle_monitor  Started                                                                                        1.5s
[root@192 images]# 

// 但是还是用不了eagle,不知道为啥,防火墙是已经关了

2. springboot集成

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,这里采用这种方式

2.1 创建单点kafka和topic

[root@192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root@192 images]# docker run -d --name kafka \
> --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
> --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
> --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
> --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
> --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
> --net=host wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root@192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.

2.2 创建生产者

dependencies

<!-- kafkfa -->
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <exclusions>
           <exclusion>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka-clients</artifactId>
           </exclusion>
       </exclusions>
   </dependency>
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>1.2.83</version>
   </dependency>

application.yml

server:
  port: 8080
spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 192.168.200.131:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

controller-发送消息

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("test","springboot发的第一条消息");
        return "ok";
    }

    @GetMapping("/helloUser")
    public String helloUser(){
        User user = new User();
        user.setName("xiaowang");
        user.setAge(18);

        kafkaTemplate.send("user-topic", JSON.toJSONString(user));

        return "ok";
    }
}

User

public class User {
    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

2.3 创建消费者

dependencies

<!-- kafkfa -->
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <exclusions>
           <exclusion>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka-clients</artifactId>
           </exclusion>
       </exclusions>
   </dependency>
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>1.2.83</version>
   </dependency>

application.yml

server:
  port: 8081
spring:
  application:
    name: kafka-consumer
  kafka:
    bootstrap-servers: 192.168.200.131:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

User

public class User {
    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

消息监听器

@Component
public class HelloListener {

    @KafkaListener(topics = "test")
    public void onMessage1(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user.toString());
        }

    }
}

启动生产者和消费者项目,浏览器输入http://127.0.0.1:8080/hello,发现消费者收到消息
在这里插入图片描述
浏览器输入http://127.0.0.1:8080/helloUser,发现消费者收到消息
在这里插入图片描述
项目结构
在这里插入图片描述

3.其它

通常在监听类直接调用service方法

@Component
@Slf4j
public class ArtilceIsDownListener {

    @Autowired
    private ApArticleConfigService apArticleConfigService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1669038.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《C++学习笔记---初阶篇6》---string类 上

目录 1. 为什么要学习string类 1.1 C语言中的字符串 2. 标准库中的string类 2.1 string类(了解) 2.2 string类的常用接口说明 2.2.1. string类对象的常见构造 2.2.2. string类对象的容量操作 2.2.3.再次探讨reserve与resize 2.2.4.string类对象的访问及遍历操作 2.2.5…

PPMP_char3

PMPP char3 – Multidimensional grids and data ​ 五一过后&#xff0c;有些工作要赶&#xff0c;抽出时间更新一下。这一章基本都熟练掌握&#xff0c;在做习题过程中有一些思考。这里涉及到了一点点GEMM&#xff08;矩阵乘&#xff09;&#xff0c;GEMM有太多可深挖的了&a…

Ubuntu24 文件目录结构——用户——权限 详解

目录 权限 用户 文件目录结构 一个目录可以有程序&#xff0c;目录&#xff0c;文件&#xff0c;以及这三者的链接。可以看到还分别有使用者和权限信息。 每个文件和目录都有与之关联的三个主要属性&#xff1a;所有者&#xff08;owner&#xff09;、组&#xff08;group&a…

【ESP32接入ATK-MO1218 GPS模块】

【ESP32接入ATK-MO1218 GPS模块】 1. 引言2. ATK-MO1218 GPS模块概述3. 接入ATK-MO1218 GPS模块的步骤4. 示例代码5. 结论1. 引言 在现代的嵌入式系统和物联网项目中,精确的位置信息是至关重要的。ATK-MO1218 GPS模块作为一款高性能的GPS/北斗双模定位模块,为开发者提供了强…

【Qt 学习笔记】Qt常用控件 | 容器类控件 | Tab Widget的使用及说明

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt常用控件 | 容器类控件 | Tab Widget的使用及说明 文章编号&#xf…

实现红黑树

目录 红黑树的概念 红黑树的节点结构定义 红黑树的插入 红黑树的验证 实现红黑树完整代码 红黑树的概念 红黑树 &#xff0c;是一种 二叉搜索树 &#xff0c;但 在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是 Red 或 Black 。 通过对 任何一条从根到叶子的…

You Only Cache Once:YOCO 基于Decoder-Decoder 的一个新的大语言模型架构

这是微软再5月刚刚发布的一篇论文提出了一种解码器-解码器架构YOCO&#xff0c;因为只缓存一次KV对&#xff0c;所以可以大量的节省内存。 以前的模型都是通过缓存先前计算的键/值向量&#xff0c;可以在当前生成步骤中重用它们。键值(KV)缓存避免了对每个词元再次编码的过程&…

基于SSM的“网约车用户服务平台”的设计与实现(源码+数据库+文档)

基于SSM的“网约车用户服务平台”的设计与实现&#xff08;源码数据库文档) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统功能 首页 站内新闻浏览 打车信息查询功能 在线打车功能…

Linux 服务器配置共享文件夹(NFS)

一、准备三台 linux 服务器 三台服务器: manger:172.16.11.178 ap1:172.16.11.179 ap2:172.16.11.180 /root/serverfiles/ 为共享目录 二、配置步骤 1、在服务端01的机器上安装nfs和rpcbind程序 yum -y install nfs* yum -y install rpcbind* 2、在安装完nfs以及rpcb…

MySQL查询篇-聚合函数-窗口函数

文章目录 distinct 关键字聚合函数常见的聚合函数group by和having 分组过滤 窗口函数with as窗口聚合函数排名窗口函数值窗口函数 distinct 关键字 distinct 去重数据&#xff0c;ps:null值也会查出来 select distinct column from table;聚合函数 常见的聚合函数 select …

【保姆级教程】VMware Workstation Pro的虚拟机导入vritualbox详细教程

解决方案 1、OVF格式2、VMX格式 1、OVF格式 选定需要导出的虚拟机&#xff08;关闭或者挂起状态下&#xff09;依次选择文件-导出为ovf 在Vritualbox导入刚刚导出的.ovf文件 更改路径&#xff0c;按实际需要修改 成功导入 2、VMX格式 如果在VMware Workstation Pro导出的…

rs6(vmp)瑞某,药某局,商某局,专某局,维某网,cookie + 后缀 的分析解析

文章目录 说在前面rs vmp 特征 介绍解决方法算法补环境运行报错 代码联调补环境框架 补环境导出结果导出cookie导出后缀 效果展示 vx lyj_txd qq 1416279170 # 加我备注来意说在前面 免责声明&#xff1a; 本篇文章只做学习讨论&#xff0c;无商务用途&#xff0c; 未对目标…

APP反抓包 - 服务端证书验证

案例引入: app:泡泡聊天 版本:v1.7.4 发送登录请求,抓包发现提示:403 Forbidden 这里就是使用了服务端证书校验,因为charles没有安装证书,所以到达服务器的响应没有通过验证,返回异常。 美之图: 一,校验逻辑 在安卓开发时,在客户端预设证书(p12/bks),客户端…

C++基础与深度解析 | C++初探 | Hello World | 系统I/O | 控制流 | 结构体与自定义数据类型

文章目录 一、从Hello World谈起二、系统I/O三、控制流四、结构体与自定义数据类型 一、从Hello World谈起 #include <iostream>void fun(const char *pInfo) {std::cout << pInfo << std::endl; }int main() {fun("Hello World!");fun("Hel…

从 Oracle 到 TiDB,国有大行打造本地生活 APP 新体验

导读 本文介绍了某国有大行推出的本地生活服务类 APP 在数字时代的创新应用实践。该 APP 利用金融科技和互联网平台模式&#xff0c;打造“金融非金融”的线上生态服务平台&#xff0c;满足了用户多样化的生活需求。为应对用户增长和数据量增加带来的挑战&#xff0c;该 APP 决…

【网络编程】Servlet的前后端练习 | 表白墙 | 前后端交互 | 提交消息 | 获取消息

文章目录 一、Servlet的前后端练习1.表白墙服务器要实现的逻辑&#xff1a;1.获取消息 &#xff1a;2.提交消息&#xff1a;完整前端代码&#xff1a;完整后端代码&#xff1a; 一、Servlet的前后端练习 1.表白墙 服务器要实现的逻辑&#xff1a; 1.页面加载时&#xff0c;网…

47-Qt控件详解:Buttons Containers1

一 QPushButton (命令按钮) #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QPushButton>//引入QPushButton类对应的头文件class MainWindow : public QMainWindow {Q_OBJECTpublic:MainWindow(QWidget *parent nullptr);~MainWind…

YOLOv8独家原创改进: AKConv(可改变核卷积)

1.AKConv原理介绍 地址:2311.11587 (arxiv.org) 摘要:基于卷积运算的神经网络在深度学习领域取得了令人瞩目的成果,但标准卷积运算存在两个固有的缺陷。一方面,卷积运算仅限于局部窗口,无法捕获其他位置的信息, 并且它的采样形状是固定的。 另一方面,卷积核的大小固定为…

vue3土味情话pinia可以持久保存再次修改App样式

我是不是你最疼爱的人-失去爱的城市 <template><div class"talk"><button click"getLoveTalk">土味情话</button><ul><li v-for"talk in talkStore.talkList" :key"talk.id">{{ talk.title }}<…

HarmonyOS开发案例:【UIAbility内和UIAbility间页面的跳转】

UIAbility内和UIAbility间页面的跳转&#xff08;ArkTS&#xff09; 介绍 基于Stage模型下的UIAbility开发&#xff0c;实现UIAbility内和UIAbility间页面的跳转。包含如下功能&#xff1a; UIAbility内页面的跳转。跳转到指定UIAbility的首页。跳转到指定UIAbility的指定页…