大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计

news2024/12/29 17:03:18

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Apache Druid 数据存储
  • Apache Druid 数据分区
  • 索引服务
  • 压缩机制
  • 数据聚合

在这里插入图片描述

整体流程

  • Kafka 数据源: Kafka 是一个分布式流处理平台,负责接收、存储并传输数据。它支持从各类应用、日志、传感器等设备采集实时数据,将数据划分为多个主题(Topic),并将消息分发给消费者。在这个案例中,Kafka 是 Druid 的数据源。
  • Kafka Producer: 数据生产者(Producer)负责将数据发送到 Kafka 的主题中。例如,应用程序可以向 Kafka 写入日志、用户行为数据、传感器数据等。每条消息可以是 JSON、Avro 等格式的数据记录。
  • Druid Kafka Ingestion: Druid 提供了对 Kafka 的原生支持。通过 Kafka Indexing Service,Druid 可以持续从 Kafka 的某个主题中消费数据,实时地将这些数据摄取到 Druid 中。摄取过程中,Druid 会将数据拆解为小的段(Segment),并将这些段存储在 Druid 集群的深度存储中(如 HDFS、S3 等)。
  • 实时数据摄取和索引: Druid 的 Kafka 摄取任务会监听 Kafka 的分区,按照流数据的到达顺序消费数据,并在内部创建索引。这些索引结构化存储了数据,并通过分片和分区机制,保证了查询的高效性和水平扩展能力。
  • Druid 查询层: Druid 提供了非常强大的查询能力,可以通过 SQL 查询方式进行交互,也支持多维查询、聚合查询等。这些查询可以是低延迟的实时查询,也可以对历史数据进行复杂的分析。用户通过 Druid 查询接口或 BI 工具(如 Apache Superset、Tableau 等)向集群发送查询。
  • Kafka 消费者 Offset 管理: Druid 使用 Kafka 消费者模型,实时消费消息并管理 Offset(偏移量),确保数据不丢失或重复摄取。Offset 会被定期提交到 Kafka 中,保证即使任务重启,摄取进度也能从上一次的位置继续。
  • 持久化和数据存储: 数据在经过摄取和索引后,Druid 会定期将数据段(Segment)持久化到深度存储中,并对旧数据进行合并和压缩,减少存储空间的占用。Druid 的集群架构支持分布式存储和查询,并能根据数据规模进行自动扩展。

案例假设

假设我们在构建一个用户行为分析系统,通过 Kafka 采集用户点击日志,并通过 Druid 实时分析用户行为。

  • Kafka 数据生产: 电商平台的应用程序会将每次用户点击产生的日志记录(例如点击商品、页面浏览等)发送到 Kafka 中的 user-clicks 主题。每条记录都包含用户ID、商品ID、时间戳、页面信息等。
  • Druid 数据摄取: 配置 Druid 的 Kafka Indexing Service,从 user-clicks 主题消费数据。数据会实时流入 Druid 中,Druid 将数据按照时间范围切分为段,并存储到其深度存储中。
  • 实时数据查询与分析: 业务方可以通过 SQL 查询或多维查询接口,实时分析用户的点击行为。查询的例子可能是统计每个小时的页面浏览量、分析不同商品的受欢迎程度等。这些查询可以直接反映用户的当前行为,帮助业务方做出快速决策。
  • 可视化和报表: Druid 的查询结果可以通过 Apache Superset 等工具进行可视化展示,创建实时仪表盘,展示用户行为的各种关键指标。数据分析师和运营人员可以在可视化平台上直观地看到当前系统的运营状态。

需求分析

场景分析

  • 数据量大,需要在这些数据中根据业务需要灵活查询
  • 实时性要求高
  • 数据实时的推过来,要在秒级对数据进行分析并查询出结果

数据描述

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}
  • ts 交易时间
  • orderId 订单编号
  • userId 用户id
  • orderStatusId 订单状态Id
  • orderStatus 订单状态 0-11:未支付,已支付,发货中,已发货,发货失败,已退款,已关单,订单过期,订单已失效,产品已失效,代付拒绝,支付中
  • payModelId 支付方式id
  • payMode 支付方式:0-6:微信,支付宝,信用卡,银联,货到付款,现金,其他
  • payment:支付金额
  • products:购买商品 (一个订单可能包含多个商品,这里是嵌套结构)
  • productId 商品Id
  • productName 商品名称
  • price 单价
  • productNum 购买数量
  • categoryid 商品分类Id
  • catname1 商品一级分类名称
  • catname2 商品二级分类名称
  • catname3 商品三级分类名称

以上的嵌套的json数据格式,Druid不好处理,需要对数据进行预处理,将数据拉平,处理后的数据格式:

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}

Kafka生产者

好久没用Scala了,用Scala写一个:

package icu.wzk.kafka

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties
import scala.io.BufferedSource

