Java版Flink使用指南——合流

news2025/1/2 3:13:00

大纲

  • 新建工程
  • 无界流
    • 奇数Long型无界流
    • 偶数Long型无界流
    • 奇数String型无界流
  • 合流
    • Union
    • Connect
  • 测试
  • 工程代码

在《Java版Flink使用指南——分流导出》中,我们通过addSink进行了输出分流。本文我们将介绍几种通过多个无界流输入合并成一个流来进行处理的方案。

新建工程

我们新建一个名字叫MultiSource的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

无界流

我们使用《Java版Flink使用指南——自定义无界流生成器》中的方法,我们定义3个无界流。其中两个是Long类型,一个是String类型。

奇数Long型无界流

src/main/java/org/example/generator/UnBoundedOddStreamGenerator.java
这个类每隔1秒钟产生一个Long型奇数。

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedOddStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 1L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(count); // Emit data
            count = count + 2;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

偶数Long型无界流

src/main/java/org/example/generator/UnBoundedEvenStreamGenerator.java
这个类每隔1秒钟产生一个Long型偶数。

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedEvenStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(count); // Emit data
            count = count + 2;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

奇数String型无界流

src/main/java/org/example/generator/UnBoundedOddStringStreamGenerator.java
这个类每隔1秒钟产生一个String型奇数。

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedOddStringStreamGenerator extends RichSourceFunction<String> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        long count = 1L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(String.valueOf(count)); // Emit data
            count = count + 2;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

合流

Union

Union是最简单的算子。它可以把两个数据类型相同的流合并
上面奇数和偶数Long型流就可以使用Union去做合并。

DataStreamSource<Long> evenLongDataStreamSource = env.addSource(new UnBoundedEvenStreamGenerator());
DataStreamSource<Long> oddLongDataStreamSource = env.addSource(new UnBoundedOddStreamGenerator());

evenLongDataStreamSource.union(oddLongDataStreamSource).addSink(
	new SinkFunction<Long>() {
		@Override
		public void invoke(Long value, Context context) throws Exception {
			System.out.println("sink union value: " + value);
		}
	}
).name("union stream");

Connect

Connect可以用于连接两个不同类型的流。这就意味着它需要提供针对不同类型的处理方法。
上面这个例子,如果使用Connect实现,则如下

evenLongDataStreamSource.connect(oddLongDataStreamSource).map(new CoMapFunction<Long, Long, Long>() {
		@Override
		public Long map1(Long value) throws Exception {
			return value;
		}

		@Override
		public Long map2(Long value) throws Exception {
			return value;
		}
	}).addSink(
		new SinkFunction<Long>() {
			@Override
			public void invoke(Long value, Context context) throws Exception {
				System.out.println("sink connect value: " + value);
			}
		}
	).name("connect stream");

