状态的一致性和FlinkSQL

news2025/1/12 18:07:11

状态一致性

一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次,但是结果只有一个。
三个级别:

  1. 最多一次:1次或0次,有可能丢数据
  2. 至少一次:1次或n次,出错可能会重试
    • 输入端只要可以做到数据重放,即在出错后,可以重新发送一样的数据
  3. 精确一次:数据只会发送1次
    • 幂等写入:多次重复操作不影响结果,有可能出现某个值由于数据重放,导致结果回到原先的值,然后逐渐恢复。
    • 预写日志:
      1. 先把结果数据作为日志状态保存起来
      2. 进行检查点保存时,也会将这些结果数据一并做持久化存储
      3. 在收到检查点完成的通知时,将所有结果数据一次性写入外部系统
    • 预写日志缺点:这种再次确认的方式,如果写入成功返回的ack出现故障,还是会出现数据重复。
    • 两阶段提交(2PC):数据写入过程和数据提交分为两个过程,如果写入过程没有发生异常,就将事务进行提交。
      • 算子节点在收到第一个数据时,就开启一个事务,然后提交数据,在下一个检查点到达前都是预写入,如果下一个检查点正常,再进行最终提交。
      • 对外部系统有一定的要求,要能够识别事务ID,事务的重复提交应该是无效的。
      • 即barrier到来时,如果结果一致,就提交事务,否则进行事务回滚

Flink和Kafka连接时的精确一次保证

  • 开启检查点
  • 开启事务隔离级别,读已提交
  • 注意设置kafka超时时间为10分钟
public class Flink02_KafkaToFlink {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        //开启检查点
        env.enableCheckpointing(1000L);

        //kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092")
                .setGroupId("flinkb")
                .setTopics("topicA")
                //优先使用消费者组 记录的Offset进行消费,如果offset不存在,根据策略进行重置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                //如果还有别的配置需要指定,统一使用通用方法
                .setProperty("isolation.level", "read_committed")
                .build();

        DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");

        //处理过程


        //kafka Sink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("first")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )

                //语义
                //AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作
                //EXACTLY_ONCE:精确一次
                //kafka transaction timeout is larger than broker
                //kafka超时时间:1H
                //broker超时时间:15分钟

//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障
                .setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"60*1000*10")//10分钟
                .build();

        ds.map(
                JSON::toJSONString
        ).sinkTo(kafkaSink);//写入到kafka 生产者

        ds.sinkTo(kafkaSink);

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

FlinkSQL1.17

FlinkSQL不同版本的接口仍在变化,有变动查看官网。
在官网这个位置可以查看Flink对于以来的一些官方介绍。
在这里插入图片描述
Table依赖剖析
三个依赖:
1. flink-table-api-java-uber-1.17.2.jar (所有的Java API)
2. flink-table-runtime-1.17.2.jar (包含Table运行时)
3. flink-table-planner-loader-1.17.2.jar (查询计划器,即SQL解析器)

静态导包:在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法,否则 $ 方法前面都要添加Expressions的类名前缀

table.where($("vc").isGreaterOrEqual(100))
                .select($("id"),$("vc"),$("ts"))
                .execute()
                .print();

程序架构

  1. 准备环境
    • 流表环境:基于流创建表环境
    • 表环境:从操作层面与流独立,底层处理还是流
  2. 创建表
    • 基于流:将流转换为表
    • 连接器表
  3. 转换处理
    • 基于Table对象,使用API进行处理
    • 基于SQL的方式,直接写SQL处理
  4. 输出
    • 基于Table对象或连接器表,输出结果
    • 表转换为流,基于流的方式输出

流处理中的表

  • 处理的数据对象
    • 关系:字段元组的有界集合
    • 流处理:字段元组的无限序列
  • 对数据的访问
    • 关系:可以得到完整的
    • 流处理:数据是动态的

因此处理过程中的表是动态表,必须要持续查询。

流表转换

持续查询

  • 追加查询:窗口查询的结果通过追加的方式添加到表的末尾,使用toDataStream
  • 更新查询:窗口查询的结果会对原有的结果进行修改, 使用toChangeLogStream
  • 如果不清楚是什么类型,直接使用toChangeLogSteam()将表转换为流
public class Flink04_TableToStreamQQ {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        line -> {
                            String[] fields = line.split(",");
                            return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                        }
                );

        Table table = tableEnv.fromDataStream(ds);

        tableEnv.createTemporaryView("t1", table);

        //SQL
        String appendSQL = "select user, url, ts from t1 where user <> 'zhangsan'";
        //需要在查询过程中更新上一次的值
        String updateSQL = "select user, count(*) cnt from t1 group by user";

        Table resultTable = tableEnv.sqlQuery(updateSQL);

        //表转换为流
        //doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user], select=[user, COUNT(*) AS cnt])
