大数据Flink(一百二十四):案例实践——淘宝母婴数据加速查询

news2025/4/5 20:47:49

文章目录

案例实践——淘宝母婴数据加速查询

一、​​​​​​​创建数据库表并导入数据

二、​​​​​​​​​​​​​​创建session集群

三、​​​​​​​​​​​​​​源表查询

四、​​​​​​​​​​​​​​指标计算


案例实践——淘宝母婴数据加速查询

随着“全面二孩”政策落地、居民可支配收入稳步增加等因素的刺激,中国的母婴消费市场正迎来黄金时代。与此同时,随着国民消费升级90后宝爸、宝妈人数剧增,消费需求与消费理念都发生了巨大的变化。据罗兰贝格最新公布的报告预计,已经经过了16个年头发展的母婴行业,到2020年,整体规模将达到3.6万亿元,2016-2020年复合增速高达17%,行业前景看起来一片光明。如此大好形势下,母婴人群在母婴消费上有什么特点?消费最高的项目是什么?

本场景将以阿里云实时计算Flink版为基础,使用Flink自带的 MySQL Connector连接RDS云数据库实例,并以一个淘宝母婴订单实时查询的例子尝试上手Connector的数据捕获、数据变更等功能。

本场景中订单和婴儿信息存储在MySQL中,对于订单表,为了方便进行分析,我们让它关联上其对应的婴儿信息,构成一张宽表。另一方面数据经过分组聚合后,计算出订单数量和婴儿出生的关系。

按步骤完成本次实验后,您将掌握的知识有:

  1. 使用Flink实时计算平台创建并提交作业的方法;
  2. 编写基于Flink Table API SQL语句的能力;
  3. 使用MySQL Connector对数据库进行读取的方法;

一、​​​​​​​​​​​​​​创建数据库表并导入数据

在这个例子中,我们将创建三张数据表,其中一张orders_dataset_tmp是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。

进入mysql管理平台DMS,单击数据库实例,在已登录实例中找到test数据库,并双击数据库。

在SQLConsole页签中,输入如下SQL建表语句,然后单击执行。

create table orders_dataset_tmp(
	user_id bigint comment '用户身份信息',			
	auction_id bigint comment '购买行为编号',		
	cat_id bigint comment '商品种类序列号',			
	cat1 bigint comment '商品序列号(根类别)',				
	property TEXT comment '商品属性',			
	buy_mount int comment '购买数量',			
	day TEXT comment '购买时间'				
);

create table orders_dataset(
	order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id',
	user_id bigint comment '用户身份信息',			
	auction_id bigint comment '购买行为编号',		
	cat_id bigint comment '商品种类序列号',			
	cat1 bigint comment '商品序列号(根类别)',				
	property TEXT comment '商品属性',			
	buy_mount int comment '购买数量',			
	day TEXT comment '购买时间'				
);

--
create table baby_dataset(
	user_id bigint NOT NULL PRIMARY KEY,	
	birthday text comment '婴儿生日',
	gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown'
);

在DMS数据管理平台,选择左侧的常用功能>数据导入。 

配置如下信息后单击提交申请,将 (sample)sam_tianchi_mum_baby_trade_history.csv 导入 orders_dataset_tmp 表。

点击提交申请后,等待审批完成,点击执行变更,返回如下结果,数据导入完成。

 

重复上述步骤,将(sample)sam_tianchi_mum_baby.csv 导入 baby_dataset 表。

导入完成之后,在SQLConsole页签中,输入如下SQL,然后单击执行,将订单数据导入到订单源表orders_dataset 中。

insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
select * from orders_dataset_tmp;

可以看到几张表中都有了数据。

SELECT * FROM `baby_dataset` ;

SELECT * FROM `orders_dataset` ;

查询表数据条数

SELECT count(1) FROM `baby_dataset` ;

二、​​​​​​​​​​​​​​创建session集群

使用之前的flink-sql-test-session集群即可。如若没有,安装下面步骤创建。

  • 登录实时计算控制台。
  • 在Flink全托管页签,单击目标工作空间名称对应应操作列下的控制台。
  • 在左侧导航栏,单击Session集群。
  • 单击创建Session集群。

表格中未提及的参数保持默认值即可,需要配置的参数说明请参见下表。

配置项

说明

配置示例

名称

Session集群名称。

flink-sql-test-session

状态

设置当前集群的期望运行状态:

  • STOPPED:当集群配置完成后保持停止状态,同样会停止所有在运行中的作业。
  • RUNNING:当集群配置完成后保持运行状态。

RUNNING

引擎版本

Session集群引擎版本号。

vvr-6.0.7-flink-1.15

Task Managers数量

默认与并行度保持一致。

4

  • 单击创建Session集群。

当Session集群状态(页面上方集群名称旁边)从启动中变为运行中后,可以进入后续步骤。

