FLINK 基于1.15.2的Java开发-Watermark是怎么解决延迟数据唯一正确的生产级解决方案-目前市面上的例子都有问题

news2024/11/17 16:03:07

至此篇,已经完成高级生产应用,至此只剩“码需求”了。

开篇

Watermark这一块国内中文相关资料没有一篇是写完整或者写对的。源于:官网的watermark理论是对的,中文相关博客的代码和公式是错的。

很有可能是写第一篇Watermark中文博客者的手敲错了一个符号或者是根本没有理解或者上过生产环境给到了业界以误导!

因此我们结合实际生产环境,做了一个现实例子,以纠正这个犯了近4年的错误同时补足国内这一块知识的空白。

不提高深的原理先看生产需求

我们按照一个人的正常思维模式,一定是先要有:感性认识再到理性认识的指导方法论来说,先要让程序员的眼睛看得到一样东西它是怎么跑的、一步步如何分解出来结果才能有“总结”,因此我把Watermark那些晦涩难懂看似高大上的理论放在最后。先从实际生产环境需求入手说这个问题。

需求

我们有数据是按照如下情况到达我们的Flink需要做“基于时间窗口的聚合”。但是往往实际生产环境会出现“生成数据的时间是对的,到达FLINK或者下游的时间上有偏差”的情况,如下所示:

这是一个门店点赞排行榜,业务需要按照门店ID归并在同一个时间内被点赞的集合。

于是从系统上看:

  1. 我们可以看到用户在前端APP或者是小程序去点这个“赞👍”的时间即:eventTime分为“两批”,2022-10-25 8:54:54..001秒实际有4条数据,只是随机数据编号-99这一条数据比前3条101门店晚到了6秒钟送到flink(这个晚到是经常的,因为在做数据类型转换或者是网络波动情况下势必发生)。
  2. 第二批数据被点赞👍的时间即:eventTime为一批,共计3条数据,于2022-10-25 8:56:04.001秒被触发,于2022-10-25 8:56:04.010被送到Flink。

业务需求:是把这3波数据合并成:

  • 2022-10-25 8:54:54.000时可以看到:101门店,被点赞👍了(4)次;
  • 2022-10-25 8:55:59.000时可以看到:102门店,被点赞👍了(3)次;

然后每隔5秒,每个门店的点赞次数按照数据到达的窗口进行滚动的显示。

按照需求进行设计开发

从系统角度看这3波数据要按照“业务需要按照门店ID归并在同一个时间内被点赞的集合”,实际数据呢它是有延迟即:乱序到达。于是很多程序员一拍脑袋,哦。。。用Flink的Watermark不是就可以解决了吗?

对!

是用Watermark来解决上述的问题,而且flink的Watermark分成:

  1. 自带Watermark生成器,适用于大都简单场景;
  2. 自定义eventTime生成Watermark;

我们一看第2点,不错,这个“自定义eventTime生成Watermark“,不正是我们需要的吗?好。。。开始编码了,于是我们这样来开发这个代码,网上全是这个代码,所有的中文网站甚至连官网都是这个代码,各位看一下,是不是!我先告诉你们:是错的

注意一(高度重要,敲黑板了):

  • 如果要用Watermark,你必须在flink里使用“TumblingEventTimeWindows.of(Time.seconds(窗口多久一刷新))”这个窗口,TumblingEventTimeWindows就是和Watermark连在一起使用的,这边的Event指的就是Watermark的到达触发了这个Window。

注意二(高度重要,敲黑板了):

官网的原话对于:Watermark触发TumblingEventTimeWindow需要满足的条件(是对的,错不在官网错在所有中文博客博主没有理解或者是一时手误导致了这个长达近4年的错误信息的传导上):

  • watermark>窗口结束时间;
  • 并且窗口内有数据到达;

以上两点都满足的情况下,会触发TumblingEventTimeWindow进行一次计算。

于是我们来看截止这篇博客网上所有的著名的错的解决方案

