如何使用 Flink SQL 探索 GitHub 数据集|Flink-Learning 实战营

news2025/1/12 16:15:37

为进一步帮助开发者学习使用 Flink,Apache Flink 中文社区近期发起 Flink-Learning 实战营项目。本次实战营通过真实有趣的实战场景帮助开发者实操体验 Flink,课程包括实时数据接入、实时数据分析、实时数据应用的场景实。并结合小松鼠助教模式,全方位帮助入营开发者轻松玩转 Flink,点击下方图片扫码即刻入营

Tips:点击「阅读原文」即刻入营~

82f2a1f04927e8fb1bb57e045474580a.png

本期将继续详细介绍 Flink- Learning 实战营。


想要了解如何使用 Flink 在 GitHub 中发现最热门的项目吗?本实验使用阿里云实时计算 Flink 版内置的 GitHub 公开事件数据集,通过 Flink SQL 实时探索分析 Github 公开数据集中隐藏的彩蛋!

完成本实验后,您将掌握的知识有:

  • 了解 Flink 和流式计算的优势

  • 对 Flink SQL 基础能力和 Flink 实时处理特性有初步体验

实验简介

通过 Flink 对 GitHub 的实时事件流进行分析,并通过报表直观展示,了解 GitHub 的最新热门趋势、特定仓库或者组织的活跃度。

体验此场景后,可以对 Flink SQL 基础能力和 Flink 实时处理特性有直观的初步体验。

■ 为回馈广大开源开发者对社区的支持,阿里云实时计算 Flink 版提供云原生免费试用资源

实验资源

本场景使用到的实验资源和配置如下:

阿里云实时计算 Flink 版

配置项规格

Task Manger 个数

4 个

Task Manager CPU

2 核心

Task Manager Memory

8 GiB

Job Manager CPU

1 核

Job Manager Memory

2 GiB


体验目标

对 Flink SQL 基础能力和 Flink 实时处理特性有直观的初步体验。

背景知识

GitHub 公开数据集(GitHub Archive)是 GitHub 提供的一个开放数据集合,它包含了每个公共仓库的事件数据,例如提交、拉取请求、问题和评论等。GitHub 公开数据集的数据可以用于进行各种类型的研究和分析,例如开源社区的协作情况、开发者的行为特征、编程语言的发展趋势等。使开发者们更好地了解 GitHub 上的活动和趋势,并从中获得有价值的信息和洞察。

本实验将 GitHub 公开数据集实时同步到 SLS 作为数据源,根据 Flink 对数据进行多种维度的分析并且通过报表直观展示。

前置知识

  • 了解 Flink 相关的基础知识。

  • 了解 Flink SQL 相关的基础知识。


环境搭建

创建 Session 集群。进入阿里云控制台,选择实时计算 Flink 版。然后选择已经购买的工作空间。

6b9fc78e74a8f9349b40bd6e899979a2.png

在开始阿里云实时计算 Flink 版作业编写前,需要先创建 Session 集群,只有创建了 Flink 集群,才能执行任务。

1. 点击系统管理 -> Session 集群 ->创建 Session

ef79ba422da760a89ebff174467b486a.png

2. 创建 Session 集群时设置为 SQL Preview 集群,这样无需设置 Sink, 即可将 Select 语句的结果输出成图表的格式。

05e651e5e8d27aad12bd45c9c22f967e.png


实验 1:Github 关注数排行榜

本实验统计从一周前起的 Github 关注度排行榜。

操作

1. 作业 SQL 代码。其中 startTime 尽量设置为当前此刻的一周前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。根据不同的地域设置相应的 project 和endPoint,如实例为上海的服务平台,因此设置'project' = 'github-events-shanghai'和'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',其他地域如北京、杭州、深圳更改为对应值即可。

-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的唯一ID。
  created_at BIGINT,                                -- 事件时间,单位秒。
  created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
  type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
  actor_id STRING,                                  -- Github用户ID。
  actor_login STRING,                               -- Github用户名。
  repo_id STRING,                                   -- Github仓库ID。
  repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
  org STRING,                                       -- Github组织ID。
  org_login STRING                                 -- Github组织名,如:apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-01 14:00:00'              -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值
);


-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 


-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';




-- 查看Github新增star数Top 5仓库。
SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num
FROM gh_event WHERE type = 'WatchEvent' 
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name
ORDER BY num DESC
LIMIT 5;

