Flink从入门到实践(一):Flink入门、Flink部署

news2024/9/22 11:33:02

文章目录

  • 系列文章索引
  • 一、快速上手
    • 1、导包
    • 2、求词频demo
      • (1)要读取的数据
      • (2)demo1:批处理(离线处理)
      • (3)demo2 - lambda优化:批处理(离线处理)
      • (4)demo3:流处理(实时处理)
      • (5)总结:实时vs离线
      • (6)demo4:批流一体
      • (7)对接Socket
  • 二、Flink部署
    • 1、Flink架构
    • 2、Standalone部署
    • 3、自运行flink-web
    • 4、通过参数传递
    • 5、通过webui提交job
    • 6、停止作业
    • 7、常用命令
    • 8、集群
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、快速上手

1、导包

<!-- fink 相关依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>

2、求词频demo

注意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。

(1)要读取的数据

定义data内容:

pk,pk,pk
ruoze,ruoze
hello

(2)demo1:批处理(离线处理)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 使用Flink进行批处理,并统计wc
 *
 *
 * 结果:
 * (bye,2)
 * (hello,3)
 * (hi,1)
 */
public class BatchWordCountApp {

    public static void main(String[] args) throws Exception {
        // step0: Spark中有上下文,Flink中也有上下文,MR中也有
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // step1: 读取文件内容  ==> 一行一行的字符串而已
        DataSource<String> source = env.readTextFile("data/wc.data");

        // step2: 每一行的内容按照指定的分隔符进行拆分  1:N
        source.flatMap(new FlatMapFunction<String, String>() {

                    /**
                     *
                     * @param value 读取到的每一行数据
                     * @param out 输出的集合
                     */
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        // 使用,进行分割
                        String[] splits = value.split(",");
                        for(String split : splits) {
                            out.collect(split.toLowerCase().trim());
                        }
                    }
                })
                .map(new MapFunction<String, Tuple2<String,Integer>>() {

                    /**
                     *
                     * @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)
                     */
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                })
                .groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标
                .sum(1)  // ==> 求词频 sum,传下标
                .print(); // 打印
    }
}

(3)demo2 - lambda优化:批处理(离线处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * lambda表达式优化
 */
public class BatchWordCountAppV2 {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = env.readTextFile("data/wc.data");

        /**
         * lambda语法: (参数1,参数2,参数3...) -> {函数体}
         */
//        source.map(String::toUpperCase).print();

        // 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .groupBy(0).sum(1).print();

    }
}

(4)demo3:流处理(实时处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流式处理
 * 结果:
 * 8> (hi,1)
 * 6> (hello,1)
 * 5> (bye,1)
 * 6> (hello,2)
 * 6> (hello,3)
 * 5> (bye,2)
 */
public class StreamWCApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("data/wc.data");
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum
                .sum(1).print();

        // 需要手动开启
        env.execute("作业名字");
    }
}

(5)总结:实时vs离线

离线:结果是一次性出来的。
实时:来一个数据处理一次,数据是带状态的。

(6)demo4:批流一体

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 采用批流一体的方式进行处理
 */
public class FlinkWordCountApp {