错误的解决方案的代码

 

       long windowInterval = 5000L;
       SerializableTimestampAssigner<StoreRanking> timestampAssigner = new SerializableTimestampAssigner<StoreRanking>() {
           @Override
           public long extractTimestamp(StoreRanking element, long recordTimestamp) {
               long watermarkTime = element.getEventTime()*1000L; //因为我们的Watermark是用秒来打的因此要把毫秒变成秒否则窗口连一次都不会被触发
           }
       };
 
       KafkaSource<StoreRanking> source = KafkaSource.<StoreRanking>builder()
               .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
               .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
               .setDeserializer(KafkaRecordDeserializationSchema.of(new StoreRankingJSONDeSerializer())).build();
       DataStream<StoreRanking> kafkaDS = env
               .fromSource(source,
                       WatermarkStrategy.<StoreRanking>forBoundedOutOfOrderness(Duration.ofSeconds(windowInterval))
                               .withTimestampAssigner(timestampAssigner).withIdleness(Duration.ofSeconds(5)),
                       "Kafka Source");
       DataStream<Tuple2<String, Integer>> ds = kafkaDS
               .flatMap(new FlatMapFunction<StoreRanking, Tuple2<String, Integer>>() {
                   public void flatMap(StoreRanking store, Collector<Tuple2<String, Integer>> collector)
                           throws Exception {
                       StoreRanking s = new StoreRanking();
                       s.setStoreId(store.getStoreId());
                       s.setStoreNo(store.getStoreNo());
                       s.setEventTime(store.getEventTime());
                       logger.info(">>>>>>storeNo->" + store.getStoreNo() + " storeId->" + store.getStoreId());
                       collector.collect(new Tuple2<String, Integer>(store.getStoreId(), 1));
                   }
               });
       ds.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();
       env.execute();

以上代码很简单,根据一个传入的Bean:StoreRanking中的eventTime(被点赞👍)时间以及门店ID进行归并:

package org.mk.demo.flink;
 
import java.io.Serializable;
 
public class StoreRanking implements Serializable {
 
    public StoreRanking() {
 
    }
 
    public StoreRanking(String storeId, long eventTime) {
        this.storeId = storeId;
        this.eventTime = eventTime;
    }
 
     
 
    long eventTime = 0;
    String storeId = "";
    int storeNo = 0;
 
    public int getStoreNo() {
        return storeNo;
    }
 
    public void setStoreNo(int storeNo) {
        this.storeNo = storeNo;
    }
 
    public long getEventTime() {
        return eventTime;
    }
 
    public void setEventTime(long eventTime) {
        this.eventTime = eventTime;
    }
 
    public String getStoreId() {
        return storeId;
    }
 
    public void setStoreId(String storeId) {
        this.storeId = storeId;
    }
}

样就可以把8:54:84.100秒这3条和8:55:55.000秒这一条组合在一起变成4条一起做一次归并计算,以得到:101门店被点赞👍了4次:

而实际代码运行后,99%的人没有发觉这个问题:是因为测试时用的数据是“流式”的、源源不断的、像Matrix里的黑屏绿字那样不断“滚动”着,因此看不出其中的细微差别。因此我们先把这4条包括-99延迟数据做成一批往Flink送进去,我们来看发生了什么事:

模拟kafka送数据

// 发3条数据
for (int i = 0; i < 3; i++) {
    StoreRanking stores = new StoreRanking("101", ascendingEventTime);
    stores.setStoreNo(i + 1);
    storeRankingKafka.send(TOPIC, stores);
    Thread.sleep(50);
}
// 发一条:延时数据
Thread.sleep(4000);
StoreRanking delayStore = new StoreRanking("101", ascendingEventTime);
delayStore.setStoreNo(-99);
storeRankingKafka.send(TOPIC, delayStore);

 送过去后flink端的显示

嘿嘿嘿嘿嘿!

前面一共发了两波,第一波准时到达第二波延时到达但是在逻辑上第一波和第二波应该归并成一波数据。

然后我们送第三波数据,即:门店编号102的数据有三条

