Kafka基本介绍

news2024/9/23 11:23:55

消息队列

产生背景

消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程

消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案

消息队列介绍

常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群基本很少
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源消息,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品

应用场景

  • 应用解耦合
  • 异步处理
  • 限流削峰
  • 消息驱动系统

消息队列中两种消息模型

在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务

消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息

在JMS规范中, 专门规定了两种消息消费模型: 
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊

Kafka基本介绍

基本介绍

Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala
官方地址: http://kafka.apache.org

kafka特点:

- 可靠性:Kafka集群是分布式的,并且有多副本的机制。数据可以自动复制
- 可扩展性:Kafka集群可以灵活的调整,在线扩容
- 耐用性:Kafka数据保存在磁盘上面,数据并且有多副本的机制。数据持久化,而且可以一定程度上防止数据丢失
- 高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行数据存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写、零拷贝等)提高数据的读写速度(吞吐量)

Kafka的架构

在这里插入图片描述

1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样
2- Topic:主题/话题,是业务层面对消息进行分类的。
3- 一个Topic可以设置多个Partition分区。
4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数
5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本
6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息
7- Zookeeper用来管理集群,以及管理元数据信息
8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。
该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务


相关名词:
Kafka Cluster: Kafka集群
Topic: 主题/话题
Broker: Kafka中的节点
Producer: 生产者,负责生产/发送消息到Kafka中
Consumer: 消费者,负责从Kafka中获取消息
Partition: 分区。一个Topic可以设置多个分区,没有数量限制

Kafka的相关使用

Kafka的shell命令使用

Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。

1- 创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2  

参数说明:
	--bootstrap-server: Kafka集群中broker连接信息
	--create: 指定操作类型。这里是新建Topic
	--topic: 指定要新建的Topic名称
	--partitions: 设置Topic的分区数
	--replication-factor: 设置Topic分区的副本数

设置副本数超过了集群broker节点个数

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test04 --partitions 4 --replication-factor 4

在这里插入图片描述
2- 查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list

参数说明:
	--bootstrap-server: Kafka集群中broker连接信息
	--list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

在这里插入图片描述
3- 查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
	--bootstrap-server: Kafka集群中broker连接信息
	--describe: 指定操作类型。这里是查看具体Topic信息

在这里插入图片描述
4- 模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
	--broker-list: Kafka集群中broker连接信息
	--topic: 指定要将消息发送到哪个具体的Topic

在这里插入图片描述
5- 模拟消费者Consumer

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04

参数说明:
	--bootstrap-server: Kafka集群中broker连接信息
	--topic: 指定要从哪个Topic中消费消息
	--from-beginning: 指定该参数以后,会从最旧的地方开始消费
	latest: 消费者(默认)从最新的地方开始消费
	--max-messages: 最多消费的条数。满足条数后,就会自动结束
	--group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
	
参数一般如何使用?
答: 推荐latest、--max-messages、--group一同使用。因为实际中Topic的数据量是特别大的,消费、打印都需要消耗服务器的资源,如果不限定消费的最大条数,可能造成服务器宕机。

在这里插入图片描述
6- 修改Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10

分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小
减小分区:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 1

在这里插入图片描述

修改副本数:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --replication-factor 2 --partitions 11

在这里插入图片描述
7- 删除Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --delete --topic test01

参数说明:
	--bootstrap-server: Kafka集群中broker连接信息
	--delete: 指定操作类型。这里是删除Topic
	--topic: 指定要删除哪个Topic

8- 查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

在这里插入图片描述

Kafka基准测试

Kafka的基准测试, 主要是用于测试Kafka集群的吞吐量, 每秒钟最大可以生产多少条数据, 以及每秒钟最大可以消费多少条数据

测试生产的效率

1- 创建Topic

./kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --partitions 3 --replication-factor 2 --topic test01

2- 执行生产测试命令: 测试后,会增加4GB磁盘占用

./kafka-producer-perf-test.sh --topic test01 --num-records 4000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

在这里插入图片描述
3- 测试结果
在这里插入图片描述

测试消费的效率

1- 执行消费测试命令

./kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test01 --fetch-size 1048576 --messages 500000

在这里插入图片描述
2- 测试结果:
在这里插入图片描述

