Flink Lookup Join(维表 Join)

news2025/1/16 20:51:50

Lookup Join 定义(支持 Batch\Streaming)

Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

应用场景:

Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来;

实际案例

kafka流表和mysql维表的关联:
使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

mysql端处理:

[root@spop007~]# mysql -uroot -p123456


mysql> create database test;
mysql> CREATE TABLE `user_profile` (
  `user_id` varchar(100) NOT NULL,
  `age` varchar(100) DEFAULT NULL,
  `sex` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO test.user_profile (user_id,age,sex) VALUES
	 ('a','12-18','男'),
	 ('b','18-24','女'),
	 ('c','18-24','男');

mysql>select * from test.user_profile; 

kafka端处理:

# 1.创建Kafka主题 test_k,指定分区数量为1,副本数量为1
kafka-topics.sh \
--create \
--topic test_k \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1


# 2.向 test_k 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_k \
--bootstrap-server localhost:9092

!!!!!这个错误是因为你使用的 Kafka 版本较旧,不支持 --bootstrap-server 参数。旧版本的 Kafka 使用
参数代替 --broker-list 
----------------------------------
./kafka-console-producer.sh \
--topic test_k \
--broker-list localhost:9092
-----------------------------------
#输入完上面脚本,直接粘贴复制json
{"log_id": "1", "timestamp": "1635696063","user_id":"a"}
{"log_id": "2", "timestamp": "1635696180","user_id":"b"}
{"log_id": "3", "timestamp": "1635696300","user_id":"c"}
{"log_id": "4", "timestamp": "1635696360","user_id":"b"}
{"log_id": "5", "timestamp": "1635696420","user_id":"c"}
{"log_id": "6", "timestamp": "1635696420","user_id":"d"}

# 3.创建一个消费者组 group_k1 来消费 test_k 数据
kafka-console-consumer.sh \
--topic test_k \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning


Flinksql代码:

前提:

jdbc的jar包和mysql的驱动包,都需要事先放入$FLINK_HOME/lib目录下。

flink-connector-jdbc-1.15.2.jar

mysql-connector-java-8.0.29.jar
cd $FLINK_HOME/bin
./sql-client.sh    

CREATE TABLE click_log_table (
  log_id BIGINT, 
  `timestamp` bigint,
  user_id string,
  proctime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_k',
    'properties.bootstrap.servers' = '192.168.77.88:9092',
    'properties.group.id' = 'group_k1',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);

 CREATE TABLE user_profile (
  `user_id` string, 
  `age` string,
  `sex` string
)
WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.77.88:3306/test',
   'table-name' = 'user_profile',
   'username'='root',
   'password'='root'
);

SELECT 
    s.log_id as log_id
    , s.`timestamp` as `timestamp`
    , s.user_id as user_id
    , s.proctime as proctime
    , u.sex as sex
    , u.age as age
FROM click_log_table AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id;


查看flinksql输出窗口显示:


               log_id            timestamp                        user_id                proctime                            sex                            age
                    1           1635696063                              a 2024-11-19 00:28:14.40412-18
                    2           1635696180                              b 2024-11-19 00:28:14.40718-24
                    3           1635696300                              c 2024-11-19 00:28:14.40918-24
                    4           1635696360                              b 2024-11-19 00:28:14.41218-24
                    5           1635696420                              c 2024-11-19 00:28:14.42218-24
                    6           1635696420                              d 2024-11-19 00:28:14.424                         (NULL)                         (NULL)

在这里插入图片描述

修改mysql的数据 查看动态表的变化
UPDATE user_profile
SET age = '99-99', sex = 0
WHERE user_id = "a";


kafka端输入:
{"log_id": "11111111111", "timestamp": "1635696063","user_id":"a"}
结果对应下图一

kafka端再输入:
{"log_id": "222222", "timestamp": "1635696063","user_id":"a"}
结果对应下图二

在这里插入图片描述
在这里插入图片描述


删除和新增有空再写


总结: Lookup Join 使用left join关联 ,左表全部输出,右表能匹配上的输出,匹配不上的用null填充。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243937.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Azure Kubernetes Service (AKS)资源优化策略

针对Azure Kubernetes Service (AKS)的资源优化策略,可以从多个维度进行考虑和实施,以提升集群的性能、效率和资源利用率。以下是一些关键的优化策略: 一、 Pod资源请求和限制 设置Pod请求和限制:在YAML清单中为所有Pod设置CPU和…

RabbitMQ1:初识MQ

欢迎来到“雪碧聊技术”CSDN博客! 在这里,您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者,还是具有一定经验的开发者,相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导,我将…

AI 提示词(Prompt)入门 十:最佳实践|详细询问,提供细节!

1、原则解释 当与 ChatGPT 交流时,提供具体和详细的信息非常重要。 这样做可以帮助 ChatGPT 更准确地理解你的需求和上下文,从而生成更相关和有用的回答 明确的信息可以包括具体的问题背景、相关领域的说明、你所期望的答案类型等。 2、如何实践 明…

实验十三 生态安全评价

1 背景及目的 生态安全是生态系统完整性和健康性的整体反映,完整健康的生态系统具有调节气候净化污染、涵养水源、保持水土、防风固沙、减轻灾害、保护生物多样性等功能。维护生态安全对于人类生产、生活、健康及可持续发展至关重要。随着城市化进程的不断推进&…