public void sendStoreRankingWithTimestamp() {
        long ascendingEventTime = System.currentTimeMillis();
        try {
 
            // 发5条数据
            for (int i = 0; i < 3; i++) {
                StoreRanking stores = new StoreRanking("102", ascendingEventTime);
                stores.setStoreNo(i + 1);
                storeRankingKafka.send(TOPIC, stores);
                Thread.sleep(50);
            }
            //Thread.sleep(4000);
            // 发一条:延时数据
            //StoreRanking delayStore = new StoreRanking("101", ascendingEventTime);
            //delayStore.setStoreNo(-99);
            //storeRankingKafka.send(TOPIC, delayStore);
 
        } catch (Exception e) {
            logger.error(">>>>>>sendStoreRankingWithTimestamp error: " + e.getMessage(), e);
        }
    }

 

 

咦,102号门店没有被计算,但计算了前面那个101号门店的归并计算了,4条,没错!因为我们按照eventTime如果一致就算一批。

于是我们等了10多分钟,看看网上说的withIdleness也设上了,可是窗口没有在我们设想的等5秒钟应该自动再触发一下完成102门店的点赞计算呀。

env.fromSource(source,
                       WatermarkStrategy.<StoreRanking>forBoundedOutOfOrderness(Duration.ofSeconds(windowInterval))
                               .withTimestampAssigner(timestampAssigner).withIdleness(Duration.ofSeconds(5)),
                       "Kafka Source");

我们再送一批数据,103号门店的数据同样按照102号门店那样送过去。

我们发觉103号门店送过去却触发了102号门店的这个计算窗口。此时更夸张的事开始来了。。。生产上103号门店是最后一批数据后续没有数据再送过来了,然后。。。103号门店的这一批本来应该有3条被点赞👍的结果就永远等不到了。卡死了!

于是网上又出现了很多千篇一律的”使用自定义Watermark解决最后一个窗口数据的问题“,代码是长这样的(我这边给的代码是能用的而这一块目前网上的代码竟然没有一个可以用只有理念对了,我自己根据1.15.2版纠正了一堆错误)。但我这边先告诉大家写完了依旧没有用

注意(高度注意,敲黑板了):

这一块代码也没有错,错误的原因我会在文章下面部分会给出。