2. 验证 SQL 是否正确并且执行

2581f994b3b96d25c585d76c5fa6cec7.png

3. 配置图表

    a. 选择 Y Bar 并且编辑标题栏为 Top 5

c7721d360e2727ac18df1320d035ef19.png

    b. 配置 group by repo_name, order by num,即根据 repo_name 分组比较数量

a3eff77701fe0546783665b66bf37cfd.png

    c. 实验可以一直运行,不断消费最新的数据。但是如果当前集群的 CPU 数配置的较少,不足以执行两个任务,又想执行下一个实验是,可以将本实验停止。点击结果左侧的红色方框即可。

536ba20334d352ef3e289f5966296276.png


结果

61d3d2cf73378fec3388ce5553f5aeeb.png

第一名:s0md3v/roop 视频换脸(最近我在b站也经常翻到)

第二名:pengzhile/pandora 潘多拉实现了网页版 ChatGPT 的主要操作

第三名:ClassmateLin/dm-ticket 大麦网抢票(疫情放开,估计上周演唱会很多)

第四名:ShishirPatil/gorilla 连接海量 API 的大型语言模型

第五名:iperov/DeepFaceLive 换脸

由此可见最近一周最流行的 repo 就是 ai 视频换脸和大模型,最流行的领域就是 ai


实验 2:统计组织活跃度变化

本实验统计 Apache 和 Alibaba 组织开源在从 24 小时前开始的活跃度趋势变化。

操作

1. SQL 代码如下。其中 startTime 尽量设置为当前此刻的 24 小时前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。如果想要统计 Alibaba, 改成 org_login ='alibaba' 即可

CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的唯一ID。
  created_at BIGINT,                                -- 事件时间,单位秒。
  created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
  type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
  actor_id STRING,                                  -- Github用户ID。
  actor_login STRING,                               -- Github用户名。
  repo_id STRING,                                   -- Github仓库ID。
  repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
  org STRING,                                       -- Github组织ID。
  org_login STRING                                 -- Github组织名,如:apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-07 14:00:00'               -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);


-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';


-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';




-- 从一天前开始统计事件总量
SELECT NOW(), max(created_at_ts) as created_ts, COUNT(*) as event_count
FROM gh_event 
WHERE  org_login ='apache' and
created_at_ts >= NOW() - INTERVAL '1' DAY;

2. 点击执行,并且配置图表

    a. 点击图表配置

42d036208783cf0ff2bf2a38194e8507.png

    b. 编辑标题为"Apache",并且选择 X/Y Line

cc345a6c77490f04bf9580b5e0930f9b.png

    c. 配置 X 轴为 create_ts, y 轴为 event_count

86c604d747a0d429a485c9bf0a4a4b92.png

aa3ab60cb6c0426dd93194ba197ad9d8.png


结果

17cdd625fd7477912106e61f9146bb06.png

54b4b2b1e56e365ff084ab72061814b5.png

Apache 作为全球性的开源组织,一天内的活跃度比较均匀,而阿里巴巴开源基本由国内开发者关注和贡献,夜间增加比较平缓,在 9 点之后明显提升。

实验 3: 统计仓库贡献时间分布情况

本实验统计 flink 和 spark 开源仓库在从一周前前开始的贡献分布情况。贡献包括代码提交、commit 评论、issue 评论、提交 PR 请求、PR 请求的审查评论等与开发者相关的事件。

1. 作业 SQL 代码。其中 startTime 尽量设置为当前此刻的一周前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。如果想要统计 spark, 改成 repo_name = 'apache/flink'' 即可。

