Flink DataStream API 介绍

news2024/11/15 21:23:25

Flink DataStream API 介绍

StreamExecutionEnvironment

StreamExecutionEnvironment
StateBackend管理
setStateBackend()
Checkpoint管理
enableCheckpointing()
Serialzer序列化管理
addDefaultKryoSerialize()
类型和序列化注册
registerTypewithKryoSerializer()
registerType()
DataStream数据源创建
addSource()
readTextFile()
fromCollection()
fromElements()
socketTextStream()
TimeCharacteristic管理
setStreamTimeCharacteristic()
Transformation存储与管理
addOperation()
StreamGraph创建和获取
getStreamGraph()
CacheFile注册与管理
registerCacheFile()
任务提交与运行
execute()
executeAsync()
重启策略
setRestartStrategy()

DataStram数据源

StreamExecutionEnvironment 数据源
基本数据源接口(直接使用)
GenerateSequence
Collection集合
Socket
File(HDFS,Local)
数据源连接器(需要依赖第三方依赖)
Kafka Connector
Es Connector
Custom DataSource
根据具体数据源决定
addSource()方法

Datastream 基本数据源

//从给定的数据元素中转换
DatastreamSource<OUT> fromElements(OUT... data)
//从指定的集合中转换成DataStream
DatastreamSource<OUT> flomCollection(Collection<OUT> data)
//读取文件并转换
DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath)
//从Scocket端口中读取
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter)
//直接通过InputFormat创建
DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat)

最终都是通过 ExecutionEnvironment 创建 fromSource() 方法转换成DataStreamSource

Datastream 数据源连接器

Flink 内 置 Connector:

  • Apache Kafka (source/sink)

  • Apache Cassandra (Sink)

  • Amazon Kinesis Streams (source/sink)

  • Elasticsearch(Sink)

  • Hadoop FileSystem (sink)

  • RabbitMQ (source/sink)

  • Apache NiFi (source/sink)

  • Twitter Streaming API (source)

  • Google PubSub (source/sink)

  • JDBC (sinkJ

Apache Bahir 项 目 :

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

以Kafka 连接器为例 :

<dependency>
    <groupld>org.apache.flink</groupld>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

Datastream 数据源连接器 - Source

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092);
properties.setProperty("group.id", "test0");

Datastream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic",new SimpleStringSchema(), properties));

Datastream 数据源连接器

以Kafka 连接器为例 :

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();

specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificstartOffsets);

Datastream 数据源连接器 - Sink

Datastream<string> stream =Properties properties = new Properties();
properties.setpProperty("bootstrap.servers", "localhost:9092");

FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("my-topic",//target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addsSink(myProducer);

Datastream 主要转换操作

在这里插入图片描述

DataStream转换操作
基于单数据处理
map
一对一转换
filter
过滤
flatmap
一对多转换
Window操作
NonKeyed DataStream
Keyed DataStream
timeWindowAll
时间窗口
countWindowAll
计数窗口
windowAll
自定义窗口
timeWindowAll
时间窗口
countWindowAll
计数窗口
windowAll
自定义窗口
多流合并
NonKeyed DataStream
join
关联操作
connect
连接操作
coGroup
关联操作
union
合并操作
Keyed DataStream
interval join
间隔join操作
单流切分
split
切分操作
sideOutput
旁路输出

理解Keyedstream

在这里插入图片描述

Datastream 之间的转换

在这里插入图片描述

物理分组操作

类型描述
dataStream.global();全部发往第1个task
dataStream.broadcast();广播
dataStream.forward();上下游并发度一样时一对一发送
dataStream.shuffle();随机均匀分配
dataStream.rebalance();Round-Robin(轮流分配)
dataStream.recale();Local Round-Robin(本地轮流分配)
dataStream.partitionCustom();自定义单播

public DataStream<T> shuffle(){
    return setConnectionType(new ShufflePartitioner<T>());
}

DataStream Kafka 实例

public class KafkaExample{
    public static void main(String[] args) throws Exception {
        // parse inputarg umenlts
        final ParameterTool parameterTool = ParameterTooLfromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getConfig().setGlobalobParameters(parameterTool); // make parameters available in the web interface
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<kafkaEvent> input = env
            .addSource(
            new FlinkKafKaConsumer<>(
                parameterTool.getRedquired("input-topic"),
                new KafkaEventSchema(),
                parameterTool.getPropelties())
            .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
            .keyBy("word")
            .map(new RollingAdditionMapper())
            .shuffle();
        input.addSink(
            new FlinKafkaProduCer<>(
                parameterTool.getRequired("output-topic"),
                new KeyedSerializationSchemaWrapper<>(new KafKaEventSChema()),
                parameterTool.getProperties(),
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
        env.execute("Modern Kafka ExamPle");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用element-ui实现树形穿梭框

<template><div class"transferTreeBox"><!-- 左侧待选内容 --><div class"SelectBox"><div class"boxTitle" click"clickAllSelect">全选 ></div><div class"boxCenter"><…

【Hack The Box】Linux练习-- Frolic

HTB 学习笔记 【Hack The Box】Linux练习-- Frolic &#x1f525;系列专栏&#xff1a;Hack The Box &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4c6;首发时间&#xff1a;&#x1f334;2022年9月7日&#x1f334; &#x1f36d…

dumi 2,它来了它来了它来了

dumi 1.0 在 2020 年 3 月 2 日正式发布&#xff0c;到今天一共有 80 位 Contributor、提交 1100 Commit、为近 4000 个开源项目提供了组件库/站点的文档方案&#xff1b;dumi 作为一个 GitHub 数亿开源项目中的渺小一粒&#xff0c;能有这么多人共同参与、能为这么多项目提供价…

【JAVA程序设计】(C00097) 基于SSM的果树溯源可视化管理系统

基于SSM的果树溯源可视化管理系统项目简介项目获取开发环境项目技术运行截图项目简介 基于ssm框架的果树溯源可视化管理系统&#xff0c;本系统分为二种用户&#xff1a;管理员、农户 管理员角色包含以下功能&#xff1a; 登录、农户管理、商家管理、果树管理、地块管理、农资…

马上2023年了,终于发现一款颜值爆表的记账软件

不知道大家平时有没有记账的习惯&#xff0c;我是在疫情之后&#xff0c;才开始记账的。 记账之后&#xff0c;的确发现了很多问题。尤其是自己花钱大手大脚没有规划的毛病。 后来&#xff0c;在每个月第1周&#xff0c;我都会分析一下上一个月的账目&#xff0c;看看自己的收…

同花顺_代码解析_交易系统_J09_18

本文通过对同花顺中现成代码进行解析&#xff0c;用以了解同花顺相关策略设计的思想 目录 J_09 抛物线转向系统 J_10 均线系统 J_11 随机指标专家 J_12 顺势指标 J_15 动量线 J_16 心理线 J_17 变动速率 J_18 相对强弱指标 J_09 抛物线转向系统 指标标识由绿变红时为买…

LeetCode287之寻找重复数(相关话题:二分查找,快慢指针)

题目描述 给定一个包含 n 1 个整数的数组 nums &#xff0c;其数字都在 [1, n] 范围内&#xff08;包括 1 和 n&#xff09;&#xff0c;可知至少存在一个重复的整数。 假设 nums 只有 一个重复的整数 &#xff0c;返回 这个重复的数 。 你设计的解决方案必须 不修改 数组 …

【SIFT】超详详详解 - 实现细节记录

目录前言一、尺度空间的生成&#xff1a;高斯金字塔 Gaussian pyrmid1、图像的尺度空间 - 高斯金字塔 Gaussian pyramid2、高斯金字塔的组 与 组数 Octave1&#xff09;组2&#xff09;组数3&#xff09;升采样获得 base image3、高斯金字塔的层与层数 interval31&#xff09;层…

2022年轨道交通行业研究报告

第一章 行业概况 轨道交通是指运营车辆需要在特定轨道上行驶的一类交通工具或运输系统。最典型的轨道交通就是由传统火车和标准铁路所组成的铁路系统。随着火车和铁路技术的多元化发展&#xff0c;轨道交通呈现出越来越多的类型&#xff0c;不仅遍布于长距离的陆地运输&#x…

k8s基础命令及Linux上用Kubectl(k8s)部署Nginx

k8s基础命令及Linux上用Kubectl(k8s)部署Nginx 不懂K8s搭建的可以看我这篇文章 Linux上部署Kubectl(k8s) 1.k8s简介 1.1 Kubernetes 概念 在 k8s 上进行部署前&#xff0c;首先需要了解一个基本概念 Deployment Deployment 译名为 部署。在k8s中&#xff0c;通过发布 Depl…

积分商城小程序的作用_分享积分商城小程序开发的效果

积分商城系统带来的6点作用分别是&#xff1a;对商家的依赖性、提升转化和复购、运营模式多元化、提升收益、进行积分营销、进行口碑传播&#xff0c;下面我们就来详细的了解一下。 积分商城系统带来的作用一&#xff1a;对商家的依赖性 积分商城系统是进行积分兑换的&#xf…

渗透测试CTF-图片隐写的详细教程2(干货)

上篇文章我们介绍了这7个工具&#xff0c;这里简单的介绍一下。 Binwalk 用来检测图片中是否有隐藏的文件。 Foremost 将图片中的隐藏文件拆分出来。 010Editor ①修改图片的参数来查看隐藏信息。 ②查看压缩包是否是伪加密。 Stegsolve.jar 图片隐写查看神器。 OurSecret 1个图…

公众号免费查题功能搭建

公众号免费查题功能搭建 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 题库&#xff1a;题库后台&#xff08;点击跳转&#xf…

Word处理控件Aspose.Words功能演示:在 Java 中将文本转换为 PNG、JPEG 或 GIF 图像

在各种情况下&#xff0c;通常需要进行文本到图像的转换&#xff0c;例如&#xff0c;使文本成为只读。在上一篇文章中&#xff0c;我们写过如何将TXT文件中的文本转换为 Java 中的 PDF。在本文中&#xff0c;您将学习如何在 Java 中以编程方式将文本转换为PNG、JPEG或GIF图像。…

B. Catching Cheaters(最长公共子序列变形)

Problem - 1446B - Codeforces 给你两个字符串A和B&#xff0c;代表两个涉嫌作弊的学生的论文。对于任何两个字符串C&#xff0c;D&#xff0c;我们将其相似性分数S(C,D)定义为4⋅LCS(C,D)-|C|-|D|&#xff0c;其中LCS(C,D)表示字符串C和D的最长公共子序列。 你认为只有部分文…

三次握手与四次挥的问题,怎么回答?

在面试中&#xff0c;三次握手和四次挥手可以说是问的最频繁的一个知识点了&#xff0c;我相信大家也都看过很多关于三次握手与四次挥手的文章&#xff0c;今天的这篇文章&#xff0c;重点是围绕着面试&#xff0c;我们应该掌握哪些比较重要的点&#xff0c;哪些是比较被面试官…

大一学生网页课程作业 南京介绍网页设计 学生家乡网页设计作品静态 HTML网页模板源码 html我的家乡网页作业

家乡旅游景点网页作业制作 网页代码运用了DIV盒子的使用方法&#xff0c;如盒子的嵌套、浮动、margin、border、background等属性的使用&#xff0c;外部大盒子设定居中&#xff0c;内部左中右布局&#xff0c;下方横向浮动排列&#xff0c;大学学习的前端知识点和布局方式都有…

uniapp自动识别并切换到pad端、pc端【不断更新】【伸手党福利】

目录uniapp自动切换到pad、pc端&#xff08;框架方法&#xff09;1. 新建文件&#xff1a;index为主页面&#xff08;代理页面&#xff09;detail为主页面的引用页面&#xff08;业务页面&#xff09;leftwindow为左边栏【名字随便起】topwindow为顶部栏【名字随便起】2. pages…

【LeetCode】808.分汤

题目描述 有 A 和 B 两种类型 的汤。一开始每种类型的汤有 n 毫升。有四种分配操作&#xff1a; 提供 100ml 的 汤A 和 0ml 的 汤B 。 提供 75ml 的 汤A 和 25ml 的 汤B 。 提供 50ml 的 汤A 和 50ml 的 汤B 。 提供 25ml 的 汤A 和 75ml 的 汤B 。 当我们把汤分配给某人之后&a…

【selenium】多 frame 切换定位元素

frame 简介 frame 是 html 中的框架导航。同一个框架集中&#xff0c;点击某一框架的超链接&#xff0c;内容会在另一个框架的窗口中展示。 比如后台管理页面&#xff0c;点击左侧导航栏按钮&#xff0c;在右侧区域展示加载的内容&#xff0c;而不是打开一个新的窗口。 fram…