没有错的原因是:因为这的确是属于”最后一个窗口“并且再也没有后续数据到达了,因此此时如果你不自动给它一个”定时触发“去生成这个Watermark,因此我们需要使用自定义Watermark里的:public void onPeriodicEmit(WatermarkOutput watermarkOutput) 不断的去贴合着窗口内的endtime,以使得Watermark可以贴合着Watermark公式:Watermark必须>窗口endtime。否则就永无法不会触发窗口计算。

       long windowInterval = 5000L;
       SerializableTimestampAssigner<StoreRanking> timestampAssigner = new SerializableTimestampAssigner<StoreRanking>() {
           @Override
           public long extractTimestamp(StoreRanking element, long recordTimestamp) {
                return element.getEventTime()*1000L;
           }
       };
 
       KafkaSource<StoreRanking> source = KafkaSource.<StoreRanking>builder()
               .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
               .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
               .setDeserializer(KafkaRecordDeserializationSchema.of(new StoreRankingJSONDeSerializer())).build();
       DataStream<StoreRanking> kafkaDS = env
               .fromSource(source,
                       WatermarkStrategy.<StoreRanking>forBoundedOutOfOrderness(Duration.ofSeconds(windowInterval))
                               .withTimestampAssigner(timestampAssigner).withIdleness(Duration.ofSeconds(5)),
                       "Kafka Source");
 
       DataStream watermarkDS = kafkaDS.assignTimestampsAndWatermarks(new WatermarkStrategy<StoreRanking>() {
           private final long maxOutOfOrderness = windowInterval;
           private long currentMaxTimestamp = System.currentTimeMillis() + 1000l;
 
           @Override
           public WatermarkGenerator<StoreRanking> createWatermarkGenerator(
                   WatermarkGeneratorSupplier.Context context) {
 
               return new WatermarkGenerator<StoreRanking>() {
                   @Override
                   public void onEvent(StoreRanking event, long l, WatermarkOutput watermarkOutput) {
                       logger.info(">>>>>>onEvent->" + event.getStoreId() + " " + event.getStoreNo());
                       currentMaxTimestamp = Math.max(currentMaxTimestamp, event.getEventTime());
                       Date dt1 = new Date(currentMaxTimestamp);
                       SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                       String currentMaxTimeStr = sd.format(dt1);
                       Date dt2 = new Date(event.getEventTime());
                       String eventTimeStr = sd.format(dt2);
 
                       logger.info(">>>>>>onEvent  currentMaxTimestamp->" + currentMaxTimeStr + "  eventTimeStr->"
                               + eventTimeStr);
                   }
 
                   @Override
                   public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                       Date dt = new Date(currentMaxTimestamp);
                       SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                       String timeStr = sd.format(dt);
 
                       // logger.info(">>>>>>onPeriodicEmit currentMaxTimestamp->" + timeStr +
                       // "maxOutOfOrderness->"
                       // + maxOutOfOrderness);
 
                       long newWatermarkTime = (currentMaxTimestamp - maxOutOfOrderness - 1) * 1000L;
                       watermarkOutput.emitWatermark(new Watermark((System.currentTimeMillis() + 100) * 1000L));
                   }
               };
           }
       });
 
       DataStream<Tuple2<String, Integer>> ds = watermarkDS
               .flatMap(new FlatMapFunction<StoreRanking, Tuple2<String, Integer>>() {
                   public void flatMap(StoreRanking store, Collector<Tuple2<String, Integer>> collector)
                           throws Exception {
                       StoreRanking s = new StoreRanking();
                       s.setStoreId(store.getStoreId());
                       s.setStoreNo(store.getStoreNo());
                       s.setEventTime(store.getEventTime());
                       logger.info(">>>>>>storeNo->" + store.getStoreNo() + " storeId->" + store.getStoreId());
                       collector.collect(new Tuple2<String, Integer>(store.getStoreId(), 1));
                   }
               });
       ds.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();
       env.execute();

然后运行后这次就算第二波、第三波数据被不断的送到flink这,然而其结果却变成了一直不触发窗口进行计算了。这个搞得更大了!

 

不要急一起来分析问题

我们把3批数据的“被点赞👍”时间、送达flink的时间、窗口的计算时间全部用图画出来。表示成如下这样的一张图。

 

看着这个图我们同时来看flink官网的对于Watermark的理论公式,公式有两个点,这两个点满足那么窗口才会被触发

  • Watermark的时间>窗口的endtime;
  • 并且在这段窗口内要有数据;

所以网上所有资料的问题在于两个层次递进的发生的:

  1. 第一块代码里,我们没上自定义watermark时由于我们使用了“业务触发时间”去做的Watermark以满足按照时间+门店ID做归并的业务需求,因此数据到达flink时,造成了这个被我们打上的Watermark永远<了窗口的endtime,因此导致了,只有当下一批数据送到并且下一批的数据的eventTime(就是Watermark)>了当前flink的窗口时,才被触发了上一个窗口内的计算,因此才会出现送101店过去不显示计算结果,再送102店过去才触发了101门店的结果即:下一批数据触发了前一批数据的计算窗口;
  2. 第二块代码里,我们虽然正确做了自定义Watermark(这块代码网上都没有,理论是有的或者只有<flink 1.12版之前的代码),但是这个emit发出的Watermark始终<了我们的窗口的end time并且又由于第一块代码里的错误导致了连一次窗口都不会被触发;

以上两点就是全部错误的原因,按照上文中所叙述,错误不在第二块代码因此第二块代码我们保留,而实际错在第一块代码,错在哪?喏,在下面:

就这么改一下就行了?天哪!这么简单?

不信是吧?来看结果:

