理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.
flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃
bug复现操作
idea运行代码后 往source kafka发送一条数据
a,1,1690304400000
可以看到无限输出:
理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用
问题复现代码
package com.yy.flinkWindowAndTrigger
import com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Seconds
object flinkEventWindowAndProcessTriggerBUGLearn {
def main(args: Array[String]): Unit = {
// flink 启动本地webui
val conf = new Configuration
conf.setInteger(RestOptions.PORT, 28080)
// val env = StreamExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.configure(conf)
/*
kafka输入:
a,1,1690304400000 //对应 2023-07-26 01:00:00 (无限输出) //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)
a,1,7200000 // 1970-01-1 10:00:00
*/
val brokers = "172.18.105.147:9092"
val source = KafkaSource.builder[String].setBootstrapServers(brokers)
.setTopics("t1")
.setGroupId("my-group-23asdf46")
.setStartingOffsets(OffsetsInitializer.latest())
// .setDeserializer() // 参数: KafkaRecordDeserializationSchema
.setDeserializer(new M1())
.build()
val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
val s1 = ds1
.map(_.split(","))
.map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳
.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0)))
.keyBy(_.f1)
.window(TumblingEventTimeWindows.of(seconds(10)))
.trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L)))
.reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))
s1.print()
env.execute("KafkaNewSourceAPi")
}
// 乱序流
class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {
override def extractTimestamp(element: C1): Long = {
element.f3
}
}
// key num timestamp
case class C1(f1: String, f2: Int, f3: Long)
}
-
-
maven pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FlinkLocalDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FlinkLocalDemo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.12.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.33</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<!-- <version>1.2.17</version>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入flink1.13.0 scala2.12.12 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- or... -->
<!-- 下面几个是代码中写sql需要的包 四个中一个都不能少 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<!--这部分可有可无,加上的话则直接生成可运行jar包-->
<!--<archive>-->
<!--<manifest>-->
<!--<mainClass>${exec.mainClass}</mainClass>-->
<!--</manifest>-->
<!--</archive>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>