三、​​​​​​​​​​​​​​源表查询

  • 进入Flink开发平台,点击作业开发,在demo文件夹下创建monther-baby-test流作业草稿,版本选择vvr-6.0.7-flink-1.15。创建源表,代码如下
CREATE TABLE orders_dataset (
	order_id BIGINT,
  `user_id` bigint,			
	auction_id bigint,		
	cat_id bigint,			
	cat1 bigint,				
	property varchar,			
	buy_mount int,			
	`day` varchar	,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = 'rm-cn-g4t3gzb9789789ca.rwlb.rds.aliyuncs.com',
    'port' = '3306',
    'username' = 'itlanson',
    'password' = 'It123',
    'database-name' = 'test',
    'table-name' = 'orders_dataset'
);
CREATE TABLE baby_dataset (
	`user_id` bigint,
	birthday varchar,
	gender int,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = 'rm-cn-g4t3gzb9789789ca.rwlb.rds.aliyuncs.com',
    'port' = '3306',
    'username' = 'itlanson',
    'password' = 'It123',
    'database-name' = 'test',
    'table-name' = 'baby_dataset'
);

选中代码,点击左上角运行,完成表的创建。创建完之后,可以在元数据中的vvp.default下看到表。

  • 查询表数据
select * from baby_dataset;

选中代码,点击调试,提交到flink-sql-test-session集群。结果如下

 

select * from orders_dataset;

选中代码,点击调试,查询结果如下

  • 查询数据条数,代码如下
select count(1) from baby_dataset;

选择代码后,点击调试。

可以看到控制台的结果在不断增大,达到500会暂停。这是因为默认查询500条,此时需要点击左侧的绿色箭头,恢复查询。

最后结果如下,可以看到,与mysql中的对应表数据条数相同。

 

此时,在mysql中向baby_dataset表插入一条数据

insert into baby_dataset values (99999999,'20130101',1);

回到flink控制台,可以看到,计数结果也增加了。

点击红色按钮停止查询。然后查询刚才插入的数据。

SELECT * FROM `baby_dataset` 
where user_id=99999999;

此时,在mysql将此条数据的生日进行更改 

UPDATE baby_dataset SET birthday = '20140101' WHERE user_id = 99999999;

执行成功后,观察flink控制台的变化,发现数据也完成了更改。

 

四、​​​​​​​​​​​​​​指标计算

我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。查询orders_dataset和baby_dataset表的关联结果,代码如下:

SELECT o.*,
	b.birthday,
	b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b
    ON o.user_id = b.user_id;

选中代码,点击调试,结果如下

接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。然后对宽表数据的订单时间取到月份进行分组 GROUP BY,并统计每个分组中订单的购买数量SUM和出生婴儿的数量COUNT。代码如下

SELECT 
	SUBSTRING(tmp1.`day` FROM 1 FOR 6) as year_mon,
	SUM(tmp1.buy_mount) as buy_num,
	COUNT(birthday) as baby_num
FROM(
	SELECT o.*,
		b.birthday,
		b.gender
	FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o
	LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b
		ON o.user_id = b.user_id
) tmp1
GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6);

选中代码,点击调试,结果如下


 

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2162699.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【全新课程】正点原子《ESP32基础及项目实战入门》培训课程上线!

正点原子《ESP32基础及项目实战入门》全新培训课程上线啦!正点原子工程师手把手教你学!熟练掌握ESP-IDF开发,突破ESP32入门难题! 一、课程介绍 本课程针对ESP32的入门和基础外设进行系统教学,内容包括环境搭建、编程…

C#和数据库高级:密封类和方法覆盖

文章目录 一、密封类关键字:sealed方法覆盖 面向对象三大特性总结 一、密封类 关键字:sealed 方法覆盖 面向对象三大特性总结

类与对象【中】

1.类的六个默认构造函数 如果一个类中什么成员都没有简称空类 但空类中真的什么都没有吗?并不是,任何类在什么都不写时,编译器会自动生成以下6个默认成员函数。 默认成员函数:用户没有显示实现,编译器会生成的成员函数…

公安局软件管理平台建设方案和必要性,论文-3-———未来之窗行业应用跨平台架构

三、平台功能设计 四、技术架构 1. 前端界面 - 采用简洁、易用的设计风格,适应不同终端设备的访问。 - 基于 HTML5、CSS3 和 JavaScript 构建。 2. 后端服务 - 选择主流的 Web 开发框架,如 未来之窗跨平台架构,VUE。 - 数据库…

IIS HTTPS 网页可能暂时无法连接,或者它已永久性地移动到了新网址 ERR_HTTP2_INADEQUATE_TRANSPORT_SECURITY

问题描述:站点突然无法访问,经排查发现,HTTP协议的网址可以继续访问,HTTPS的网址不可以访问。 问题分析:在Windows更新和滚动之后,由于 HTTP/2,当站点启动了 HTTP/2 连接,会出现一个…

【春秋云境】CVE-2024-23897-Jenkins 2.441之前版本存在任意文件读取漏洞

