流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换

news2024/7/6 18:31:34

pyflink 处理 kafka数据
在这里插入图片描述

1 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('output', Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink
   .for_row_format('output', Encoder.simple_string_encoder())
   .build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

2 自定义转换函数的三种方式

三种方式支持用户自定义函数。

2.1 Lambda函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

2.2 python函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def my_map_func(value):
   return value + 1

def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

2.3 接口函数[复杂]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction


class MyMapFunction(MapFunction):

   def map(self, value):
      return value + 1


def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

3 常用算子

参考官网算子

3.1 map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

在这里插入图片描述

3.2 flat_map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
out = data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.3 filter【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件


def my_func(value):
    if value % 2 == 0:
        return value
    
    
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
filtered_stream = data_stream.filter(my_func)

filtered_stream.print()
env.execute("tutorial_job")

3.4 window_all【DataStream->AllWindowedStream】

根据某些特征(例如,最近 100毫秒秒内到达的数据)对所有流事件进行分组。
所有的元素。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.4.1 apply【AllWindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AllWindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):
    def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
out = all_window_stream.apply(MyAllWindowFunction())

out.print()
env.execute("tutorial_job")

3.5 key_by【DataStream->KeyedStream】

需要结合reduce或window算子使用。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

3.6 reduce【KeyedStream->DataStream】增量

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
out = key_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
在相同 key 的数据流上“滚动”执行 reduce。
将当前元素与最后一次 reduce 得到的值组合然后输出新值。

3.7 window【KeyedStream->WindowedStream】

