emr上使用sparkrunner运行beam数据流水线

news2025/1/8 12:23:51

参考资料

  • https://time.geekbang.org/column/intro/167?tab=catalog

Apache Beam和其他开源项目不太一样,它并不是一个数据处理平台,本身也无法对数据进行处理。Beam所提供的是一个统一的编程模型思想,而我们可以通过这个统一出来的接口来编写符合自己需求的处理逻辑,这个处理逻辑将会被转化成为底层运行引擎相应的API去运行

beam的编程模型需要让我们根据“WWWH”这四个问题来进行数据处理逻辑的编写

在这里插入图片描述

  1. 是现在已有的各种大数据处理平台(例如Apache Spark或者Apache Flink),在Beam中它们也被称为Runner
  2. 是可移植的统一模型层,各个Runners将会依据中间抽象出来的这个模型思想,提供一套符合这个模型的APIs出来,以供上层转换。
  3. 是SDK层。SDK层将会给工程师提供不同语言版本的API来编写数据处理逻辑,这些逻辑就会被转化成Runner中相应的API来运行。
  4. 是可扩展库层。工程师可以根据已有的Beam SDK,贡献分享出更多的新开发者SDK、IO连接器、转换操作库等等。
  5. 是应用层,各种应用将会通过下层的Beam SDK或工程师贡献的开发者SDK来实现。
  6. 社区层。全世界的工程师可以提出问题,解决问题,实现解决问题的思路。

beam编程模型主要逻辑为What、Where、When、How

  • what,要做什么计算?得到什么样的结果?Beam SDK中各种transform操作就是用来回答这个问题的。这包括我们经常使用到批处理逻辑,训练机器学习模型的逻辑等等。

  • where,计算什么时间范围的数据?这里的“时间”指的是数据的事件时间。

  • when,何时将计算结果输出?我们可以通过使用水位线和触发器配合触发计算

  • how,后续数据的处理结果如何影响之前的处理结果呢?这个问题可以通过累加模式来解决,常见的累加模式有:丢弃(结果之间是独立且不同的)、累积(后来的结果建立在先前的结果上)等等。

Beam的编程模型将所有的数据处理逻辑都分割成了这四个纬度,统一成了Beam SDK。我们在基于Beam SDK构建数据处理业务逻辑时,只需要根据业务需求,按照这四个维度调用具体的API,即可生成符合自己要求的数据处理逻辑。Beam会自动转化数据处理逻辑,并提交到具体的Runner上去执行

Beam将数据封装为PCollection,就是Parallel Collection,意思是可并行计算的数据集(PCollection和RDD十分相似)

  1. PCollection的创建完全取决于需求。比如,在测试中PCollection往往来自于代码生成的伪造数据,或者从文件中读取
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
  1. 需要为PCollection的元素编写Coder。计算流程最终会运行在一个分布式系统。所有的数据都有可能在网络上的计算机之间相互传递。Coder就是在告诉Beam,怎样把数据类型序列化和逆序列化以方便在网络上传输。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
  1. PCollection是无序的
  2. PCollection没有固定大小。PCollection可以是有界的,也可以是无界的,Beam也是用window来分割持续更新的无界数据
  3. PCollection具有不可变性,PCollection不提供任何修改它所承载数据的方式。Beam的PCollection都是延迟执行(deferred execution)的模式

进一步,Beam把数据转换抽象成了有向图。PCollection是有向图中的边,而Transform是有向图里的节点(不符合直觉),因为区分节点和边的关键是看一个Transform是不是会有一个多余的输入和输出

Beam中所有的数据处理逻辑都会被抽象成数据流水线(Pipeline)来运行。数据流水线是对于数据处理逻辑的一个封装,它包括了从读取数据集,将数据集转换成想要的结果和输出结果数据集这样的一整套流程

  • 创建Beam数据流水线的同时,必须给这个流水线定义一个选项(Options)。告诉Beam用户的Pipeline应该如何运行。例如,是在本地的内存上运行,还是在Apache Flink上运行
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
  • 数据流水线中,每次PCollection经过一个Transform之后,流水线都会新创建一个PCollection出来。而这个新的PCollection又将成为下一个Transform的新输入。也可以使三个不同的Transform应用在它之上,从而再产生出三个不同的PCollection2、PCollection3和PCollection4出来

    在这里插入图片描述

