Flink SQL -- 命令行的使用

news2024/9/17 7:34:14
1、启动Flink SQL
首先启动Flink的集群,选择独立集群模式或者是session的模式。此处选择是时session的模式:

yarn-session.sh -d 

在启动Flink SQL的client:
sql-client.sh
2、kafka SQL 连接器
在使用kafka作为数据源的时候需要上传jar包到flnik的lib下:

/usr/local/soft/flink-1.15.2/lib

可以去官网找对应的版本下载上传。

 

1、创建表:

再流上定义表
再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)


CREATE TABLE students (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING    
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)


2、查询数据(连续查询):

select clazz,count(1) as c from students group by clazz;


3、客户端为维护和可视化结果提供了三种的模式:

        1、表格模式(默认使用的模式),(table mode),在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

SET 'sql-client.execution.result-mode' = 'table';

        2、变更日志模式,(changelog mode),不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';

        3、Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

4、 Flink SQL流批一体:
        1、流处理:

                a、流处理即可以处理有界流也可以处理无界流

                b、流处理的输出的结果是连续的结果

                c、流处理的底层是持续流的模型,上游的Task和下游的Task同时启动等待数据的到达

SET 'execution.runtime-mode' = 'streaming'; 
        2、批处理:

                a、批处理只能用于处理有界流

                b、输出的是最终的结果

                c、批处理的底层是MapReduce模型,会先执行上游的Task,在执行下游的Task 

SET 'execution.runtime-mode' = 'batch';
Flink做批处理,读取一个文件:

-- 创建一个有界流的表
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/spark/stu/students.txt',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);


select clazz,count(1) as c from 
students_hdfs
group by clazz
5、Flink SQL的连接器:
        1、kafka SQL 连接器

对于一些参数需要从官网进行了解。

                1、kafka source 

-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

                2、kafka sink 

