03 数仓平台 Kafka

news2025/1/23 17:37:18

kafka概述

定义

Kafka 是一个开源的分布式事件流平台(Event Streaming Plantform),主要用于大数据实时领域。本质上是一个分布式的基于发布/订阅模式的消息队列(Message Queue)。

消息队列

在大数据场景中主要采用Kafka 作为消息队列。传统消息队列主要应用场景包括:缓存/削峰、解耦和异步通信。
消息队列的模式包含了 2 种,点对点订阅模式和发布/订阅模式。
在这里插入图片描述
Kafka采用了发布/订阅模式,这种模式有以下特点:

  • 可以有多个topic 主题
  • 消费者消费后,不会立即删除数据
  • 每个消费者组相互独立,不会影响。

Kafka 基础架构

在这里插入图片描述
为了方便扩展,提高吞吐量,一个 topic可以分为多个 partition。为了配合分区设计,提出了消费者组的概念,组内每个消费者并行消费。为提高可用性,每个 partition 增加若干可配置副本。在 2.8 之下的版本,将数据 leader提交给 Zookeeper 保管,2.8 版本之后,可以不配置 zookeeper。

Kafka 快速安装

规划

Hadoop101Hadoop102Hadoop103
ZKZKZK
KafkaKafkaKafka

集群部署

  1. 下载地址: https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
  2. 解压安装:[logan@hadoop101 software]$ tar -zxf kafka_2.12-3.0.0.tgz -C /opt/module
  3. 创建链:[logan@hadoop101 module]$ ln -snf kafka_2.12-3.0.0/ kafka
  4. 进入到/opt/module/kafka/config/目录,修改配置文件vim server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/data
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka
  1. 分发安装包xsync /opt/module/kafka
  2. 分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
  3. 配置环境变量,在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置。增加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. 刷新环境变量source /etc/profile
  2. 分发环境变量文件到其他节点,并source。
  3. 先启动 Zookeeper zk.sh start
  4. 编写 kafka 集群启动脚本vim ~/bin/kf.sh,增加执行权限chmod +x ~/bin/kf.sh
#! /bin/bash

