kafka使用入门案例与踩坑记录

news2025/1/11 11:01:32

每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤:

Docker镜像形式启动

zookeeper启动

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

kafka启动

docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.16.123:12348 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.16.123:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d  wurstmeister/kafka

进入kafka容器

docker exec -it [容器id] /bin/bash

创建topic

进入容器,在/opt/kafka_2.13-2.8.1/bin 目录下创建topic

./kafka-topics.sh --create --zookeeper 150.158.16.123:12348 --replication-factor 1 --partitions 1 --topic mykafka
./kafka-topics.sh --create --zookeeper 150.158.16.123:2181 --replication-factor 1 --partitions 1 --topic mykafka

运行生产者

image-20220923112024973

运行消费者

image-20220923112035890


单机形式启动

前提

1、Linux 机器

2、环境已准备好JDK,如果还没有装,推荐用yum一键安装

yum  install  -y  java-1.8.0-openjdk.x86_64

检验:

[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)

3、将kafka压缩包上传到你的Linux

配置文件关注config目录下的zookeeper.propertiesserver.properties,启动服务时要指定

配置-启动

有默认配置,可不做修改(有需要可以自定义启动端口和数据存放位置等参数)

1、先启动自带的 Zookeeper:

[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:14:52,759] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,766] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2023-02-26 14:14:52,783] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,784] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2023-02-26 14:14:52,796] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.version=1.8.0_362 (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
(省略大部分)

2、启动 Kafka

[root@localhost kafka_2.12-2.3.0]# bin/kafka-server-start.sh config/server.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:16:00,261] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-26 14:16:01,004] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-26 14:16:01,024] INFO starting (kafka.server.KafkaServer)
[2023-02-26 14:16:01,025] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-26 14:16:01,068] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-26 14:16:01,072] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.version=1.8.0_362 (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.ZooKeeper)
(省略大部分)

上述步骤只要启动过程没有报错信息,一般是没有问题的

测试

1、创建个topic

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.154.134:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.

2、查看topic列表