流水线的底层思想其实还是动用了MapReduce的原理,在分布式环境下,整个数据流水线会启动N个Workers来同时处理PCollection。而在具体处理某一个特定Transform的时候,数据流水线会将这个Transform的输入数据集PCollection里面的元素分割成不同的Bundle,将这些Bundle分发给不同的Worker来处理

  • 具体会分配多少个Worker,以及将一个PCollection分割成多少个Bundle都是随机的,Beam数据流水线会尽可能地让整个处理流程达到完美并行(Embarrassingly Parallel)
  • 每一个Bundle在一个Worker机器里经过Transform逻辑后,也会产生出来一个新的Bundle

Beam的运行模式有直接运行模式,spark运行模式,flink运行模式等等

  • 直接运行模式的时候,Beam会在单机上用多线程来模拟分布式的并行处理。

  • spark运行模式则提供了和原生spark应用相同的数据管道

    The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark’s Standalone RM, or using YARN or Mesos.

Beam目前只支持3.2.x版本spark,并且Beam和spark的版本对应关系不一致会引起让人困惑的问题

控制台启动emr-6.7.0集群,spark版本为3.2.1

在本地ide中进行如下编码

pom依赖配置

  • beam依赖为2.36.0,https://beam.apache.org/documentation/runners/spark/#deploying-spark-with-your-application
  • beam2.1.x版本持续出现beam java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.flatMapValues报错
  • 如果使用directrunner本地测试需要beam-sdks-java-io-amazon-web-services依赖,否则出现s3 filesystem找不到错误
  • 指定awsreigon配置项需要beam-sdks-java-io-amazon-web-services依赖
  • 通过maven-shade-plugin插件将代码打包为super jar
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>athenaconnect</artifactId>
    <version>1.0</version>
    <name>Archetype - athenaconnect</name>
    <url>http://maven.apache.org</url>

    <dependencies>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-spark</artifactId>
            <version>2.36.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-amazon-web-services</artifactId>
            <version>2.36.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.5</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>2.0.5</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>shaded</shadedClassifierName>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

仿照官方文档示例编写wordcount,https://beam.apache.org/get-started/quickstart-java/

package com.example;

//import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

public class BeamWC {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(SparkRunner.class); //强制指定SparkRunner,也可以在运行时通过--runner SparkRunner指定
        options.as(AwsOptions.class).setAwsRegion("cn-north-1");
		//options.setRunner(DirectRunner.class); // 本地测试强制指定为direct runner
        Pipeline p = Pipeline.create(options);

        PCollection<String> lines = p.apply(TextIO.read().from("s3://bucketname/shakespeare/*"));

        PCollection<String> words = lines.apply("ExtractWords", FlatMapElements
                .into(TypeDescriptors.strings())
                .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));

        PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());

        PCollection<String> formatted = counts.apply("FormatResults", MapElements
                .into(TypeDescriptors.strings())
                .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));

        formatted.apply(TextIO.write().to("s3://bucketname/beamoutput/shakespeare"));

        p.run().waitUntilFinish();
    }
}

通过maven打包为jar包,并上传到master节点上,提交任务

  • 由于代码中已经指定了runner,因此这里没有使用–runner
  • --deploy-mode client将driver启动在master节点方便查看日志
spark-submit --class com.example.BeamWC --master yarn --deploy-mode client athenaconnect-1.0-shaded.jar

一些关键的日志

23/12/21 06:08:27 INFO FileBasedSource: Filepattern s3://bucketname/shakespeare/* matched 2 files with total size 300
23/12/21 06:08:27 INFO FileBasedSource: Splitting filepattern s3://bucketname/shakespeare/* into bundles of size 150 took 44 ms and produced 2 files and 2 bundles

确认spark任务的运行模式为yarn

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237525.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

github高分项目 WGCLOUD - 运维实时管理工具

