Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

news2025/3/1 18:40:46

目录

  • Kafka的安装
  • 文件与配置
    • 目录
      • bin
      • config
    • 配置文件
      • server.properties
      • producer.properties
      • consumer.properties
  • 命令行简单使用
    • kafka-topics.sh
      • 新增
      • 查看列表
      • 查看详情
      • 修改
      • 删除
    • kafka-console-producer.sh
    • kafka-console-consumer.sh
  • 概念
    • 集群
    • 代理broker
    • 主题topic
    • 分区partition
    • 偏移量offset
    • 生产者producer
    • 消费者组consumer group
    • 消费者consumer
  • FAQ
    • 如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?
    • 如何设置分区和消费者数?
  • 参考


Kafka的安装

前提,已安装docker和docker-compose。
拉取镜像

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest

docker-compose.yaml如下

version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

启动命令

docker-compose up -d

截图
在这里插入图片描述
之后的相关命令若涉及容器id,请自行更换

文件与配置

目录

docker exec -it a0 ls /opt/bitnami/kafka

查看目录命令

截图
在这里插入图片描述
重要目录解释如下:

  • bin: 脚本目录
  • config:配置目录
  • libs:第三方依赖库目录
  • logs:日志

bin

重要的shell脚本加粗了,之后会用

connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh
kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh
kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
kafka-configs.sh kafka-mirror-maker.sh trogdor.sh
kafka-console-consumer.sh kafka-producer-perf-test.sh windows
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh
kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh
kafka-delete-records.sh kafka-server-stop.sh

config

connect-console-sink.properties connect-mirror-maker.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties trogdor.conf
connect-file-sink.properties kraft zookeeper.properties
connect-file-source.properties log4j.properties
connect-log4j.properties producer.properties

配置文件

server.properties

  • broker.id: 唯一id值,通过环境变量设置为了1
  • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
  • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

更多配置可以查看参考中Dockerhub链接的Configuration部分
在这里插入图片描述

producer.properties

  • bootstrap.servers:kafka的ip:port,这里是localhost:9092
  • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

consumer.properties

  • group.id:消费者组id,默认为test-consumer-group
  • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

命令行简单使用

kafka-topics.sh

对主题topic进行增删改查的工具
在这里插入图片描述

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --create:创建主题
  • --delete:删除主题
  • --describe:描述主题
  • --list:查看主题列表
  • --alter:修改主题的 partitions等
  • --topic <String: topic>:主题名
  • --topic-id <String: topic-id>:主题id
  • --partitions <Integer: # of partitions>:主题的partition

新增

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9

截图
在这里插入图片描述

查看列表

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

截图
在这里插入图片描述

查看详情

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9

截图
在这里插入图片描述

修改

命令
以修改主题partiion数量为例

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3

截图
在这里插入图片描述

删除

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9

截图
在这里插入图片描述

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具
在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --sync:同步发送
  • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述

kafka-console-consumer.sh

在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --group <String: consumer group id>:消费者组id
  • --key-deserializer <String: deserializer for key>:key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --value-deserializer <String: deserializer for values>:value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --offset <String: consume offset>:消费的offset
  • --partition <Integer: partition>:消费的分区

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述
命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

截图
在这里插入图片描述
上手之后我们再来了解一些概念。

概念

集群

已发布的消息保存在一组服务器中,称为Kafka集群。

代理broker

集群中的每一个服务器都是一个代理。

主题topic

每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

分区partition

topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

分区的原则:

  • 生产者指定了partition,则直接使用
  • 未指定partition但指定了key,通过对key的value进行hash出一个partition
  • partition和key都未指定,使用轮询选出一个partition

偏移量offset

任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

生产者producer

push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

消费者组consumer group

包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

消费者consumer

从topic中pull数据,可以指定partition和offset。

FAQ

如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

如何设置分区和消费者数?

建议分区数与消费者数一致,防止消费不过来。

参考