map方法中的CoMapFunction接口类中的map1和map2就是将两个不同类型的流归一化处理的中间方法。
IN1是Connect方法调用者的流数据类型;IN2是Connect参数的流数据类型;R是它们归一化后的类型。

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * This method is called for each element in the first of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map1(IN1 value) throws Exception;

    /**
     * This method is called for each element in the second of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map2(IN2 value) throws Exception;
}

假如我们将Long型偶数流和String型奇数流合并,并生成一个Double类型的流,则可以如下

evenLongDataStreamSource.connect(oddStringDataStreamSource).map(new CoMapFunction<Long, String, Double>() {
	@Override
	public Double map1(Long value) throws Exception {
		return Double.valueOf(value);
	}

	@Override
	public Double map2(String value) throws Exception {
		return Double.valueOf(value);
	}

}).addSink(
	new SinkFunction<Double>() {
		@Override
		public void invoke(Double value, Context context) throws Exception {
			System.out.println("sink union connect value: " + value);
		}
	}
).name("union connect stream");

map1方法将evenLongDataStreamSource中的Long型数据转成了Double;map2将oddStringDataStreamSource中的String型数据转换成了Double。

测试

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1912632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ArcGIS实战—等高线绘制

今天分享一个使用ArcGIS Pro制作等高线地图的教程&#xff0c;等高线是用来表达地形最常见的形式之一。那么如何制作一个效果比较好的等高线地形图呢&#xff1f;让我们开始今天的教程。 1 DEM数据 第一步&#xff1a;获取DEM地形数据&#xff0c;网址&#xff08;https://dwt…

贴脸细看Mixtral 8x7B- 稀疏混合专家模型(MoE)的创新与推动

贴脸细看Mixtral 8x7B- 稀疏混合专家模型&#xff08;MoE&#xff09;的创新与推动 原创 一路到底孟子敬 上堵吟 2024年01月15日 20:05 美国 I. 引言 A. Mixtral 8x7B的背景和目的 • 背景&#xff1a;随着大型语言模型在自然语言处理&#xff08;NLP&#xff09;领域的广泛…

本地 HTTP 文件服务器的简单搭建 (deno/std)

首发日期 2024-06-30, 以下为原文内容: 在本地局域网搭建一个文件服务器, 有很多种方式. 本文介绍的是窝觉得比较简单的一种. 文件直接存储在 btrfs 文件系统之中, 底层使用 LVM 管理磁盘, 方便扩容. 使用 btrfs RAID 1 进行镜像备份 (一个文件在 2 块硬盘分别存储一份), 防止…

es是如何处理索引数据的变动的?

1 概述 es是如何处理索引数据的变动的&#xff1f; 或者说索引数据变动时&#xff0c;es会执行哪些操作&#xff1f; refresh、fsync、merge 和 flush 操作有何作用&#xff1f; es是如何确保即使es发生宕机数据也不丢失的&#xff1f; 在回答上述问题前&#xff0c;可以先…

【Linux】多线程_1

文章目录 九、多线程1. 线程概念2. 线程的控制 未完待续 九、多线程 1. 线程概念 我们知道&#xff1a;进程 内核数据结构 进程代码和数据 。那什么是线程呢&#xff1f;线程是进程内部的一个执行分支。一个进程内部可以有多个执行流&#xff08;内核数据结构&#xff09;&…

[高频 SQL 50 题(基础版)]第一千七百五十七题,可回收且低脂产品

题目&#xff1a; 表&#xff1a;Products ---------------------- | Column Name | Type | ---------------------- | product_id | int | | low_fats | enum | | recyclable | enum | ---------------------- product_id 是该表的主键&#xff08;具有唯…

大数据专业创新人才培养体系的探索与实践

一、引言 随着大数据技术的迅猛发展&#xff0c;其在各行各业中的应用日益广泛&#xff0c;对大数据专业人才的需求也日益增长。我国高度重视大数据产业的发展&#xff0c;将大数据作为国家战略资源&#xff0c;推动大数据与各行业的深度融合。教育部也积极响应国家战略&#…

C语言编程4:复合赋值,递增递减运算符,局部变量与全局变量,本地变量,转义字符

一篇文章带你玩转C语言基础语法4&#xff1a;复合赋值&#xff0c;递增递减运算符&#xff0c;局部变量与全局变量&#xff0c;本地变量&#xff0c;转义字符 一、复合赋值&#x1f33f; 1.1&#x1f4a0;定义 赋值就是给任意一个变量或者常量赋一个值&#xff0c;这个值可以…

在亚马逊云科技AWS上利用SageMaker机器学习模型平台搭建生成式AI应用(附Llama大模型部署和测试代码)

项目简介&#xff1a; 接下来&#xff0c;小李哥将会每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。本次介绍的是如何在Amazon …

Jmeter在信息头中设置Bearer与 token 的拼接值

思路&#xff1a;先获取token&#xff0c;将token设置成全局变量&#xff0c;再与Bearer拼接。 第一步&#xff1a;使用提取器将token值提取出来&#xff0c;使用setProperty函数将提取的token值设置成全局变量&#xff0c;在登录请求后面添加BeanShell取样器 或者 BeanShell后…

嘉立创EDA学习笔记

嘉立创EDA学习笔记 PCB引线一、设计规则间距安全间距其他间距 物理导线网络长度差分对过孔尺寸 平面铺铜 PCB布线 作为一个嵌入式开发潜力工程师&#xff0c;咱们必须得学会如何绘制开发板以满足顾客各种功能的需求&#xff0c;因此小编去学习了一下嘉立创&#xff0c;写这篇文…

配网行波故障预警与定位装置:配电线路安全性与可靠性的保障

配网行波故障预警与定位装置&#xff1a;配电线路安全性与可靠性的保障 一、传统配网故障排查的困境 1. 巡检效率低下&#xff1a;在二十世纪80年代及以前&#xff0c;电力线路故障的排查主要依赖于人工巡检&#xff0c;这种方式效率低下&#xff0c;特别是在故障区间较大的情…

Seata解决分布式事务

我举的例子是&#xff1a;在网上购物时&#xff0c;我们支付后&#xff0c;订单微服务会更新订单状态&#xff0c;同时会远程调用购物车微服务清空购物车&#xff0c;和调用商品微服务完成商品库存减一。 我们曾经说的事务是只能在本微服务完成回滚&#xff0c;意思就是如果过…

PHP全域旅游景区导览系统源码小程序

&#x1f30d;【探索无界&#xff0c;畅游无忧】全域旅游景区导览系统小程序全攻略 &#x1f4f1;【一键启动&#xff0c;智能导览在手】 告别纸质地图的繁琐&#xff0c;迎接全域旅游景区导览系统小程序的便捷时代&#xff01;只需轻轻一点&#xff0c;手机瞬间变身私人导游…

如何快速将Excel定义的表结构转换为MySQL的建表语句

目录 引言 方法一&#xff1a;使用Python编程 步骤一&#xff1a;安装必要的库 步骤二&#xff1a;读取Excel文件 步骤三&#xff1a;编写函数生成建表语句 注意事项 方法二&#xff1a;使用Excel VBA 步骤一&#xff1a;启用VBA编辑器 步骤二&#xff1a;编写VBA代码…

通过git将文件push到github 远程仓库

1.先git clone 代码地址 git clone htttp://github.com/用户名/test.git 2. 添加文件 例如&#xff1a;touch 1.txt 3.将文件添加到暂存区 git add 1.txt 4.提交 git commit -m "commit 1.txt" 5.与远程仓库建立关联 git remote add 远程仓库名 远程仓库…

文件操作和IO流(Java版)

前言 我们无时无刻不在操作文件。可以说&#xff0c;我们在电脑上能看到的图片、视频、音频、文档都是一个又一个的文件&#xff0c;我们需要从文件中读取我们需要的数据&#xff0c;将数据运算后也需要将结果写入文件中长期保存。可见文件的重要性&#xff0c;今天我们就来简…

泛微E-Cology getFileViewUrl SSRF漏洞复现

0x01 产品简介 泛微协同管理应用平台e-cology是一套兼具企业信息门户、知识文档管理、工作流程管理、人力资源管理、客户关系管理、项目管理、财务管理、资产管理、供应链管理、数据中心功能的企业大型协同管理平台。 0x02 漏洞概述 泛微E-Cology getFileViewUrl 接口处存在…

[安洵杯 2019]easy_serialize_php

源码&#xff1a; <?php$function $_GET[f];function filter($img){$filter_arr array(php,flag,php5,php4,fl1g);$filter /.implode(|,$filter_arr)./i;return preg_replace($filter,,$img); }if($_SESSION){unset($_SESSION); }$_SESSION["user"] guest; …

0010基于免疫遗传算法的配送中心选址

免疫优化算法&#xff08;Immune Optimization Algorithm, IOA&#xff09;在物流配送中心选址中的应用是通过模拟免疫系统的进化过程来解决选址优化问题。物流配送中心选址问题涉及到如何在给定区域内选择最优的位置&#xff0c;以最大化服务覆盖并最小化运输成本。 免疫优化…