    public static void main(String[] args) throws Exception {
        // 统一使用StreamExecutionEnvironment这个执行上下文环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动

        DataStreamSource<String> source = env.readTextFile("data/wc.data");

        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
            String[] splits = value.split(",");
            for(String split : splits) {
                out.collect(Tuple2.of(split.trim(), 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握
                .sum(1).print();

        // 执行
        env.execute("作业名字");
    }
}

(7)对接Socket

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 使用Flink对接Socket的数据并进行词频统计
 *
 * 大数据处理的三段论: 输入  处理  输出
 *
 */
public class FlinkSocket {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text
         *
         * 官网上描述的是 env.addSource(...)
         *
         * socket的方式对应的并行度是1,因为它来自于SourceFunction的实现
         */
        DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
        System.out.println(source.getParallelism());

        // 处理
        source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
                    String[] splits = value.split(",");
                    for(String split : splits) {
                        out.collect(Tuple2.of(split.trim(), 1));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x -> x.f0) // 这种写法一定要掌握
                .sum(1)
                // 数据输出
                .print();  // 输出到外部系统中去

        env.execute("作业名字");
    }
}

二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。

一个JobManager(协调/分配),一个或多个TaskManager(工作)。
在这里插入图片描述
在这里插入图片描述

2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

可以根据官网来安装,需要下载、解压、安装。
也可以使用docker安装。

启动之后,localhost:8081就可以访问管控台了。

3、自运行flink-web

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>1.18.0</version>
</dependency>
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 新版本可以直接使用getExecutionEnvironment(conf)

以上亲测并不好使……具体原因未知,设置为flink1.16版本或许就好用了。

4、通过参数传递

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");

DataStreamSource<String> source = env.socketTextStream(host, port);
System.out.println(source.getParallelism());

可以通过命令行参数:–host localhost --port 8765

5、通过webui提交job

在这里插入图片描述
在这里插入图片描述

6、停止作业

在这里插入图片描述

7、常用命令

# 查看作业列表
flink list -a  # 所有
flink list -r  # 正在运行的
# 停止作业
flink cancel <jobid>

# 提交job
# -c,--class <classname> 指定main方法
# -C,--classpath <url> 指定classpath
# -p,--parallelism <paralle> 指定并行度
flink run -c com.demo.FlinkDemo FlinkTest.jar 

8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/

单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/

k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/

YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/

参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440995.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入解析Linux中HTTP代理的工作原理

亲爱的Linux探险家们&#xff0c;准备好一起探索HTTP代理背后的神秘面纱了吗&#xff1f;在这个数字世界里&#xff0c;HTTP代理就像是一个神秘的中间人&#xff0c;默默地在你和互联网之间穿梭&#xff0c;为你传递信息。那么&#xff0c;这个神秘的中间人到底是如何工作的呢&…

github拉取项目,pycharm配置远程服务器环境

拉取项目 从github上拉取项目到pycharmpycharm右下角选择远程服务器上的环境 2.1. 如图 2.2. 输入远程服务器的host&#xff0c;port&#xff0c;username&#xff0c;password连接 2.3. 选择服务器上的环境 链接第3点 注&#xff1a;如果服务器上环境不存在&#xff0c;先创建…

【动态规划】【C++算法】2188. 完成比赛的最少时间

作者推荐 【动态规划】【前缀和】【C算法】LCP 57. 打地鼠 本文涉及知识点 动态规划汇总 LeetCode2188. 完成比赛的最少时间 给你一个下标从 0 开始的二维整数数组 tires &#xff0c;其中 tires[i] [fi, ri] 表示第 i 种轮胎如果连续使用&#xff0c;第 x 圈需要耗时 fi…

微策略爆买3万枚!

号外&#xff1a;教链内参2.7《内参&#xff1a;美股续创新高》 日前&#xff0c;因囤积比特币&#xff08;BTC&#xff09;而知名的美股上市公司微策略&#xff08;Microstrategy&#xff09;放出了2023四季度财报。看了这篇财报&#xff0c;才发现在2024年初现货ETF通过之前&…

【分布式】雪花算法学习笔记

雪花算法学习笔记 来源 https://pdai.tech/md/algorithm/alg-domain-id-snowflake.html概述 雪花算法是推特开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将64位分割成多个部分&#xff0c;每一个部分代表不同的含义&#xff0c;这种就是将64位划分成不同的段&…

【JavaWeb】头条新闻项目实现 基本增删改查 分页查询 登录注册校验 业务功能实现 第二期

文章目录 一、为什么使用token口令二、登录注册功能2.1 登录表单提交后端代码&#xff1a; 2.2 根据token获取完整用户信息代码实现&#xff1a; 2.3 注册时用户名占用校验代码实现&#xff1a; 2.4 注册表单提交代码实现&#xff1a; 三、头条首页功能3.1 查询所有头条分类3.2…

免费生成ios证书的方法(无需mac电脑)

使用hbuilderx的uniapp框架开发移动端程序很方便&#xff0c;可以很方便地开发出移动端的小程序和app。但是打包ios版本的app的时候却很麻烦&#xff0c;官方提供的教程需要使用mac电脑来生成证书&#xff0c;但是mac电脑却不便宜&#xff0c;一般的型号都差不多上万。 因此&a…

网络学习:数据链路层VLAN原理和配置

一、简介&#xff1a; VLAN又称为虚拟局域网&#xff0c;它是用来将使用路由器的网络分割成多个虚拟局域网&#xff0c;起到隔离广播域的作用&#xff0c;一个VLAN通常对应一个IP网段&#xff0c;不同VLAN通常规划到不同IP网段。划分VLAN可以提高网络的通讯质量和安全性。 二、…

彻底学会系列:一、机器学习之线性回归(一)

1.基本概念(basic concept) 线性回归&#xff1a; 有监督学习的一种算法。主要关注多个因变量和一个目标变量之间的关系。 因变量&#xff1a; 影响目标变量的因素&#xff1a; X 1 , X 2 . . . X_1, X_2... X1​,X2​... &#xff0c;连续值或离散值。 目标变量&#xff1a; …

C++Linux网络编程day02:select模型

本文是我的学习笔记&#xff0c;学习路线跟随Github开源项目&#xff0c;链接地址&#xff1a;30dayMakeCppServer 文章目录 select模型fd_set结构体 timeval结构体文件描述符的就绪条件带外数据与普通数据socket的状态 select模型 select是Linux下的一个IO复用模型&#xff…

用HTML5实现灯笼效果

本文介绍了两种实现效果&#xff1a;一种使用画布&#xff08;canvas&#xff09;标签/元素&#xff0c;另一种不用画布&#xff08;canvas&#xff09;标签/元素主要使用CSS实现。 使用画布&#xff08;canvas&#xff09;标签/元素实现&#xff0c;下面&#xff0c;在画布上…

Idea里自定义封装数据警告解决 Spring Boot Configuration Annotation Processor not configured

我们自定对象封装指定数据&#xff0c;封装类上面一个红色警告&#xff0c;虽然不影响我们的执行&#xff0c;但是有强迫症看着不舒服&#xff0c; 去除方式&#xff1a; 在pom文件加上坐标刷新 <dependency><groupId>org.springframework.boot</groupId><…

Android AOSP源码研究之万事开头难----经验教训记录

文章目录 1.概述2.Android源下载1.配置环境变量2.安装curl3.下载repo并授权4.创建一个文件夹保存源码5.设置repo的地址并配置为清华源6.初始化仓库7.指定我们需要下载的源码分支并初始化 2.1 使用移动硬盘存放Android源码的坑2.2 解决方法 3.Android源码编译4.Android源烧录 1.…

v-if 和v-for的联合规则及示例

第073个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使用&#xff0c;computed&a…

SpringBoot WebSocket客户端与服务端一对一收发信息

依赖 <!--websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>配置类 Configuration public class WebSocketConfig {Bean //方法返回值交…

跟着小德学C++之TCP基础

嗨&#xff0c;大家好&#xff0c;我是出生在达纳苏斯的一名德鲁伊&#xff0c;我是要立志成为海贼王&#xff0c;啊不&#xff0c;是立志成为科学家的德鲁伊。最近&#xff0c;我发现我们所处的世界是一个虚拟的世界&#xff0c;并由此开始&#xff0c;我展开了对我们这个世界…

FATFA文件系统

一.文件系统基本知识 1.文件系统是什么&#xff1f; 文件系统是一种用于组织和存储计算机上的文件和目录的方法。它是操作系统中的一个重要组成部分&#xff0c;负责管理磁盘或其他存储介质上的文件&#xff0c;使其易于访问和使用。文件系统提供了一种结构化的方式来组织文件…

three.js 匀速动画(向量表示速度)

效果&#xff1a; 代码&#xff1a; <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div>1. 匀速动画(向量表示速度)</div…

RibbonOpenFeign源码(待完善)

Ribbon流程图 OpenFeign流程图

Window环境下使用go编译grpc最新教程

网上的grpc教程都或多或少有些老或者有些问题&#xff0c;导致最后执行生成文件时会报很多错。这里给出个人实践出可执行的编译命令与碰到的报错与解决方法。&#xff08;ps:本文代码按照煎鱼的教程编写&#xff1a;4.2 gRPC Client and Server - 跟煎鱼学 Go (gitbook.io)&…