大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析

news2024/9/29 11:46:41

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 通过两篇来完成 集群模式配置、集群模式启动

在这里插入图片描述

基本介绍

Apache Druid 从 Kafka 中获取数据并进行分析的流程通常分为以下几个步骤:

  • Kafka 数据流的接入: Druid 通过 Kafka Indexing Service 直接从 Kafka 中摄取实时流数据。Kafka 是一个高吞吐量的消息队列,适合处理大量实时数据。Druid 会订阅 Kafka 的 topic,每当新数据到达时,它会自动从 Kafka 中读取数据。

  • 数据解析与转换: 数据从 Kafka 进入 Druid 后,首先会进行数据解析,通常采用 JSON、Avro 或 CSV 格式。解析的过程中,Druid 可以根据预定义的 schema 进行字段映射、过滤和数据转换,比如将字符串转为数值类型、提取时间戳等。这一步允许对数据进行初步处理,比如数据清洗或格式化。

  • 实时数据摄取与索引: Druid 将解析后的数据放入一个实时索引中,同时也将数据存储在内存中。Druid 的一个核心特点是,它会为每条记录生成倒排索引和 bitmap 索引,这样可以大大加快查询速度。实时摄取的数据在内存中保存一段时间,直到满足一定条件(比如时间或数据量),然后会以段的形式写入深度存储(如 HDFS 或 S3)。

  • 批处理与历史数据合并: Druid 支持实时和批处理的混合模式。当实时摄取的数据段被持久化到深度存储后,Druid 可以自动将这些段与批处理数据合并。这种设计确保了在数据分析时,既能查询到最新的实时数据,也能访问历史数据。批处理数据可以通过 Hadoop 或 Spark 等框架预先批量加载到 Druid 中。

  • 数据分片与副本管理: Druid 支持水平扩展,通过分片将数据分布在多个节点上。每个分片可以有多个副本,这样可以保证系统的高可用性和容错性。通过负载均衡,Druid 可以有效处理大规模查询请求,尤其是在数据量非常大的情况下。

  • 查询与分析: Druid 的查询系统基于 HTTP/JSON API,支持多种类型的查询,如时间序列查询、分组聚合查询、过滤查询等。Druid 的查询引擎设计非常高效,可以处理大规模的 OLAP(在线分析处理)查询。由于 Kafka 中的数据是实时流式的,Druid 的查询结果通常可以反映出最新的业务指标和分析结果。

  • 可视化与监控: Druid 的数据可以与 BI 工具(如 Superset、Tableau)集成,生成实时的报表和仪表盘。用户可以通过这些可视化工具,实时监控业务指标,做出数据驱动的决策。

整个流程中,Druid 负责将 Kafka 中的数据转化为高效的、可查询的 OLAP 格式,并且通过索引和分布式架构实现高效查询。这个系统可以被广泛应用于实时监控、用户行为分析、金融交易分析等场景。

从Kafka中加载数据

典型架构

在这里插入图片描述

日志业务中,我们不会在Druid中处理复杂的数据转换清晰工作

案例测试

假设有以下网络流量数据:

  • ts:时间戳
  • srcip:发送端IP地址
  • srcport:发送端端口号
  • dstip:接收端IP地址
  • dstport:接收端端口号
  • protocol:协议
  • packets:传输包
  • bytes:传输的字节数
  • cost: 传输耗费的时间

数据是JSON格式,通过Kafka传输
每行数据包含:

  • 时间戳
  • 维度列
  • 指标列

需要计算的指标:

  • 记录的条数:count
  • packets:max
  • bytes:min
  • cost:sum

数据汇总粒度:分钟

测试数据

{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}

{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}

{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}

{"ts":"2020-10-01T00:01:38Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":4, "bytes":4000, "cost": 0.1}

{"ts":"2020-10-01T00:02:08Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":5, "bytes":5000, "cost": 0.2}

{"ts":"2020-10-01T00:02:09Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":6, "bytes":6000, "cost": 0.2}

{"ts":"2020-10-01T00:02:10Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":7, "bytes":7000, "cost": 0.2}

{"ts":"2020-10-01T00:02:11Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":8, "bytes":8000, "cost": 0.2}

{"ts":"2020-10-01T00:02:12Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":9, "bytes":9000, "cost": 0.2}

写入的数据如下所示:
在这里插入图片描述

启动Kafka

这里由于资源比较紧张,我就只启动一台Kafka了:
我在 h121 节点上启动

kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties

创建 Topic

kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1

推送消息

kafka-console-producer.sh --broker-list h121.wzk.icu:9092 --topic druid1

输出我们刚才的数据,一行一行的写入输入进行(后续要用)。

提取数据

浏览器打开我们之前启动的Druid服务

http://h121.wzk.icu:8888/

LoadData

点击控制台中的 LoadData 模块:
在这里插入图片描述

Streaming

选择 Streaming:
在这里插入图片描述

Kafka

继续选择Kafka,点击 ConnectData,在右侧输入对应的信息,点级Apply:

  • h121.wzk.icu:9092
  • druid1

