参考资料
- https://time.geekbang.org/column/intro/167?tab=catalog
Apache Beam和其他开源项目不太一样,它并不是一个数据处理平台,本身也无法对数据进行处理。Beam所提供的是一个统一的编程模型思想,而我们可以通过这个统一出来的接口来编写符合自己需求的处理逻辑,这个处理逻辑将会被转化成为底层运行引擎相应的API去运行
beam的编程模型需要让我们根据“WWWH”这四个问题来进行数据处理逻辑的编写
- 是现在已有的各种大数据处理平台(例如Apache Spark或者Apache Flink),在Beam中它们也被称为Runner
- 是可移植的统一模型层,各个Runners将会依据中间抽象出来的这个模型思想,提供一套符合这个模型的APIs出来,以供上层转换。
- 是SDK层。SDK层将会给工程师提供不同语言版本的API来编写数据处理逻辑,这些逻辑就会被转化成Runner中相应的API来运行。
- 是可扩展库层。工程师可以根据已有的Beam SDK,贡献分享出更多的新开发者SDK、IO连接器、转换操作库等等。
- 是应用层,各种应用将会通过下层的Beam SDK或工程师贡献的开发者SDK来实现。
- 社区层。全世界的工程师可以提出问题,解决问题,实现解决问题的思路。
beam编程模型主要逻辑为What、Where、When、How
-
what,要做什么计算?得到什么样的结果?Beam SDK中各种transform操作就是用来回答这个问题的。这包括我们经常使用到批处理逻辑,训练机器学习模型的逻辑等等。
-
where,计算什么时间范围的数据?这里的“时间”指的是数据的事件时间。
-
when,何时将计算结果输出?我们可以通过使用水位线和触发器配合触发计算
-
how,后续数据的处理结果如何影响之前的处理结果呢?这个问题可以通过累加模式来解决,常见的累加模式有:丢弃(结果之间是独立且不同的)、累积(后来的结果建立在先前的结果上)等等。
Beam的编程模型将所有的数据处理逻辑都分割成了这四个纬度,统一成了Beam SDK。我们在基于Beam SDK构建数据处理业务逻辑时,只需要根据业务需求,按照这四个维度调用具体的API,即可生成符合自己要求的数据处理逻辑。Beam会自动转化数据处理逻辑,并提交到具体的Runner上去执行
Beam将数据封装为PCollection,就是Parallel Collection,意思是可并行计算的数据集(PCollection和RDD十分相似)
- PCollection的创建完全取决于需求。比如,在测试中PCollection往往来自于代码生成的伪造数据,或者从文件中读取
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
- 需要为PCollection的元素编写Coder。计算流程最终会运行在一个分布式系统。所有的数据都有可能在网络上的计算机之间相互传递。Coder就是在告诉Beam,怎样把数据类型序列化和逆序列化以方便在网络上传输。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
- PCollection是无序的
- PCollection没有固定大小。PCollection可以是有界的,也可以是无界的,Beam也是用window来分割持续更新的无界数据
- PCollection具有不可变性,PCollection不提供任何修改它所承载数据的方式。Beam的PCollection都是延迟执行(deferred execution)的模式
进一步,Beam把数据转换抽象成了有向图。PCollection是有向图中的边,而Transform是有向图里的节点(不符合直觉),因为区分节点和边的关键是看一个Transform是不是会有一个多余的输入和输出
Beam中所有的数据处理逻辑都会被抽象成数据流水线(Pipeline)来运行。数据流水线是对于数据处理逻辑的一个封装,它包括了从读取数据集,将数据集转换成想要的结果和输出结果数据集这样的一整套流程
- 创建Beam数据流水线的同时,必须给这个流水线定义一个选项(Options)。告诉Beam用户的Pipeline应该如何运行。例如,是在本地的内存上运行,还是在Apache Flink上运行
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
-
数据流水线中,每次PCollection经过一个Transform之后,流水线都会新创建一个PCollection出来。而这个新的PCollection又将成为下一个Transform的新输入。也可以使三个不同的Transform应用在它之上,从而再产生出三个不同的PCollection2、PCollection3和PCollection4出来
流水线的底层思想其实还是动用了MapReduce的原理,在分布式环境下,整个数据流水线会启动N个Workers来同时处理PCollection。而在具体处理某一个特定Transform的时候,数据流水线会将这个Transform的输入数据集PCollection里面的元素分割成不同的Bundle,将这些Bundle分发给不同的Worker来处理
- 具体会分配多少个Worker,以及将一个PCollection分割成多少个Bundle都是随机的,Beam数据流水线会尽可能地让整个处理流程达到完美并行(Embarrassingly Parallel)
- 每一个Bundle在一个Worker机器里经过Transform逻辑后,也会产生出来一个新的Bundle
Beam的运行模式有直接运行模式,spark运行模式,flink运行模式等等
-
直接运行模式的时候,Beam会在单机上用多线程来模拟分布式的并行处理。
-
spark运行模式则提供了和原生spark应用相同的数据管道
The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark’s Standalone RM, or using YARN or Mesos.
Beam目前只支持3.2.x版本spark,并且Beam和spark的版本对应关系不一致会引起让人困惑的问题
控制台启动emr-6.7.0集群,spark版本为3.2.1
在本地ide中进行如下编码
pom依赖配置
- beam依赖为2.36.0,https://beam.apache.org/documentation/runners/spark/#deploying-spark-with-your-application
- beam2.1.x版本持续出现
beam java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.flatMapValues
报错 - 如果使用directrunner本地测试需要
beam-sdks-java-io-amazon-web-services
依赖,否则出现s3 filesystem找不到错误 - 指定awsreigon配置项需要
beam-sdks-java-io-amazon-web-services
依赖 - 通过
maven-shade-plugin
插件将代码打包为super jar
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>athenaconnect</artifactId>
<version>1.0</version>
<name>Archetype - athenaconnect</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>2.36.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services</artifactId>
<version>2.36.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
仿照官方文档示例编写wordcount,https://beam.apache.org/get-started/quickstart-java/
package com.example;
//import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class BeamWC {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class); //强制指定SparkRunner,也可以在运行时通过--runner SparkRunner指定
options.as(AwsOptions.class).setAwsRegion("cn-north-1");
//options.setRunner(DirectRunner.class); // 本地测试强制指定为direct runner
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(TextIO.read().from("s3://bucketname/shakespeare/*"));
PCollection<String> words = lines.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));
PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());
PCollection<String> formatted = counts.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));
formatted.apply(TextIO.write().to("s3://bucketname/beamoutput/shakespeare"));
p.run().waitUntilFinish();
}
}
通过maven打包为jar包,并上传到master节点上,提交任务
- 由于代码中已经指定了runner,因此这里没有使用–runner
--deploy-mode client
将driver启动在master节点方便查看日志
spark-submit --class com.example.BeamWC --master yarn --deploy-mode client athenaconnect-1.0-shaded.jar
一些关键的日志
23/12/21 06:08:27 INFO FileBasedSource: Filepattern s3://bucketname/shakespeare/* matched 2 files with total size 300
23/12/21 06:08:27 INFO FileBasedSource: Splitting filepattern s3://bucketname/shakespeare/* into bundles of size 150 took 44 ms and produced 2 files and 2 bundles
确认spark任务的运行模式为yarn