flink left join消费kafka数据

news2024/11/26 2:30:27

left join会产生回车流数据

在控制台数据


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: YSKSolution
 * @Date: 2022/11/8/19:16
 * @Package_name: PACKAGE_NAME
 */
public class LeftJoin extends BaseSQLAPP {

    public static void main(String[] args) {
        new LeftJoin().init(
                2003,
                    2,
                "BaseSQLAPP"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {

        //join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));
        tEnv.executeSql("create table t1 (" +
                " id int, "+
                " name string "+
                ")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv")
        );

        tEnv.executeSql("create table t2 (" +
                " id int, "+
                " age int "+
                ")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv")
        );

        Table table = tEnv.sqlQuery(" select " +
                "t1.id," +
                "t1.name," +
                "t2.age" +
                " from t1 " +
                " left join t2 " +
                " on t1.id = t2.id "
        );

//        tEnv.createTemporaryView("result",table);


        table.execute().print();
    }
}

先输入t1数据
在这里插入图片描述
控制台数据 ,左表数据输出,右表数据为null
在这里插入图片描述
再输入右表数据
在这里插入图片描述
控制台产生两条数据,一条是回撤流,一条是join得到的数据
在这里插入图片描述
2.写入upsertkakfa消费


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: YSKSolution
 * @Date: 2022/11/8/19:16
 * @Package_name: PACKAGE_NAME
 */
public class LeftJoin extends BaseSQLAPP {

    public static void main(String[] args) {
        new LeftJoin().init(
                2003,
                    2,
                "BaseSQLAPP"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {

        //join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));
        tEnv.executeSql("create table t1 (" +
                " id int, "+
                " name string "+
                ")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv")
        );

        tEnv.executeSql("create table t2 (" +
                " id int, "+
                " age int "+
                ")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv")
        );

        Table table = tEnv.sqlQuery(" select " +
                "t1.id," +
                "t1.name," +
                "t2.age" +
                " from t1 " +
                " left join t2 " +
                " on t1.id = t2.id "
        );

//        tEnv.createTemporaryView("result",table);

        tEnv.executeSql("create table t3(" +
                "id int," +
                "name string," +
                "age int," +
                "primary key (id) not enforced"+
                ")"
                +SQLUtil.getUpsertKafkaDDL("t3","json"));

        table.executeInsert("t3");

    }
}

先写左表,消费到的数据如下,右表数据为null
在这里插入图片描述
再写右表,产生两条数据,第一条是null,表示删除上面那条数据,第二条是left join得到的结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1717833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ARM32开发——库与包

🎬 秋野酱:《个人主页》 🔥 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 嵌入式软件通用架构库SPL库HAL库MSP 嵌入式软件通用架构 库 标准外设库(Standard Peripheral Library)和H…

一分钟学习数据安全——自主管理身份SSI基本概念

之前我们已经介绍过数字身份的几种模式。其中,分布式数字身份模式逐渐普及演进的结果就是自主管理身份(SSI,Self-Sovereign Identity)。当一个人能够完全拥有和控制其数字身份,而无需依赖中心化机构,这就是…

《STM32Cube高效开发教程基础篇》- 单片机知识准备

文章目录 正点原子视频P1 单片机介绍P2 Cortex-M系列介绍P3 初识STM32P4 学会查看数据手册P5 最小系统和IO分配晶振电源复位BOOT启动电路下载调试 正点原子视频 视频链接 P1 单片机介绍 P2 Cortex-M系列介绍 P3 初识STM32 P4 学会查看数据手册 P5 最小系统和IO分配 晶振 电源…

外贸怎么开发国外客户呢?

外贸开发国外客户是一个系统性的过程,需要采取多种策略来确保成功。以下是一些清晰、分点表示的方法,用于帮助外贸企业开发国外客户: 深入研究目标市场 了解目标市场的文化、商业习惯、法律法规和消费者需求。通过市场调查和分析&#xff0c…

墨天轮《2023年中国数据库行业年度分析报告》正式发布!

为明晰发展脉络,把握未来趋势,墨天轮于5月29日正式发布 《2023年中国数据库年度行业分析报告》。该报告由墨天轮联合业界专家学者共同编写,共330页,旨在梳理和洞察中国数据库行业的发展趋势、技术创新、市场动态以及面临的挑战&am…

TiKV学习5:TiDB SQL执行流程

目录 1. DML语句读流程概要 2. DML语句写流程概要 3. DDL 流程概要 4. SQL的Parse和Compile 5. 读取的执行 6. 写入的执行 7. DDL的执行 8. 小结 1. DML语句读流程概要 TiDB Server接收sql并处理,TiKV负责持久化数据,PD提供TSO和Region的数据字典…

推荐系统三十六式学习笔记:02|个性化推荐系统有哪些绕不开的经典问题?

