Flink编程——最小程序MiniProgram

news2024/12/24 0:48:30

最小程序MiniProgram

前面我们已经搭建起了Flink 的基础环境,这一节我们就在上一节的基础上,进行编写我们的第一个Flink 程序,开始之前我们先看一下一个完整的Flink 程序是什么样的

Flink 程序结构

为了演示Flink 程序结构,我们下面写了一个程序,这个程序我称之为 MiniProgram,也就是流程序的最小结构

package com.kingcall.examples.stream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;


public class MiniProgram {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> arras = new ArrayList<>();
        arras.add("China");
        arras.add("Japan");
        arras.add("Russia");

        DataStream<String> country= env.fromCollection(arras);

        DataStream<String> filters = country.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String str) throws Exception {
                return !"Japan".equals(str);
            }
        });

        filters.print();

        env.execute();
    }
}

可以参考下面的注释,一个流程序就这样被创建出来了,一个标准的程序就是这样的结构

  1. 每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment
  2. DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
  3. 上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

image-20240123093838466

需要特别注意的是,Flink 的流程序需要我们在最后调用env.execute() 才可以执行,在此之前都是在定义程序的执行逻辑,只有最后的这一步骤才会提交任务并执行

如果没有调用 execute(),应用就不会运行

下面就是程序的输出

image-20240123095811283

1> 和 5> 指出输出来自哪个 sub-task(即 thread),我们可以对上面的程序进行改造一下,也就是设置并行度env.setParallelism(1); 这个时候输出就没有> 这样的提示了

image-20240123100033191

基本的 stream source

上述示例用 env.fromElements(...) 方法构造 DataStream<Person> 。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

DataStream<String> lines = env.socketTextStream("localhost", 9999)

或读取文件

DataStream<String> lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

基本的 stream sink

上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

输出看起来类似于

1> Fred: age 35
2> Wilma: age 35
  1. 1> 和 2> 指出输出来自哪个 sub-task(即 thread)在生产中
  2. 常用的 sink 包括各种数据库和几个 pub-sub 系统

什么是流

前面我们介绍了Flink程序的结构,我们是通过一个叫做最小程序的MiniProgram 进行说明的,我们之前一直说到Flink 程序的数据流,流说明数据是实时的流动的,那这里的数据有什么格式的要求吗,或者什么样的数据才能流动。

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0Tuple25 类型。

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;
POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

示例:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);
完整程序

下面的程序将关于人的记录流作为输入,并且过滤后只包含成年人。

package com.kingcall.examples.stream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Adults {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;

        public Person() {
        }

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

下面就是程序的输出

image-20240123095457015

Stream 执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

image-20240123101650818

调试

在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。

总结

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment)
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【TEE论文】Confidential Serverless Made Efficient with Plug-In Enclaves (2021 ISCA)

Confidential Serverless Made Efficient with Plug-In Enclaves ipads.se.sjtu.edu.cn/chinasys21/vedios/Confidential Serverless Made Efficient with Plug-In Enclaves-李明煜.mp4 问题&#xff1a;在SGX飞地中运行现有的无服务器应用程序&#xff0c;并观察到性能下降可…

【ASOC全解析(一)】ASOC架构简介和欲解决的问题

【ASOC全解析&#xff08;一&#xff09;】ASOC架构简介和欲解决的问题 一、什么是ASOC以及ASOC解决的三个问题二、ASOC的组成与功能解决第一个问题解决第二个问题解决第三个问题 三、ASOC基本工作原理 /********************************************************************…

使用Sobel算子把视频转换为只剩边缘部分

效果展示 原始视频 修改后的视频 整体代码 import cv2vc cv2.VideoCapture(test.mp4)if vc.isOpened():open, frame vc.read() else:open Falsei 0 while open:ret, frame vc.read()if frame is None:breakif ret True:i 1# 转换为灰度图gray cv2.cvtColor(frame, cv…

RabbitMQ进阶篇【理解➕应用】

&#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于RabbitMQ的相关操作吧 目录 &#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 一.什么是交换机 1.概念释义 2.例…

聚观早报 | 苹果将开放第三方NFC支付;华为P70系列参数曝光

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 1月23日消息 苹果将开放第三方NFC支付 华为P70系列参数曝光 Celestiq已正式开始量产 岚图汽车官宣与华为合作 美…

LLM + RecSys 初体验(上)

最近在逛小红书的时候&#xff0c;发现了一个新的GPU算力租赁平台&#xff0c;与AutoDL和恒源云等平台类似。正巧&#xff0c;官网有活动&#xff0c;注册即送RTX 4090三个小时&#xff0c;CPU 5 小时。正巧最近在测试 LLM推荐系统的 OpenP5 平台&#xff0c;果断入手测试! 用…

力扣精选算法100道——x的平方根(二分查找专题)

x的平方根 首先看到这个题目的时候&#xff0c;我们需要对上一个二分查找专题的题目进行深度理解&#xff0c;然后了解模板&#xff0c;这题是完全利用的上一题的模板知识进行&#xff0c;如果直接看这个题目可能是有点懵的&#xff0c;因为我这里直接利用模板进行解题。力扣…