-- 创建kafka 表
CREATE TABLE students_kafka_sink (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students_sink', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 将查询结果保存到kafka中
insert into students_kafka_sink
select * from students_hdfs;

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink

        3、将更新的流写入到kafka中 

因为在Kafka是一个消息队列,是不会去重的。所以只需要将读取数据的格式改成canal-json。当数据被读取回来还是原来的流模式。

CREATE TABLE clazz_num_kafka (
    clazz STRING,
    num BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'clazz_num', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'canal-json' -- 读取数据的格式
);

-- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
{"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
{"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}


insert into clazz_num_kafka
select clazz,count(1) as num from 
students
group by clazz;


kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
        2、 hdfs SQL 连接器

                1、hdfs source

                        Flink读取文件可以使用有界流的方式,也可以是无界流方式。

-- 有界流
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

select * from students_hdfs_batch;

-- 无界流
-- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大
CREATE TABLE students_hdfs_stream (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv' ,                    -- 必选:文件系统连接器指定 format
    'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流
);


select * from students_hdfs_stream;

                2、hdfs sink

-- 1、批处理模式(使用方式和底层原理和hive类似)
SET 'execution.runtime-mode' = 'batch';

-- 创建表
CREATE TABLE clazz_num_hdfs (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- 将查询结果保存到表中
insert into clazz_num_hdfs
select clazz,count(1) as num
from students_hdfs_batch
group by clazz;


-- 2、流处理模式
SET 'execution.runtime-mode' = 'streaming'; 

-- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式
CREATE TABLE clazz_num_hdfs_canal_json (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num_canal_json',  -- 必选:指定路径
  'format' = 'canal-json'                     -- 必选:文件系统连接器指定 format
);

insert into clazz_num_hdfs_canal_json
select clazz,count(1) as num
from students_hdfs_stream
group by clazz;
3、MySQL SQL 连接器

        1、整合:

# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar

# 2、需要重启flink集群
yarn application -kill [appid]
yarn-session.sh -d

# 3、重新进入sql命令行
sql-client.sh

         2、mysql   source 

-- 有界流
-- flink中表的字段类型和字段名需要和mysql保持一致
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456'
);

select * from students_jdbc;

        3、mysql sink 

-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 创建mysql sink表
CREATE TABLE clazz_num_mysql (
    clazz STRING,
    num BIGINT,
    PRIMARY KEY (clazz) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'clazz_num',
    'username' ='root',
    'password' ='123456'
);

--- 再mysql创建接收表
CREATE TABLE clazz_num (
    clazz varchar(10),
    num BIGINT,
    PRIMARY KEY (clazz) -- 主键
) ;

-- 将sql查询结果实时写入mysql
-- 将更新更改的流写入mysql,flink会自动按照主键更新数据
insert into clazz_num_mysql
select 
clazz,
count(1) as num from 
students_kafka
group by clazz;

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 插入一条数据
        4、DataGen:用于生成随机数据,一般用在高性能测试上
-- 创建包(只能用于source表)
CREATE TABLE students_datagen (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5', -- 每秒随机生成的数据量
    'fields.age.min'='1',
    'fields.age.max'='100',
    'fields.sid.length'='10',
    'fields.name.length'='2',
    'fields.sex.length'='1',
    'fields.clazz.length'='4'
);

        5、print:用于高性能测试 只能用于sink表
CREATE TABLE print_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
     'connector' = 'print'
);

insert into print_table
select * from students_datagen;



结果需要在提交的任务中查看。
        6、BlackHole :是用于高性能测试使用,在后面可以用于Flink的反压的测试。
CREATE TABLE blackhole_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'blackhole'
);

insert into blackhole_table
select * from students_datagen;
6、SQL 语法
        1、Hints:

               用于提示执行,在Flink中可以动态的修改表中的属性,在Spark中可以用于广播。在修改动态表中属性后,不需要在重新建表,就可以读取修改后的需求。

CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表
select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;


-- 有界流
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;
         2、WITH:

                当一段SQL语句在被多次使用的时候,就将通过with给这个SQL起一个别名,类似于封装起来,就是为这个SQL创建一个临时的视图(并不是真正的视图),方便下次使用。

CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;



-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用
with tmp as (
    select * from students_hdfs 
    /*+ OPTIONS('source.monitor-interval' = '5000' )  */
    where clazz='文科一班'
)
select * from tmp
union all
select * from tmp;
        3、DISTINCT:

在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大 状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题

select 
count(distinct sid) 
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;

select 
    count(sid)  
from (
    select 
    distinct *
    from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */
);

注意事项:

       1、 当Flink Client客户端退出来以后,里面创建的动态表就不存在了。这些表结构是元数据,是存储在内存中的。

        2、当在进行where过滤的时候,字符串会出现三种情况:空的字符串、空格字符串、null的字符串,三者是有区别的:

        这三者是不同的概念,在进行where过滤的时候过滤的条件是不同的。

1、过滤空的字符串:
 where s!= ‘空字符串’

2、过滤空格字符串:
 where s!= ‘空格’

3、过滤null字符串:

where s!= null
Flink SQL中常见的函数:

from_unixtime: 
   
    以字符串格式 string 返回数字参数 numberic 的表示形式(默认为 ‘yyyy-MM-dd HH:mm:ss’

to_timestamp:  
    
    将格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为 timestamp




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1194178.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter笔记:光影动画按钮、滚动图标卡片组等

Flutter笔记 scale_design更新:光影动画按钮、滚动图标卡片组 作者:李俊才 (jcLee95):https://blog.csdn.net/qq_28550263 邮箱 :291148484163.com 本文地址:https://blog.csdn.net/qq_28550263…

又一重要合作,创邻科技华为云联营产品正式发布

近日,创邻科技旗下的“Galaxybase高性能图平台”正式入驻华为云云商店联营商品,创邻科技成为华为云在数据库与缓存领域的联营联运合作伙伴。通过联营联运模式,双方合作能够深入产品、生态、解决方案等多个领域,助力各行业用户数字…

农业大棚智能化改造升级与远程视频监管方案,助力智慧农业建设发展

一、需求分析 随着现代化技术的发展,农业大棚的智慧化也成为当前备受关注的智慧农业发展手段。利用先进的信息化手段来对农业大棚进行管理,采集和掌握作物的生长状况、作业监督、生态环境等信息数据,实现精准操作、精细管理,远程…

SMART PLC模拟量上下限报警功能块(梯形图代码)

博途PLC模拟量偏差报警功能块请参考下面的文章链接: 模拟量偏差报警功能块(SCL代码)_RXXW_Dor的博客-CSDN博客文章浏览阅读594次。工业模拟量采集的相关基础知识,可以查看专栏的系列文章,这里不再赘述,常用链接如下:PLC模拟量采集算法数学基础(线性传感器)_plc傳感器數…

Leetcode—637.二叉树的层平均值【简单】

2023每日刷题(二十五) Leetcode—637.二叉树的层平均值 BFS实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/ /*** Note: The returned array mu…

归并分治 归并排序的应用 + 图解 + 笔记

归并分治 前置知识:讲解021-归并排序 归并排序 图解 递归 非递归 笔记-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/134338789?spm1001.2014.3001.5501原理: (1)思考一个问题在大范围上的答案,是否等于&…

c语言break和continue语句用法

作用 break语句:可用于循环结构和开关结构(switch)中,在开关语句中的作用是执行完当前case后立即跳出switch结构。在循环语句中的作用是终止当前层的循环。continue语句:作用是跳过循环体中剩余的语句而强行执行下一次循环。 区别 continue…

golang 库之「依赖注入」

文章目录 1. 写在最前面2. 依赖注入2.1 使用场景2.2 框架对比 3. fx 框架使用场景示例3.1 示例3.2 golang 原生的库3.3 fx 库3.4 对比3.4.1 如上两种实现方式对比3.4.2 关于过度设计3.4.3 感悟 4. 碎碎念5. 参考资料 1. 写在最前面 同事在技术分享的时候用了 golang 的 fx 框架…

chatglm3-6b记录问答对

# 打开文件,第二个参数是打开文件的模式,a代表追加,也就是说,打开这个文件之后直接定位到文件的末尾 file open(chatlog.txt, "a") # 写入数据 file.write(ask:prompt_text\n) file.write(response:response\n) # 关闭文件 fil…

基于SSM的水果网上商城设计与实现

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…

Ubuntu 安装常见问题

1. 安装oh my zsh 搜狗输入法不能用 vim /etc/environmentexport XIM_PROGRAMfcitx export XIMfcitx export GTK_IM_MODULEfcitx export QT_IM_MODULEfcitx export XMODIFIERS“imfcitx” export LANG“zh_CN.UTF-8”配置完后重启,稍等一会,右上角会有个…

35 字段类型不匹配 影响 使用索引?

前言 这是一个经常能够看到的问题, 又或者 经常在面试中碰到 如果 索引字段类型 不匹配, 然后 不会使用索引 这里 我们来看一下 具体的情况 测试表结构如下 CREATE TABLE tz_test (id int(11) unsigned NOT NULL AUTO_INCREMENT,field1 varchar(128) DEFAULT NULL,PRIMA…

接口自动化测试之Fiddler使用教程

一、Fiddler 简介 Fiddler工具介绍 Fiddler是一个通过代理的方式来进行抓包工具,运行时会在本地建立一个代理服务,默认地址:127.0.0.1:8888。Fiddler开启之后,配置本机代理,再打开IE浏览器,IE的PROXY会自…

C语言与C++的区别和联系

C语言和C到底是什么关系? 首先C和C语言本来就是两种不同的编程语言,但C确实是对C语言的扩充和延伸,并且对C语言提供后向兼容的能力。对于有些人说的“C完全就包含了C语言”的说法也并没有错。 C一开始被本贾尼斯特劳斯特卢普(Bja…

身份证读取器手持机 二代证核验手持终端 身份证核查手持机

身份证手持机外观比较小巧,方便携带,支持条码识别、人脸识别、NFC卡刷卡、内置二代证加密模块,可离线采集和识别二代身份证,进行身份识别。信息读取更便捷、安全高效。采用IP65高防护等级,1.5M防摔,可以适应…

SmartBear正式收购Stoplight,并计划在核心API设计、文档和门户产品中集成其功能

不久前,软件开发和可视化工具提供商SmartBear正式宣布收购全球领先的API设计公司Stoplight。这一收购是为了打造业内最全面的API开发平台,为寻求现代化API实践的开发团队提供更好的透明度、自动化与生产力。将Stoplight在API方面的优势(包括治…

Linux驱动应用层与内核层之间的数据传递

摘要 本文将深入探讨在Linux驱动中,应用层与内核层之间数据传递的机制和优化策略。我们将详细解析这一过程中的各个步骤,包括数据从应用层到内核层的传输,以及从内核层返回应用层的过程。此外,我们将提出并评估一系列可能的优化策…

Docker+K8s基础(重要知识点总结)

目录 一、Docker的核心1,Docker引擎2,Docker基础命令3,单个容器运行多个服务进程4,多个容器运行多个服务进程5,备份在容器中运行的数据库6,在宿主机和容器之间共享数据7,在容器之间共享数据8&am…

已解决:TypeError: ‘NoneType‘ object is not callable 问题

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页: 🐅🐾猫头虎的博客🎐《面试题大全专栏》 🦕 文章图文并茂&#x1f996…

GC5958低压三相无刷直流驱动芯片,无感,正弦,低压,PWM调速可替代APX9358/茂达

GC5958提供了无传感器的三相无刷直流电机的速度控制的所有电路。正弦波驱动器的方法将是更好的低噪声。控制器功能包括启动电路、反电动势换向控制。脉宽调制) 速度控制。锁定保护和热关断电路GC5958既适合游戏机器,也适用于需要无声驱动的CPU冷却器。它以DFN3x3-10…