Java版Flink使用指南——自定义无界流生成器

news2024/11/13 9:39:05

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunction
        implements SourceFunction<OUT> {

    private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(count++); // Emit data
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;

/**
 * Skeleton for a Flink DataStream Job.
 *
 * <p>For a tutorial how to write a Flink application, check the
 * tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
 */
public class DataStreamJob {

	public static void main(String[] args) throws Exception {
		// Sets up the execution environment, which is the main entry point
		// to building Flink applications.
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source")
            .setParallelism(1) 
            .print(); // For demonstration, print the stream to stdout

		// Execute program, beginning computation.
		env.execute("Flink Java API Skeleton");
	}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1905964.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AntV X6 图编辑引擎速通

前言&#xff1a;参考 [AntV X6 官网](https://x6.antv.antgroup.com/) 一、简介 X6 可以快速搭建 DAG 图、ER 图、流程图、血缘图等应用。 二、快速上手 1. 安装 npm install antv/x6 --save# oryarn add antv/x6# orpnpm add antv/x6 2. 使用 2.1 初始画布 在页面中创…

利用面向AWS的Thales Sovereign解决方案保护AI之旅

亚马逊网络服务(AWS)是全球最大的云服务提供商。众所周知&#xff0c;他们致力于提供工具、解决方案和最佳实践&#xff0c;使其客户能够安全地利用AWS上的生成式人工智能 (GenAI) 工作负载。组织正在迅速使用GenAI为企业带来更高的生产力和创造力。在GenAI的几乎所有用途中&am…

Python实战项目:外星人入侵(源码分享)(文章较短,直接上代码)

✌ 作者名字&#xff1a;高峰君主 &#x1f482; 作者个人网站&#xff1a;高峰君主 - 一个数码科技爱好者 &#x1f4eb; 如果文章知识点有错误的地方&#xff0c;请指正&#xff01;和大家一起学习&#xff0c;一起进步&#x1f440; &#x1f4ac; 人生格言&#xff1a;没有…

vb.netcad二开自学笔记7:绘图命令的改进与扩展

1、在笔记6中创建了一个绘制直线的命令&#xff0c;若其他基本绘图命令都按那个写法会麻烦得要死&#xff0c;所以要总结其中的公共复用部分包封到一个统一的类中&#xff0c;这样在以后使用起来&#xff0c;特别是移植VBA代码时更方便&#xff0c;代码如下&#xff1a; Publi…

通过SimU-Net进行同时深度学习体素分类的纵向CECT扫描肝病灶变化分析| 文献速递-深度学习自动化疾病检查

Title 题目 Liver lesion changes analysis in longitudinal CECT scans by simultaneous deep learning voxel classification with SimU-Net 通过SimU-Net进行同时深度学习体素分类的纵向CECT扫描肝病灶变化分析 01 文献速递介绍 影像学随访是对影像学研究的解读&#x…

【C++高阶】深入理解红黑树:数据结构与算法之美

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;了解 AVL 树 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀红黑树 &#x1f4d2;1. 红黑树的概…

加油站税控云平台:税务合规新标杆,引领油站高效运营

在当今数字化快速发展的时代&#xff0c;加油站行业也面临着前所未有的变革。税务管理的合规性与运营的高效性成为了加油站发展的两大核心要素。 而加油站税控云平台的出现&#xff0c;无疑为这一传统行业注入了新的活力&#xff0c;它不仅是税务合规的新标杆&#xff0c;更是…

图形编辑器基于Paper.js教程07:鼠标画直线或移动路径

探索Paper.js: 使用鼠标绘制直线和轨迹 在数字图形设计和Web应用开发中&#xff0c;提供一个直观和互动的界面供用户绘制图形是极为重要的。Paper.js是一款功能强大的JavaScript库&#xff0c;它使得在HTML5 Canvas上绘制矢量图形变得简单快捷。本文将介绍如何使用Paper.js实现…

昇思MindSpore25天学习Day19:CycleGAN图像风格迁移互换

(TOC)[CycleGAN图像风格迁移呼唤] 模型介绍 模型简介 CycleGAN(Cycle Generative Adversaial Network)即循环对抗生成网络&#xff0c;来自论文Link:Unpaired lmage-to-mage Translation using Cycle-Consistent AdvesairalNetworks该模型实现了—种在没有配对示例的情况下学…

【单片机毕业设计选题24049】-基于STM32单片机的智能手表设计

系统功能: 显示时间&#xff0c;温湿度&#xff0c;体温信息&#xff0c;播放音乐及控制红外小夜灯&#xff0c;通过蓝牙模块连接手机APP。 系统上电后OLED显示“欢迎使用智能手表系统请稍后”&#xff0c;两秒后进入正常页面显示 第一行显示获取到的当前时间 第二行显示获…

汽车数据应用构想(六)

今天接着说车辆独有的数据信息&#xff0c;对于车辆本身的故障、损耗&#xff0c;原理上都会有相应的数据特征&#xff0c;举个例子&#xff1a; 刹车对于安全无比重要&#xff0c;但刹车性能的下降却并不会引发仪表告警。一般都是保养的时候&#xff0c;工人肉眼观察一下刹车…

文章解读与仿真程序复现思路——太阳能学报EI\CSCD\北大核心《考虑碳效益和运行策略的风电场储能优化配置》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

【scau统计学】期末考试——Excel数据分析加载项

首先机房只有MicrosoftExcel没有WPS&#xff0c;所以需要熟悉Microsoft Excel的使用。 如果没有数据分析这块选项 如果没有数据分析这个加载项&#xff0c;请先下载。 左上角点击文件&#xff1a; 点击左下角&#xff08;选项&#xff09;&#xff0c;选择加载项 点击下方&a…

【C语言】C语言编译链接和Win32API简单介绍

目录 翻译环境和运行环境翻译环境编译器预处理&#xff08;预编译&#xff09;编译链接 执行环境 Win32API是什么控制台程序控制台获取坐标COORDGetStdHandle函数GetConsoleCursorinfo函数CONSOLE_CURSOR_INFOSetConsoleCursorInfo函数SetConsoleCursorPostion函数GetAsyncKeyS…

完美解决ValueError: not enough values to unpack (expected 2, got 1)的正确解决方法,亲测有效!!!

完美解决ValueError: not enough values to unpack (expected 2, got 1)的正确解决方法&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 亲测有效 完美解决ValueError: not enough values to unpack (expected 2, got 1)的正确解决方法&#xff0c;亲测有效&#xf…

算法设计与分析 实验5 并查集法求图论桥问题

目录 一、实验目的 二、问题描述 三、实验要求 四、实验内容 &#xff08;一&#xff09;基准算法 &#xff08;二&#xff09;高效算法 五、实验结论 一、实验目的 1. 掌握图的连通性。 2. 掌握并查集的基本原理和应用。 二、问题描述 在图论中&#xff0c;一条边被称…

Three.js机器人与星系动态场景(四):封装Threejs业务组件

实际在写业务的时候不会在每个组件里都写几十行的threejs的初始化工作。我们可以 将通用的threejs的场景、相机、render、轨道控制器等进行统一初始化。同时将非主体的函数提到组件外部&#xff0c;通过import导入进组件。将业务逻辑主体更清晰一些。下面的代码是基于reactthre…

(附源码)springboot共享单车管理系统-计算机毕设 65154

springboot共享单车管理系统 摘 要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于共享单车管理系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了共享单车管理系…

Leetcode3194. 最小元素和最大元素的最小平均值

Every day a Leetcode 题目来源&#xff1a;3194. 最小元素和最大元素的最小平均值 解法1&#xff1a;排序遍历 将数组 nums 排序后&#xff0c;利用双指针计算每一对 (minElement maxElement) / 2&#xff0c;最小值即为答案。 代码&#xff1a; /** lc appleetcode.cn …

多线程网络实战之仿qq群聊的服务器和客户端

目录 一、前言 二、设计需求 1.服务器需求 2.客户端需求 三、服务端设计 1.项目准备 2.初始化网络库 3.SOCKET创建服务器套接字 4. bind 绑定套接字 5. listen监听套接字 6. accept接受客户端连接 7.建立套接字数组 8. 建立多线程与客户端通信 9. 处理线程函数&…