nexus清理docker私库

下载nexus-cli客户端&#xff0c;并非必须下载到服务器&#xff0c;理论上只要能访问到nexus就行 wget https://s3.eu-west-2.amazonaws.com/nexus-cli/1.0.0-beta/linux/nexus-cli这个链接下载不了了&#xff0c;末尾有资源下载&#xff0c;里面包含了完整包和脚本&#xff0…

Mysql主从复制、读写分离、分库分表

大数据处理 1.主从复制1.1 概述1.2 原理1.3 搭建 1.主从复制 主从复制 1.1 概述 主从复制指: 将主数据库的DDL和DML操作通过二进制日志传递到从库服务器中, 然后从库根据日志重新执行(也叫重做), 从而使从库和主库的数据保存同步 MYSQL支持一台主库同时向多台从库进行复制,…

Kafka-服务端-KafkaController

Broker能够处理来自KafkaController的LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest等请求。 在Kafka集群的多个Broker中&#xff0c;有一个Broker会被选举为Controller Leader,负责管理整个集群中所有的分区和副本的状态。 例如&#xff1a;当某分区的Le…

解密.dataru被困的数据:如何应对.dataru勒索病毒威胁

导言&#xff1a; 在数字时代&#xff0c;勒索病毒如.dataru正在不断演变&#xff0c;威胁着用户的数据安全。本文91数据恢复将深入介绍.dataru勒索病毒的特点、被加密数据的恢复方法&#xff0c;以及预防措施&#xff0c;帮助您更好地了解并对抗这一数字威胁。当面对被勒索病…

Armv8-M的TrustZone技术之SAU寄存器总结

每个SAU寄存器是32位宽。下表显示了SAU寄存器概要。 5.1 SAU_CTRL register SAU_CTRL寄存器的特征如下图和表所示&#xff1a; 5.2 SAU_TYPE register 5.3 SAU_RNR register 5.4 SAU_RBAR register 5.5 SAU_RLAR register 5.6 SAU区域配置 当SAU启用时&#xff0c;未由已启用…

深度学习技巧应用33-零门槛实现模型在多个GPU的分布式流水线训练的应用技巧

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下深度学习技巧应用33零门槛实现模型在多个GPU的分布式流水线训练的应用技巧&#xff0c;本文将帮助大家零门槛的实现模型在多个GPU的并行训练&#xff0c;如果你手头上没有GPU资源&#xff0c;根据本文的介绍也可实现…

Redis应用(1)缓存(1.2)------Redis三种缓存问题

三者出现的根本原因是&#xff1a;Redis缓存命中率下降&#xff0c;请求直接打到DB上了。 一、 缓存穿透&#xff1a; 1、定义&#xff1a; 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。…

C++面试宝典第24题:袋鼠过河

题目 一只袋鼠要从河这边跳到河对岸,河很宽,但是河中间打了很多桩子。每隔一米就有一个桩子,每个桩子上都有一个弹簧,袋鼠跳到弹簧上就可以跳得更远。每个弹簧力量不同,用一个数字代表它的力量,如果弹簧力量为5,就代表袋鼠下一跳最多能够跳5米;如果为0,就会陷进去无法…

《WebKit 技术内幕》学习之十一(3):多媒体

3 音频 3.1 音频元素 说完视频之后&#xff0c;接下来就是HTML5中对音频的支持情况。音频支持不仅指对声音的播放&#xff0c;还包括对音频的编辑和合成&#xff0c;以及对乐器数字接口&#xff08;MIDI&#xff09;等的支持&#xff0c;下面逐次介绍并分析它们。 3.1.1 H…

更改wpf原始默认按钮的样式

样式 代码 <Window x:Class"WpfApp4.Window1"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.com/expression/blend/2008…

【linux】Debian挂起和休眠

一、挂起和休眠 在Debian桌面系统中&#xff0c;挂起和休眠是两种不同的状态&#xff0c;它们之间有一些区别。 挂起&#xff08;Suspend&#xff09;是将当前系统的状态保存到RAM&#xff08;内存&#xff09;中&#xff0c;然后关闭所有硬件设备&#xff0c;除了RAM之外。在…

详细分析Java中的list.foreach()和list.stream().foreach()

目录 前言1. 基本知识2. 差异之处2.1 执行顺序2.2 串行并行2.3 复杂数据处理2.4 CRUD集合2.5 迭代器 3. 总结4. 彩蛋 前言 典故来源于项目中使用了两种方式的foreach&#xff0c;后面尝试体验下有何区别&#xff01; 先看代码示例&#xff1a; 使用List的forEach&#xff1a…

风丘车辆热管理测试方案

车辆热管理是在能源危机出现、汽车排放法规日益严格以及人们对汽车舒适性要求更高的背景下应运而生的。将各个系统或部件如冷却系统、润滑系统和空调系统等集成一个有效的热管理系统&#xff1b;控制和优化车辆的热量传递过程&#xff0c;保证各关键部件和系统安全高效运行&…