在这里插入图片描述

ParserData

此时可以看到右下角有:Next: Parse Data:
在这里插入图片描述
数据虽然加载了,但是格式不对,我们在右侧选择:JSON:
在这里插入图片描述

点击之后,可以看到,(如果你解析不顺利,可以用这个尝试)点击 Add column flattening
在这里插入图片描述
如果正常解析,数据应该是这个样子:
在这里插入图片描述

ParserTime

继续点击 Next Parse Time:
在这里插入图片描述

Transform

继续点击 Next Transform:

  • 不建议在Druid中进行复杂的数据变化操作,可考虑将这些操作放在数据预处理的过程中处理
  • 这里没有定义数据转换

在这里插入图片描述

Filter

继续点击 Next Filter:

  • 不建议在Druid中进行复杂的数据过滤操作,可以考虑将这些操作放在数据预处理中
  • 这里没有定义数据过滤

在这里插入图片描述

Configuration Schema

点击 Next Configuration Schema:

  • 定义指标列、维度列
  • 定义如何在维度列上进行计算
  • 定义是否在摄取数据时进行数据的合并(即RollUp),以及RollUp的粒度

在这里插入图片描述
此时点击右侧的:RollUp,会看到数据被聚合成了两条:
在这里插入图片描述
聚合结果:
在这里插入图片描述

Partition

点击 Next Partition:

  • 定义如何进行数据分区
  • Primary partitioning 有两种方式:
  • 方式1:uniform,以一个固定的时间间隔聚合函数数据,建议使用这种方式,这里将每天的数据作为一个分区
  • 方式2:arbitary,尽量保证每个 segements大小一致,时间间隔不固定
  • Secondary Partitioning
  • 参数1:Max rows per segment,每个Segment最大的数据条数
  • 参数2:Max total rows,Segment等待发布的最大数据条数

在这里插入图片描述

Tune

点击 Next Tune:

  • 定义任务执行和优化相关的参数

在这里插入图片描述

Publish

点击 Next Publish:

  • 定义Datasource的名称
  • 定义数据解析失败后采取的动作

在这里插入图片描述

Edit Special

点击 Next Edit spec:

  • JSON串为数据摄取规范,可返回之前的步骤中进行修改,也可以直接编辑规范内容,并在之前的步骤可以看到修改的结果
  • 摄取规范定义完成后,点击Submit会创建一个数据摄取的任务

在这里插入图片描述

Submit

点击 Submit 按钮:

在这里插入图片描述

数据查询

  • 数据摄取规范发布后生成Supervisor
  • Supervisor会启动一个Task,从kafka中摄取数据
    需要等待一段时间,Datasource才会创建完毕,选择 【Datasources】板块:
    在这里插入图片描述

点击末尾的三个小圆点,选择 Query With SQL:
在这里插入图片描述

会出现如下的界面,我们写入SQL,并运行:

SELECT 
	*
FROM 
	"druid1"

执行结果如下图:
在这里插入图片描述

数据摄取规范

{
  "type":"kafka",
  "spec":{
    "ioConfig":Object{...},
    "tuningConfig":Object{...},
    "dataSchema":Object{...}
  }
}
  • dataSchema:指定传入数据的Schema
  • ioConfig:指定数据的来源和去向
  • tuningConfig:指定各种摄取参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2176599.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux学习】【Ubuntu入门】1-2 新建虚拟机ubuntu环境

1.双击打开VMware软件,点击“创建新的虚拟机”,在弹出的中选择“自定义(高级)” 2.点击下一步,自动识别ubuntu光盘映像文件,也可以点击“浏览”手动选择,点击下一步 3.设置名称及密码后&#xf…

1Panel安装部署证书(httpsok.com)

1Panel安装部署证书(httpsok.com) 购买服务器 推荐购买香港服务器,这样通过域名访问就不需要备案。 创建静态站点 申请SSL证书 进入 httpsok.com,点击申请证书 输入站点域名 根据提示,添加DNS解析记录 添加成功后,提示域名验证…

如何在AI绘画SD中调节光照?这2个超好用的方法别错过!轻松生成AI人像光感大片!

大家好,我是画画的小强 在AI绘画Stable Diffusion 摄影艺术中,灯光的运用对于照片的质量和情感表达至关重要。它不仅能够彰显主题,还能为画面增添深度与立体感,帮助传递感情,以及凸显细节之美。 下面,我将…

YD-D3无线遥控声光报警器,微波探测预警安全设备

YD-D3无线遥控声光报警器‌是一种广泛应用于工厂车间、水泥厂、起重机、叉车、仓库、门吊、港口、车站等场所的安全报警设备。它通过大分贝喇叭播报语音提示以及高亮灯光示警,为现场人员安全保驾护航。该报警器采用集成电路设计,音质优美,抗干…

航顺芯片HK32MCU受邀出席汽车芯片国产化与技术创新闭门研讨会

[中国,北京,2024年9月21日]近日,深圳市航顺芯片技术研发有限公司(以下简称“航顺芯片”)产品总监郑增忠受邀出席由中国设备管理协会新能源汽车产业发展促进中心主办的“汽车芯片国产化与技术创新闭门研讨会”。 会上航…