第一批数据送过去,看。。。结果出来了,4条,4个赞👍。

再送第二批数据3条数据,看。。。结果出来了,3条,3个赞👍。 

 

高大上的实际生产级应用的理论

到此整个Watermark到底怎么用?为什么要用Watermark以及如何正确使用Watermark全部讲完了,全代码我附在最后。

而实际在生产环境我们很多时候如果仅仅靠flink自带的Watermark机制是不能满足我们的需求的,大量场景我们会使用“业务时间”来做Watermark以实现数据的归并,来应对“延迟”到达的数据。

而仅仅只有Watermark来应对“延迟到达”的数据只是第一道防线,依然会有漏过的有延时的数据。因此此时在我们的Window里还有一个成员函数它是:window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5))来形成第二道防线,继续对漏过Watermark的延迟数据进行最后的捕捉以尽量满足业务数据的精准要求。

所以整个flink窗口它在正常的时候按照一个固定的duration进行滚动,以Watermark来做数据补偿,以allowedLateness来做漏过Watermark的延迟数据的进一步补偿。这样的话从“用户端客户侧”我们就可以达到以下这样的显示效果:

第一秒我看到的数据可能会有一定的小偏差,然后在一秒不到内发觉这个数据被修正了(Watermark触发了窗口补偿)但此时还差那么个0.01%偏差,但是又过了一秒不到我发觉:哈哈,这次是准了

就是这么么一种“短促渐变式的体验”,这个体验其实用这么一个例子来做比喻或许会更加恰当吧:手机上的图片逐步加载清晰。一开始你在手机上看到一个图有一点模糊,在1秒内这个图片自己开始一点点变清晰这么一种过程的感觉。

这也符合实时计算的特性,因为实时、实时,它由于是异步的,不可能做到像直连HTTP连接、RESTFUL API一来一回这么准时,但是我们需要在实时计算中使用这么两道修正,来及时把这个数据修正成精准的,这正是实时计算中的一个非常至关重要的“设计理念”。

有人这边如果说要抬杠:其实如果只是看板,我们不需要这么精准。

那么我告诉你,这一系列文章我写下的目的不仅仅只是针对三产一类的商业应用,它们要面对的是全行业,譬如说:工业行业中的实时计算应用,我们需要在2秒内把气压阀值偏差控制在0.1%内如果可以到达0.01%那么会延长设备的使用设备(如:涡轮发电机),那么第一道防线我就要先到达偏差值在0.1%,第二道防线我反这个精准度降到0.01%,这样我的一个涡轮发电机转子的寿命本来正常情况下只能使用5年,现在我可以使用8年,这就是巨大的一笔成本上的节省。这就是一个典型的工控上的例子。

附:全代码(git地址-flinkdemo: flink基于1.15.2 JAVA版生产级应用DEMO)

package org.mk.demo.flink;
 
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
 
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.alibaba.fastjson.JSON;
 
public class DemoWaterMark {
    private final static Logger logger = LoggerFactory.getLogger(DemoWaterMark.class);
 
    public static class StoreRankingJSONDeSerializer implements KafkaDeserializationSchema<StoreRanking> {
 
        private final String encoding = "UTF8";
 
        @Override
        public TypeInformation<StoreRanking> getProducedType() {
            return TypeInformation.of(StoreRanking.class);
        }
 
        @Override
        public boolean isEndOfStream(StoreRanking nextElement) {
            return false;
        }
 