Kafka的Python API的操作

纯Python的方式操作Kafka。
准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装命令:
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

API使用的参考文档:
https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

在这里插入图片描述

完成生产者代码
import time

from kafka import KafkaProducer

# 同步发送
def sync_send():
    global topic, partition, offset
    # 2.1- 同步发送数据/消息
    metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()
    # metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()
    # 2.2- 获取元信息中的内容
    topic = metadata.topic
    partition = metadata.partition
    """
        offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引
        在不同分区间,消息偏移量是无序
        在同一个分区里面,消息偏移量是有序
    """
    offset = metadata.offset
    print(f"{topic},{partition},{offset},{metadata}")


if __name__ == '__main__':

    # 1- 创建生产者
    producer = KafkaProducer(
        bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"]
    )

    # 2- 发送消息
    for i in range(10):
        # 同步发送
        # sync_send()

        # 2.3- 异步发送
        """
            异步发送,需要等待一下,或者明确关闭Producer生产者
        """
        producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))

    time.sleep(1)

    # 3- 释放资源/关闭生产者
    # producer.close()

在这里插入图片描述
可能遇到的错误:
在这里插入图片描述

原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
完成消费者代码
from kafka import KafkaConsumer

if __name__ == '__main__':

    # 1- 创建消费者
    consumer = KafkaConsumer(
        "test01",
        bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"]
    )

    # 2- 消费消息
    for msg in consumer:
        topic = msg.topic
        partition = msg.partition
        offset = msg.offset
        # key和value消费出来都是bytes数据类型,需要进行解码
        key = msg.key
        value = msg.value

        print(f"{topic}{partition}{offset}{key}{value.decode('UTF-8')}{msg}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380944.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql事务的处理

1、事务,就是一组命令的操作。 不过这一组命令,我们有时候需要使用手动提交; 1、使用这组命令可以查询出来现在的提交方式:自动提交(就是命令输入,点击enter后,会不会直接对表格产生修改&#x…

x-cmd pkg | csview - 美观且高性能的 csv 数据查看工具

目录 介绍首次用户功能特点类似工具与竞品进一步阅读 介绍 csview 是一个用于在命令行中查看 CSV 文件的工具,采用 Rust 语言编写的,支持中日韩/表情符号。它允许用户在终端中以表格形式查看 CSV 数据,可以对数据进行排序、过滤、搜索等操作…

关于html导出word总结一

总结 测试结果不理想,html-to-docx 和 html-docx-js 最终导出的结果 都 差强人意,效果可以见末尾的附图 环境 "electron": "24.3.0" 依赖库 html-docx-js html-docx-js - npm html-to-docx html-to-docx - npm file-saver…

Linux技术,winSCP连接服务器超时故障解决方案

知识改变命运,技术就是要分享,有问题随时联系,免费答疑,欢迎联系! 故障现象 使用 sftp 协议连接主机时, 明显感觉缓慢且卡顿,并且时常出现如下报错: 点击重新连接后,又有概率重新连接上; 总之在"连接上"和&…

前端js调用Lodop实现云打印

一、下载Lodop控件 官网:下载中心 - Lodop和C-Lodop官网主站 二、解压后安装 双击进行安装,里面有些页面文件是一些教程案例 勾选云服务工作模式 安装成功会自动启动 浏览器访问地址:http://localhost:8000/ 首页最下面有个教程案例跳转地址&…

kali_linux换源教程

vim /etc/apt/sources.list #阿里云deb http://mirrors.aliyun.com/kali kali-rolling main non-free contribdeb-src http://mirrors.aliyun.com/kali kali-rolling main non-free contrib#清华大学源deb http://mirrors.tuna.tsinghua.edu.cn/kali kali-rolling main contrib…

navicat for oracle

前言 Oracle中的概念并不是创建数据库,而是创建一个表空间,然后再创建一个用户,设置该用户的默认表空间为我们新创建的表空间,这些操作之后,便和你之前用过的mysql数据库创建完数据库一模一样了。 创建数据库 使用O…

PHP信息分类网源码带手机端和文档

PHP信息分类网源码带手机端和文档 安装简易说明: 上传 → 安装 → 进入后台 → 恢复数据 → 修改cookie记录值(第3点有说明) 1.上传程序到网站根目录,访问http://域名/install/index.php 进行安装,不要直接打开网址,先直接安装&am…

Java学习手册——第六篇输入输出

几乎所有的开发语言都会有输入输出,程序的定义里面也有输入输出,可以见得输入输出是何等的重要。如果没有输入,人类如何与计算机交流?如果没有输出,一切努力都是白费,因为我们看不到结果。 这里的输入输出你…

常用计算电磁学算法特性与电磁软件分析

常用计算电磁学算法特性与电磁软件分析 参考网站: 计算电磁学三大数值算法FDTD、FEM、MOM ADS、HFSS、CST 优缺点和应用范围详细教程 ## 基于时域有限差分法的FDTD的计算电磁学算法(含Matlab代码)-框架介绍 参考书籍:The finite…

Multi-Concept Customization of Text-to-Image Diffusion——【代码复现】

本文是发表于CVPR 2023上的一篇论文:[2212.04488] Multi-Concept Customization of Text-to-Image Diffusion (arxiv.org) 一、引言 本文主要做的工作是对stable-diffusion的预训练模型进行微调,需要的显存相对较多,论文中测试时是在两块GP…

原子操作类原理剖析

UC包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS实现的,相比使用锁实现原子性操作这在性能上有很大提高。 由于原子性操作类的原理都大致相同,所以只讲解最简单的AtomicLong类的实现原理以及JDK8中新增的LongAdder和LongAccumu…

领导风格测试

领导风格指的是管理者在开展管理工作时的思维和行为模式,通常我们也称之为习惯,或者是人格特征。这种习惯是固化的,是长期的经历所形成的,其中包含了个人的知识,经验,人际关系等。领导风格测试是企业人才选…

必练的100道C语言程序设计练习题(上)

前言: 在计算机编程的世界中,C语言一直是一门备受推崇的语言。它的简洁性、高效性以及广泛应用使得学习C语言成为每一位程序员的必由之路。然而,掌握这门语言并不是一蹴而就的事情,它需要不断的练习和实践。为了帮助各位编程爱好者更好地理解…

进程和线程的比较

目录 一、前言 二、Linux查看进程、线程 2.1 Linux最大进程数 2.2 Linux最大线程数 2.3 Linux下CPU利用率高的排查 三、线程的实现 四、上下文切换 五、总结 一、前言 进程是程序执行相关资源(CPU、内存、磁盘等)分配的最小单元,是一…

Halcon边缘滤波器edges_image 算子

Halcon边缘滤波器edges_image 算子 基于Sobel滤波器的边缘滤波方法是比较经典的边缘检测方法。除此之外,Halcon也提供了一些新式的边缘滤波器,如edges_image算子。它使用递归实现的滤波器(如Deriche、Lanser和Shen)检测边缘&…

【无标题】关于异常处理容易犯的错

一般项目是方法打上 try…catch…捕获所有异常记录日志,有些会使用 AOP 来进行类似的“统一异常处理”。 其实,这种处理异常的方式非常不可取。那么今天,我就和你分享下不可取的原因、与异常处理相关的坑和最佳实践。 捕获和处理异常容易犯…

Java研学-分页查询

一 分页概述 1 介绍 将大量数据分段显示,避免一次性加载造成的内存溢出风险 2 真假分页 ① 真分页   一次性查询出所有数据存到内存,翻页从内存中获取数据,性能高但易造成内存溢出 ② 假分页   每次翻页从数据库中查询数据&#xff0c…

day16 二叉树的最大深度 n叉树的最大深度 二叉树的最小深度 完全二叉树的节点数

题目1:104 二叉树的最大深度 题目链接:104 二叉树的最大深度 题意 二叉树的根节点是root,返回其最大深度(从根节点到最远叶子节点的最长路径上的节点数) 递归 根节点的的高度就是二叉树的最大深度 所以使用后序遍…

【Node.js学习 day4——模块化】

模块化介绍 什么是模块化与模块? 将一个复杂的程序文件依据一定规则(规范)拆分成多个文件的过程称之为模块化 其中拆分的每个文件就是一个模块,模块的内部数据是私有的,不过模块可以暴露内部数据以便其他模块使用。什…