//        DataStream<Row> rowDs = tableEnv.toDataStream(resultTable);

        //有更新操作时,使用toChangelogStream(),它即支持追加,也支持更新查询
        DataStream<Row> rowDs = tableEnv.toChangelogStream(resultTable);

        rowDs.print();

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

将动态表转换为流

  • 仅追加流:如果表的结果都是追加查询
  • Retract撤回流:
    • 包含两类消息,添加消息和撤回消息
    • 下游需要根据这两类消息进行处理
  • 更新插入流:
    • 两种消息:更新插入消息(带key)和删除消息

连接器

  • DataGen和Print连接器
public class Flink01_DataGenPrint {
    public static void main(String[] args) {
        //TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
        //1. 准备表环境, 基于流环境,创建表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        //DataGen
        String createTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT " +
                        " ) WITH (" +
                        "  'connector' = 'datagen' ,"  +
                        "  'rows-per-second' = '1' ," +
                        "  'fields.id.kind' = 'random' , " +
                        "  'fields.id.length' = '6' ," +
                        "  'fields.vc.kind' = 'random' , " +
                        "  'fields.vc.min' = '100' , " +
                        "  'fields.vc.max' = '1000' ," +
                        "  'fields.ts.kind' = 'sequence' , " +
                        "  'fields.ts.start' = '1000000' , " +
                        "  'fields.ts.end' = '100000000' " +
                        " )" ;


        tableEnv.executeSql(createTable);

        //Table resultTable = tableEnv.sqlQuery("select * from t1 where vc >= 200");
        //.execute().print();

        //print
        String sinkTable =
                "create table t2(" +
                        "id string," +
                        "vc int," +
                        "ts bigint" +
                        ") with (" +
                        "   'connector' = 'print', " +
                        "   'print-identifier' = 'print>' " +
                       ")";
        tableEnv.executeSql(sinkTable);
        tableEnv.executeSql("insert into t2 select id, vc, ts from t1 where vc >= 200");
    }
}
  • 文件连接器
public class Flink02_FileConnector {
    public static void main(String[] args) {
        TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //FileSource
        String sourceTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        //"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
                        "  `file.size` bigint not null METADATA" +
                        " ) WITH (" +
                        "  'connector' = 'filesystem' ,"  +
                        "  'path' = 'input/ws.txt' ,"  +
                        "  'format' = 'csv' "  +
                        " )" ;

        tableEnvironment.executeSql(sourceTable);

        //tableEnvironment.sqlQuery(" select * from t1 ").execute().print();

        //转换处理...

        //File sink
        String sinkTable =
                " create table t2 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        //"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
                        "  file_size bigint" +
                        " ) WITH (" +
                        "  'connector' = 'filesystem' ,"  +
                        "  'path' = 'output' ,"  +
                        "  'format' = 'json' "  +
                        " )" ;

        tableEnvironment.executeSql(sinkTable);

        tableEnvironment.executeSql("insert into t2 " +
                "select id, vc, ts, `file.size` from t1");
    }
}
  • kafka连接器
public class Flink03_KafkaConnector {
    public static void main(String[] args) {
        TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //kafka source
        String sourceTable =
                " create table t1 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        "  `topic` string not null METADATA," +
                        "  `partition` int not null METADATA," +
                        "  `offset` bigint not null METADATA" +
                        " ) WITH (" +
                        "  'connector' = 'kafka' ,"  +
                        "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +
                        "  'topic' = 'topicA', "  +
                        "  'properties.group.id' = 'flinksql', "  +
                        "  'value.format' = 'csv', "  +
                        "  'scan.startup.mode' = 'group-offsets',"  +
                        "  'properties.auto.offset.reset' = 'latest' "  +
                        " )" ;

        //创建表
        tableEnvironment.executeSql(sourceTable);

        //打印查询结果
        //tableEnvironment.sqlQuery(" select * from t1 ").execute().print();

        //转换处理...

        //kafka Sink
        String sinkTable =
                " create table t2 ( " +
                        "  id STRING , " +
                        "  vc INT ," +
                        "  ts BIGINT," +
                        "  `topic` string " +
                        " ) WITH (" +
                        "  'connector' = 'kafka' ,"  +
                        "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +
                        "  'topic' = 'topicB', "  +
                        "  'sink.delivery-guarantee' = 'at-least-once', "  +
                        //"  'properties.transaction.timeout.ms' = '', "  +
                        //"  'sink.transactional-id-prefix' = 'xf', "  +
                        //"  'properties.group.id' = 'flinksql', "  +
                        "  'value.format' = 'json' "  +
                        //"  'scan.startup.mode' = 'group-offsets',"  +
                        //"  'properties.auto.offset.reset' = 'latest' "  +
                        " )" ;

        tableEnvironment.executeSql(sinkTable);

        tableEnvironment.executeSql("insert into t2 " +
                "select id, vc, ts, `topic` from t1");


    }
}
  • Jdbc连接器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1313839.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么选择合适的3ds Max云渲染农场?