GitHub - tianshiyeben/wgcloud: Linux运维监控工具&#xff0c;支持系统硬件信息&#xff0c;内存&#xff0c;CPU&#xff0c;温度&#xff0c;磁盘空间及IO&#xff0c;硬盘smart&#xff0c;GPU&#xff0c;防火墙&#xff0c;网络流量速率等监控&#xff0c;服务接口监测&…

MyBatisPlus 用法详解

文章目录 一、快速入门1.1 引入依赖&#xff1a;1.2 定义 Mappper&#xff1a;1.3 使用演示&#xff1a;1.4 常见注解&#xff1a;1.4.1 TableName:1.4.2 TableId&#xff1a;1.4.3 TableField&#xff1a; 1.5 常见配置&#xff1a; 二、核心功能2.1 条件构造器&#xff1a;2.…

Python小游戏23——捕鱼达人

首先&#xff0c;你需要安装Pygame库。如果你还没有安装&#xff0c;可以使用以下命令进行安装&#xff1a; 【bash】 pip install pygame 运行效果展示 接下来是示例代码&#xff1a; 【python】 import pygame import random # 初始化Pygame pygame.init() # 屏幕尺寸 SCREEN…

库打包工具 rollup

库打包工具 rollup 摘要 **概念&#xff1a;**rollup是一个模块化的打包工具 注&#xff1a;实际应用中&#xff0c;rollup更多是一个库打包工具 与Webpack的区别&#xff1a; 文件处理&#xff1a; rollup 更多专注于 JS 代码&#xff0c;并针对 ES Module 进行打包webpa…

基于SSM+VUE小学生素质成长记录平台JAVA|VUE|Springboot计算机毕业设计源代码+数据库+LW文档+开题报告+答辩稿+部署教+代码讲解

源代码数据库LW文档&#xff08;1万字以上&#xff09;开题报告答辩稿 部署教程代码讲解代码时间修改教程 一、开发工具、运行环境、开发技术 开发工具 1、操作系统&#xff1a;Window操作系统 2、开发工具&#xff1a;IntelliJ IDEA或者Eclipse 3、数据库存储&#xff1a…

【架构设计常见技术】

EJB EJB是服务器端的组件模型&#xff0c;使开发者能够构建可扩展、分布式的业务逻辑组件。这些组件运行在EJB容器中&#xff0c;EJB将各功能模块封装成独立的组件&#xff0c;能够被不同的客户端应用程序调用&#xff0c;简化开发过程&#xff0c;支持分布式应用开发。 IOC …

优选算法 - 1 ( 双指针 移动窗口 8000 字详解 )

一&#xff1a;双指针 1.1 移动零 题目链接&#xff1a;283.移动零 class Solution {public void moveZeroes(int[] nums) {for(int cur 0, dest -1 ; cur < nums.length ; cur){if(nums[cur] 0){}else{dest; // dest 先向后移动⼀位int tmp nums[cur];nums[cur] num…

鸿蒙操作系统是什么?与安卓系统有什么区别?

鸿蒙操作系统 鸿蒙操作系统&#xff08;HarmonyOS&#xff09;是华为公司发布的一款基于微内核的面向全场景的分布式操作系统。 发展历程&#xff1a; 早期规划&#xff1a;华为从2012 年开始规划自有操作系统&#xff0c;并在芬兰赫尔辛基设立智能手机研发中心&#xff0c;招…

现场工程师日记-MSYS2迅速部署PostgreSQL主从备份数据库

文章目录 一、概要二、整体架构流程1. 安装 MSYS2 环境2. 安装postgresql 三、技术名词解释1.MSYS22.postgresql 四、技术细节1. 创建主数据库2.添加从数据库复制权限3. 按需修改参数&#xff08;1&#xff09;WAL保留空间&#xff08;2&#xff09;监听地址 4. 启动主服务器5.…

第二届计算机网络技术与电子信息工程国际学术会议(CNTEIE 2024,12月6-8日)