在已经分区的 KeyedStreams 上定义 Window。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.7.1 apply【WindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):
    def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield key, sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.apply(MyWindowFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.7.2 reduce【WindowedStream->DataStream】

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows,TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
方式二

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyReduceFunction(ReduceFunction):
    def reduce(self, value1, value2):
        return value1[0] + value2[0], value1[1]

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(MyReduceFunction())

out.print()
env.execute("tutorial_job")

3.8 union【DataStream*->DataStream】

将两个或多个数据流联合来创建一个包含所有流中数据的新流。
注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
out = data_stream2.union(data_stream1)

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9 connect【DataStream,DataStream->ConnectedStream】

stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

3.9.1 CoMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoMapFunction(CoMapFunction):

    def map1(self, value):
        return value[0] *100, value[1]

    def map2(self, value):
        return value[0], value[1] + 'flink'

out = connected_stream.map(MyCoMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9.2 CoFlatMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoFlatMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoFlatMapFunction(CoFlatMapFunction):

    def flat_map1(self, value):
        for i in range(value[0]):
            yield value[0]*100

    def flat_map2(self, value):
        yield value[0] + 10

out = connected_stream.flat_map(MyCoFlatMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

4 对接kafka输入json输出json

输入{“name”:“中文”}
输出{“name”:“中文结果”}

from pyflink.common import SimpleStringSchema, WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \
    KafkaRecordSerializationSchema
import json

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

brokers = "IP:9092"

# 读取kafka
source = KafkaSource.builder() \
    .set_bootstrap_servers(brokers) \
    .set_topics("flink_source") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

ds1 = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds1.print()

# 处理流程
def process_fun(line):
    data_dict = json.loads(line)
    result_dict = {"result": data_dict.get("name", "无")+"结果"}
    return json.dumps(result_dict, ensure_ascii=False)

ds2 = ds1.map(process_fun, Types.STRING())
ds2.print()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1798816.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是阴道菌群CST分型,不同的分型代表哪些女性健康问题

谷禾健康 人体内的各个部位,如皮肤、口腔、肠道和阴道等,都是微生物的重要栖息地,这些微生物与人体健康紧密相关,并能反映人体的疾病状态。这些部位因受基因、环境和生活方式等影响,具有独特的菌群特征。 女性生殖系统…

Linux磁盘分区(fdisk)和卷管理详解(VG-LV-PV)

先看整体图,再讲解概念 一、磁盘分区 一个磁盘disk可以分多个区part,用fdisk命令,举例把/dev/vdb划分为/dev/vdb1和/dev/vdb2 二、创建虚拟卷 LVM是逻辑盘卷管理(Logical Volume Manager)的简称,他是磁盘…

Cortex-M7——NVIC

Cortex-M7——NVIC 小狼http://blog.csdn.net/xiaolangyangyang 一、NVIC架构 二、中断及异常编号 三、中断屏蔽寄存器(__disable_irq和__enable_irq操作的是PRIMASK寄存器) 四、中断分组寄存器(SCB->AIRCR[10:8]) 五、NVIC寄…

「动态规划」如何求最小路径和?

64. 最小路径和https://leetcode.cn/problems/minimum-path-sum/description/ 给定一个包含非负整数的m x n网格grid,请找出一条从左上角到右下角的路径,使得路径上的数字总和为最小。说明:每次只能向下或者向右移动一步。 输入:…

DNF手游攻略:主C职业推荐,云手机强力辅助!

在《地下城与勇士》手游中,你是否厌倦了重复刷图和无休止的手动操作?利用VMOS云手机,你可以一键解决这些烦恼,实现自动打怪、一机多开,让游戏变得更加轻松愉快。下面我们将介绍如何使用VMOS云手机,以及推荐…

servlet实现图片上传和下载

图片上传 前端 后端 protected void dopost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {Part filePart request.getPart("file"); // 获取上传的文件String fileName filePart.getSubmittedFileName(); …

2009年408真题解析

2009年408真题解析 【2009.1】为解决计算机主机与打印机之间速度不匹配问题,通常设置一个打印数据缓冲区,主机将要输出的数据依次写入该缓冲区,而打印机则依次从该缓冲区中取出数据。该缓冲区的逻辑结构应该是。 A.栈 B.队列 C.树 D.图 …

动手学深度学习29 残差网络ResNet

动手学深度学习29 残差网络ResNet ResNet代码ReLU的两种调用1. 使用 torch.nn.ReLU 模块2. 使用 torch.nn.functional.relu 函数总结 QA29.2 ResNet 为什么能训练处1000层的模型ResNet的梯度计算怎么处理梯度消失的 QA ResNet 更复杂模型包含小模型,不一定改进&…

CNN依旧能战:nnU-Net团队新研究揭示医学图像分割的验证误区,设定先进的验证标准与基线模型

这篇论文研究了在3D医学图像分割领近年引入了许多新的架构和方法,但大多数方法并没有超过2018年的原始nnU-Net基准。作者指出,许多关于新方法的优越性的声称在进行严格验证后并不成立,这揭示了当前在方法验证上存在的不严谨性。 揭示验证短板…

MySQL将错乱的水果信息,截取展示为 品名 英文名 价格 三列展示

将错乱的水果信息,截取展示为 品名 英文名 价格 三列展示 idname1苹果Apple72Plum6李子3Pineapple8菠萝4Mango5芒果5龙吐珠5Buddha’sHand6Olive9橄榄7Raspberry4树莓8Apricot5杏子9Grapefruit9柚子10火龙果Dragonfruit911倒挂金钟Hanging6LobsterClaw12巨峰葡萄Co…

论道数字化:2024年企业增长密码在哪里?

企业微信正在成为一个中国TO B数字化工具中的特殊个体。 它既具备TO B服务的能力,能帮助企业构建从办公到内部协同管理,帮助企业修炼内功;同时它更是企业面向C端的连接器,基于自身足够显著的C端标签,其几乎可以算是国…

企业必备技能导航栏的写法

创建一个导航栏是网页设计中的一个重要环节,它不仅有助于用户快速找到他们需要的信息,还能提升整个网站的用户体验。以下是一些基本步骤和技巧,可以帮助你快速制作一个高效且美观的导航栏: 确定导航栏位置:导航栏通常位…

Stable Diffusion WebUI 各操作系统安装教程

最近几天在 2 台 Mac、2 台 PC、一台云无 GPU 的 Linux 安装了 Stable Diffusion WebUI,这里记录下如何安装,以及一些注意点和坑。 以下内容针对 Windows(N 卡)、MacOS(m 系列芯片)、Linux(Ubu…

打造精美电子画册,提升企业形象的方法

在当今数字化时代,企业形象的表达方式正在发生深刻变革。精美电子画册作为一种新兴的传播媒介,不仅能够展现企业风采、提升品牌价值,还能够吸引潜在客户、增强市场竞争力。 接下来告诉大家一些简单的制作方法,可以收藏起来哦 1.首…

vue3+vite插件开发

插件开发目的:由于我司使用的前端技术栈为vue3tsvite2.Xaxios,在前端代码框架设计初期,做了把axios挂载到proxy对象上的操作,具体可见我的另一篇文章vue3TS自动化封装全局api_ts 封装腾讯位置api-CSDN博客 现在可以实现vue2的类似this.$api.xxx去调用接口,但是vue2源码使用的是…

Visual C++ Redistributable下载

安装程序的时候提示丢失mfc140u.dll 如下图,查了资料说可以下载Visual C Redistributable来进行处理 下载Visual C Redistributable 1.打开网站 https://www.microsoft.com/zh-cn/download/details.aspx?id48145&751be11f-ede8-5a0c-058c-2ee190a24fa6True) 2.点击下载 …

Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明

Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明 目录 Python 机器学习 基础 之 处理文本数据 【处理文本数据/用字符串表示数据类型/将文本数据表示为词袋】的简单说明 一、简单介绍 二、处理文本数据 三、用…

linux 服务器上离线安装 node nvm

因为是离线环境 如果你是可以访问外网的 下面内容仅供参考 也可以继续按步骤来 node 安装路径 Node.js — Download Node.js nvm 安装路径 Tags nvm-sh/nvm GitHub 后来发现 nvm安装后 nvm use 版本号 报错 让我去nvm install 版本 我是内网环境 install不了 下面 你要 把安…

K210视觉识别模块学习笔记4: (MaixHub)训练与使用自己的模型_识别字母

今日开始学习K210视觉识别模块: 模型训练与使用_识别字母 亚博智能的K210视觉识别模块...... 固件库: maixpy_v0.6.2_52_gb1a1c5c5d_minimum_with_ide_support.bin 文章提供测试代码讲解、完整代码贴出、测试效果图、测试工程下载 这里也算是正式开始进入到视觉识别的领域了…

问题:1、彩色餐巾可以渲染就餐气氛,下列说法错误的是 #知识分享#其他

问题:1、彩色餐巾可以渲染就餐气氛,下列说法错误的是 A.如艳红、大红餐巾给人以庄重热烈的感觉; B.橘黄、鹅黄色餐巾给人以高贵典雅的感觉; C.湖蓝色在夏天能给人以凉爽、舒适之感&#xff1…