flink处理函数--副输出功能

news2025/1/13 17:08:19

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;

import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;

public class SideOutPutJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> readings = see.addSource(new SensorSource());

        SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());
        // 打印附输出
        monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();
        // 打印主输出
        monitoredReadings.print();
        see.execute();
    }
}


package wikiedits.processfunc.process;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import wikiedits.processfunc.pojo.SensorReading;

public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {

    private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};


    @Override
    public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
        if (value.temperature < 32.0) {
            ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);
        }
        out.collect(value);
    }

}
package wikiedits.processfunc.source;

/*
 * Copyright 2015 Fabian Hueske / Vasia Kalavri
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;

import java.util.Calendar;
import java.util.Random;

/**
 * Flink SourceFunction to generate SensorReadings with random temperature values.
 *
 * Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.
 *
 * Note: This is a simple data-generating source function that does not checkpoint its state.
 * In case of a failure, the source does not replay any data.
 */
public class SensorSource extends RichParallelSourceFunction<SensorReading> {

    // flag indicating whether source is still running
    private boolean running = true;

    /** run() continuously emits SensorReadings by emitting them through the SourceContext. */
    @Override
    public void run(SourceContext<SensorReading> srcCtx) throws Exception {

        // initialize random number generator
        Random rand = new Random();
        // look up index of this parallel task
        int taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();

        // initialize sensor ids and temperatures
        String[] sensorIds = new String[10];
        double[] curFTemp = new double[10];
        for (int i = 0; i < 10; i++) {
            sensorIds[i] = "sensor_" + (taskIdx * 10 + i);
            curFTemp[i] = 65 + (rand.nextGaussian() * 20);
        }

        while (running) {

            // get current time
            long curTime = Calendar.getInstance().getTimeInMillis();

            // emit SensorReadings
            for (int i = 0; i < 10; i++) {
                // update current temperature
                curFTemp[i] += rand.nextGaussian() * 0.5;
                // emit reading
                srcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));
            }

            // wait for 100 ms
            Thread.sleep(3000);
        }
    }

    /** Cancels this SourceFunction. */
    @Override
    public void cancel() {
        this.running = false;
    }
}

程序运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1056578.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决SpringBoot Configuration Annotation Processor not configured

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 问题描述 在使用ConfigurationProperties注解和EnableConfigurationProperties注解时&#xff0c;IDEA报错&#xff1a;SpringBoot Configuration Annotation Processor no…

【chainlit】使用chainlit部署chatgpt

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

微服务架构改造案例

最后一个部分&#xff0c;结合我们自己的财务共享平台项目进行了微服务架构改造案例分析。 对于改造前的应用&#xff0c;实际上存在四个方面的问题。 其一是关于高可用性方面的&#xff0c;即传统的单体应用我们在进行数据库水平扩展的时候已经很难扩展&#xff0c;已经出现…

XShell远程连接Ubuntu

环境 系统&#xff1a;Ubuntu 18.04.6 LTS IP&#xff1a;192.168.1.4 ps:查看ubuntu版本 lsb_release -a 查看ubuntu的ip地址 Ubuntu系统准备工作 root权限 打开ubuntu系统后&#xff0c;打开终端&#xff0c;切换为root权限&#xff1a;su root 如果出现su root认证失…

管理经济学基本概念(五):一些基本术语

1、理性-行动者范式 使经济学家行动一致的东西就是采用理性-行动者范式来预判人的行为。简单地说&#xff0c;这个范式认为人们的行动式理性的、优化的和自利的。 2、税后净营业利润 税后经营净利润(NOPAT)是指将公司不包括利息收支的营业利润扣除实付所得税税金之后的数额加…

Acwing 907. 区间覆盖

Acwing 907. 区间覆盖 知识点题目描述思路讲解代码展示 知识点 贪心 题目描述 思路讲解 代码展示 #include <iostream> #include <algorithm>using namespace std;const int N 100010;int n;struct Range {int l, r;bool operator < (const Range &W) …

【day11.02】网络编程脑图

大小端存储&#xff1a; ip地址划分&#xff1a;

MySQL 锁分类和详细介绍

锁是计算机协调多个进程或线程并发访问某一资源的机制&#xff0c;在数据库中&#xff0c;除传统的计算资源&#xff08;CPU、RAM、I/O&#xff09;的争用以外&#xff0c;数据也是一种供许多用户共享的资源&#xff0c;锁机制是保证数据一致性和并发性的重要手段&#xff0c;它…