        @Override
        public StoreRanking deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            if (consumerRecord != null) {
                try {
                    String value = new String(consumerRecord.value(), encoding);
                    StoreRanking store = JSON.parseObject(value, StoreRanking.class);
                    return store;
                } catch (Exception e) {
                    logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
                }
            }
            return null;
        }
    }
 
    public static void main(String[] args) throws Exception {
        long windowInterval = 5000L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool argParas = ParameterTool.fromArgs(args);
        String propertiesFilePath = argParas.get("config_path");
        if (logger.isDebugEnabled()) {
            logger.debug(">>>>>>start to load properties from {}", propertiesFilePath);
        }
        ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
        env.getConfig().setGlobalJobParameters(paras);
        SerializableTimestampAssigner<StoreRanking> timestampAssigner = new SerializableTimestampAssigner<StoreRanking>() {
            @Override
            public long extractTimestamp(StoreRanking element, long recordTimestamp) {
                 
                // return element.getEventTime()*1000L; //错误的代码
                long watermarkTime = ((element.getEventTime() + windowInterval) * 1000L);//正确的Watermark计算
                return watermarkTime;
            }
        };
 
        KafkaSource<StoreRanking> source = KafkaSource.<StoreRanking>builder()
                .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
                .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new StoreRankingJSONDeSerializer())).build();
        DataStream<StoreRanking> kafkaDS = env
                .fromSource(source,
                        WatermarkStrategy.<StoreRanking>forBoundedOutOfOrderness(Duration.ofSeconds(windowInterval))
                                .withTimestampAssigner(timestampAssigner).withIdleness(Duration.ofSeconds(5)),
                        "Kafka Source");
 
        DataStream watermarkDS = kafkaDS.assignTimestampsAndWatermarks(new WatermarkStrategy<StoreRanking>() {
            private final long maxOutOfOrderness = windowInterval; // 3.5 秒
            private long currentMaxTimestamp = System.currentTimeMillis() + 1000l;
 
            @Override
            public WatermarkGenerator<StoreRanking> createWatermarkGenerator(
                    WatermarkGeneratorSupplier.Context context) {
 
                return new WatermarkGenerator<StoreRanking>() {
                    @Override
                    public void onEvent(StoreRanking event, long l, WatermarkOutput watermarkOutput) {
                        logger.info(">>>>>>onEvent->" + event.getStoreId() + " " + event.getStoreNo());
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, event.getEventTime());
                        Date dt1 = new Date(currentMaxTimestamp);
                        SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String currentMaxTimeStr = sd.format(dt1);
                        Date dt2 = new Date(event.getEventTime());
                        String eventTimeStr = sd.format(dt2);
 
                        logger.info(">>>>>>onEvent  currentMaxTimestamp->" + currentMaxTimeStr + "  eventTimeStr->"
                                + eventTimeStr);
                    }
 
                    @Override
                    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                        Date dt = new Date(currentMaxTimestamp);
                        SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String timeStr = sd.format(dt);
 
                        // logger.info(">>>>>>onPeriodicEmit currentMaxTimestamp->" + timeStr +
                        // "maxOutOfOrderness->"
                        // + maxOutOfOrderness);
 
                        long newWatermarkTime = (currentMaxTimestamp - maxOutOfOrderness - 1) * 1000L;
                        watermarkOutput.emitWatermark(new Watermark((System.currentTimeMillis() + 100) * 1000L));
                    }
                };
            }
        });
 
        DataStream<Tuple2<String, Integer>> ds = watermarkDS
                .flatMap(new FlatMapFunction<StoreRanking, Tuple2<String, Integer>>() {
                    public void flatMap(StoreRanking store, Collector<Tuple2<String, Integer>> collector)
                            throws Exception {
                        StoreRanking s = new StoreRanking();
                        s.setStoreId(store.getStoreId());
                        s.setStoreNo(store.getStoreNo());
                        s.setEventTime(store.getEventTime());
                        logger.info(">>>>>>storeNo->" + store.getStoreNo() + " storeId->" + store.getStoreId());
                        collector.collect(new Tuple2<String, Integer>(store.getStoreId(), 1));
                    }
                });
        ds.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum(1).print();
        env.execute();
    }
}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【模电实验】【验证性实验——基本差动放大电路实验】

实验4-1 验证性实验——基本差动放大电路实验 1. 静态工作点的测试 按照下图连接电路&#xff0c;检查无误后将A, B两端短接&#xff0c;接通电源12 V&#xff0c; 分别测量三极管各极对地的电压值&#xff0c;推算静态电流&#xff0c;记入下表&#xff0c;并与仿真结果&…

