kafka服务介绍

news2024/12/23 9:40:41

kafka

    • 安装使用
    • 管理 Kafka

Apache Kafka 是一个开源的分布式事件流平台,主要用于实时数据传输和流处理。它最初由 LinkedIn 开发,并在 2011 年成为 Apache 基金会的顶级项目。Kafka 设计的目标是处理大规模的数据流,同时提供高吞吐量、低延迟和高容错性

Kafka 的工作原理可以从几个关键方面来理解:

    1. 消息的生产
    • Producer:生产者是发送消息的客户端,向 Kafka 发送消息。生产者将消息发布到特定的主题(Topic)中,主题内部可以有多个分区(Partition)。
    • Partitioning:消息会被分配到不同的分区。分区是主题的逻辑分片,有助于提高并发处理能力。消息的分配通常基于某种策略,比如消息键(key)或者轮询。
    1. 消息的存储
    • Broker:Kafka 的服务器组件,负责存储和管理消息。每个 Kafka 实例都是一个 Broker。多个 Brokers 组成 Kafka 集群。
    • Log Segments:在每个分区中,消息被追加到日志(log)中。日志是一个有序的、不可变的消息序列。为了管理日志文件的大小,Kafka 会定期将日志分段成多个文件。
    1. 消息的消费
    • Consumer:消费者从 Kafka 中读取消息。消费者订阅一个或多个主题,然后拉取(fetch)数据。
    • Consumer Group:消费者可以组成一个消费组(Consumer Group)。同一消费组中的消费者会分担读取同一个主题的不同分区的任务,从而实现负载均衡。每个消息只会被消费组中的一个消费者读取。
    1. 消息的存储与复制
    • Replication:为了提高数据的可靠性,Kafka 使用副本(replica)。每个分区有一个主副本(leader)和若干个从副本(follower)。所有的读写请求都由主副本处理,从副本则从主副本同步数据。
    • Leader-Follower:在分区中,领导者负责所有的读写请求,从副本负责从领导者同步数据。领导者失败时,从副本会选举新的领导者,确保高可用性。
    1. 协调与管理
    • Zookeeper:早期 Kafka 使用 Zookeeper 来管理集群的元数据和协调集群中的 Broker 和分区状态。Zookeeper 负责领导者的选举、配置管理和状态监控。
    • Kafka 2.8.0 及以后:Kafka 开始逐渐减少对 Zookeeper 的依赖,尝试用 Kafka 自身的协议来管理集群的元数据,这种模式称为 KRaft 模式(Kafka Raft Metadata Mode)。

数据流示例

  • 生产:生产者将消息发送到一个主题,主题有多个分区。每条消息附带一个时间戳。
  • 存储:消息被追加到分区的日志中。日志分为多个段,Kafka 按顺序存储消息。
  • 消费:消费者从主题的分区中读取消息。消费者可以跟踪已处理的消息位置,以实现断点续传。
  • 复制:数据在主副本和从副本之间同步,保证数据在 Broker 失败时不会丢失。

通过这些机制,Kafka 能够实现高吞吐量、低延迟和高可靠性的消息传递和数据流处理。

安装使用

Kafka 依赖于 Java 运行环境,因此首先需要安装 Java 11 或更高版本

apt install -y openjdk-11-jdk
root@huhy:~# java --version
openjdk 11.0.23 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu1, mixed mode, sharing)

官网下载;https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

tar -xf kafka_2.13-3.7.1.tgz
cd kafka_2.13-3.7.1/