dockerhub-bitnami/kafka
腾讯云CKafka 压缩算法对比
python-kafka客户端封装

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/996509.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用递归实现字符串逆序(不使用库函数)

文章目录 前言一、题目要求二、解题步骤1.大概框架2.如何反向排列&#xff1f;3.模拟实现strlen4.实现反向排列5.递归实现反向排列 总结 前言 嗨&#xff0c;亲爱的读者们&#xff01;我是艾老虎尤&#xff0c;今天&#xff0c;我们将探索一个题目&#xff0c;这个题目对新手非…

【计算机基础】揭露办公软件WPS、Offfice好用但又很少去做的便捷操作

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

【memcpy函数的介绍与使用和模拟实现】

memcpy函数的介绍与使用和模拟实现 1.memcpy函数的介绍 资源来源于cplusplus网站 它的作用是&#xff1a; 将数字字节的值从源指向的位置直接复制到目标指向的内存块。 源指针和目标指针指向的对象的基础类型与此函数无关; 结果是数据的二进制副本。 该函数不检查源代码中是否…

uboot顶层Makefile前期所做工作说明四

一. uboot顶层 Makefile文件 uboot 顶层 Makefile&#xff0c;就是 uboot源码工程的根目录下的 Makefile文件。 本文继续对 uboot顶层 Makefile的前期准备工作进行介绍。续上一篇文章内容的学习&#xff0c;如下&#xff1a; uboot顶层Makefile前期所做工作说明三_凌肖战的博…

信息系统项目管理师(第四版)教材精读思维导图-第十二章项目质量管理

请参阅我的另一篇文章&#xff0c;综合介绍软考高项&#xff1a; 信息系统项目管理师&#xff08;软考高项&#xff09;备考总结_计算机技术与软件专业技术_铭记北宸的博客-CSDN博客 本章思维导图源文件 ​ 12.1 管理基础 12.2 管理过程 12.3 规划质量管理 12.4 管理质量 12.5…

增强 CAD Exchanger SDK 中 B-rep 表示的渲染性能

增强 CAD Exchanger 中 B-rep 表示的渲染性能 在这篇博文中&#xff0c;我们将深入探讨增强 CAD Exchanger 产品中 B-rep 表示的渲染性能的主题&#xff0c;探讨此过程中面临的挑战&#xff0c;并讨论 CAD Exchanger 所采用的创新技术来优化它。 在 版本 3.20中&#xff0c;我…

第7篇 vue的模块化与label的转换

一 label的转换 1.1 label的转换 二 模块化 2.1 模块化 前端中&#xff0c;js文件调用js文件&#xff0c;js文件之间的调用&#xff0c;即就是模块化。 2.2 案例1 1.新建工程并初始化 2. 编写脚本 1.js // 定义成员&#xff1a; const sum function(a,b){return parseIn…

持安零信任加入PKS体系生态联盟,共创办公安全新生态

近日&#xff0c;PKS体系生态联盟公布最新一期会员单位名单&#xff0c;零信任办公安全领域的明星企业持安科技成为其网络安全领域新增会员&#xff0c;未来将与众多合作伙伴一同建设网络安全强国。 PKS体系生态联盟是在中国电子信息产业集团有限公司的倡议下&#xff0c;广泛联…

Redis数据库安装、使用、数据类型、常用命令(详解)

安装 Releases tporadowski/redis GitHub 直接去选择msi格式的&#xff0c;窗口式的安装&#xff0c;一步一步。 安装过程中有一个选项是问你需不需要配置到环境变量中&#xff0c;选上这个选项&#xff0c;不选的话&#xff0c;需要自己去配环境变量。 检查是否安装配置…

腾讯云CVM S5服务器性能如何?CPU计算性能测评

腾讯云服务器CVM标准型S5实例具有稳定的计算性能&#xff0c;CVM 2核2G S5活动优惠价格280.8元一年自带1M带宽&#xff0c;15个月313.2元、2核4G配置748.2元15个月&#xff0c;CPU内存配置还可以选择4核8G、8核16G等配置&#xff0c;公网带宽可选1M、3M、5M或10M&#xff0c;百…

