使用Linux部署Kafka教程

news2025/1/11 0:09:39

目录

一、部署Zookeeper

1 拉取Zookeeper镜像

2 运行Zookeeper

二、部署Kafka

1 拉取Kafka镜像

2 运行Kafka

三、验证是否部署成功

1 进入到kafka容器中

2 创建topic 生产者

3 生产者发送消息

4 消费者消费消息

四、搭建kafka管理平台

五、SpringBoot整合Kafka 

1、导入依赖

2、修改配置

3、生产者

 4、消费者

5、测试发送消息

 6、测试收到消息


一、部署Zookeeper

1 拉取Zookeeper镜像

docker pull wurstmeister/zookeeper
  • 1

2 运行Zookeeper

docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2  \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper

二、部署Kafka

1 拉取Kafka镜像

docker pull wurstmeister/kafka

2 运行Kafka

docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
 -p 9092:9092 \
 -e KAFKA_BROKER_ID=0 \
 -e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \
 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \
 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
 -v /etc/localtime:/etc/localtime \
 -d wurstmeister/kafka

参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

三、验证是否部署成功

1 进入到kafka容器中

docker exec -it kafka /bin/sh

2 创建topic 生产者

cd opt/kafka_2.13-2.8.1

bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic

在这里插入图片描述

3 生产者发送消息

bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic

在这里插入图片描述

4 消费者消费消息

  • 新打开个ssh窗口
  • 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning

在这里插入图片描述

四、搭建kafka管理平台

 docker search kafdrop

docker run -d --rm  -p 9000:9000 \
    -e JVM_OPTS="-Xms32M -Xmx64M" \
    -e KAFKA_BROKERCONNECT=<host:port,host:port> \
    -e SERVER_SERVLET_CONTEXTPATH="/" \
    obsidiandynamics/kafdrop
 
<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留
 
上面的命令是百度的
 
以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \
    -e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \
    -e KAFKA_BROKERCONNECT=192.168.58.130:9092 \
    -e SERVER_SERVLET_CONTEXTPATH="/" \
    obsidiandynamics/kafdrop
 
因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动

访问地址:Kafdrop: Broker List 

五、SpringBoot整合Kafka 

1、导入依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、修改配置

spring:
  kafka:
    bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

本次测试:linux地址:192.168.58.130

spring.kafka.bootstrap-servers=192.168.58.130:9092

advertised.listeners=192.168.58.130:9092

3、生产者

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 事件的生产者
 */
@Slf4j
@Component
public class KafkaProducer {

    @Autowired
    public KafkaTemplate kafkaTemplate;

    /** 主题 */
    public static final String TOPIC_TEST = "Test";
    /** 消费者组 */
    public static final String TOPIC_GROUP = "test-consumer-group";

    public void send(Object obj){
        String obj2String = JSON.toJSONString(obj);
        log.info("准备发送消息为:{}",obj2String);

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        //回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                //成功的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());
            }
        });
    }



}

 4、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 事件消费者
 */
@Component
public class KafkaConsumer {
    private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)
    public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}

5、测试发送消息

@Test
    void kafkaTest(){
        kafkaProducer.send("Hello Kafka");
    }

 6、测试收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/931866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

求生之路2私人服务器开服搭建教程centos

求生之路2私人服务器开服搭建教程centos 大家好我是艾西&#xff0c;朋友想玩求生之路2(left4dead2)重回经典。Steam玩起来有时候没有那么得劲&#xff0c;于是问我有没有可能自己搭建一个玩玩。今天跟大家分享的就是求生之路2的自己用服务器搭建的一个心路历程。 &#xff0…

【LeetCode-面试经典150题-day15】

目录 104.二叉树的最大深度 100.相同的树 226.翻转二叉树 101.对称二叉树 105.从前序与中序遍历序列构造二叉树 106.从中序与后序遍历序列构造二叉树 117.填充每个节点的下一个右侧节点指针Ⅱ 104.二叉树的最大深度 题意&#xff1a; 给定一个二叉树 root &#xff0c;返回其…

登录校验-JWT令牌-登陆后下发令牌

目录 思路 接口文档 令牌生成和下发 步骤 具体代码如下 工具类 控制类 测试 前后端联调 思路 令牌生成&#xff1a;登陆成功后&#xff0c;生成JWT令牌&#xff0c;并返回给前端令牌校验&#xff1a;在请求到达服务端后&#xff0c;对令牌进行统一拦截、校验 接口文档…

c语言练习题31:字符转换

scanf(“%[^\n]“, str)正则用法 1 ^表示"非"&#xff0c;[^\n]表示读入换行字符就结束读入。这个是scanf的正则用法&#xff0c;我们都知道scanf不能接收空格符&#xff0c;一接受到空格就结束读入&#xff0c;所以不能像gets()等函数一样接受一行字符串&#xff0…

全基因组选择:LightGBM通过提升GWAS敏感性促进基因挖掘

GWAS是识别性状相关基因和理解性状背后的遗传结构的有效方法&#xff0c;随着下一代测序技术的快速发展&#xff0c;基因分型费用显著降低&#xff0c;而在大规模人群的情况下&#xff0c;表型已成为GWAS的瓶颈。由于测序技术的快速发展&#xff0c;获取基因的成本已经显著降低…

分享几个 Selenium 自动化常用操作

最近工作会用到selenium来自动化操作一些重复的工作&#xff0c;那么在用selenium写代码的过程中&#xff0c;又顺手整理了一些常用的操作&#xff0c;分享给大家。 常用元素定位方法 虽然有关selenium定位元素的方法有很多种&#xff0c;但是对于没有深入学习&#xff0c;尤…

