【API篇】二、源算子API

news2024/12/25 10:27:04

文章目录

  • 0、demo数据
  • 1、源算子Source
  • 2、从集合中读取数据
  • 3、从文件中读取
  • 4、从Socket读取
  • 5、从Kafka读取
  • 6、从数据生成器读取数据
  • 7、Flink支持的数据类型
  • 8、Flink的类型提示(Type Hints)

0、demo数据

准备一个实体类WaterSensor:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class WaterSensor{

	private String id;   //水位传感器类型
 
 	private Long ts;     //传感器记录时间戳

	private Integer vc;  //水位记录
}
//注意所有属性的类型都是可序列化的,如果属性类型是自定义类,那要实现Serializable接口

模块下准备个文件words.txt,内容:

hello flink
hello world
hello java

1、源算子Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。

在这里插入图片描述
Flink1.12以前,添加数据源的方式是,调用执行环境对象的addSource方法

DataStream<String> stream = env.addSource(...);
//方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口

Flink1.12开始的流批统一的Source框架下,则是:

DataStreamSource<String> stream = env.fromSource()

2、从集合中读取数据

调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
	List<Integer> data = Arrays.asList(1, 22, 33);
	
    DataStreamSource<Integer> ds = env.fromCollection(data);

	stream.print();   //直接打印

    env.execute();
}

还可以直接fromElements方法:

DataStreamSource<Integer> ds = env.fromElements(1,22,33);

在这里插入图片描述

3、从文件中读取

从文件中读是批处理中最常见的读取方式,比如读取某个日志文件。首先需要引入文件连接器依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>${flink.version}</version>
</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		//第三个参数为自定义的sourceName
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();

        env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file")
    	   .print();

        env.execute();
}

  • FileSource数据源对象的创建,传参可以是目录,也可以是文件,可以相对、绝对路径,也可从HDFS目录下读,开头格式hdfs://…
  • 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录
  • 之前的env.readTextFile方法被标记为过时是因为底层调用了addSource

4、从Socket读取

前面的文件和集合,都是有界流,而Socket常用于调试阶段模拟无界流:

DataStream<String> stream = env.socketTextStream("localhost", 9527);
# 对应的主机执行
nc -lk 9527

5、从Kafka读取

数据源是外部系统,常需要导入对应的连接器的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

实例:

public class SourceKafka {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop01:9092,hadoop02:9092,hadoop03:9092")   //指定Kafka节点的端口和地址
            .setTopics("topic_1")  //消费的Topic
            .setGroupId("code9527") //消费者组id
            //Flink程序做为Kafka的消费者,要进行对象的反序列化,setDeserializer对key和value都生效
            .setStartingOffsets(OffsetsInitializer.latest())  //指定Flink消费Kafka的策略
            .setValueOnlyDeserializer(new SimpleStringSchema())   //反序列化Value的反序列化器
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
}

//很多传参Ctrl+P看源码类型、Ctrl+H实现类自行分析

Kafaka的消费者参数:

  • earliest:有offset,就从offset继续消费,没offset,就从最早开始消费
  • latest:有offset,就从offset继续消费,没offset,就从最新开始消费

Flink下的KafkaSource,offset消费策略有个初始化器OffsetInitializer,默认是earliest:

  • earliest:一定从最早消费
  • latest:一定从最新消费

注意和Kafka自身的区别。

6、从数据生成器读取数据

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数来调试。1.17版本提供了新写法,导入依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-datagen</artifactId>
	<version>${flink.version}</version>
</dependency>
public class DataGeneratorDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource =
                new DataGeneratorSource<>(
                        new GeneratorFunction<Long, String>() {
                            @Override
                            public String map(Long value) throws Exception {
                                return "Number:"+value;
                            }
                        },
                        Long.MAX_VALUE,
                        RateLimiterStrategy.perSecond(10),
                        Types.STRING
                );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator")
           .print();

        env.execute();
    }
}