[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.154.134:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test

3、启动生产者

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.154.134:9092 --topic test
>hi
>什么意思啊

4、启动消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.154.134:9092 --topic test
hi
什么意思啊

正常启动,OK!

可视化:kafka-manager

镜像下载

docker pull sheepkiller/kafka-manager

运行容器

docker run -d --name kafka-manager -p 12349:9000 --link zookeeper --link kafka01 --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager  

然后访问对应的IP:端口即可进入管理页面

注意:ZK_HOSTS 后面在web页面上要用到!

管理界面

进入主页面后,点击 Add Cluster 添加集群信息

image-20220818233716973

然后填写配置信息,主要填写集群名称,Zookeeper的Hosts,还有指定kafka版本(选个跟你所使用的kafka版本号最接近的就行),其他的一些配置按默认的就行。

当你正确连接上以后,就能看到你的集群啦,如:

image-20220818235108897

image-20220818235134261

更多关于kafka可视化操作就由你慢慢探索吧!这里将你引进门!

注意:

  1. 如果你在启动kafka manager这个容器时指定了 ZK_HOSTS ,那么Cluster Zookeeper Hosts这项填的内容要和 ZK_HOSTS 一致,否则会出现连接不上,连接超时等情况。如下图:

    image-20220818234605319

  2. 另外有些配置默认值是1,但是你得将其改成1以上的整数,否则不能正确保存提交。如:

    image-20220818234844587


注意

kafka版本不同,响应的api有区别

本版本是2.11

注意3.x是 --bootstrap-server localhost:9092方式新建,kafka2.x是以–zookeeper方式创建。下面查看新建的topic。

image-20220925165659756

image-20220925165713665

奇葩问题

1.重启docker失败?

[root@localhost ~]# systemctl restart docker
Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.
[root@localhost ~]# journalctl -xe
-- The result is failed.
222 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
222 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
222 02:01:53 localhost.localdomain systemd[1]: docker.service failed.
222 02:01:55 localhost.localdomain systemd[1]: docker.service holdoff time over, scheduling restart.
222 02:01:55 localhost.localdomain systemd[1]: Stopped Docker Application Container Engine.
-- Subject: Unit docker.service has finished shutting down
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-- 
-- Unit docker.service has finished shutting down.
222 02:01:55 localhost.localdomain systemd[1]: start request repeated too quickly for docker.service
222 02:01:55 localhost.localdomain systemd[1]: Failed to start Docker Application Container Engine.
-- Subject: Unit docker.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-- 
-- Unit docker.service has failed.
-- 
-- The result is failed.

原因:修改文件/etc/docker/daemon.json时不规范,可能存在空格什么的

解决:

[root@localhost ~]# cat <<EOF >/etc/docker/daemon.json
> {
> "registry-mirrors": ["https://registry.docker-cn.com"]
> }
> EOF
[root@localhost ~]# cat /etc/docker/daemon.json 
{
"registry-mirrors": ["https://registry.docker-cn.com"]
}
[root@localhost ~]# 
[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl restart docker

2.查询镜像无果?

[root@localhost ~]# docker search kafka
Error response from daemon: Get "https://index.docker.io/v1/search?q=kafka&n=25": x509: certificate has expired or is not yet valid: current time 2023-02-22T02:08:25+08:00 is before 2023-02-22T00:00:00Z

原因:虚拟机时间与外部时间不一致

解决:

[root@localhost ~]# date
2023年 02月 22日 星期三 02:09:50 CST
[root@localhost ~]# ntpdate cn.pool.ntp.org
26 Feb 13:31:38 ntpdate[44996]: step time server 119.28.206.193 offset 386475.634457 sec
[root@localhost ~]# date
2023年 02月 26日 星期日 13:31:48 CST
[root@localhost ~]# docker search kafka
NAME                                         DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
bitnami/kafka                                Apache Kafka is a distributed streaming plat…   615                  [OK]
ubuntu/kafka                                 Apache Kafka, a distributed event streaming …   25                   
bitnami/kafka-exporter                                                                       9                    
ibmcom/kafka                                 Docker Image for IBM Cloud Private-CE (Commu…   6                    
bitnami/kafka-trigger-controller             Source for this controller is in the kubeles…   5                    
ibmcom/kafka-python-console-sample           Docker image for the IBM Event Streams Pytho…   2                    
openwhisk/kafkaprovider                      Apache OpenWhisk event provider service for2                    [OK]

3.Docker容器内如何安装vim?

  1. apt-get install vim (可能提示你安装失败!继续往下)

  2. agt-get update 同步 /etc/apt/sources.list 和 /etc/apt/sources.list.d 中列出的源的索引

    配置国内镜像源:

    echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.list
        echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
        echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
        echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
    
  3. 返回第一步

4.无法启动kafka?

kafka.common.KafkaException: Socket server failed to bind to 150.158.16.123:9092: 无法指定被请求的地址.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)
        at kafka.network.Acceptor.<init>(SocketServer.scala:252)
        at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)
        at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at kafka.network.SocketServer.startup(SocketServer.scala:83)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
        at kafka.Kafka$.main(Kafka.scala:65)
        at kafka.Kafka.main(Kafka.scala)

注意,上面是配置里面有个地址写得不对,listeners=PLAINTEXT://10.20.30.153:9092后接的是内网地址,通过ip addr即可查看,如我的机器

image-20221001140813960

一个写内网地址,一个写外网地址即可

image-20221001140917719

本次分享到这,下期见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/373091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Apk转Aab(Android-App-Bundle)

这篇文章是参考Apk转Aab(Android-App-Bundle)_YoungBillsohu的博客-CSDN博客 基本照着这个大佬的步骤来就行&#xff0c;但是要注意的是apkTool最好是下新的&#xff0c;否则&#xff0c;会出现说一堆无语的错误&#xff0c;然后导致AAPT2关联资源的时候报错 类似这样的&#…

Java自定义生成二维码(兼容你所有的需求)

1、概述作为Java开发人员&#xff0c;说到生成二维码就会想到zxing开源二维码图像处理库&#xff0c;不可否认的是zxing确实很强大&#xff0c;但是实际需求中会遇到各种各样的需求是zxing满足不了的&#xff0c;于是就有了想法自己扩展zxing满足历史遇到的各种需求&#xff0c…

STC单片机启动看门狗定时器介绍和使用

STC单片机启动看门狗定时器介绍 ✨这里以STC8系列为例。 📑看门狗复位(WDT_CONTR) WDT_FLAG:看门狗溢出标志 看门狗发生溢出时,硬件自动将此位置 1,需要软件清零。EN_WDT:看门狗使能位 0:对单片机无影响 1:启动看门狗定时器。 注意:看门狗定时器可使用软件方式启动,…

JSP网上书店系统用myeclipse定制开发mysql数据库B/S模式java编程计算机网页

一、源码特点 JSP 网上书店系统 是一套完善的系统源码&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。研究的基本内容是基于网上书店系 统&#xff0c;使用JSP作为页面开发工具。Web服务的运…

【机器学习】集成学习投票法:投票回归器(VotingRegressor) 投票分类器(VotingClassifier)

前言 投票回归器和投票分类器都属于集成学习。在【机器学习】集成学习基础概念介绍中有提到过&#xff0c;集成学习的结合策略包括&#xff1a; 平均法、投票法和学习法。sklearn.ensemble库中的Voting Classifier和Voting Regressor&#xff0c;它们分别实现了对回归任务和分…

JUC 之 Synchronized 与 锁升级

—— 对象内存布局 和 对象头 对象构成布局 1. 对象头 对象标记 Mark Word 哈希码GC 标记 & 次数 GC 年龄 采用 4 位 bit 存储&#xff0c;最大为 15&#xff08;1111&#xff09;&#xff0c;所以 MaxTenutingThreshold 参数&#xff08;分代年龄&#xff09;的参数默…

C++回顾(三)—— 函数

3.1 内联函数 3.1.1 内联函数的定义 &#xff08;1&#xff09;内联函数的作用 作用&#xff1a;不是在调用时发生控制转移&#xff0c;而是在编译时将函数体嵌入在每一个调用处&#xff0c;适用于功能简单&#xff0c;规模较小又使用频繁的函数。递归函数无法内联处理&…

Java-重排序,happens-before 和 as-if-serial 语义

目录1. 如何解决重排序带来的问题2. happens-before1. 如何解决重排序带来的问题 对于编译器&#xff0c;JMM 的编译器重排序规则会禁止特定类型的编译器重排序。对于处理器重排序&#xff0c;JMM 的处理器重排序规则会要求编译器在生成指令序列时&#xff0c;插入特定类型的内…

Android笔记(二十五):两种sdk热更插件资源加载方案

背景 在研究sdk插件化热更新方式的过程中总结出了两套插件资源加载方案&#xff0c;在此记录下 资源热更方式 方式一&#xff1a;合并所有插件资源 需要解决资源id冲突问题 资源ID值一共4个字段&#xff0c;由三部分组成&#xff1a;PackageIdTypeIdEntryId PackageId&…

Mysql 事务的隔离性(隔离级别)

Mysql 中的事务分为手动提交和自动提交&#xff0c;默认是自动提交&#xff0c;所以我们在Mysql每输入一条语句&#xff0c;其实就会被封装成一个事务提交给Mysql服务端。 手动提交需要先输入begin&#xff0c;表示要开始处理事务&#xff0c;然后就是常见的sql语句操作了&…

C++之入门之命名空间、缺省参数、函数重载

一、前言 我们知道c是对c语言的完善以及再发展&#xff0c;所以C中的很多东西是与C语言十分修饰的&#xff0c;并且C也是兼容C的&#xff0c;学习了C之后&#xff0c;相信学C也不在困难&#xff0c;对我们来说&#xff0c;唯一感到不解和陌生就只有 using namespace std; 这条…

【c++】STL1—STL初识

文章目录STL的基本概念STL六大组件STL中容器、算法、迭代器容器算法迭代器容器算法迭代器初识vector存放内置数据类型vector存放自定义数据类型容器嵌套容器c的面向对象和泛型编程思想&#xff0c;目的就是复用性的提升。 为了建立数据结构和算法的一套标准&#xff0c;诞生了S…

并查集(13张图解)--擒贼先擒王

目录 前言 故事 &#x1f33c;思路 &#x1f33c;总结 &#x1f33c;代码 &#x1f44a;观察过程代码 &#x1f44a;正确代码 &#x1f44a;细节代码 来自《啊哈算法》 前言 刚学了树在优先队列中的应用--堆的实现 那么树还有哪些神奇的用法呢&#xff1f;我们从一…

前端卷算法系列(二)

前端卷算法系列&#xff08;二&#xff09; 回文数 给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样…

zookeeper集群的搭建,菜鸟升级大神必看

一、下载安装zookeeperhttp://archive.apache.org/dist/zookeeper/下载最新版本2.8.1http://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/二、上传安装包到服务器上并且解压&#xff0c;重命名tar -zxvf apache-zookeeper-3.8.1-bin.tar.gzmv apache-zookeeper-3.8.1-b…

设计环形队列

文章目录1.思路分析1.1队列空满分析1.2出队分析2.循环队列设计1.思路分析 1.1队列空满分析 首先我们假设一个长度为4的环形队列 队头front 队尾rear 当队列为空时 frontrear 当队列满时 frontrear 所以我们无法判断队列是满的或者空的 因此我们多加入一个空间使队列长度为5&am…

什么是自适应平台服务?

总目录链接==>> AutoSAR入门和实战系列总目录 文章目录 什么是自适应平台服务?1.1 自适应平台服务包含哪些功能簇呢?1.1.1 ara::sm 状态管理 (SM)1.1.2 ara::diag 诊断管理 (DM)1.1.3 ara::s2s 信号到服务映射1.1.4 ara::nm 网络管理 (NM)1.1.5 ara::ucm 更新和配置管…

数据结构期末复习总结(前章)

作者的话 作为一名计算机类的学生&#xff0c;我深知数据结构的重要性。在期末复习前&#xff0c;我希望通过这篇博客给大家一些复习建议。希望能帮助大家夯实数据结构的基础知识&#xff0c;并能够更好地掌握数据结构和算法的应用。 一、绪论 数据&#xff1a;信息的载体&am…

【测试】loadrunner安装

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录备注一、下载安装包二、安装loadrunner三、修改浏览器配置今天搬砖不努力&#xff0c;明天地位不稳定&#xff01; 备注 电脑最好有IE浏览器&#xff0c;但是没有也没事儿。&#xff08;注意&#xff1a;IE浏览器不…

Bootstrap系列之栅格系统

Bootstrap栅格系统 bootatrap提供了一套响应式&#xff0c;移动设备优先的流式网格系统&#xff0c;随着屏幕或者视口尺寸的增加&#xff0c;系统会自动分为最多12列&#xff0c;多出12列的将不再此行显示&#xff08;换行显示&#xff09; bootstrap网格系统有以下六个类 重点…