软件设计模式系列之二十二——状态模式

1 模式的定义 状态模式是一种行为型设计模式&#xff0c;它允许对象在内部状态发生改变时改变其行为&#xff0c;使得对象的行为看起来像是改变了其类。状态模式将对象的状态抽象成一个独立的类&#xff0c;让对象在不同状态下具有不同的行为&#xff0c;而且可以在运行时切换…

做一个优秀的博士生,时间的付出是必要条件

&#xff0a;图片来自管理学季刊 时间的付出 所有成功的科学家一定具有的共同点&#xff0c;就是他们必须付出大量的时间和心血。这是一条真理。实际上&#xff0c;无论社会上哪一种职业&#xff0c;要想成为本行业中的佼佼者&#xff0c;都必须付出比常人多的时间。有时&…

【知识点随笔分析 | 第六篇】HTTP/1.1,HTTP/2和HTTP/3的区别

前言&#xff1a; 当今互联网已成为人们生活的重要组成部分&#xff0c;而HTTP协议&#xff08;Hypertext Transfer Protocol&#xff09;是支持Web通信的基础。随着Web技术的发展和互联网应用的不断增多&#xff0c;HTTP也在不断演进。本文旨在介绍HTTP的演变过程中的三个重要…

【Godot4.1】Godot实现闪烁效果(Godot使用定时器实现定时触发的效果)

文章目录 准备工作创建Sprite2D创建Timer节点 编写脚本完整代码运行效果 准备工作 如果你希望配置C#编写脚本&#xff0c;可以查看如下教程&#xff1a; Godot配置C#语言编写脚本 创建Sprite2D 首先弄一个用于显示的Sprite2D&#xff0c;右键单击任意节点&#xff0c;然后选…

Transformer在小目标检测上的应用

本篇文章是博主在AI、无人机、强化学习等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对人工智能等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在AI学…

mysql双主互从通过KeepAlived虚拟IP实现高可用

mysql双主互从通过KeepAlived虚拟IP实现高可用 在mysql 双主互从的基础上&#xff0c; 架构图&#xff1a; Keepalived有两个主要的功能&#xff1a; 提供虚拟IP&#xff0c;实现双机热备通过LVS&#xff0c;实现负载均衡 安装 # 安装 yum -y install keepalived # 卸载 …

反向输出一个三位数

系列文章目录 进阶的卡莎C++_睡觉觉觉得的博客-CSDN博客数1的个数_睡觉觉觉得的博客-CSDN博客双精度浮点数的输入输出_睡觉觉觉得的博客-CSDN博客足球联赛积分_睡觉觉觉得的博客-CSDN博客大减价(一级)_睡觉觉觉得的博客-CSDN博客小写字母的判断_睡觉觉觉得的博客-CSDN博客纸币(…

手把手教你完成(Java)师生信息管理系统

手把手教你完成&#xff08;Java&#xff09;师生信息管理系统 对阶段一学到的知识进行应用&#xff0c;完成练手小项目。同时&#xff0c;也可以当做学校的课设来做。项目已上传 CSDN &#xff0c;可以按需下载。 一、成果展示 添加学生&#xff08;查看学生&#xff09; 删除…

计算机毕业设计 基于SSM的宿舍管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

【C进阶】指针笔试题解析

做题之前我们再来回顾一下 对于数组名的理解&#xff1a;除了以下两种情况&#xff0c;数组名表示的都是数组首元素的地址 &#xff08;1&#xff09;sizeof&#xff08;数组名&#xff09;&#xff1a;这里的数组名表示整个数组 &#xff08;2&#xff09;&&#xff08;数…

关掉在vscode使用copilot时的提示音

1. 按照图示的操作File --> Preferences --> Settings 2. 搜索框输入关键字Sound&#xff0c;因为是要关掉声音&#xff0c;所以找有关声音的设置 3. 找到如下图所示的选项 Audio Cues:Line Has Inline Suggetion,将其设置为Off 这样&#xff0c;就可以关掉suggest code时…

使用 Python 给 PDF 添加目录书签

0、库的选择——pypdf 原因&#xff1a;Python Version Support Python 3.11 3.10 3.9 3.8 3.7 3.6 2.7 pypdf>3.0 YES YES YES YES YES YES PyPDF2>2.0 YES YES YES YES YES YES PyPDF2 1.20.0 - 1.28.4 YES YES YES YES YES YES P…