第二届计算机网络技术与电子信息工程国际学术会议&#xff08;CNTEIE 2024&#xff09; 2024 2nd International Conference on Computer Network Technology and Electronic and Information Engineering 重要信息 会议官网&#xff1a;www.cnteie.org 2024 2nd Internation…

Git 入门篇(一)

前言 操作系统&#xff1a;win11 64位 与gitee搭配使用 Git 入门篇&#xff08;一&#xff09; Git 入门篇&#xff08;二&#xff09; Git 入门篇&#xff08;三&#xff09; 目录 git下载、安装与配置 下载 安装 配置 git下载、安装与配置 下载 官网&#xff1a;git-…

WPS文档中的“等线”如何删除

如何删除“等线”占用的行如何删除表格之间的空行WPS文档中的“等线”是什么如果删除脚注文本占用的行 如下这种&#xff0c;在文档中添加了表格和脚注&#xff0c;发现上下表格之间有多行空行&#xff0c;鼠标选中&#xff0c;显示是“等线”&#xff0c;那么如何去除等线占用…

题目讲解15 合并两个排序的链表

原题链接&#xff1a; 合并两个排序的链表_牛客题霸_牛客网 思路分析&#xff1a; 第一步&#xff1a;写一个链表尾插数据的方法。 typedef struct ListNode ListNode;//申请结点 ListNode* BuyNode(int x) {ListNode* node (ListNode*)malloc(sizeof(ListNode));node->…

计算机网络基本概念总结

IP地址 概念 使网络中的设备都有唯一的地址标识&#xff0c;用于表示其在网络中的位置。 格式 IP地址是一个32位的二进制数&#xff0c;通常被分割为4个8位二进制数&#xff08;也就是4个字节&#xff09;&#xff0c;如&#xff1a;01100100.00001000.00001010.00000110。通常…

Pandas | 特征列大量数据异常需要填充数据时注意事项

问题描述 一组数据如下&#xff1a; df.isnull().sum()城市 0 名称 0 星级 1529 评分 0 价格 1 销量 1 省/市/区 0 坐标 0 简介 41 是否免费 0 具体地址 3 dtype: int64df[星级]0…

Science Robotics 综述揭示演化研究新范式,从机器人复活远古生物!

在地球46亿年的漫长历史长河中&#xff0c;生命的演化过程充满着未解之谜。如何从零散的化石证据中还原古生物的真实面貌&#xff1f;如何理解关键演化节点的具体过程&#xff1f;10月23日&#xff0c;Science Robotics发表重磅综述&#xff0c;首次系统性提出"古生物启发…

[编译报错]ImportError: No module named _sqlite3解决办法

1. 问题描述&#xff1a; 在使用python进行代码编译时&#xff0c;提示下面报错&#xff1a; "/home/bspuser/BaseTools/Source/Python/Workspace/WorkspaceDatabase.py", line 18, in <module>import sqlite3File "/usr/local/lib/python2.7/sqlite3/_…

EasyExcel的AbstractColumnWidthStyleStrategy注入CellStyle不生效

设置背景色 CellStyle style workbook.createCellStyle();style.setFillForegroundColor(IndexedColors.RED.getIndex()); // 是设置前景色不是背景色style.setFillPattern(FillPatternType.SOLID_FOREGROUND)EasyExcel.writerTable(0).head(Head1.class).registerWriteHandl…

iphone怎么删除重复的照片的新策略

Phone用户常常面临存储空间不足的问题&#xff0c;其中一个主要原因是相册中的重复照片。这些重复项不仅占用了大量的存储空间&#xff0c;还会影响设备的整体性能。本文将向您展示iphone怎么删除重复的照片的方法&#xff0c;包括一些利用工具来自动化这个过程的创新方法。 识…

AI4SCIENSE(鄂维南院士:再谈AI for Science)

鄂维南院士&#xff1a;再谈AI for Science_哔哩哔哩_bilibili 以往处理高维问题 量子力学&#xff1a;单变量乘积 统计学&#xff1a;旋转 AI4S 处理数据 蛋白质折叠&#xff1f; 不是纯粹的数据驱动 物理学等学科基本原理 例&#xff1a;分子动力学 数据模型 流程图 这…