flink1.17 eventWindow不要配置processTrigger

news2024/9/25 9:37:18

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTrigger

import com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Seconds


object flinkEventWindowAndProcessTriggerBUGLearn {
  def main(args: Array[String]): Unit = {


    // flink 启动本地webui
    val conf = new Configuration
    conf.setInteger(RestOptions.PORT, 28080)

    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.configure(conf)


    /*
    kafka输入:
        a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)
        a,1,7200000               // 1970-01-1 10:00:00
     */
    val brokers = "172.18.105.147:9092"
    val source = KafkaSource.builder[String].setBootstrapServers(brokers)
      .setTopics("t1")
      .setGroupId("my-group-23asdf46")
      .setStartingOffsets(OffsetsInitializer.latest())
      // .setDeserializer() // 参数: KafkaRecordDeserializationSchema
      .setDeserializer(new M1())
      .build()


    val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")


    val s1 = ds1
      .map(_.split(","))
      .map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳
      .assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0)))
      .keyBy(_.f1)
      .window(TumblingEventTimeWindows.of(seconds(10)))
      .trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L)))
      .reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))


    s1.print()


    env.execute("KafkaNewSourceAPi")
  }

  // 乱序流
  class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {
    override def extractTimestamp(element: C1): Long = {
      element.f3
    }
  }


  // key num timestamp
  case class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkLocalDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>FlinkLocalDemo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.8</scala.version>
    </properties>



    <dependencies>
        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.12.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.33</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
<!--            <version>1.2.17</version>-->
        </dependency>


        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入flink1.13.0 scala2.12.12   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Either... -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- or... -->

<!--        下面几个是代码中写sql需要的包 四个中一个都不能少 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
<!--            <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>


<!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--            <scope>provided</scope>-->
        <!--        </dependency>-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>


    </dependencies>
    <build>
            <plugins>
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <!--这部分可有可无,加上的话则直接生成可运行jar包-->
                    <!--<archive>-->
                    <!--<manifest>-->
                    <!--<mainClass>${exec.mainClass}</mainClass>-->
                    <!--</manifest>-->
                    <!--</archive>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
        </plugins>
    </build>
</project>















 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/838907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.net 6 efcore一个model映射到多张表(非使用IEntityTypeConfiguration)

现在有两张表&#xff0c;结构一模一样&#xff0c;我又不想创建两个一模一样的model&#xff0c;就想一个model映射到两张表 废话不多说直接上代码 安装依赖包 创建model namespace oneModelMultiTable.Model {public class Test{public int id { get; set; }public string…

【C语言进阶】数据的存储----浮点型篇

&#x1f341; 博客主页:江池俊的博客 &#x1f4ab;收录专栏&#xff1a;C语言—探索高效编程的基石 &#x1f4bb; 其他专栏&#xff1a;数据结构探索 ​&#x1f4a1;代码仓库&#xff1a;江池俊的代码仓库 &#x1f3aa; 社区&#xff1a;GeekHub &#x1f341; 如果觉得博…

部分常用CSS样式

目录 1.字体样式 2.文本样式 3.鼠标样式 cursor 4.背景样式 5.列表样式 6.CSS伪类 7.盒子模型 1.字体样式 font-family 字体类型&#xff1a;隶书” “楷体” font-size 字体大小&#xff1a;像素px font-weight 字体粗细&#xff1a;bold 定义粗体字…

8月5日上课内容 nginx的优化和防盗链

全部都是面试题 nginx的优化和防盗链 重点就是优化&#xff1a; 每一个点都是面试题&#xff0c;非常重要&#xff0c;都是面试题 1、隐藏版本号&#xff08;重点&#xff0c;一定要会&#xff09; 备份 cp nginx.conf nginx.conf.bak.2023.0805 方法一&#xff1a;修改配…

拦截器在SpringBoot中使用,HandlerInterceptor,WebMvcConfigurer

拦截器在Controller之前执行。 用于权限校验&#xff0c;日志记录&#xff0c;性能监控 在SpringBoot中使用 创建拦截器类&#xff1a;首先&#xff0c;创建一个Java类来实现拦截器逻辑。拦截器类应该实现Spring提供的HandlerInterceptor接口。实现拦截器方法&#xff1a;拦…

探索PostgreSQL的新功能:最新版本更新解析

PostgreSQL作为一种强大而开源的关系型数据库管理系统&#xff0c;不断在不断进化和改进。每一次的版本更新都带来了更多功能和改进&#xff0c;让用户在处理大规模数据和复杂查询时体验更好的性能和功能。在本文中&#xff0c;我们将深入探索PostgreSQL的最新版本更新&#xf…

进程上下文切换以及应用场景

