Java版Flink使用指南——分流导出

news2024/9/30 2:44:02

大纲

  • 新建工程
  • 编码
    • Pom.xml
    • 自定义无界流
    • 分流
  • 测试
  • 工程代码

在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。

新建工程

我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

编码

Pom.xml

因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(count++); // Emit data
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

分流

我们通过下面的代码生成数据流

		DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。

		String host = "172.25.103.252"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		int parallelism = 1;
		
		String oddSinkQueueName = "odd.data.to.rbtmq"; 
		RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());
		longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");

		String evenSinkQueueName = "even.data.to.rbtmq";
		RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());
		longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");

测试

执行一段时间后,我们看到两个队列相序增加
在这里插入图片描述
奇数队列
在这里插入图片描述
偶数队列
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1913258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java如何实现一个死锁 ?

死锁(Deadlock)是指在并发系统中,两个或多个线程(或进程)因争夺资源而互相等待,导致它们都无法继续执行的一种状态。 一、简易代码 public class DeadlockExample {private static final Object lock1 = new Object();private

Python面试宝典第9题:买卖股票

题目 给定一个整型数组&#xff0c;它的第i个元素是一支给定股票第i天的价格。如果最多只允许完成一笔交易&#xff08;即买入和卖出一支股票一次&#xff09;&#xff0c;设计一个算法来计算你所能获取的最大利润。注意&#xff1a;你不能在买入股票前卖出股票。 示例 1&#…

前端面试题36(js栈和堆)

在JavaScript中&#xff0c;内存管理是自动进行的&#xff0c;主要通过栈(stack)和堆(heap)两种方式来分配和管理内存。理解这两者对于深入学习JavaScript以及优化代码性能非常关键。 栈 (Stack) 栈是一种后进先出&#xff08;Last In, First Out, LIFO&#xff09;的数据结构…

U盘启动快捷键查询

电脑开机一般默认自身硬盘启动系统&#xff0c;如需要U盘重装系统&#xff0c;开机时一直按对应机型的U盘启动快捷键&#xff0c;选择对应USB设备即可U盘启动。 一、品牌台式 二、品牌笔记本 三、组装电脑

Go语言---Json

JSON (JavaScript Object Notation)是一种比XML 更轻量级的数据交换格式&#xff0c;在易于人们阅读和编写的同时&#xff0c;也易于程序解析和生成。尽管JSON是 JavaScript的一个子集&#xff0c;但 JSON采用完全独立于编程语言的文本格式&#xff0c;且表现为键/值对集合的文…

红日靶场----(三)漏洞利用

上期已经信息收集阶段已经完成&#xff0c;接下来是漏洞利用。 靶场思路 通过信息收集得到两个吧靶场的思路 1、http://192.168.195.33/phpmyadmin/&#xff08;数据库的管理界面&#xff09; root/root 2、http://192.168.195.33/yxcms/index.php?radmin/index/login&am…

深入探索大语言模型

深入探索大语言模型 引言 大语言模型&#xff08;LLM&#xff09;是现代人工智能领域中最为重要的突破之一。这些模型在自然语言处理&#xff08;NLP&#xff09;任务中展示了惊人的能力&#xff0c;从文本生成到问答系统&#xff0c;无所不包。本文将从多个角度全面介绍大语…

在vue3中,手写父子关联,勾选子级父级关联,取消只取消当前子级,父节点不动

树形控件选择子级勾选父级&#xff0c;以及所有子级&#xff0c; 取消勾选仅取消子级 在项目中&#xff0c;可能会遇到这种场景&#xff0c;比如权限配置的时候&#xff0c;页面权限和菜单权限以tree的形式来配置&#xff0c;而且不用半选&#xff0c;菜单在页面的下面&#xf…

OR-3H7-4晶体管光耦,可对标替代TLP281-4等

提供隔离反馈 逻辑电路之间的接口 提供1通道和4通道 电平转换 DC和AC输入 SMPS中的调节反馈电路 消除接地环路 特征 电流传输比&#xff1a;IF 1mA&#xff0c;VCE 5V&#xff0c;Ta 25 C时最小50% 高输入输出隔离电压。&#xff08;VISO3&#xff0c;750Vrms&#xf…

基于Java中的SSM框架实现暖心家装平台系统项目【项目源码+论文说明】

基于Java中的SSM框架实现暖心家装平台系统演示 摘要 自从互联网技术得到大规模的应用以后&#xff0c;传统家装企业面临全新的竞争激烈的市场环境。要想占得当前家装营销与管理的先机&#xff0c;除了要加强内部管理&#xff0c;提高企业内部运营效率&#xff0c;更要积极推进…

【漏洞复现】时空智友ERP——uploadStudioFile——任意文件上传

声明&#xff1a;本文档或演示材料仅供教育和教学目的使用&#xff0c;任何个人或组织使用本文档中的信息进行非法活动&#xff0c;均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 时空智友ERP是专为医药等行业设计的综合性企业资源规划系统&…

Camera Raw:蒙版 - 蒙版叠加

Camera Raw “蒙版”模块中的蒙版叠加 Calibration功能可以帮助用户在调整照片时更好地可视化和管理所选区域&#xff0c;提高照片局部处理过程中的效率。 ◆ ◆ ◆ 使用方法与技巧 1、自动切换叠加 默认情况下启用“自动切换叠加”选项&#xff0c;这样可以使得在绘制蒙版时…

谷粒商城学习笔记-23-分布式组件-SpringCloud Alibaba-Nacos配置中心-简单示例

之前已经学习了使用Nacos作为注册中心&#xff0c;这一节学习Nacos另外一个核心功能&#xff1a;配置中心。 一&#xff0c;Nacos配置中心简介 Nacos是一个易于使用的平台&#xff0c;用于动态服务发现和配置管理。作为配置中心&#xff0c;Nacos提供了以下核心功能和优势&am…

全终端自动化测试框架wyTest

突然有一些觉悟&#xff0c;程序猿不能只会吭哧吭哧的低头做事&#xff0c;应该学会怎么去展示自己&#xff0c;怎么去宣传自己&#xff0c;怎么把自己想做的事表述清楚。 于是&#xff0c;这两天一直在整理自己的作品&#xff0c;也为接下来的找工作多做点准备。接下来…

详细分析Spring中的@Configuration注解基本知识(附Demo)

目录 前言1. 基本知识2. 详细分析3. Demo3.1 简单Bean配置3.2 属性配置3.3 多条件配置 4. 实战拓展 前言 Java的基本知识推荐阅读&#xff1a; java框架 零基础从入门到精通的学习路线 附开源项目面经等&#xff08;超全&#xff09;Spring框架从入门到学精&#xff08;全&am…

2-1静态库

静态库制作 编写库文件 test.c #include<stdio.h> int main(void){printf("%d\n",add(3,5));return 0; }add.c int add(int a,int b){return ab; }生成.o(目标文件) 用nm查看.o文件 T代表add这个函数的链接性是外部链接&#xff0c;即全局可见&#xff0c;…

03_Shell变量

【Shell】03_Shell变量 一、环境变量 Linux系统配置文件&#xff08;全局配置文件和用户个人配置文件&#xff09;中定义的变量&#xff0c;提供给所有Shell程序使用 1.1、全局环境变量 1.1.1、配置文件位置 /etc/environment /etc/bashrc&#xff08;或者/etc/bash.bashrc…

tensorflow张量生成以及常用函数

张量tensor&#xff1a;多维数组&#xff08;列表&#xff09; 阶&#xff1a;张量的维数 维数 阶 名字 例子 0-D 0 标量 scalar s 1&#xff0c; 2&#xff0c; 3 1-D 1 向量 vector…

Mac VSCode 突然闪退、崩溃、打不开了

1、 思路历程 VSCode 作为前端常用开发工具&#xff0c;其重要性就不一一描述了。 所以 VSCode 突然打不开了&#xff0c;真的是让我一脸懵逼。 本来以为问题不大&#xff0c;于是 &#xff1a; 1、重启了一下VSCode 2、关机重启了一下电脑&#xff1b; 3、清理了一下缓存&am…

阈值分割后配合Connection算子和箭头工具快速知道区域的ID并选择指定区域

代码 dev_close_window () read_image (Image, E:/机器视觉学习/海康视觉平台/二期VM视觉学习/二期VM视觉学习/机器视觉程序/标定相机找圆心和焊头修正相机找圆心之算法软件/标定相机找圆心和焊头修正相机找圆心之算法软件/03 标定相机找圆心/S2/1号机/1.bmp) get_image_size …