Python“牵手”当当网商品列表数据,关键词搜索当当网API接口数据,当当网API接口申请指南

当当网平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c;当当网API接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问当当网平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现当…

AVR128 智能风扇设计-proteus-程序

一、系统方案 模拟的电风扇的工作状态有3种&#xff1a;自然风、常风及睡眠风。使用三个按键S1-S3设置自然风、常风及睡眠风。 再使用两个按键S4和S5&#xff0c;S4用于定时电风扇定时时间长短的设置&#xff0c;每按一次S4键&#xff0c;定时时间增加10秒&#xff0c;最长60秒…

MacOS软件安装包分享(附安装教程)

目录 一、软件简介 二、软件下载 一、软件简介 MacOS是一种由苹果公司开发的操作系统&#xff0c;专门用于苹果公司的计算机硬件。它被广泛用于创意和专业应用程序&#xff0c;如图像设计、音频和视频编辑等。以下是关于MacOS的详细介绍。 1、MacOS的历史和演变 MacOS最初于…

功能强大的网站检测工具Web-Check

什么是 Web-Check &#xff1f; Web-Check是一款功能强大的一体化工具&#xff0c;用于查找有关网站/主机的信息。目前仪表版上可以显示&#xff1a;IP 信息、SSL 信息、DNS 记录、cookie、请求头、域信息、搜索爬虫规则、页面地图、服务器位置、开放端口、跟踪路由、DNS 安全扩…

UE4/5的Custom节点:在VScode使用HLSL(新手入门用)

目录 custom节点 VSCode环境安装 将VSCode里面的代码放入Custom中 custom节点 可以看到这是一个简单的Custom节点&#xff1a; 而里面是可以填写代码的&#xff1a; 但是在这里面去写代码会发现十分的繁琐【按下enter后&#xff0c;不会换行&#xff0c;也不会自动缩进】 …

关于UG/NX二次开发的历史和发展前景

UG/NX是一款广泛应用于计算机辅助设计与制造领域的软件&#xff0c;具有强大的二次开发能力。本文将介绍UG/NX二次开发的历史和发展前景。 一、UG/NX二次开发的历史 UG/NX最初由美国UGS公司&#xff08;后被西门子收购&#xff09;开发&#xff0c;是一款集成了CAD、CAM和CAE…

docker 重装提示 Exising installation is up to date 解决方法

Windows Docker 重装提示 Exising installation is up to date 解决方法 出现这个问题是因为卸载Docker没有卸载干净&#xff0c;导致无法重装 解决方法&#xff1a; 按下WindowR唤起命令输入界面&#xff0c;输入 regedit 打开注册表编辑在地址栏输入HKEY_LOCAL_MACHINE\SOFTW…

大数据之Maven

一、Maven的作用 作用一&#xff1a;下载对应的jar包 避免jar包重复下载配置&#xff0c;保证多个工程共用一份jar包。Maven有一个本地仓库&#xff0c;可以通过pom.xml文件来记录jar所在的位置。Maven会自动从远程仓库下载jar包&#xff0c;并且会下载所依赖的其他jar包&…

【MOS管的作用和工作原理】

数电/模电知识学习与分享001 MOS管的作用和工作原理1、MOS管基本概念2、MOS管基本原理3、MOS管广泛作用4、MOS管特点4、参考文献 MOS管的作用和工作原理 1、MOS管基本概念 MOS管&#xff08;Metal-Oxide-Semiconductor Field-Effect Transistor&#xff09;是一种常用的半导体…

100个Python小游戏,上班摸鱼我能玩一整年【附源码】

哈喽铁子们 表弟最近在学Python&#xff0c;总是跟我抱怨很枯燥无味&#xff0c;其实&#xff0c;他有没有认真想过&#xff0c;可能是自己学习姿势不对&#xff1f; 比方说&#xff0c;可以通过打游戏来学编程&#xff01; 今天给大家分享100个Python小游戏&#xff0c;一定…

个人记录:划分

原始数据展示 每五个大图移动一次所有的大图名称的小图片。 读取指定图片格式的图片名称&#xff0c;内置函数map执行,文件移动 图片01-17[:27] 图片17-70要改27为25 import os import shutil # source dataset/sat_train/ source_path "/mnt/sdb1/fenghaixia/dsm/da…

考虑储能电池参与一次调频技术经济模型的容量配置方法(matlab代码)

目录 1 主要内容 储能参与调频原理 储能参与一次调频的充放电策略 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序复现文献《考虑储能电池参与一次调频技术经济模型的容量配置方法》模型&#xff0c;以调频效果最优为目标&#xff0c;考虑储能参与一次调频的充放电…

Tcp 协议的接口测试

首先明确 Tcp 的概念&#xff0c;针对 Tcp 协议进行接口测试&#xff0c;是指基于 Tcp 协议的上层协议比如 Http &#xff0c;串口&#xff0c;网口&#xff0c; Socket 等。这些协议与 Http 测试方法类似&#xff08;具体查看接口自动化测试章节&#xff09;&#xff0c;但在测…

函数式编程-Stream流学习第二节-中间操作

1 Stream流概述 java8使用的是函数式编程模式,如同它的名字一样&#xff0c;它可以用来对集合或者数组进行链状流式操作&#xff0c;让我们更方便的对集合或者数组进行操作。 2 案例准备工作 我们首先创建2个类一个作家类&#xff0c;一个图书类 package com.stream.model;…