object KafkaProducerForDruid {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "druid2"
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    val producer = new KafkaProducer[String, String](prop);
    val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
    val iter: Iterator[String] = source.getLines();
    iter.foreach {
      line => val msg = new ProducerRecord[String, String](topic, line);
        producer.send(msg)
        println(msg)
        Thread.sleep(10)
    }
    producer.close()
    source.close()
  }
}

运行结果如下图:
在这里插入图片描述

Druid导入数据

这里就不详细描述了,之前入门阶段已经走过完整的流程了:

  • JSON数据要拉平
  • 不定义 RollUp

加载数据源:
在这里插入图片描述
JSON 拉平:
在这里插入图片描述
时间戳:
在这里插入图片描述
不要进行 RollUp:
在这里插入图片描述
最终结果如下图所示:
在这里插入图片描述
计算结果如下图所示:
在这里插入图片描述
运行测试的SQL,一切正常!
在这里插入图片描述

查询计算

订单总数

-- 查询订单总数
SELECT COUNT(distinct orderId) as orderscount
FROM druid2

运行结果如下图所示:
在这里插入图片描述

用户总数

-- 查询用户总数
SELECT COUNT(distinct userId) as usercount
FROM druid2

运行结果如下图:
在这里插入图片描述

统计结果状态订单数

-- 统计各种订单状态的订单数
SELECT orderStatus, count(*)
FROM (
  SELECT orderId, orderStatus
  FROM druid2
  GROUP BY orderId, orderStatus
)
GROUP BY orderStatus

执行结果如下图所示:
在这里插入图片描述

统计各种支付方式的订单数

-- 统计各种支付方式订单数
SELECT payMode, count(1)
FROM (
  SELECT orderId, payMode
  FROM druid2
  GROUP BY orderId, payMode
)
GROUP BY payMode

执行结果如下图所示:
在这里插入图片描述

订单金额最大的前10名

-- 订单金额最大的前10名
SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
FROM druid2
GROUP BY orderId, payment

执行结果如下图所示:
在这里插入图片描述

案例小节

  • 在配置摄入源时要设置为True从流的开始进行消费数据,否则在数据源中可能查不到数据
  • Druid的JOIN能力非常有限,分组或者聚合多的场景推荐使用
  • SQL支持能力非常受限
  • 数据的分区组织只有时间序列一种方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 学习笔记(二):深入理解用户管理、运行级别与命令行操作

Linux 学习笔记(二):深入理解用户管理、运行级别与命令行操作 前置学习内容:Linux学习(一) 1. 用户管理 1.1 用户密码管理 创建用户密码 使用 passwd 命令可以为指定用户设置密码: sudo pas…

AWS Network Firewall - IGW方式配置只应许白名单域名出入站

参考链接 https://repost.aws/zh-Hans/knowledge-center/network-firewall-configure-domain-ruleshttps://aws.amazon.com/cn/blogs/networking-and-content-delivery/deployment-models-for-aws-network-firewall/ 1. 创建防火墙 选择防火墙的归属子网(选择公有…

Unity给物体添加网格(Wire)绘制的方法参考

先看效果&#xff1a; 再看代码&#xff1a; using System.Collections.Generic; using UnityEngine;public class WireMesh : MonoBehaviour {[SerializeField]Material material;void Start(){Mesh mesh OptimizeMesh(GetComponent<MeshFilter>().mesh);GameO…

这 5 个自动化运维场景,可能用 Python 更香?

许多运维工程师会使用 Python 脚本来自动化运维任务。Python 是一种流行的编程语言&#xff0c;具有丰富的第三方库和强大的自动化能力&#xff0c;适用于许多不同的领域。 这里插播一条粉丝福利&#xff0c;如果你正在学习Python或者有计划学习Python&#xff0c;想要突破自我…

需求6:如何写一个后端接口?

这两天一直在对之前做的工作做梳理总结&#xff0c;不过前两天我都是在总结一些bug的问题。尽管有些bug问题我还没写文章&#xff0c;但是&#xff0c;我今天不得不先停下对bug的总结了。因为在国庆之后&#xff0c;我需要自己开发一个IT资产管理的功能&#xff0c;这个功能需要…

IDEA:Properties in parent definition are prohibited

问题背景 如果你在POM.xml中使用了自定义版本&#xff0c;那么IDEA就没办法很动态检测&#xff08;其实可以做到的&#xff0c;不是吗&#xff09;&#xff0c;就会有一个Properties in parent definition are prohibited 的错误信息&#xff08;禁止使用父级定义中的属性&…

2024 八九月份国内外CTF 散装re 部分wp

CTFZone silentDRM 附件拖入ida 最后部分很明显是比较。mmap和munmap函数的块大小为0x23280&#xff0c;比较大&#xff0c;暂时不管它。下断点动调&#xff0c;跳过v6和v7的分析部分&#xff0c;因为它是根据每五个字节的第一个字节生成的。直接看call v7 做运算后分为…

【博弈强化学习】——UAV-BS 的联合功率分配和 3D 部署:基于博弈论的深度强化学习方法