如何修改jupyter notebook默认打开路径

1、用jupyter notebook在其他位置打开自己的ipython项目&#xff1a; jupyter notebook是一个很好用的工具&#xff0c;可以保存运行结果&#xff0c;还可以给项目添加很多可视化操作与介绍文字。安装anaconda后&#xff0c;jupyter notebook就会自动安装&#xff0c;点开它会…

进入大厂测试一年后的经历和感触

从去年决定跳出舒适区&#xff0c;应聘大厂&#xff0c;截止到目前已经将近一年&#xff0c;值此之际&#xff0c;总结下自己近一年在大厂的经历。希望通过我的感触&#xff0c;能够帮助你们进一步了解大厂的测试工作。 1、维护上下游合作关系 在大厂&#xff0c;人际关系非常…

山西电力市场日前价格预测【2023-09-11】

日前价格预测 山西日前电力价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-09-11&#xff09;山西电力市场全天平均日前电价为346.35元/MWh。其中&#xff0c;最高日前电价为383.36元/MWh&#xff0c;预计出现在19: 15。最低日前电价为313.95…

力扣 8049. 判断能否在给定时间到达单元格

Problem: 8049. 判断能否在给定时间到达单元格 文章目录 思路复杂度Code 思路 数学思维去写这道题 复杂度 时间复杂度: 添加时间复杂度, 示例&#xff1a; O ( 1 ) O(1) O(1) Code class Solution { public:bool isReachableAtTime(int sx, int sy, int fx, int fy, int t)…

MYSQL的慢查询

通过查询SQL的执行频次&#xff0c;我们就能够知道当前数据库到底是增删改为主&#xff0c;还是查询为主。 那假如说是以查询为主&#xff0c;次数我们可以借助于慢查询日志。接下来&#xff0c;我们就来介绍一下MySQL中的慢查询日志。 慢查询日志 慢查询日志记录了所有执行时间…

代码随想录二刷回溯算法-组合问题总结

回溯算法实际上也是一种暴力算法&#xff0c;利用树型结构的回溯与剪枝从而解决问题 解题步骤主要分三步&#xff1a;1.确立回溯函数的参数 2.确立终止条件 3.确立单层遍历逻辑 组合问题 77. 组合 这道题目就是经典的组合问题 如果我们使用for循环来进行暴力求解&#xff…

spring boot-Resolved element must not contain multiple elements 警告

首先强调一下&#xff0c;此问题不影响程序运行。 报错信息&#xff1a; package org.springframework.util; ...public abstract class Assert ...public static void state(boolean expression, String message) {if (!expression) {throw new IllegalStateException(messa…

融合康养产业、乐享宜居灞桥,西安市灞桥康养论坛即将举办

随着我国人口老龄化进程的不断加速以及人们的健康意识不断提高&#xff0c;我国康养产业逐步发展壮大。9月15日&#xff0c;以“融合康养产业、乐享宜居灞桥”为主题的灞桥康养论坛将在西安市灞桥区盛大召开。 据悉&#xff0c;此次论坛由西安市人民政府、陕西省民政厅主办&am…

2024浙大MEM提面拿优秀笔试如何冲刺备考

浙大工程师学院对于参加浙大提前批面试并获得优秀资格的考生&#xff0c;提供了一个“笔试达到联考国家线即可拟录取”的优惠政策。这确实是吸引很多MEM考生参加提前批面试的原因之一。但是&#xff0c;即使获得了优秀资格&#xff0c;考生仍然需要在后续的联考笔试中达到一定的…

JAVASE 窗口按钮

本文目录 1、前言2、JFrame、JButton3、JLabl4、ImageIcon 1、前言 java提供了很多已经写好了的类供我们使用&#xff0c;而我们没必要去细腻研究它的构成原理&#xff0c;就好比我们让我们编程让机器人动起来&#xff0c;没必要细腻研究机器人每个器件是怎么做出来的一样&…