各个进程之间是共享 CPU 资源的&#xff0c;在不同的时候进程之间需要切换&#xff0c;让不同的进程可以在 CPU 执行&#xff0c;那么这个一个进程切换到另一个进程运行&#xff0c;称为进程的上下文切换。 在详细说进程上下文切换前&#xff0c;我们先来看看 CPU 上下文切换 大…

VX-API-Gateway开源网关技术的使用记录

VX-API-Gateway开源网关技术的使用记录 官网地址 https://mirren.gitee.io/vx-api-gateway-doc/ VX-API-Gateway(以下称为VX-API)是基于Vert.x (java)开发的 API网关, 是一个分布式、全异步、高性能、可扩展、轻量级的可视化配置的API网关服务官网下载程序zip包 访问 https:/…

深入浅出 Typescript

TypeScript 是 JavaScript 的一个超集&#xff0c;支持 ECMAScript 6 标准&#xff08;ES6 教程&#xff09;。 TypeScript 由微软开发的自由和开源的编程语言。 TypeScript 设计目标是开发大型应用&#xff0c;它可以编译成纯 JavaScript&#xff0c;编译出来的 JavaScript …

AtcoderABC226场

A - Round decimalsA - Round decimals 题目大意 给定一个实数X&#xff0c;它最多可以使用三位小数表示&#xff0c;而且X的小数点后有三位小数。将X四舍五入到最接近的整数并打印结果。 思路分析 可以使用round函数进行四舍五入 知识点 round(x) 是一个用来对数字进行四…

SpringIoc-个人学习笔记

Spring的Ioc、DI、AOP思想 Ioc Ioc思想&#xff1a;Inversion of Control&#xff0c;控制反转&#xff0c;在创建Bean的权利反转给第三方 DI DI思想&#xff1a;Dependency Injection&#xff0c;依赖注入&#xff0c;强调Bean之间的关系&#xff0c;这种关系由第三方负责去设…

Redis 报错 RedisConnectionException: Unable to connect to x.x.x.x:6379

文章目录 Redis报错类型可能解决方案 Redis报错类型 org.springframework.data.redis.connection. spingboot调用redis出错 PoolException: Could not get a resource from the pool; 连接池异常:无法从池中获取资源; nested exception is io.lettuce.core. 嵌套异常 RedisConn…

针对高可靠性和高性能优化的1200V碳化硅沟道MOSFET

目录 标题&#xff1a;1200V SiC Trench-MOSFET Optimized for High Reliability and High Performance摘要信息解释研究了什么文章创新点文章的研究方法文章的结论 标题&#xff1a;1200V SiC Trench-MOSFET Optimized for High Reliability and High Performance 摘要 本文详…

FPGA----UltraScale+系列的PS侧与PL侧通过AXI-HP交互(全网唯一最详)附带AXI4协议校验IP使用方法

1、之前写过一篇关于ZYNQ系列通用的PS侧与PL侧通过AXI-HP通道的文档&#xff0c;下面是链接。 FPGA----ZCU106基于axi-hp通道的pl与ps数据交互&#xff08;全网唯一最详&#xff09;_zcu106调试_发光的沙子的博客-CSDN博客大家好&#xff0c;今天给大家带来的内容是&#xff0…

获取k8s scale资源对象的命令

kubectl get --raw /apis/<apiGroup>/<apiVersion>/namespaces/<namespaceName>/<resourceKind>/<resourceName>/scale 说明&#xff1a;scale资源对象用来水平扩展k8s资源对象的副本数&#xff0c;它是作为一种k8s资源对象的子资源存在&#xf…

This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in...错误解决

在创建函数的时候报错如下&#xff1a; 解决&#xff1a; 设置如下参数即可 SET GLOBAL log_bin_trust_function_creatorsTRUE;

[CKA]考试之PersistentVolumeClaims

由于最新的CKA考试改版&#xff0c;不允许存储书签&#xff0c;本博客致力怎么一步步从官网把答案找到&#xff0c;如何修改把题做对&#xff0c;下面开始我们的 CKA之旅 题目为&#xff1a; Task 创建一个名字为pv-volume的pvc&#xff0c;指定storageClass为csi-hostpath-…

面试热题(打家窃舍)

一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响小偷偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个房屋存放金额的非负…

宋浩概率论笔记(三)随机向量/二维随机变量

第三更&#xff1a;本章的内容最重要的在于概念的理解与抽象&#xff0c;二重积分通常情况下不会考得很难。此外&#xff0c;本次暂且忽略【二维连续型随机变量函数的分布】这一章节&#xff0c;非常抽象且难度较高&#xff0c;之后有时间再更新。

Linux系统部署Python语言开发运行环境

目录 Ubuntu自带python Debian安装python 安装 pip 库列表 安装第三方库 使用国内镜像站 实装 tkinter 库 编写运行代码 测试代码1 1. 创建项目 2. 创建源码文件 3. 写入源代码 4. 修改权限 5. 运行代码 测试代码2 本文的使用环境是Windows的Linux 子系统&…