【论文】&#xff1a;Joint Power Allocation and 3D Deployment for UAV-BSs: A Game Theory Based Deep Reinforcement Learning Approach 【引用】&#xff1a;Fu S, Feng X, Sultana A, et al. Joint power allocation and 3D deployment for UAV-BSs: A game theory based…

基于Node.js+Express+MySQL+VUE科研成果网站发布查看科研信息科研成果论文下载免费安装部署

目录 1.技术选型‌ ‌2.功能设计‌ ‌3.系统架构‌ ‌4.开发流程‌ 5.开发背景 6.开发目标 7.技术可行性 8.功能可行性 8.1功能图 8.2 界面设计 8.3 部分代码 构建一个基于Spring Boot、Java Web、J2EE、MySQL数据库以及Vue前后端分离的科研成果网站&#xff0c;可…

PACS系统的延伸:三维重建后处理

影像中心PACS系统源代码&#xff0c;C#语言三发的PACS源码&#xff0c;三甲以下医院都能满足。 PACS系统即医学影像存档与通信系统&#xff0c;是医疗领域中不可或缺的信息技术系统。它主要负责医院内医学影像的数字化存储、管理、传输和显示&#xff0c;极大地促进了医疗影像信…

在线PDF怎么转换成JPG图片?分享14种转换操作!

作为一名社畜&#xff0c;俺也经常要将PDF转换为图片格式&#xff01; 如何进行快速转换&#xff0c;包括电脑端、在线端和手机端&#xff0c;今天俺就测评了50款工具&#xff0c;给你得出了下面这些渠道&#xff0c;不少也是免费的&#xff0c;相信对你有帮助哦&#xff01; …

springboot基于Vue的电影在线预定与管理系统

目录 毕设制作流程功能和技术介绍系统实现截图开发核心技术介绍&#xff1a;使用说明开发步骤编译运行代码执行流程核心代码部分展示可行性分析软件测试详细视频演示源码获取 毕设制作流程 &#xff08;1&#xff09;与指导老师确定系统主要功能&#xff1b; &#xff08;2&am…

VS Code调整字体大小

##在工程目录底下.vscode/settings.json添加设置参数 {"editor.fontSize": 15,"window.zoomLevel": 1.5 }

Coursera_ Algorithms I 学习笔记:Module_3_Analysis_of_Algorithm_Introduction

Coursera_ Algorithms I 学习笔记&#xff1a;Module_3_Analysis_of_Algorithm_Introduction scientific method applied to analysis of algorithms data analysis log-log plot doubling hypothesis experimental alogrithmics System independent effectsSystem dependen…

【CSS in Depth 2 精译_040】6.3 CSS 定位技术之:相对定位(下)—— 用纯 CSS 绘制一个三角形

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09;第二章 相对单位&#xff08;已完结&#xff09;第三章 文档流与盒模型&#xff08;已完结&#xff09;第四章 Flexbox 布局&#xff08;已…

Clocking System

文章目录 1. 介绍2. 时钟源2.1 scillator Circuit (OSC)2.1.1 外部时钟输入模式2.1.2 外部晶体/陶瓷谐振器模式2.1.3 振荡器的配置2.1.4 Oscillator Watchdog 2.2 Back-up Clock 3. 锁相环&#xff08;PLL&#xff09;3.1 系统锁相环3.1.1 Features3.1.2 框图 3.2.外设锁相环3.…

JAVA云洋系统聚合快递打造一站式快递系统小程序源码

云洋系统聚合快递 —— 打造一站式快递管理新体验 &#x1f680; 一站式快递管理新时代 在快节奏的现代生活中&#xff0c;快递已经成为我们日常不可或缺的一部分。然而&#xff0c;面对众多快递公司和复杂的物流信息&#xff0c;如何高效管理快递成为了许多人的难题。幸运的是…

基于SpringBoot大学生就业管理系统设计与实现

1.1 研究背景 科学技术日新月异的如今&#xff0c;计算机在生活各个领域都占有重要的作用&#xff0c;尤其在信息管理方面&#xff0c;在这样的大背景下&#xff0c;学习计算机知识不仅仅是为了掌握一种技能&#xff0c;更重要的是能够让它真正地使用到实践中去&#xff0c;以…

从“抄袭”到“原创”:5个超实用的论文降重技巧!

AIPaperGPT&#xff0c;论文写作神器~ https://www.aipapergpt.com/ 每当写完一篇论文&#xff0c;松了一口气准备庆祝时&#xff0c;突然想到还有一个名叫“查重”的终极大Boss等着你&#xff0c;瞬间心情从云端跌入谷底。 是不是你&#xff1f; 很多同学在提交之前&#…

CDGA|利用人工智能与边缘计算显著提升数据治理效率与效果的实践案例

在当今数字化转型的浪潮中&#xff0c;数据已成为企业最宝贵的资产之一。然而&#xff0c;随着数据量的爆炸性增长&#xff0c;如何高效、安全地治理这些数据成为企业面临的重要挑战。人工智能&#xff08;AI&#xff09;与边缘计算技术的融合&#xff0c;为数据治理带来了前所…