new数据生成器源对象,有四个参数:

  • 第一个为GeneratorFunction接口,key为Long型,value为需要map转换后的类型。需要实现map方法,输入方法固定是Long类型
  • 第二个为自动生成数字的最大值,long型,到这个值就停止生成
  • 第三个为限速策略,比如每秒生成几个
  • 第四个为返回的数据类型,Types.xx,Types类是Flink包下的

在这里插入图片描述

嘶,并行度默认为CPU核心数了,输出算子6个子任务,且是每个并行度上是各自自增的(先按总数/并行度划分,再各自执行,比如最大值100,并行度2,那一个从0开始,另一个从50到99)。数字打印出来看着有点乱了,改下并行度

env.setParallelism(1);

在这里插入图片描述

可以看到程序结束了,相当于有界流了,想模拟无界流,可以第二个参数传Long.MAX_VALUE,这就一直输出了

7、Flink支持的数据类型

Flink使用类型信息(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器

在这里插入图片描述

对于Java和Scale常见的数据类型,Flink都支持,在Types工具类中可以看到:

在这里插入图片描述

Flink支持所有自定义的Java类和Scala类,但要符合以下要求:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是可访问的,即公有public或private+getter、setter
  • 类中所有属性的类型都是可以序列化的

不满足以上要求的类,会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性,它们也不是由Flink本身序列化的,而是由Kryo序列化的。

8、Flink的类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的,需要我们手动显示提供类型信息。

之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

//....
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1093716.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

导航栏参考代码

导航栏参考代码 <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>导航栏参考代码</title> </head> <body> <table width"858" border"0" align"center"><tr&g…

ASO优化之应用评分评论对APP下载量增长的重要性

APP应用评分和评论对于下载转化率的效果是显著的&#xff0c;是ASO优化重要的因素之一。应用评分是对应用性能的衡量预判&#xff0c;而应用评论是用户向应用提供的反馈总结。 1、评分的重要性。 应用评分显示在应用商店的搜索结果&#xff0c;特色页面&#xff0c;热门图表中…

第五十七章 学习常用技能 - 查看Globals

文章目录 第五十七章 学习常用技能 - 查看Globals查看Globals测试查询并查看查询计划 第五十七章 学习常用技能 - 查看Globals 查看Globals 要查看一般Globals&#xff0c;可以使用 ObjectScript ZWRITE 命令或管理门户中的全局页面。如果正在寻找存储类数据的Globals&#x…

docker搭建nginx+php-fpm

docker run --name nginx -p 8898:80 -d nginx:1.20.2-alpine# 将容器nginx.conf文件复制到宿主机 docker cp nginx:/etc/nginx/nginx.conf /usr/local/nginx/conf/nginx.conf# 将容器conf.d文件夹下内容复制到宿主机 docker cp nginx:/etc/nginx/conf.d /usr/local/nginx/conf…

Linemod算法研究

转载&#xff0c;这篇博客写的比较详细&#xff0c;分析也到位. https://www.cnblogs.com/aoru45/p/16810996.html

Linux查看端口号及进程信息

Linux查看端口号及进程 Linux查看端口号 netstat netstat -tuln显示当前正在监听的端口号以及相关的进程信息 ss ss -tuln与netstat类似&#xff0c;ss也可以用于显示当前监听的端口以及相关信息 isof isof -i :端口号端口号替换为具体要查找的端口号&#xff0c;显示该端…

Mysql数据库 4.图形化界面工具DataGrip

DataGrip工具 选择进入官网安装 ataGrip: The Cross-Platform IDE for Databases && SQL by JetBrains 下载最新版本的可以直接点击 Download 下载&#xff0c;下载其他版本的点击 Other versions 下载其他版本 .选择对应的版本进行下载即可 下载完&#xff0c;得到…

Redis中的BigKey如何发现和处理

文章目录 什么是BigKey?大键的存在通常被认为是不好的,主要原因:常见的bigkey原因: BigKey危害&#xff1f;占用大量内存空间阻塞服务器进程加长持久化时间延长复制时间增加内存碎片加重AOF重写压力降低查找效率 如何发现BigKey&#xff1f;info命令scan命令Redis-cli第三方工…

002数据安全传输-多端协议传输平台:配置Oracle数据库-19c及导入数据信息

002多端协议传输平台&#xff1a;配置Oracle数据库-19c及导入数据信息 文章目录 002多端协议传输平台&#xff1a;配置Oracle数据库-19c及导入数据信息1. 数据库准备2. 导入sql脚本2.1 原版Oracle-11g脚本2.2 新版Oracle-19c脚本2.3 命令行导入脚本 3. 删除系统中数据库信息sql…

在雷电模拟器9上安装magisk并安装LSPosed模块以及其Manager管理器(二)之LSPosed的使用

上一篇已经安装好LSPosed模块及其Manager管理器&#xff0c;参考文章 在雷电模拟器9上安装magisk并安装LSPosed模块以及其Manager管理器&#xff08;一&#xff09;-CSDN博客 安装完成后&#xff0c;在模拟器上出现图标如下&#xff1a; 一、运行LSPosed 二、仓库模块 内容非…

《向量数据库指南》——Milvus Cloud和Elastic Cloud 特性对比

随着以 Milvus 为代表的向量数据库在 AI 产业界越来越受欢迎,诸如 Elasticsearch 之类的传统数据库和检索系统也开始行动起来,纷纷在快速集成专门的向量检索插件方面展开角逐。 例如,在提供类似插件的传统数据库中,Elasticsearch 8.0 首屈一指,推出了包括向量插入和最相似…

蓝桥杯 第 1 场算法双周赛 第1题 三带一 c++ map 巧解 加注释

题目 三带一【算法赛】https://www.lanqiao.cn/problems/5127/learning/?contest_id144 问题描述 小蓝和小桥玩斗地主&#xff0c;小蓝只剩四张牌了&#xff0c;他想知道是否是“三带一”牌型。 所谓“三带一”牌型&#xff0c;即四张手牌中&#xff0c;有三张牌一样&#…

字符串进行 URL 编码处理

// URL_test.cpp : Defines the entry point for the console application. //#include "stdafx.h"#include <stdio.h> #include <stdlib.h> #include <string.h>// 判断字符是否需要进行 URL 编码 static bool needs_encoding(char c) {if ((c &g…

力扣第538题 把二叉搜索树转换为累加树 c++

题目 538. 把二叉搜索树转换为累加树 中等 相关标签 树 深度优先搜索 二叉搜索树 二叉树 给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值…

相似性搜索:第 3 部分--混合倒排文件索引和产品量化

接续前文&#xff1a;相似性搜索&#xff1a;第 2 部分&#xff1a;产品量化 SImilarity 搜索是一个问题&#xff0c;给定一个查询的目标是在所有数据库文档中找到与其最相似的文档。 一、介绍 在数据科学中&#xff0c;相似性搜索经常出现在NLP领域&#xff0c;搜索引擎或推…

【RocketMQ系列二】通过docker部署单机RocketMQ

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精…

goland安装教程

安装版本&#xff1a; goland-2023.2.3.exe

(滑动窗口) 76. 最小覆盖子串 ——【Leetcode每日一题】

❓76. 最小覆盖子串 难度&#xff1a;困难 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 "" 。 注意&#xff1a; 对于 t 中重复字符&#xff0c;我们寻找的子字符串…

C语言 sizeof

定义 sizeof是C语言的一种单目操作符。它并不是函数。sizeof操作符以字节形式给出了其操作数的存储大小。操作数可以是一个表达式或括在括号内的类型名。操作数的存储大小由操作数的类型决定。 使用方法 用于数据类型 sizeof(type) 数据类型必须用括号括住 用于变量 size…

C#,数值计算——数据建模Proposal的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public class Proposal { public Normaldev gau { get; set; } null; private double logstep { get; set; } public Proposal(int ranseed, double lstep) { this.gau…