目录 推荐系统的问题模式评分预测行为预测 几个常见顽疾1、冷启动问题2、探索与利用问题安全问题 总结 推荐系统的问题模式 推荐系统的使命是为用户和物品建立连接,建立的方式是提前找出哪些隐藏的连接呈现给用户,这是一个预测问题;所以推荐…

Echarts 实现自定义曲线的弧度

文章目录 问题分析问题 分析 在 ECharts 中,可以通过控制数据点的位置来调整曲线的弧度。具体来说,可以通过设置数据项的控制点来调整曲线的形状。ECharts 中的折线图和曲线图都是通过控制点来绘制曲线的,可以通过设置数据项的控制点来调整曲线的弧度。 以下是一个简单的示…

EG2106 原装正品 贴片SOP-8 大功率MOS管栅极驱动芯片耐压600V

EG2106 在电机控制中的应用非常广泛,下面是一些典型的应用案例: 1. 无刷直流电机(BLDC)控制:EG2106 可以用于驱动无刷直流电机的功率MOSFET或IGBT。在无刷电机控制器中,通常会用到H桥电路来控制电机的正…

你认识nginx吗,nginx是做什么的,nginx可以做什么 --2)nginx配置

hello大家今天教大家如何用nginx实验tomcat的负载均衡,同理其他的也可以,如httpd等 首先需要准备一个nginx和tomcat包,这里用到的是版本号为 然后需要准备最少三台linux虚拟机,然后我们开始吧 1.安装tomcat 解包 tar zxf /mnt/…

图数据集的加载

原文参考官方文档: https://pytorch-geometric.readthedocs.io/en/latest/modules/loader.html torch_geometric.loader 库中, 该库中包含了多种 图数据集的 加载方式, 这里主要介绍 DenseDataLoader and DataLoader 这两者之间的区别&#…

2024最新群智能优化算法:人工原生动物优化器(Artificial Protozoa Optimizer ,APO))求解23个函数,MATLAB代码

一、人工原生动物优化器 人工原生动物优化器(Artificial Protozoa Optimizer ,APO)由Xiaopeng Wang等人于2024年提出,其灵感来自自然界中的原生动物。APO 模拟了原生动物的觅食、休眠和繁殖行为。 参考文献 [1]Wang X, Snšel V…

【强化学习】DPO(Direct Preference Optimization)算法学习笔记

【强化学习】DPO(Direct Preference Optimization)算法学习笔记 RLHF与DPO的关系KL散度Bradley-Terry模型DPO算法流程参考文献 RLHF与DPO的关系 DPO(Direct Preference Optimization)和RLHF(Reinforcement Learning f…

哪款桌面便签软件安全好用?2024好用便签app推荐

桌面便签软件已经成为许多人日常生活和工作中不可或缺的工具,它们实用、灵活,能够帮助我们快速记录重要信息,提醒任务事项。随着科技的进步,市面上的便签软件层出不穷,功能也越发强大和实用。在众多的便签软件中&#…

5、css3 自动动画渐变背景

效果例图&#xff1a;&#xff08;因gif图片太大&#xff0c;而csdn只能上传小于5m图片&#xff0c;所以无法上传&#xff09; 1、首先上传html代码&#xff1a; <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8">&l…

TDesign环境搭建-后台模板的安装和使用-构建web管理端后台系统

TDesign环境搭建-后台模板的安装和使用-构建web管理端后台系统 一、安装Nodejs 下载&#xff1a;nodejs 注意&#xff0c;目前前端代码使用的vue3做为脚手架进行开发&#xff0c;需要强制依赖node版本为16及以上 二、模板安装 2.1 安装脚手架&#xff1a;cli nodejs安装好之…

基于51单片机多功能防盗报警proteus仿真( proteus仿真+程序+设计报告+原理图+讲解视频)

基于51单片机多功能防盗报警系统 1. 主要功能&#xff1a;2. 讲解视频&#xff1a;3. 仿真4. 程序代码5. 设计报告6. 原理图7. 设计资料内容清单&&下载链接 基于51单片机多功能防盗报警系统( proteus仿真程序设计报告原理图讲解视频&#xff09; 仿真图proteus8.9及以上…

Linux下的Git应用

1、卸载 2、安装 3、创建并初始化 4、配置 (附加删除语句) 5、查看(tree .git/) 6、增加和提交

关于d3js生成节点画布的个人笔记

实现功能 根据鼠标位置生成节点根据节点位置通过鼠标拖拽生成连线实现自定义线段颜色功能删除节点以及连线功能实现单个节点拖动功能实现整条线路的拖动功能 界面如下&#xff1a; 主要模块介绍 绘制连线 const line svg.selectAll(".line").data(links, d >…