【JavaWeb】之富文本编辑器

【JavaWeb】富文本编辑器前言一、富文本编辑器介绍二、富文本编辑器使用1.引入编辑器&#xff08;多种引入方式&#xff09;2.使用编辑器三、主流富文本编辑器推荐1.TinyMCE2.CKEditor3.UEditor4.wangEditor5.kindeditor6.simditor7.bootstrap-wysiwyg8.summernote9.Froala10.Q…

C++ Reference: Standard C++ Library reference: C Library: cstdlib: wctomb

C官网参考链接&#xff1a;https://cplusplus.com/reference/cstdlib/wctomb/ 函数 <cstdlib> wctomb int wctomb (char* pmb, wchar_t wc); 宽字符wc被转换为其等效多字节&#xff0c;并存储在pmb指向的数组中。函数在调用后返回由pmb指向的等效多字节序列的字节长度。…

倡议“1024区块链活动日”第三系列活动在京主会场和全球21个分会场成功举办

10月24日下午&#xff0c;倡议“1024区块链活动日”第三次系列活动暨乡村产业链改大会乡村振兴链改助农大会通过线上的形式&#xff0c;在北京主会场和全球21个分会场&#xff0c;1024个视频直播节点联动成功举办&#xff0c;本次活动由中国通信工业协会区块链专业委员会&#…

关于蓝桥杯单片机组自学的经验分享

这篇文章主题如标题所示。先说一下经验分享&#xff0c;文章末再写一些碎碎念。 蓝桥杯单片机组 客观题 程序设计题 数模电 C语言 单片机。 先说第一个等式&#xff0c;是从考题结构看的&#xff0c;程序设计题只要好好练&#xff0c;基本都能实…

SpringBoot常用注解

文章目录组件添加SpringBootApplicationConfigurationBeanConditionImprotImportSelectorImportBeanDefinitionRegistrar原生配置文件引入ImportResource配置绑定Component ConfigurationPropertiesConfigurationProperties EnableConfigurationProperties自动配置原理入门引…

SCI论文降重技巧盘点 - 易智编译EaseEditing

要想顺利发布SCI论文&#xff0c;首先就是要保证论文的原创性和创新性。要知道论文写作当中对于文献和资料的引用是必不可少的&#xff0c;所以论文的重复率很有可能会超标&#xff0c;对于这点要留意。 免费的查重网站有PaperYY、百度学术查重、Freecheck、Paperpass等等&…

上市公司信息透明度数据(1991-2019年)包含stata源代码和数据

上市公司信息透明度数据&#xff08;1991-2019年&#xff09;包含stata源代码和数据 1、数据来源&#xff1a;附在文件内 2、时间跨度&#xff1a;1991-2019年 3、区域范围&#xff1a;全国 4、指标说明&#xff1a; 股价同步性&#xff08;SYNCH&#xff09;&#xff0c;S…

自学网络安全的三个必经阶段(含路线图)

一、为什么选择网络安全&#xff1f; 这几年随着我国《国家网络空间安全战略》《网络安全法》《网络安全等级保护2.0》等一系列政策/法规/标准的持续落地&#xff0c;网络安全行业地位、薪资随之水涨船高。 未来3-5年&#xff0c;是安全行业的黄金发展期&#xff0c;提前踏入…

jquery导航图片全屏滚动、首页全屏轮播图,各式相册

1.目录结构 源码 project cssjsimageindex1index2index3index4index.html index1到index4分为四个iframe标签引入的可单独分离的主页&#xff0c;相当于组件的原理&#xff0c;其中index作为主页&#xff0c;index1是首页全屏轮播图&#xff0c;其他都是单独的相册风格&…

图形学-反走样/抗锯齿

1.反走样 1.1 什么是走样 在上一篇文章中&#xff0c;我们通过采样的方式把一个三角形变成离散的点显示在屏幕上。在采样过程中&#xff0c;我们会产生很多锯齿&#xff0c;这些锯齿的学名就叫做走样 1.2 反走样 如何消除锯齿(走样),我们就要引入反走样技术&#xff0c;之所…

