[实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响

news2024/11/7 17:25:48

table.local-time-zone

  • table.local-time-zone
  • DataStream-to-Table Conversion(拓展知识)
  • 代码测试
  • flinksql代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)

如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)

DataStream-to-Table Conversion(拓展知识)

datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时

        // 水位线 允许乱序
        WatermarkStrategy<String> waterStrategy = WatermarkStrategy.<String>forMonotonousTimestamps() //ofSeconds(20)
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        try {
                            Mybook book= JSON.parseObject(element,Mybook.class);
                            return boo.time+8*60*60*1000  //转成东八区
                        }catch (Exception e){
                            return recordTimestamp;
                        }

                    }
                }).withIdleness(Duration.ofSeconds(timeWindowIdleness));

        SingleOutputStreamOperator<UserSlotGame> processStream = env
                .fromSource(source, waterStrategy, "readKafka")
                .process(new ProcessFunction<String, UserSlotGame>() {
                             @Override
                             public void processElement(String value, Context ctx, Collector<UserSlotGame> out) throws Exception {
                                // 省略

                             }
                    }) ;

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (
  `operation_code` int DEFAULT NULL,
  `update_time` varchar(255) DEFAULT NULL, -- 注意这是字符串
  `product_id` varchar(255) DEFAULT NULL,
  `product_name` varchar(255) DEFAULT NULL,
  `price` float DEFAULT NULL,
  `time_long` bigint NOT NULL DEFAULT '0' -- 注意这是long
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci



INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:01:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:02:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_001', 'scooter', 12.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_002', 'basketball', 19.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(4, '2024-01-01 18:00:00', 'p_001', 'scooter', 12.99, 1730346179000);

flinksql代码

package com.pg.TableAndDataStreamApi;



import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;


/*
*
* */
public class version_table {
    private static final String SOURCE="CREATE TABLE source_table(\n" +
            "\toperation_code int,\n" +
            "\tupdate_time string,\n" +
            "\tup_t AS TO_TIMESTAMP(update_time),\n" +
            "\ttime_long bigint,\n" +
            "\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n" +
            "    ) WITH (\n" +
            "   'connector' = 'jdbc',\n" +
            "   'url' = 'jdbc:mysql://ip:3306/flink',\n" +
            "   'driver'='com.mysql.cj.jdbc.Driver',\n "+
            "   'username'='用户名',\n"+
            "   'password'='密码',\n"+
            "   'table-name' = 'versioned_rates'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(SOURCE);
        Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");

        tableEnv.getConfig().addConfiguration(configuration);
        // 从 MySQL 表中选择所有行
        Table t = tableEnv.sqlQuery("select * from source_table");
        t.execute().print();


    }
}

执行结果截图

TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的

  1. 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
  2. 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2235164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Bypassuac之白名单结合注册表方式

参考 Bypass UAC 原来这么简单 本章记录一下系统白名单文件结合注册表bypassuac&#xff0c;uac这个东西并不是Windows设置的防御机制而是相当于保护机制&#xff0c;只是用来控制用户行为的&#xff0c;弹个窗来提醒一下用户的行为&#xff0c;和直接的杀软是不一样的性质&am…

【力扣打卡系列】单调栈

坚持按题型打卡&刷&梳理力扣算法题系列&#xff0c;语言为go&#xff0c;Day20 单调栈 题目描述 解题思路 单调栈 后进先出 记录的数据加在最上面丢掉数据也先从最上面开始 单调性 记录t[i]之前会先把所有小于等于t[i]的数据丢掉&#xff0c;不可能出现上面大下面小的…

如何通过CDN加速提升电商网站双十一购物节用户体验

随着双十一购物节的到来&#xff0c;电商平台迎来了一年中流量的高峰。各大电商平台如天猫、京东和抖音等纷纷推出了全新的促销活动和玩法。在这场购物狂欢中&#xff0c;用户体验成为了电商网站能否脱颖而出的关键。而CDN&#xff08;内容分发网络&#xff09;加速服务&#x…

Linux信号_信号的产生

信号概念 信号是进程之间事件异步通知的一种方式&#xff0c;属于软中断。 异步&#xff1a;在异步操作中&#xff0c;任务可以独立执行。一个任务的开始或完成不依赖于其他任务的状态。 同步&#xff1a;在同步操作中&#xff0c;任务之间的执行是相互依赖的。一个任务必须等待…

Docker学习—Docker核心概念总结

核心概念总结 容器&#xff1a;容器就是将应用运行所需的所有内容比如代码、运行时环境&#xff0c;进行打包和隔离。 容器和虚拟机的对比 虚拟机是在同一个硬件上虚拟化出多个操作系统&#xff08;OS&#xff09;实例。 容器是在操作系统上进行虚拟化&#xff0c;用于隔离…

51单片机教程(六)- LED流水灯

1 项目分析 基于点亮LED灯、LED灯闪烁&#xff0c;扩展到构成最简单、花样流水灯。 2 技术准备 1 流水灯硬件及原理图 流水灯是由多个LED灯组成的 2 C语言知识点 数组 数组声明&#xff1a;长度不可变 数据类型 数组名称[长度n] // 整数型默认为0&#xff0c;小数型默认…

供热的一些基础技术数据

1、应该了解的几个实用数据:(1)室内采暖达标温度182℃(2)建筑面积采暖热负荷 4060kcal/h㎡(4570W/㎡)(3)建筑面积采暖所需合理流量 2.53.5kg/h㎡(节能建筑12 kg/h㎡)(4)一次网严寒期外网总供、回水温度5570℃(5)热网的补水量应小于热网循环量的1%(6)1蒸吨的热量可供11.5 万平方…

