Kafka 安装教程和基本操作

news2025/1/12 7:58:57

一、简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。

应用特性

  • 分布式存储:数据被自动分区并分布在集群的节点中。
  • 消息有序性Kafka 能确保从生产者传到消费者的记录都是有序的。
  • 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高吞吐量Kafka 支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。
  • 易扩展性:支持集群热扩展。
  • 高并发:支持数千个客户端同时读写。
  • 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
  • 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。

应用场景

Kafka 本质是 支持分布式的消息系统/消息中间件 。分析 Kafka 的应用场景等同于分析 消息中级件 的应用场景。通常,使用 消息系统 的 发布/订阅模型 功能来连接 生产者消费者。实现以下三大功能:

  • 生产者和消费者的解耦
  • 消息持久化 / 消息冗余
  • 消息缓冲 / 流量消峰

具体应用场景有:

  • 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
  • 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
  • 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
  • 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
  • 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
  • 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。

设计目标

  • 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 横向扩展:支持在线水平热扩展

二、kafka安装和配置

1. zookeeper安装配置

需要说明一下, 为了支持 Kafka 的集群功能, Zookeeper 必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南

2. kafka安装配置

下载链接:Kafka Downloads

下载页面中包含两种下载方式

  • : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译

a) 安装

[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka

b) 配置实例

配置第一个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔   /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

配置第二个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

注:两个客户端的listeners中的port不能一样

4) 服务管理

# 启动服务 -daemon 表示后台启动
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties


# 查看服务
jps -l
	43330 org.apache.zookeeper.server.quorum.QuorumPeerMain
	14356 org.elasticsearch.bootstrap.Elasticsearch
	14583 org.logstash.Logstash
	45976 kafka.Kafka  # kafka服务进程
	
netstat -anlpt | grep 9091
	tcp6       0      0 :::9091                 :::*                    LISTEN      45976/java
	tcp6       0      0 192.168.18.128:9091     192.168.18.128:49356    TIME_WAIT   -

# 关闭服务
$KAFKA_HOME/bin/kafka-server-stop.sh

3. 常用操作

1) 创建topic

 #两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu 

在kafka1上创建一个topic,会自动同步到其他客户端

  • --create表示创建操作
  • --zookeeper 指定了 Kafka 连接的 ZooKeeper
  • --partitions 表示每个主题4个分区
  • --replication-factor 表示创建每个分区创建2个副本(副本因子)
  • --topic 表示主题名称。
    注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.

2) 查看topic

# 查看topic列表     #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka 
	__consumer_offsets
	topic-demo
	yumu

# 查看topic详细信息   #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu 
	Topic: yumu	PartitionCount: 2	ReplicationFactor: 2	Configs:
		Topic: yumu	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
		Topic: yumu	Partition: 1	Leader: 1	Replicas: 2,1	Isr: 1,2

3) 测试通信

# 窗口1,启动生产者,向yumu主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu

# 窗口2,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu

# 窗口3,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu

=====结果=====
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消费者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.

# 消费者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.

# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning

# 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091  --topic yumu

三、遇到的问题

1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:

[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:235)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)

原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件

cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A

第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)

推荐阅读:

  • Kafka介绍
  • ELK介绍
  • Kafka安装
  • C语言操作kafka以及安装librdkafka库

下一篇:Kafka消息系统原理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1698868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】快速排序详解!

文章目录 1. 快速排序的非递归版本2. 快速排序2.1 hoare 版本一2.2 挖坑法 🐧版本二2.3 前后指针 版本三2.4 调用以上的三个版本的快排 3. 快速排序的优化 1. 快速排序的非递归版本 🆒🐧关键思路: 🍎① 参数中的begin…

安装mamba时报错bare_metal_version

原因:缺少cuda118的环境版本,直接安装 nvidia/label/cuda-11.8.0 可解决,代码如下: conda install -c "nvidia/label/cuda-11.8.0" cuda-nvcc

八种单例模式