UNet详细解读(一)论文技术要点归纳

UNet 论文技术要点归纳UNet摘要简介Over-tile策略网络架构训练数据增强小结UNet 摘要 2015年诞生&#xff0c;获得当年的ISBI细胞追踪挑战比赛第一名&#xff0c;在GPU上推理512x512的图像不到1秒钟&#xff0c;开创图像分割的先河。 简介 在当时&#xff0c;卷积神经网络是主…

Win10-GPU服务器-深度学习从零配置环境

1.装anaconda 下载安装anaconda&#xff08;conda也一并装了&#xff09; https://www.anaconda.com/products/distribution 配系统变量 将类似这个位置放进path里面“C:\ProgramData\Anaconda3” 2.安装1.5.0版本的pytorch GPU版 2.1确定的你的显卡型号 https://jingyan.…

Redis持久化之AOF

AOF&#xff08;Append Only File&#xff09; 将我们所有的命令记录下来, history, 恢复的时候就把这个文件全部执行一遍 以日志的形式来记录每个写操作, 将redis执行过的所有指令记录下来(读操作不记录), 只许追加文件但不可以改写文件, 启动之初会读取该文件重新构建数据…

木犀草素修饰人血清白蛋白(Luteolin-HSA),山柰酚修饰人血清白蛋白(Kaempferol-HSA)

产品名称&#xff1a;木犀草素修饰人血清白蛋白 英文名称&#xff1a;Luteolin-HSA 用途&#xff1a;科研 状态&#xff1a;固体/粉末/溶液 产品规格&#xff1a;1g/5g/10g 保存&#xff1a;冷藏 储藏条件&#xff1a;-20℃ 储存时间&#xff1a;1年 温馨提醒&#xff1a;仅供科…

花2个月时间学习,面华为测开岗要30k,面试官竟说:你不是在....

【文章末尾给大家留下了大量的.。。。。】 背景介绍 计算机专业&#xff0c;代码能力一般&#xff0c;之前有过两段实习以及一个学校项目经历。第一份实习是大二暑期在深圳的一家互联网公司做前端开发&#xff0c;第二份实习由于大三暑假回国的时间比较短&#xff08;小于两个…

要如何才能抑制局部放电试验干扰?

局部放电产生的信号在微伏量级。就信号而言&#xff0c;很容易被外界干扰信号淹没。因此&#xff0c;必须考虑抑制干扰信号的影响&#xff0c;采取有效的抗干扰措施。局部放电测试仪测试中一些干扰的抑制方法如下: (1)电源的干扰可以通过滤波器来抑制。滤波器应该能够抑制…

Linux进程控制

文章目录进程创建fork函数进一步探讨写时拷贝进程终止进程退出场景进程终止时&#xff0c;操作系统做了什么&#xff1f;三大终止进程函数进程等待&#xff08;阻塞&#xff09;进程等待的必要性进程等待的两种函数获取子进程参数status如何通过status获取子进程的退出码。为什…

数字IC设计 - 逻辑综合简介与Design Compiler使用(GUI方式)

逻辑综合 定义 逻辑综合就是将前端设计工程师编写的RTL代码&#xff0c;映射到特定的工艺库上&#xff0c;通过添加约束信息&#xff0c;对RTL代码进行逻辑优化&#xff0c;形成门级网表。约束信息包括时序约束&#xff0c;线载模型约束&#xff0c;面积约束&#xff0c;功耗…

我的Mysql突然挂了(Communications link failure)

在一个风和日丽的下午&#xff0c;我照常继续做着我的项目&#xff0c;今天的主题是一个涉及多表的分页查询 老复杂了&#xff01;写了半天才搞好。当我满怀期待运行项目&#xff0c;进入页面后发现登陆后台却怎么也登陆不上&#xff0c;吓得我连忙回去查看后台日志&#xff0…