3ds Max 用户日常面临的一个共同挑战便是漫长的渲染周期。作为一个强大的三维建模和渲染软件&#xff0c;3ds Max 势必需处理大量的光照、材质和阴影计算任务&#xff0c;因此&#xff0c;良好的渲染方案对从业者而言尤为重口。 一、为何考虑3ds Max云渲染? 云渲染成为了解决…

mysql中DML数据操作的增 修 删

现有如下的表 DML 数据操作 增(四种方法) 修 删 -- 增 INSERT INTO student (NAME,birthday,phone,height,reg_time)VALUES(詹姆斯,1985-2-3,15222233,1.98,NOW()) INSERT INTO student SET NAME科比 ,birthday19863,phone15622333 -- 插入多个 INSERT INTO student (NAME…

使用C++11开发一个半同步半异步线程池

半同步半异步线程池介绍 在处理大量并发任务的时候&#xff0c;如果按照传统的方式&#xff0c;一个请求一个线程来处理请求任务&#xff0c;大量的线程创建和销毁将消耗过多的系统资源&#xff0c;还增加了线程上下文切换的开销&#xff0c;而通过线程池技术就可以很好的解决这…

命令执行 [BUUCTF 2018]Online Tool1

打开题目 我们代码审计一下 if (isset($_SERVER[HTTP_X_FORWARDED_FOR])) { $_SERVER[REMOTE_ADDR] $_SERVER[HTTP_X_FORWARDED_FOR]; } 如果存在xxf头且不为空&#xff0c;则将xxf头内容&#xff08;真实的客户端ip&#xff09;赋给ROMOTE_ADDR&#xff08;代理服务器传过…

[pluginviteimport-analysis] vite 提示jsx语法报错

参考文章 https://segmentfault.com/q/1010000043499356https://blog.csdn.net/kkkkkkgg/article/details/131168224 报错内容 内容类似如下&#xff1a; 03:16:26 [vite] Internal server error: Failed to parse source for import analysis because the content contains…

上海亚商投顾:沪指收复3000点,房地产板块集体走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日窄幅震荡&#xff0c;创业板指走势较弱&#xff0c;科创50指数跌近1%。房地产板块集体走强&#xff0…

Linux服务器开发太麻烦? 试试IntelliJ IDEA公网远程访问开发极大提升开发效率

文章目录 1. 检查Linux SSH服务2. 本地连接测试3. Linux 安装Cpolar4. 创建远程连接公网地址5. 公网远程连接测试6. 固定连接公网地址7. 固定地址连接测试 本文主要介绍如何在IDEA中设置远程连接服务器开发环境&#xff0c;并结合Cpolar内网穿透工具实现无公网远程连接&#xf…

C语言刷题数组------数组交换

输入一个长度为 10的整数数组 X[10]&#xff0c;将里面的非正整数全部替换为 1&#xff0c;输出替换完成后的数组。 输入格式 输入包含 10个整数&#xff0c;每个整数占一行。输出格式 输出新数组中的所有元素&#xff0c;每个元素占一行。输出格式为 X[i] x&#xff0c;其中…

spring面试:二、bean的生命周期和循环引入问题(三级缓存、@Lazy)

bean的生命周期 Spring容器在进行实例化时&#xff0c;会将xml配置的的信息封装成一个BeanDefinition对象&#xff0c;Spring根据BeanDefinition来创建Bean对象&#xff0c;里面有很多的属性用来描述Bean。 其中比较重要的是&#xff1a; beanClassName&#xff1a;bean 的类…

焊盘:十字连接VS全覆盖 铺铜

在铺铜规则中&#xff0c;焊盘连接方式有两种&#xff1a; 十字连接 优点&#xff1a;较好焊接&#xff1a;因铺铜面积减少&#xff0c;温度下降速度降低&#xff0c;较好焊接&#xff0c;不易虚焊。 缺点&#xff1a;载流能力较弱&#xff1a;铺铜面积↓ → 载流能力↓全连接…

python如何发送企业微信群消息

一、创建机器人&#xff0c;并获取webhook 1.1 进入企业微信中&#xff0c;添加群机器人&#xff0c;添加完成后可以获取到一个webhook的地址 1.2 群机器人企业微信接口的调用可以参考这个文件 https://developer.work.weixin.qq.com/document/path/99110#%E5%A6%82%E4%BD%…

问答区故意在结题前回答混赏金的狗

此贴专记录CSDN问答社区里面&#xff0c;一些回答者在临近结题时胡乱回答&#xff0c;只为分取结题赏金的人。 所有图片均为事实&#xff0c;绝无半点虚假。各位看官可以自行搜索问题题目或者通过查看此人回答求证 所有图片均为事实&#xff0c;绝无半点虚假。各位看官可以自行…

服务器数据恢复-EqualLogic PS存储硬盘坏道导致存储不可用的数据恢复案例

服务器数据恢复环境&#xff1a; 一台DELL EqualLogic PS系列存储&#xff0c;存储中有一组由16块SAS硬盘组成的RAID5。上层是VMFS文件系统&#xff0c;存放虚拟机文件。存储上层分了4个卷。 服务器故障&检测&#xff1a; 存储上有2个硬盘指示灯显示黄色&#xff0c;磁盘出…

学习Linux(3)-Linux软件安装之yum

什么是yum yum&#xff08; Yellow dog Updater, Modified&#xff09;是一个在 Fedora 和 RedHat 以及 SUSE 中的 Shell 前端软件包管理器。 假设&#xff0c;在一台window系统的电脑上要用qq&#xff0c;那么我们回去下载qq的安装包&#xff0c;然后执行qq.exe文件在本机上进…

csrf和ssrf的区别,攻击如何防护

CSRF&#xff08;跨站请求伪造&#xff09;和SSRF&#xff08;服务器端请求伪造&#xff09;都是网络安全中的常见攻击类型&#xff0c;但它们的目标和攻击方式有所不同。理解这两种攻击的区别对于有效地防御它们至关重要。 CSRF和SSRF的主要区别在于攻击的发起者和目标。CSRF…

Linux-----3、物理机安装Linux

# 物理机安装Linux # 系统镜像获取 http://isoredirect.centos.org/centos/7/isos/ 例如&#xff1a; CentOS7.9.2009 arch (opens new window) 阿里云镜像 CentOS7.9.2009 x86 (opens new window) # 华为Atlas 500pro 表 2-1 系统版本及适配信息 名称内容操作系统型号CentO…

Ubuntu18.04.6下安装opencv库及OpenCV安装libjasper-dev依赖包错误

目录 01 解压安装包 02 安装cmake和依赖库 03 配置编译环境 01 解压安装包 创建一个名为Opencv的文件夹 mkdir opencv 将源码的压缩包复制到opencv目录下 将压缩包解压到opencv文件夹&#xff08;指定一个文件夹&#xff09; unzip opencv-3.4.11.zip -d opencv02 安装cm…

解决nuxt3子路由router-view中出现的document not defined错误

之前讲过几种解决document not defined错误的方法&#xff0c;但是今天碰到一种新情况&#xff1a; 就是访问根路由/ , 然后再跳转到子路由没有问题: 但是如果直接访问子路由时router-view会报这个错误。 我怀疑原因是&#xff1a; 直接访问子路由时&#xff0c;有可能dom树还…

Tableau快速入门-下载安装加载数据与仪表盘构建

官网介绍 官网连接如下&#xff1a; https://www.tableau.com/zh-cn tableau的产品包括如下&#xff1a; 参考:https://zhuanlan.zhihu.com/p/341882097 Tableau是功能强大、灵活且安全些很高的端到端的数据分析平台&#xff0c;它提供了从数据准备、连接、分析、协作到查阅…

【Matlab】三角函数的周期性图像可视化(附完整MATLAB代码)

三角函数的周期性图像可视化 前言三角函数:MATLAB对三角函数的理解和帮助: 正文思考步骤 代码实现结果 前言 三角函数: 三角函数是数学中一类描述角度和周期性变化的特殊函数。常见的三角函数包括正弦函数 ( sin ⁡ ) (\sin ) (sin) &#xff0c;余弦函数 ( cos ⁡ ) (\cos…