一、靶场介绍 Jenkins 2.441及更早版本,以及LTS 2.426.2及更早版本没有禁用其CLI命令解析器的一个功能,该功能会将参数中’字符后跟的文件路径替换为该文件的内容,允许未经身份验证的攻击者读取Jenkins控制器文件系统上的任意文件。 二、P…

Linux——HTTPS协议

HTTPS也是一个应用层协议:只是是在 HTTP 协议的基础上引入了一个加密层而已~ 目录 概念准备 1什么是加密 2为什么要加密 3常见的加密方式 对称加密 非对称加密 数据摘要(数据指纹) 一HTTPS加密方式 方案1只使用对称加密 方案2只使用非对称加密 方案3双…

C#|.net core 基础 - 深拷贝的五大类N种实现方式

在实际应用中经常会有这样的需求:获取一个与原对象数据相同但是独立于原对象的精准副本,简单来说就是克隆一份,拷贝一份,复制一份和原对象一样的对象,但是两者各种修改不能互相影响。这一行为也叫深克隆,深…

第6章 常用UI组件库

一.Element Plus组件库 1. 安装Element Plus 什么是Element Plus? Element Plus是基于Vue 3开发的优秀的PC端开源UI组件库,它是Element的升级版,对于习惯使用Element的人员来说,在学习Element Plus时,不用花费太多的…

CC面试准备

半导体基础 半导体是介于导体和绝缘体之间的一种介质,在不同条件下表现出不同的导电性或者不导电特性, 电子半导体器件材料大部分为硅,锗等元素 本征半导体:完全不含杂质的纯净半导体,因为不含杂质,其中…

[笔记]一组电缆、定位相关产品的技术参数

csdn不允许做广告,这里的那家定位供应商的技术看起来是可以的。很有希望。它的原理并不复杂,这家企业在处理业务领域以外的新型产品时,是查过资料的,这就超过了60%的同行。 1.电缆 仅给出现在市面供应的铠装电缆结构&#xff0c…

七层负载均衡和四层负载均衡的区别

文章目录 什么是七层负载均衡?一、定义与工作原理二、优点与缺点三、应用场景四、常见七层负载均衡器五、负载均衡算法 什么是四层负载均衡?一、定义与原理定义:原理: 二、特点与应用场景特点:应用场景: 三…

STM32基础学习笔记-ADC面试基础题6

第六章、ADC 常见问题 1、基本概念:什么是ADC ?作用 ?逐次逼近型 2、传感器本质 ?传感器、电压、ADC数值转化 ? 3、ADC的特征 ? 转化时间、分辨率、精度、量化误差 ? 4、ADC框图组成部分 &…

华为云发布全栈可观测平台AOM,以AI赋能应用运维可观测

9月19日,华为全联接大会2024举办期间,在“AI赋能应用现代化,加速软件生产力跃升”为主题的论坛上,华为云发布全栈可观测平台AOM,以AI赋能应用运维可观测,提升企业应用可用性与稳定性。 该平台发布标志着华…

针对国产化--离线安装Nginx rpm包下载 ARM64(.aarch64.rpm) 版本下载

源地址:https://nginx.org/packages/centos/7/aarch64/RPMS/ 可以选择系统分别进行下载对应的rmp包

公安局软件管理平台建设方案和必要性,论文-2-———未来之窗行业应用跨平台架构

一、平台方略 随着gov信息化建设的不断推进,各类ZW软件的应用需求日益增加。为了提高ZW软件的获取便利性、AQ性和规范性,建设一个专门的GOV软件管理平台具有重要意义。 集中提供各类ZW软件,方便工作人员快速获取和安装,减少因软…

基于DAMODEL——Faster-RCNN 训练与测试指南

Faster-RCNN 训练与测试指南 前言 今天我们要来实现一个经典的目标检测模型:Faster-Rcnn。我们使用DAMODEL云平台来实现,这是个很强大的云端平台,功能众多,你可以投你所好去进行你想做的事情。 1. 环境与工具准备 1.1 远程连接…

经颅磁刺激技术,脑科学研究——精神患者治疗方案

经颅磁刺激(Transcranial Magnetic Stimulation ,TMS)技术是一种利用脉冲磁场作用于中枢神经系统(主要是大脑),改变皮层神经细胞的膜电位,使之产生感应电流,影响脑内代谢和神经电活动…

开放原子开源基金会OPENATOM

AtomGit_开放原子开源基金会代码托管平台-AtomGit 开放原子开源基金会是致力于推动全球开源事业发展的非营利机构,于 2020 年 6 月在北京成立,由阿里巴巴、百度、华为、浪潮、360、腾讯、招商银行等多家龙头科技企业联合发起。 精选项目: 比…

HDFS_API文件和文件夹

代码: Beforepublic void init() throws URISyntaxException, IOException {URI uri new URI("hdfs://master:9000");// 创建一个配置文件Configuration entries new Configuration();// 获取到了客户端对象 // entries.set("dfs.replicat…