【1个月速成Java】基于Android平台开发个人记账app学习日记——第7天,申请阿里云SMS短信服务SDK

系列专栏链接如下&#xff0c;方便跟进&#xff1a; https://blog.csdn.net/weixin_62588253/category_12821860.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12821860&sharereferPC&sharesourceweixin_62588253&sharefromfrom_link 同时篇幅…

A02、JVM性能监测调优

1、JVM内存模型 1.1、介绍 JVM 自动内存分配管理机制的好处很多&#xff0c;但实则是把双刃剑。这个机制在提升 Java 开发效率的同时&#xff0c;也容易使 Java 开发人员过度依赖于自动化&#xff0c;弱化对内存的管理能力&#xff0c;这样系统就很容易发生 JVM 的堆内存异常&…

钉钉调试微应用整理2

第一步 新建应用 钉钉开放平台](https://open-dev.dingtalk.com/) 去新增应用 第二步 配置应用信息 把本地代码运行起来&#xff0c;并设置本地地址 第三步 在本地代码添加调试命令 这里有2中添加方式 哪一种都可以 方式一&#xff1a; index.html页面中 <!DOCTYPE h…

《TCP/IP网络编程》学习笔记 | Chapter 3:地址族与数据序列

《TCP/IP网络编程》学习笔记 | Chapter 3&#xff1a;地址族与数据序列 《TCP/IP网络编程》学习笔记 | Chapter 3&#xff1a;地址族与数据序列分配给套接字的IP地址和端口号网络地址网络地址分类和主机地址边界用于区分套接字的端口号数据传输过程示例 地址信息的表示表示IPv4…

飞牛fnOs内网穿透-使用Lucky实现ipv6动态解析+HTTPS访问NAS服务

&#x1f9ed;Lucky官方介绍 Lucky最初是作为一个小工具&#xff0c;由开发者为自己的个人使用而开发&#xff0c;用于替代socat&#xff0c;在小米路由AX6000官方系统上实现公网IPv6转内网IPv4的功能。Lucky的设计始终致力于让更多的Linux嵌入式设备运行&#xff0c;以实现或…

《安富莱嵌入式周报》第345期:开源蓝牙游戏手柄,USB3.0 HUB带电压电流测量,LCR电桥前端模拟,开源微型赛车,RF信号扫描仪,开源无线电收发器

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 本周更新一期视频教程 第5期&#xff1a;RTX5/FreeRTOS全家桶源码工程综合实战模板集成CANopen组件&#xff08;2024-1…

微服务中常用分布式锁原理及执行流程

1.什么是分布式锁 分布式锁是一种在分布式系统环境下实现的锁机制&#xff0c;它主要用于解决&#xff0c;多个分布式节点之间对共享资源的互斥访问问题&#xff0c;确保在分布式系统中&#xff0c;即使存在有多个不同节点上的进程或线程&#xff0c;同一时刻也只有一个节点可…

三:LoadBalancer负载均衡服务调用

LoadBalancer负载均衡服务调用 1.LB负载均衡(Load Balance)是什么2.loadbalancer本地负载均衡客户端 与 Nginx服务端负载均衡区别3.实现loadbalancer负载均衡实例3-1.首先应模拟启动多个服务提供者应用实例&#xff1a;3-2.在服务消费项目引入LoadBalancer3-3&#xff1a;测试用…

简单入门Git

Git作用 Git简介 作用&#xff1a;版本控制多人协作 集中式 典型代表&#xff1a;SVN 特点&#xff1a;所有的版本库都存在中央服务器&#xff0c;本地备份动作必须依赖中央服务器&#xff0c;如果一旦服务器挂掉&#xff0c;或者网络状况不好&#xff0c;没法提交版本。…

解决echarts桑基图为0时tooltip不显示的问题

关键代码 formatter: function (params) {console.log("params",params)if (params.value 0) {// 如果值为0&#xff0c;返回空字符串&#xff0c;不显示任何内容return params.name : params.value;// return ;} else {// 否则返回标准的格式化信息return par…

DevOps业务价值流:版本规划的最佳实践

初入公司&#xff0c;面对瀑布研发模式下的冗长周期与频繁返工&#xff0c;我率先尝试局部敏捷迭代&#xff0c;但成效有限。随后&#xff0c;推动全面敏捷化&#xff0c;从需求阶段即开始规划&#xff0c;虽方向正确&#xff0c;却遭遇版本规划难题。项目经理与产品经理对敏捷…

NewStar CTF 2024 misc WP

decompress 压缩包套娃&#xff0c;一直解到最后一层&#xff0c;将文件提取出来 提示给出了一个正则&#xff0c;按照正则爆破密码&#xff0c;一共五位&#xff0c;第四位是数字 ^([a-z]){3}\d[a-z]$ 一共就五位数&#xff0c;直接ARCHPR爆破&#xff0c;得到密码 xtr4m&…

2020年美国总统大选数据分析与模型预测

数据集取自&#xff1a;2020年&#x1f1fa;&#x1f1f8;&#x1f1fa;&#x1f1f8;美国大选数据集 - Heywhale.com 前言 对2020年美国总统大选数据的深入分析&#xff0c;提供各州和县层面的投票情况及选民行为的可视化展示。数据预处理阶段将涉及对异常值的处理&#xff0…