文章目录 1.单例模式基本介绍1.介绍2.单例模式八种方式 2.饿汉式(静态常量,推荐)1.基本步骤1.构造器私有化(防止new)2.类的内部创建对象3.向外暴露一个静态的公共方法 2.代码实现3.优缺点分析 3.饿汉式(静态…

Shell字符串变量

目标 能够使用字符串的3种方式 掌握Shell字符串拼接 掌握shell字符串截取的常用格式 能够定义Shell索引数组和关联数组 能够使用内置命令alias,echo,read,exit,declare操作 掌握Shell的运算符操作 Shell字符串变量 介绍 字符串(String)就是一系…

嵌入式进阶——数码管

🎬 秋野酱:《个人主页》 🔥 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 数码管结构移位寄存器原理图移位寄存器数据流程移位寄存器控制流程移位寄存器串联实现数码管显示 数码管结构 共阴与共阳 共阳数码…

从程序被SQL注入来MyBatis 再谈 #{} 与 ${} 的区别

缘由 最近在的一个项目上面,发现有人在给我搞 SQL 注入,我真的想说我那么点资源测试用的阿里云服务器,个人估计哈,估计能抗住他的请求。狗头.png 系统上面的截图 数据库截图 说句实在的,看到这个之后我立马就是在…

架构师系列-定时任务解决方案

定时任务概述 在很多应用中我们都是需要执行一些定时任务的,比如定时发送短信,定时统计数据,在实际使用中我们使用什么定时任务框架来实现我们的业务,定时任务使用中会遇到哪些坑,如何最大化的提高定时任务的性能。 我…

设计模式深度解析:分布式与中心化,IT界两大巨头“华山论剑”

​🌈 个人主页:danci_ 🔥 系列专栏:《设计模式》《MYSQL应用》 💪🏻 制定明确可量化的目标,坚持默默的做事。 ✨IT界的两大巨头交锋✨ 👋 在IT界的广阔天地中,有两座…

【Linux 网络】网络基础(三)(网络层协议:IP 协议)

在复杂的网络环境中确定一个合适的路径。 一、TCP 与 IP 的关系 IP 层的核心作用是定位主机,具有将数据从主机 A 发送到主机 B 的能力,但是能力并不能保证一定能够做到,所以这时就需要 TCP 起作用了,TCP 可以通过超时重传、拥塞控…

DBAPI怎么进行数据格式转换

DBAPI如何进行数据格式的转换 假设现在有个API,根据学生id查询学生信息,访问API查看数据格式如下 {"data":[{"name":"Michale","phone_number":null,"id":77,"age":55}],"msg"…

JVM学习-Class文件结构②

访问标识(access_flag) 在常量池后,紧跟着访问标记,标记使用两个字节表示,用于识别一些类或接口层次的访问信息,包括这个Class是类还是接口,是否定义为public类型,是否定义为abstract类型,如果…

[vue error] vue3中使用同名简写报错 ‘v-bind‘ directives require an attribute value

错误详情 错误信息 ‘v-bind’ directives require an attribute value.eslintvue/valid-v-bind 错误原因 默认情况下,ESLint 将同名缩写视为错误。此外,Volar 扩展可能需要更新以支持 Vue 3.4 中的新语法。 解决方案 更新 Volar 扩展 安装或更新 …

Java 泛型基础

目录 1. 为什么使用泛型 2. 泛型的使用方式 2.1. 泛型类 2.2. 泛型接口 2.3. 泛型方法 3. 泛型涉及的符号 3.1. 类型通配符"?" 3.2. 占位符 T/K/V/E 3.3. 占位符T和通配符?的区别。 4. 泛型不变性 5. 泛型编译时擦除 1. 为什么使用泛型 Java 为…

Django 里的静态资源调用

静态资源:图片,CSS, JavaScript 一共有两种方法 第一种方法 在项目的文件夹里创建名为 static 文件夹 在该文件夹里,添加静态资源 在 settings.py 里添加路径 import os# Static files (CSS, JavaScript, Images) # https://docs.djan…

Git基础命令:带图整理

基础命令 Git 安装 Git下载地址 https://git-scm.com/downloads Git安装(Window/Mac) 选择不同系统安装包安装 检验是否安装成功 出现Git Bash命令行工具或Git GUI工具git --version 查看git安装版本 Git 结构 工作区(Working Direct…

干货收藏 | 掌握ChatGPT提示词的精髓:从小白到高手!!

前言 提示决定了 ChatGPT 的输出。也就是说:GPT 生成的答案质量,完全取决于你“问它”,以及“引导它”的方式,如果你能问得好,引导的好,那么它就会帮你生成让你惊喜的答案,反之则无价值&#x…

国际版Tiktok抖音运营流量实战班:账号定位/作品发布/热门推送/等等-13节

课程目录 1-tiktok账号定位 1.mp4 2-tiktok作品发布技巧 1.mp4 3-tiktok数据功能如何开通 1.mp4 4-tiktok热门视频推送机制 1.mp4 5-如何发现热门视频 1.mp4 6-如何发现热门音乐 1.mp4 7-如何寻找热门标签 1.mp4 8-如何寻找垂直热门视频 1.mp4 9-如何发现热门挑战赛 1…

从垃圾识别到收集器:详细聊聊Java的GC

个人博客 从垃圾识别到收集器:详细聊聊Java的GC | iwts’s blog 前言 聊GC,自然离不开JVM内存模型,建议先了解JVM内存模型相关内容,或者最起码了解堆相关的内容,GC主要处理的就是堆。 这里会从垃圾识别算法->GC算法->JV…

OWASP top10--SQL注入(二)

目录 06:SQL注入提交方式 6.1、get提交 6.2、post提交 6.3、cookie提交 6.4、HTTP Header头提交 07:注入攻击支持类型 7.1、union注入: 7.1.1、union操作符一般与order by语句配合使用 7.1.2、information_schema注入 7.2、基于函数…

【云原生--K8S】K8S python接口研究

文章目录 前言一、搭建ubuntu运行环境1.运行ubuntu容器2.拷贝kubeconfig文件二、python程序获取k8s信息1.获取node信息2.获取svc信息3.常用kubernetes API总结前言 在前面的文章中我们都是通过kubectl命令行来访问操作K8S,但是在实际应用中可能需要提供更方便操作的图形化界面…