文章目录
PyFlink的作业开发入门案例
一、批处理的入门案例
1、示例
2、开发步骤
3、参考代码:基于DataStreamAPI编程
二、流处理的入门案例
1、示例
2、开发步骤
3、参考代码:基于DataStreamAPI编程
PyFlink的作业开发入门案例
一、批处理的入门案例
1、示例
编写Flink程序,读取表中的数据,并根据表中的字段信息进行统计每个单词出现的数量。
2、开发步骤
- 创建批式处理的运行环境
- 构建数据源
- 对数据进行处理
- 对处理后的结果输出打印
- 启动执行
- 准备测试数据
3、参考代码:基于DataStreamAPI编程
在pyflink_study项目下新建data文件夹,在data下新建input文件夹,在input下新建wordcount.txt文件,内容如下:
Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
Total,time,BUILD,SUCCESS
Final,Memory,Finished,at
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
BUILD,SUCCESS
在pyflink_study项目下新建Easy_Case_Batch_DataStream.py文件,代码如下:
from pyflink.common import Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSink, OutputFileConfig,RollingPolicy)
def word_count():
# 1. 创建流式处理环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)
# 3. 将source添加到环境中,环境会生成一个datastream,也就是我们进行操作的数据类
ds = env.read_text_file("D:\work_space\pyflink_study\data\input\wordcount.txt")
# 4. transform
def split(line):
yield from line.split(",")
# compute word count
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
# 5. sink
ds.print()
# 6. 真正执行代码
env.execute("word_count")
if __name__ == '__main__':
word_count()
注意read_text_file后的地址要与实际地址对应。
运行结果如下:
二、流处理的入门案例
1、示例
编写Flink程序,接收socket的单词数据,并以逗号进行单词拆分打印。
2、开发步骤
- 获取流处理运行环境
- 构建socket流数据源,并指定IP地址和端口号
- 对接收到的数据进行拆分
- 对拆分后的单词,每个单词记一次数
- 对拆分后的单词进行分组
- 根据单词的次数进行聚合
- 打印输出
- 在云服务器中,使用nc -lk 端口号 监听端口
- 启动执行
- 向监听端口发送单词
3、参考代码:基于DataStreamAPI编程
新建Easy_Case_Stream_DataStream.py文件,代码如下:
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, DataStream, RuntimeExecutionMode
if __name__ == '__main__':
# 1. 创建流式处理环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
# 3. 创建source
ds = DataStream(env._j_stream_execution_environment.socketTextStream('8.140.192.198', 9999))
# 4. transform
def split(line):
yield from line.split(",")
# compute word count
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
# 5. sink
ds.print()
# 6. 真正执行代码
env.execute("word_count")
注意:socketTextStream后的ip是云服务器ecs的公网ip。
先进入云服务器ECS,开启netcat,监听9999端口号。(如果没有安装可以使用yum安装nc: yum install -y nc)
nc -lk 9999
然后运行本地程序
在ecs依次发送单词
flink,spark,hadoop
flink,flink,spark
flink,hadoop
查看结果
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