case $1 in
"start"){
    for i in hadoop101 hadoop102 hadoop103
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop101 hadoop102 hadoop103
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac
  1. 启动集群 kf.sh start

kafka命令行操作

  1. topic操作命令
操作指令
查看kafka-topics.sh --bootstrap-server hadoop101:9092 --list
创建kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic first
查看 topic 详情kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first
修改分区数kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic first --partitions 3
删除 topickafka-topics.sh --bootstrap-server hadoop101:9092 --delete --topic first

说明:

  • –topic 定义topic名
  • –replication-factor 定义副本数
  • –partitions 定义分区数(分区数在修改时只能增加,不能减少)
  1. 生产者命令行
kafka-console-producer.sh --bootstrap-server hadoop101:9092 --topic first

3.消费者命令行

# 消费first主题中的数据。
kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first
# 从头开始消费主题所有数据
kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1279502.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

右值引用和移动语句(C++11)

左值引用和右值引用 回顾引用 我们之前就了解到了左值引用,首先我们要了解引用在编译器底层其实就是指针。具体来说,当声明引用时,编译器会在底层生成一个指针来表示引用,但在代码编写和使用时,我们可以像使用变量类…

鸿蒙绘制折线图基金走势图

鉴于鸿蒙下一代剥离aosp,对于小公司而言,要么用h5重构,要么等大厂完善工具、等华为出转换工具后跟进,用鸿蒙重新开发一套代码对于一般公司而言成本会大幅增加。但对于广大开发者来说,暂且不论未来鸿蒙发展如何&#xf…

中国消费电子行业发展趋势及消费者需求洞察|徐礼昭

一、引言 近年来,随着科技的飞速发展,消费电子行业面临着前所未有的挑战与机遇。本文将从行业发展趋势、消费者需求洞察以及企业数字化转型的方向和动作三个方面,对消费电子行业进行深入剖析。 二、消费电子行业发展趋势 5G技术的普及和应…

WEB安全之Python

WEB安全之python python-pyc反编译 python类似java一样,存在编译过程,先将源码文件*.py编译成 *.pyc文件,然后通过python解释器执行 生成pyc文件 创建一个py文件随便输入几句代码(1.py) 通过python交互终端 >>>import py_compil…

【C++练级之路】【Lv.1】C++,启动!(命名空间,缺省参数,函数重载,引用,内联函数,auto,范围for,nullptr)

目录 引言入门须知一、命名空间1.1 作用域限定符1.2 命名空间的意义1.3 命名空间的定义1.4 命名空间的使用 二、C输入&输出2.1 cout输出2.2 cin输入2.3 std命名空间的使用惯例 三、缺省参数3.1 缺省参数概念3.2 缺省参数分类 四、函数重载4.1 函数重载概念4.2 函数重载分类…

JavaSE自定义验证码图片生成器

设计项目的时候打算在原有的功能上补充验证码功能,在实现了邮箱验证码之后想着顺便把一个简单的图片验证码生成器也实现一下,用作分享。 注意,实际开发中验证码往往采用各种组件,通过导入依赖来在后端开发时使用相关功能&#xf…

泊车功能专题介绍 ———— 汽车全景影像监测系统性能要求及试验方法(国标未公布)

文章目录 术语和定义一般要求功能要求故障指示 性能要求响应时间图像时延单视图视野范围平面拼接视图视野平面拼接效果总体要求行列畸变拼接错位及拼接无效区域 试验方法环境条件仪器和设备车辆条件系统响应时间试验图像时延试验单视图视野范围试验平面拼接视图视野试验平面拼接…

【大学英语视听说上】Mid-term Test 2

Section A 【短篇新闻1】 You probably think college students are experts at sleeping, but parties, preparations for tests, personal problems and general stress can rack a students sleep habits, which can be bad for the body and the mind. Texas Tech Univer…

51爱心流水灯32灯炫酷代码

源代码摘自远眺883的文章,大佬是30个灯的,感兴趣的铁汁们可以去看看哦~(已取得原作者的许可):基于STC89C51单片机设计的心形流水灯软件代码部分_单片机流水灯代码_远眺883的博客-CSDN博客 由于博主是个小菜鸡&#xff…

【Python从入门到进阶】43.验证码识别工具结合requests的使用

接上篇《42、使用requests的Cookie登录古诗文网站》 上一篇我们介绍了如何利用requests的Cookie登录古诗文网。本篇我们来学习如何使用验证码识别工具进行登录验证的自动识别。 一、图片验证码识别过程及手段 上一篇我们通过requests的session方法,带着原网页登录…

前缀和 LeetCode1423. 可获得的最大点数

几张卡牌 排成一行,每张卡牌都有一个对应的点数。点数由整数数组 cardPoints 给出。 每次行动,你可以从行的开头或者末尾拿一张卡牌,最终你必须正好拿 k 张卡牌。 你的点数就是你拿到手中的所有卡牌的点数之和。 给你一个整数数组 cardPoi…

一个网站,四种创建制作电子期刊的方法

想象一下,你正在走进一家神奇的商店,里面陈列着各种精美的杂志和期刊。但是,这些杂志和期刊并不是印刷品,而是可以直接在网站上制作和发布的电子期刊。 但是像这样能在网上发的电子期刊该怎么制作呢?不知道如何制作的小…

数字媒体技术基础之:常见字体类型

字体 Font在数字设计和排版中起着至关重要的作用,不同的字体类型为文本呈现和创意表达提供了丰富多样的可能性。 .fon 字体 .fon 文件是 Windows 早期系统中使用的一种字体文件格式。 特点: 1、基于像素的位图字体。 2、不支持无损缩放,主要用…

国际语音通知是什么?国际语音通知系统有哪些功能?

一、国际语音通知是什么? 如同国际短信通知,国际语音通知也在多种生活场景中扮演着重要的角色,如会议通知、商品发货通知、物流更新通知、快递取件通知、外卖取餐通知等。那么什么是语音通知呢? 国际语音通知可将通知的文本信息使…

算法复习,数据结构 ,算法特性,冒泡法动态演示,复杂度,辗转相除法*,寻找最大公因数

算法复习 知识点 1. 程序 数据结构 算法 2. 算法: 求解问题的策略数据结构:问题的数学模型程序:微计算机处理问题编制的一组指令 3. **特性 ** 有穷性:算法在执行有穷步后能结束确定性:每一指令有确切的含义&a…

规则引擎专题---5、Groovy环境搭建和基础语法

概述 Groovy是用于Java虚拟机的一种敏捷的动态语言,它是一种成熟的面向对象编程语言,既可以用于面向对象编程,又可以用作纯粹的脚本语言。使用该种语言不必编写过多的代码,同时又具有闭包和动态语言中的其他特性。 Groovy是从Jav…

线程安全的问题以及解决方案

线程安全 线程安全的定义 线程安全:某个代码无论是在单线程上运行还是在多线程上运行,都不会产生bug. 线程不安全:单线程上运行正常,多线程上运行会产生bug. 观察线程不安全 看看下面的代码: public class ThreadTest1 {public static int count 0;public static void main…

<JavaEE> synchronized关键字和锁机制 -- 锁的特点、锁的使用、锁竞争和死锁、死锁的解决方法

目录 一、synchronized 关键字简介 二、synchronized 的特点 -- 互斥 三、synchronized 的特点 -- 可重入 四、synchronized 的使用示例 4.1 修饰代码块 - 锁任意实例 4.2 修饰代码块 - 锁当前实例 4.3 修饰普通方法 - 锁方法所在实例 4.4 修饰代码块 - 锁指定类对象 …

threadlocal - 黑马程序员

目录 1、ThreadLocal介绍1.2 ThreadLocal基本使用1.2.1、常用方法1.2.2 使用案例 1.3 ThreadLocal类与synchronized关键字 2、运用场景_事务案例3、ThreadLocal的内部结构4、 ThreadLocal的核心方法源码5、ThreadLocalMap源码分析5.2 弱引用和内存泄漏 课程地址: ht…

【大学英语视听说上】“智力”口语问答练习

题目: book 2, page 9, question 4 回答: 1: What do you think of the view “Intelligence must be bred, not trained”? I think this view is biased. The view suggests that intelligence is primarily determined by genetic factors and inh…