Kafka 的配置文件位于 config 目录中。主要的配置文件包括:

  • server.properties:Kafka 的服务器配置文件

  • zookeeper.properties:Zookeeper 的配置文件(如果使用 Zookeeper)

    • broker.id:描述:Kafka Broker 的唯一标识符。每个 Broker 必须有一个唯一的 broker.id。
      默认值:无
      示例:broker.id=0

    • listeners:描述:Kafka Broker 监听的地址和端口。指定了 Kafka 接收客户端请求的地址。
      默认值:PLAINTEXT://:9092
      示例:listeners=PLAINTEXT://localhost:9092

    • advertised.listeners:描述:Kafka 向客户端公开的地址。客户端会通过此地址与 Broker 进行通信。
      默认值:无
      示例:advertised.listeners=PLAINTEXT://your-hostname:9092

    • log.dirs:
      描述:Kafka 存储日志文件的目录。可以设置多个目录,Kafka 会将数据分散存储。
      默认值:/tmp/kafka-logs
      示例:log.dirs=/var/lib/kafka/logs

    • log.retention.hours:
      描述:日志文件的保留时间,单位是小时。超过这个时间的数据会被删除。
      默认值:168(7 天)
      示例:log.retention.hours=168

    • log.segment.bytes:
      描述:每个日志段的大小,单位是字节。日志文件会被分段存储。
      默认值:1073741824(1 GB)
      示例:log.segment.bytes=536870912(512 MB)

    • num.partitions:
      描述:默认创建的主题的分区数量。
      默认值:1
      示例:num.partitions=3

    • replication.factor:
      描述:主题的副本因子,表示每个分区有多少副本。提高副本因子可以增加数据的可靠性。
      默认值:无(主题创建时指定)
      示例:replication.factor=2

    • message.max.bytes:
      描述:Kafka 允许的最大消息大小,单位是字节。
      默认值:1000012(1 MB)
      示例:message.max.bytes=2097152(2 MB)

    • log.retention.bytes:
      描述:每个分区的日志文件最大保留大小,超过这个大小的日志会被删除。
      默认值:-1(不限制)
      示例:log.retention.bytes=1073741824(1 GB)

    • log.cleaner.enable:
      描述:启用日志清理器,用于压缩日志中的重复数据。
      默认值:false
      示例:log.cleaner.enable=true

    • security.inter.broker.protocol:
      描述:Kafka Broker 之间的通信协议,支持 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
      默认值:PLAINTEXT
      示例:security.inter.broker.protocol=SSL

    • ssl.keystore.location:
      描述:SSL 密钥库的位置,用于 SSL/TLS 加密。
      默认值:无
      示例:ssl.keystore.location=/path/to/keystore.jks

    • zookeeper.connect:
      描述:Zookeeper 的连接字符串,包括 Zookeeper 的主机名和端口号。
      默认值:localhost:2181
      示例:zookeeper.connect=localhost:2181

    • zookeeper.connection.timeout.ms:
      描述:Zookeeper 连接超时设置,单位是毫秒。
      默认值:6000
      示例:zookeeper.connection.timeout.ms=10000

    • auto.create.topics.enable:
      描述:是否自动创建主题。如果设置为 true,当客户端向不存在的主题发送消息时,Kafka 会自动创建该主题。
      默认值:true
      示例:auto.create.topics.enable=false

    • delete.topic.enable:
      描述:是否允许删除主题。如果设置为 true,可以通过 Kafka 提供的脚本删除主题。
      默认值:false
      示例:delete.topic.enable=true

通常情况下,默认配置就可以开始使用。如果需要自定义配置,可以编辑这些文件

启动 Zookeeper;Kafka 需要 Zookeeper 来管理集群的元数据。Kafka 附带了一个简单的 Zookeeper 实例,开启后另起一个终端

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka Broker;在另一个终端中,启动 Kafka Broker

bin/kafka-server-start.sh config/server.properties

另起一个终端3;Kafka 使用主题来组织消息。可以使用 Kafka 提供的脚本创建主题。例如,创建一个名为 test-topic 的主题:

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic test-topic.

生产消息:可以使用 Kafka 提供的生产者工具向主题中发送消息。打开一个终端并运行,然后在终端4中输入消息并按回车键发送消息

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>huhy
>

消费消息;在另一个终端3中,可以运行消费者工具来读取消息,只有最开始两个终端是不能终端,后两个有交互界面可以直接用

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
huhy

获取信息如下

在这里插入图片描述

管理 Kafka

查看主题:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic

查看主题详情:

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
Topic: test-topic       TopicId: rXHPQIqJRkO5lQDOsco3NQ PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: test-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

删除主题;

bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092

验证查看

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets

停止 Kafka Broker

bin/kafka-server-stop.sh

停止 Zookeeper:

bin/zookeeper-server-stop.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1939622.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言 通讯录管理 完整代码

这份代码,是我从网上找的。目前是能运行。我正在读。有些不懂的地方,等下再记录下来。 有些地方的命名,还需要重新写一下。 比如: PersonInfo* info &address_book->all_address[address_book->size]; 应该改为: Perso…

C#实现数据采集系统-实现功能介绍

系统介绍 我们这里主要使用C#( .Net 6)来实现一个数据采集系统,从0到1搭建数据采集系统,从系统分析,功能拆解,到一一实现 数据采集 数据采集是企业信息化和数字化转型过程中的关键环节,它涉及到从生产设备、传感器…

Microsoft Visual C++ 2010 Express 使用

Microsoft Visual C 2010 Express 使用 Microsoft Visual C 2010 Express(简称VC 2010 Express)是一款免费的集成开发环境(IDE),专为C和C语言的开发者设计。 安装 下载|本站链接【VC2010简体中文版】的安装包并解压…

2024年新手卖家该如何做好亚马逊运营?

随着电子商务的蓬勃发展,越来越多的新手卖家选择在亚马逊这一国际电商巨头平台上开展业务。然而,想要在竞争激烈的市场中脱颖而出,新手卖家需要精心规划并执行有效的运营策略。以下是为2024年新手卖家提供的关于如何做好亚马逊运营的一些建议…

C#学习-刘铁猛

文章目录 1.委托委托的具体使用-魔板方法回调方法【好莱坞方法】:通过委托类型的参数,传入主调方法的被调用方法,主调方法可以根据自己的逻辑决定调用这个方法还是不调用这个方法。【演员只用接听电话,如果通过,导演会…

刷题笔记 739. 每日温度 (单调栈),215. 数组中的第K个最大元素(堆),347.前 K 个高频元素