CREATE TEMPORARY TABLE gh_event(
    id STRING,                                        -- 每个事件的唯一ID。
    created_at BIGINT,                                -- 事件时间,单位秒。
    created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
    type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
    actor_id STRING,                                  -- Github用户ID。
    actor_login STRING,                               -- Github用户名。
    repo_id STRING,                                   -- Github仓库ID。
    repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
    org STRING,                                       -- Github组织ID。
    org_login STRING                                 -- Github组织名,如:apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-01 14:00:00'              -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);


-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';


-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';




-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 


-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';


-- 统计从上周起的贡献量
SELECT  DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count
FROM gh_event
WHERE created_at_ts >= NOW() - INTERVAL '7' DAY 
        AND repo_name = 'apache/flink'
       AND (type ='CommitCommentEvent' OR 
            type='IssueCommentEvent' or 
            type = 'PullRequestReviewCommentEvent'or 
            type = 'PushEvent' or 
            type = 'PullRequestEvent' or 
            type = 'PullRequestReviewEvent')
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;

2. 点击执行,并且配置图表。选择 Heatmap, 设置 Group by comment_date, Spli By comment_hour,Color为 Sum(comment_count), 即 X 轴为天,Y 周为小时,根据总数量显示颜色深浅。

4a6396b3a257da85219894ec8883e758.png

想要了解更多关于如何在 GitHub 中发现最热门的项目的知识吗?快来尝试一下吧!

8fc5866afd3fdc494fed2d66979b1b13.png

往期精选

bb644773b37689cfcbd2705d41823d91.png

622a14e3626d07c12736893188707d87.jpeg

8928221c3ee34e3b34ab5900f52db507.jpeg

1cf0a60fa4aa8b1ce707cbddf4dc0bc2.jpeg

b457a549bab893f4c913a4d255f6f026.jpeg


▼ 活动推荐▼

fced91f17fd8d84d486a4b4f7c5880e5.png

▼ 关注「Apache Flink」,获取更多技术干货 ▼

2b8f0f4d6bde152d292d2a9aa2010291.png

 86e37ffe32ed624105bab2f03e3a48c5.gif  点击「阅读原文」,即刻入营

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690197.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

USR-C216 WIIF连接手机

复位后连接USR-C216无线 浏览器输入10.10.100.254 账号密码为admin 客户端模式服务器地址无效,默认就行 打开手机网络调试助手选择客户端模式,输入10.10.100.254,端口8899 可以透传了 关于AT指令,先发“”,然后3s内发…

【数据管理架构】什么是 OLTP?

OLTP(在线事务处理)支持在 ATM 和在线银行、收银机和电子商务以及我们每天与之交互的许多其他服务背后进行快速、准确的数据处理。 什么是 OLTP? OLTP 或在线事务处理允许大量人员(通常通过 Internet)实时执行大量数据…

基于Vue+Node.js的宠物领养网站的设计与开发-计算机毕设 附源码83352

基于VueNode.js的宠物领养网站的 摘 要 随着互联网大趋势的到来,社会的方方面面,各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去,而其中最好的方式就是建立网络管理系统,并对其进行信息管理。由于现在网络…

【国产FPGA应用】紫光Pango Design联合 Modelsim 仿真方法

Modelsim 是 FPGA 开发中重要的 EDA 设计仿真工具,主要用于验证数字电路设计是否正确。我们经常用Xilinx的ISE或者Vivado与Modelsim进行联合仿真,其实国产FPGA开发工具也可以与Modelsim进行联合仿真,对于设计比较复杂的应用还是非常方便的&am…

创邻科技与浪潮信息KOS完成澎湃技术认证

近日,浙江创邻科技有限公司(简称:创邻科技)自主研发的Galaxybase图数据库系统与浪潮信息服务器操作系统KOS V5完成澎湃技术认证。创邻科技作为国内首个成熟的商业图数据库供应商,在同类厂商中率先完成认证。测试结果显…

vue3通过render函数实现一个菜单下拉框

背景说明 鼠标移动到产品服务上时,出现标红的下拉框。 使用纯css的方案实现最简单,但是没什么技术含量,弃之;使用第三方组件库,样式定制麻烦弃之。因此,我们使用vue3直接在页面创建一个dom作为下拉框吧。…

【经验分享】Docker容器部署方法说明

前 言 本案例适用开发环境: Windows开发环境:Windows 7 64bit、Windows 10 64bit Linux开发环境:Ubuntu 18.04.4 64bit 虚拟机:VMware15.1.0 Docker是一个开源的应用容器引擎,让开发者可打包他们的应用以及依赖包…

rust持续学习 声明宏

学习记录,都是学自圣经,macrobook啥的 https://doc.rust-lang.org/reference/macros-by-example.html macro_rules! bar {(3) > {println!("3");};(4) > {println!("4");}; }这个是入门例子,有点像match 调用就是…

【Java|多线程与高并发】线程池详解

文章目录 1. 线程池简介2. 创建线程池3. 工厂模式简介4. 线程池的使用5. 实现线程池6. ThreadPoolExecutor的构造方法讲解7. 线程池的线程数量,如何确定? 1. 线程池简介 Java线程池是一种用于管理和重用线程的机制,它可以在需要执行任务时,从线程池中获…

二叉树遍历方法——前、中、后序遍历(java)

二叉树结构: static class TreeNode{public char val;public TreeNode left;public TreeNode right;public TreeNode(char val) {this.val val;}Overridepublic String toString() {return this.val"";}} 一、前序遍历 前序遍历是一种访问二叉树的每一…

【shell脚本】沐风晓月跟你聊聊shell脚本中的case实战

前言 前面我们已经介绍了while及for循环,结合if语句可以构建一些简单的控制面板及菜单脚本,今天我们来探讨下case语句。 case选择语句,主要用于对多个选择条件进行匹配输出,与if elif语句结构类似,通常用于脚本传递输…

阵列模式合成第 I 部分:清零、窗口化和细化(附源码)

一、前言 本示例说明如何使用相控阵系统工具箱解决一些阵列合成问题。在相控阵设计应用中,通常需要找到一种方法来逐渐减小晶片响应,以使最终的阵列阵列模式满足某些性能标准。典型的性能标准包括主瓣位置、零位置和旁瓣电平。 二、使用旁瓣消除器消除干…

两个进程定时通过共享内存进行通信

进程1-client #include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> #include <unistd.h> #include <string.h>#define SHM_SIZE 10 * 1024 * 1024 // 共享内存大小为10M #define WRITE_INTERVAL 1 …

PHP 基础知识

目录 PHP基础 2 PHP代码标记 2 PHP注释 2 PHP语句分隔符 2 PHP变量 3 常量 3 数据类型 4 流程控制 6 文件 7 函数 9 闭包 11 常用系统函数 12 错误处理 13 错误显示设置 15 字符串类型 17 字符串相关函数 19 数组 21 遍历数组 22 数组的相关函数 25 PHP基础 PHP是一种运行在服务…

通过netty源码带你一步步剖析NioEventLoop 的任务队列原理

NioEventLoop 的异步任务队列成员: NioEventLoop 中对newTaskQueue 接口的实现,返回的是JCTools工具包Mpsc队列(多生产者单一消费者无锁队列,(无界和有界都有实现) private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {// newMpscQueue 无界对列,newM…

10万元存款是年轻人的一个“坎”?存款超过10万就会超过53.7%的人?不要焦虑,以过来人的身份帮你分析分析!

&#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Micro麦可乐的博客 &#x1f425;《Docker实操教程》专栏以最新的Centos版本为基础进行Docker实操教程&#xff0c;入门到实战 &#x1f33a;《RabbitMQ》…

ChatGPT最新版实现多样化聚合文章的批量生成文章

随着人工智能技术的不断发展&#xff0c;ChatGPT最新版在多样化聚合文章的批量生成方面取得了重要突破。本文将从随机选取的8个方面&#xff0c;对ChatGPT最新版的构建思想进行详细阐述。这些方面包括&#xff1a;自然语言处理、大规模数据集、迁移学习、多模态输入、生成模型优…

JS将图片转pdf,jspdf的使用

Hi I’m Shendi 最近做转换工具&#xff0c;需要将图片转pdf&#xff0c;这里记录下来 JS将图片转pdf&#xff0c;jspdf的使用 简介 A library to generate PDFs in JavaScript. 一个用JavaScript生成PDF的库。 下载 在网站或github下载 https://parall.ax/products/jspdf …

图像增强之图像锐化(边缘增强)之sobel算子

note matx (-1,0,1;-2,0,2;-1,0,1) maty (-1,-2,-1;0,0,0;1,2,1) code // 图像增强之图像锐化(边缘增强)之sobel算子 void GetSobelMat(Mat& sobelX, Mat& sobelY) {sobelX (Mat_<int>(3,3) << -1,0,1,-2,0,2,-1,0,1);sobelY (Mat_<int>(3,3…

【面试】数据仓库

数据分层 维度建模 (0) 什么是维度建模&#xff1f; 维度建模以分析决策的需求出发构建模型&#xff0c;构建的数据模型为分析需求&#xff08;也就是我们通常所说的数据分析&#xff09;服务。它重点解决如何更快速完成分析需求&#xff0c;同时还有较好的大规模复杂查询的响…