基于单片机电容测量仪仿真设计

文章目录 前言资料获取设计介绍设计程序具体实现截图设计获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师,一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主要对象是咱们…

Elasticsearch 8.16 和 JDK 23 中的语言环境变化

作者:来自 Elastic Simon Cooper 随着 JDK 23 即将发布,语言环境信息中有一些重大变化,这将影响 Elasticsearch 以及你提取和格式化日期时间数据的方式。首先,介绍一些背景知识。 什么是语言环境? 每次 Java 程序需要…

【Java】static-静态变量、静态方法、工具类、注意事项、args数组的使用

文章目录 一、静态变量特点调用方式 二、静态方法特点调用方式 三、类的类型1.Javabean类2.测试类3.工具类 四、注意事项从代码方面解释1. 上下文清晰2. 静态变量的访问例子注意 3. 静态方法中没有this关键字原因 4. 静态方法只能访问静态变量和静态方法错误原因解决方法 4.非静…

如何获取钉钉webhook

第一步打开钉钉并登录 第二步创建团队 并且 添加自定义 机器人 即可获取webhook

【流计算】流计算概论

前言 作者在之前写过一个大数据的专栏,包含GFS、BigTable、MapReduce、HDFS、Hadoop、LSM树、HBase、Spark,专栏地址: https://blog.csdn.net/joker_zjn/category_12631789.html?fromshareblogcolumn&sharetypeblogcolumn&sharerI…

待办事项应用SideQuests

赶在国庆长假前,自驾🚗出去玩了几天。 国庆前的错峰出游简直是太香了!一路上🛣️畅通无阻,停车🅿️不用抢,吃饭🍔不用等,景点🏞️不用排队,拍照&…

Flume实战--Flume中的拦截器详解与操作

在处理大规模数据流时,Apache Flume 是一款功能强大的数据聚合工具,它可以通过拦截器在运行时对Event进行修改或丢弃。本文将详细讲解Flume中的拦截器,包括时间戳拦截器、Host添加拦截器、静态拦截器以及如何自定义拦截器。 拦截器 拦截器的…

《HelloGitHub》第 102 期

兴趣是最好的老师,HelloGitHub 让你对编程感兴趣! 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等,涵盖多种编程语言 Python、…

LeetCode - #124 二叉树中的最大路径和(Top 100)

文章目录 前言1. 描述2. 示例3. 答案关于我们前言 本题为 LeetCode 前 100 高频题 我们社区陆续会将顾毅(Netflix 增长黑客,《iOS 面试之道》作者,ACE 职业健身教练。)的 Swift 算法题题解整理为文字版以方便大家学习与阅读。 LeetCode 算法到目前我们已经更新到 123 期…

Electron 隐藏顶部菜单

隐藏前: 隐藏后: 具体设置代码: 在 main.js 中加入这行即可: // 导入模块 const { app, BrowserWindow ,Menu } require(electron) const path require(path)// 创建主窗口 const createWindow () > {const mainWindow ne…

Qemu开发ARM篇-6、emmc/SD卡AB分区镜像制作并通过uboot进行挂载启动

文章目录 1、AB分区镜像制作2、uboot修改3、镜像启动 在上一篇 Qemu开发ARM篇-5、buildroot制作根文件系统并挂载启动中,我们通过buildroot制作了根文件系统,并通过 SD卡的形式将其挂载到设备并成功进行了启动,但上一章中,我们的…

启动 Ntopng 服务前需先启动 redis 服务及 Ntopng 常用参数介绍

启动Ntopng服务之前需要先启动redis服务,因为Ntopng服务依赖于redis服务的键值存储。 服务重启 服务启动 Ntopng常用参数: -d 将 Ntopng 进程放入后台执行。默认情况下,Ntop 在前台运行。 -u 指定启动Ntopng执行的用户,默认为…

[论文精读]TorWard: Discovery, Blocking, and Traceback of Malicious Traffic Over Tor

期刊名称:IEEE Transactions on Information Forensics and Security 发布链接:TorWard: Discovery, Blocking, and Traceback of Malicious Traffic Over Tor | IEEE Journals & Magazine | IEEE Xplore 中文译名:TorWard:…

2024大二上js高级+ES6学习9.26(闭包,递归函数)

9.26.2024 1.闭包 什么是闭包: 闭包的作用: Return 的函数作为fn的子函数,可以使用fn的局部变量num,局部变量num要等所有使用它的函数调用完毕后才销毁 2.闭包的案例 点击li会发现输出4 在 JavaScript 中,事件处理器&…

C语言 | Leetcode C语言题解之第443题压缩字符串

题目&#xff1a; 题解&#xff1a; void swap(char *a, char *b) {char t *a;*a *b, *b t; }void reverse(char *a, char *b) {while (a < b) {swap(a, --b);} }int compress(char *chars, int charsSize) {int write 0, left 0;for (int read 0; read < charsSi…