739. 每日温度 (单调栈). - 备战技术面试?力扣提供海量技术面试资源,帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/daily-temperatures/description/?envTypestudy-plan-v2&envI…

Fast Planner规划算法(一)—— Fast Planner前端

本系列文章用于回顾学习记录Fast-Planner规划算法的相关内容,【本系列博客写于2023年9月,共包含四篇文章,现在进行补发第一篇,其余几篇文章将在近期补发】 一、Fast Planner前端 Fast Planner的轨迹规划部分一共分为三个模块&…

Haproxy服务

目录 一.haproxy介绍 1.主要特点和功能 2.haproxy 调度算法 3.haproxy 与nginx 和lvs的区别 二.安装 haproxy 服务 1. yum安装 2.第三方rpm 安装 3.编译安装haproxy 三.配置文件详解 1.官方地址配置文件官方帮助文档 2.HAProxy 的配置文件haproxy.cfg由两大部分组成&…

React+TypeScript 组件库开发全攻略:集成Storybook可视化与Jest测试,一键发布至npm

平时我除了业务需求,偶尔会投入到UI组件的开发中,大多数时候只会负责自己业务场景相关或者一小部分公共组件,极少有从创建项目、集成可视化、测试到发布的整个过程的操作,这篇文章就是记录组件开发全流程,UI组件在此仅…

RabbitMQ学习实践二:MQ的实现

文章是本人在学习springboot实现消息队列功能时所经历的过程的记录,仅供参考,如有侵权请随时指出。 参考文章地址: RabbitMQ安装与入门_rabbitmq win11配置-CSDN博客 RabbitMQ入门到实战一篇文章就够了-CSDN博客 RabbitMQ系列&#xff08…

AI跟踪报道第48期-新加坡内哥谈技术-本周AI新闻:Open AI 和 Mistral的小型模型

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

华为路由器SSH登录实验

概念 SSH全称安全外壳(Secure Shell)协议,这个协议的目的就是为了取代缺乏机密性保障的远程管理协议,SSH基于TCP协议的加密通道,让客户端使用服务器的RSA公钥来验证SSHv2服务器的身份。 创建密钥对 在充当SSH服务器的…

UE4-获得角色控制权的两种方法

方法一: 方法二: 注意此方法不能有多个玩家出生点,如果有多个玩家出生点,会随机的选择一个玩家出生点进行生成。

C++的map和set介绍

系列文章目录 二叉树搜索树 map和set习题 文章目录 系列文章目录前言一、关联式容器键值对二、树形结构的关联式容器2.1 set2.1.1 set的介绍2.1.3 set的使用删除节点find的不同效率count举例lower_bound 和 upper_bound 2.2 multiset2.2.1 区别:find查找erase删除e…

Deepin系统,中盛科技温湿度模块读温度纯c程序(备份)

#include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <termios.h>int main() {int fd;struct termios options;// 打开串口设备fd open("/dev/ttyMP0", O_RDWR | O_NOCTTY|O_NDELAY); //O_NDELAY:打开设备不阻塞//O_NOCTT…

http请求网址或网页的全流程

客户端通过浏览器请求网址或网页资源的步骤如下&#xff1a; http请求网址或网页的全流程 1.首先&#xff0c;浏览器做的第一步就是解析 URL 得到里面的参数2.浏览器封装 HTTP 请求报文3.DNS 域名解析获取 IP 地址4. 建立 TCP 连接5.浏览器发送请求6.负责传输的 IP 协议7.使用 …

基于Llama Index构建RAG应用(Datawhale AI 夏令营)

前言 Hello&#xff0c;大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者&#xff0c;本文参与活动是2024 DataWhale AI夏令营&#xff1b;&#x1f632; 在本文中作者将通过&#xff1a; Gradio、Streamlit和LlamaIndex介绍 LlamaIndex 构…

【初阶数据结构】5.栈和队列

文章目录 1.栈1.1 概念与结构1.2 栈的实现2.队列2.1 概念与结构2.2 队列的实现3.栈和队列算法题3.1 有效的括号3.2 用队列实现栈3.3 用栈实现队列3.4 设计循环队列 1.栈 1.1 概念与结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操…

从零开始实现大语言模型(八):Layer Normalization

1. 前言 Layer Normalization是深度学习实践中已经被证明非常有效的一种解决梯度消失或梯度爆炸问题,以提升神经网络训练效率及稳定性的方法。OpenAI的GPT系列大语言模型使用Layer Normalization对多头注意力模块,前馈神经网络模块以及最后的输出层的输入张量做变换,使shap…

android13 默认输入法配置分析rom默认配置修改分析

总纲 android13 rom 开发总纲说明 目录 1.前言 2.解决方法 3.方法分析 3.1方法1 3.2方法2 4.彩蛋 1.前言 Android13上需要预装中文输入法, 但是直接预装输入法的话,会出现默认使能的问题,点击TextEdit输入框, 弹出的是默认英文输入法LatinIME, 而不是谷歌拼音输入…