怎样实现跨部门和跨地区的数据共享?

随着企业规模的扩大和业务的多样化,不同部门和地区之间的数据共享变得越来越重要。实时数据同步作为保证数据准确性和完整性的重要手段,被广泛应用于各行各业。那不同部门和不同地区怎么实现共享数据呢? 一、前期数据准备 前期数据上需要建…

国家工信安全中心:公共数据授权运营平台技术要求(附下载)

2023年11月23日,第二届全球数字贸易博览会“数据要素治理与市场化论坛”于杭州成功召开,国家数据局党组书记、局长刘烈宏,浙江省委常委、常务副省长徐文光出席会议并致辞。会上,国家工业信息安全发展研究中心(以下简称…

C语言数据结构——详细讲解 双链表

从单链表到双链表:数据结构的演进与优化 前言一、单链表回顾二、单链表的局限性三、什么是双链表四、双链表的优势1.双向遍历2.不带头双链表的用途3.带头双链表的用途 五、双链表的操作双链表的插入操作(一)双链表的尾插操作(二&a…

【ArcGISPro】地理配准-影像校正

由于大部分数据安全性,以下是随意下载的图片,仅展示配置操作 地图-地理配准 添加控制点 修改控制点 可以导入、导出、添加和删除控制点 保存 关闭地理配准

ReNamer Pro 7.5 中文绿色便携专业版-文件重命名工具

前言 我们日常生活和工作中所涉及的文件数量日益增多。无论是图片、音频、视频还是各种文档,这些文件在存储、管理和分享时,都需要有一个清晰、有序的文件命名规则。然而,手动重命名大量文件不仅耗时耗力,而且容易出错&#xff0c…

PgSQL即时编译JIT | 第1期 | JIT初识

PgSQL即时编译JIT | 第1期 | JIT初识 JIT是Just-In-Time的缩写,也就是说程序在执行的时候生成可以执行的代码,然后执行它。在介绍JIT之前,需要说下两种执行方式:解释执行和编译执行。其中解释执行是通过解释器,将代码逐…

力扣-Hot100-数组【算法学习day.37】

前言 ###我做这类文档一个重要的目的还是给正在学习的大家提供方向(例如想要掌握基础用法,该刷哪些题?)我的解析也不会做的非常详细,只会提供思路和一些关键点,力扣上的大佬们的题解质量是非常非常高滴&am…

DataStream编程模型之数据源、数据转换、数据输出

Flink之DataStream数据源、数据转换、数据输出(scala) 0.前言–数据源 在进行数据转换之前,需要进行数据读取。 数据读取分为4大部分: (1)内置数据源; 又分为文件数据源; socket…

爬虫开发工具与环境搭建——使用Postman和浏览器开发者工具

第三节:使用Postman和浏览器开发者工具 在网络爬虫开发过程中,我们经常需要对HTTP请求进行测试、分析和调试。Postman和浏览器开发者工具(特别是Network面板和Console面板)是两种最常用的工具,能够帮助开发者有效地捕…

vue2侧边导航栏路由

<template><div><!-- :default-active"$route.path" 和index对应其路径 --><el-menu:default-active"active"class"el-menu-vertical-demo"background-color"#545c64"text-color"#fff"active-text-col…

时代变迁对传统机器人等方向课程的巨大撕裂

2020年之后&#xff0c;全面转型新质课程规划&#xff0c;传统课程规划全部转为经验。 农耕-代表性生产关系-封建分配制度主要生产力-人力工业-代表性生产关系-资本分配制度工业分为机械时代&#xff0c;电气时代&#xff0c;信息时代&#xff1b;主要生产力-人力转为人脑&…

JVM类加载过程-Loading

一、Class对象的生命周期 .class文件是如何加载到内存中:.class文件是ClassLoader通过IO将文件读到内存,再通过双亲委派的模式进行Loading,再Linking、以及Initializing,代码调用等一系列操作后,进行GC,组成完整的生命周期; 二、双亲委派模式(Loading的过程): 1、类…

BERT--公认的里程碑

前言 如果说&#xff0c;让我选Transformer架构的哪个模型最深入人心&#xff0c;我将毫不犹豫的选择BERT&#xff01; BERT 的意义在于&#xff0c;从大量无标记的数据集中训练得到的深度模型&#xff0c;可以限制提高各项自然语言处理任务的准确率。 BERT 在当时&#xff0…

<项目代码>YOLOv8 瞳孔识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

每日OJ题_牛客_天使果冻_递推_C++_Java

目录 牛客_天使果冻_递推 题目解析 C代码 Java代码 牛客_天使果冻_递推 天使果冻 描述&#xff1a; 有 n 个果冻排成一排。第 i 个果冻的美味度是 ai。 天使非常喜欢吃果冻&#xff0c;但她想把最好吃的果冻留到最后收藏。天使想知道前 x个果冻中&#xff0c;美味…

果韵 2.0.1| 听歌神器,双端支持,支持下载歌曲和歌词

果韵是一款支持Windows和安卓双端的音乐播放器&#xff0c;支持自定义音源&#xff0c;界面简洁。用户可以通过缓存下载歌曲和歌词。为了使用这些功能&#xff0c;需要先进行音源导入。通过设置中的存储设置&#xff0c;将缓存文件夹移动到download目录下&#xff0c;之后缓存的…