【Flink快速入门-1.Flink 简介与环境配置】

news2025/2/8 21:44:16

Flink 简介与环境配置

实验介绍

在学习一门新的技术之前,我们首先要了解它的历史渊源,也就是说它为什么会出现,它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景,并运行第一个 Flink 程序,对它有一个初步的印象。

知识点
  • 流处理概述
  • Flink 简介
  • Flink 批处理 WordCount
  • Flink 流处理 WordCount

流处理简介

流处理并不是一个新概念,但是要做好并不是一件容易的事情。提到流处理,我们最先想到的可能是金融交易、信号检测以及地图导航等领域的应用。但是近年来随着信息技术的发展,除了前面提到的三个领域,其它方向对数据时效性的要求也越来越高。随着 Hadoop 生态的崛起,Storm、Spark Streaming、Samza、MillWheel 等一众流处理技术开始走入大众视野,但是我们最熟悉的应该还是 Storm 和 Spark Streaming。

我们知道,“高吞吐”、“低延迟”和”exactly-once“是衡量一个流处理框架的重要指标。 Storm 虽然提供了低延迟的流处理,但是在高吞吐方面的表现并不算佳,可以说基本满足不了日益暴涨的数据量,而且也没办法保证精准一次消费。Spark Streaming 中通过微批次的批处理来模拟流处理,只要将批处理的批次分的足够小,那么从宏观上来看就是流处理,这也是 Spark Streaming 的核心思想。通过微观批处理的方式,Spark Streaming 也实现了高吞吐和 exactly-once 语义,时效性也有了大幅提升,在很长一段时间里占据流处理榜首。但是受限于其实现方式,依然存在几秒的延迟,对于那些实时性要求较高的领域来说依然不够完美。
在这样的背景下,Flink 应运而生,接下来我们正式进入 Flink 的学习。

Flink 简介

Apache Flink 是为分布式、高性能、随时可用以及准确 的流处理应用程序打造的开源流处理框架,用于对无界和有界数据流进行有状态计算。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地处柏林的大学和欧洲的一些其它大学共同进行研究的名为 Stratosphere 的项目。2014 年 4 月 Stratosphere 将其捐赠给 Apache 软件基 金会, 初始成员是 Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。在 2015 年,阿里也加入到了 Flink 的开发工作中,并贡献了至少 150 万行代码。

Flink 一词在德语中有着“灵巧”、“快速”的意思,它的 logo 原型也是柏林常见的一种松鼠,以身材娇小、灵活著称,为该项目取这样的名字和选定这样的 logo 也正好符合 Flink 的特点和愿景。

在这里插入图片描述

注意,虽然我们说 Flink 是一个流处理框架,但是它同样可以进行批处理。因为在 Flink 的世界观里,批处理是流处理的一种特殊形式,这和 Spark 不同,在 Spark 中,流处理是通过大批量的微批处理实现的。

运行第一个 Flink 程序

接下来我们运行第一个 Flink 程序,感受一下它的魅力,从而对它有一个初步的印象。

搭建开发环境

本课程使用的是本地环境。后续实验不再提示。

首先我们需要在环境中搭建 Flink 运行环境,总共可以分为下面这几步:

  • 安装 jdk 并配置环境变量 【jdk 1.8】
  • 安装 scala 并配置环境变量 【Scala 2.11.12】
  • 安装 maven 并修改中心仓库为阿里云地址
  • 安装 IDEA 开发工具 【IDEA 2022.2.1或更新版】

我们的实验环境已经为大家安装了 jdk、scala、maven 和 IDEA,只需要在 IDEA 里配置使用即可。

双击桌面的 IDEA 程序,启动之后,点击 File -> New -> Project 创建一个新的 Maven 工程 FlinkLearning

【或者New Project->Maven Archetype】
在这里插入图片描述

创建好之后点击左上角 File > Settings 中,将 Maven 的配置文件修改为 D:\programs\apahe-maven-3.6.3\conf\settings.xml,配置之后的 Maven 中心仓库为阿里云,加载依赖会快很多:

在这里插入图片描述

项目中点击File-》Project Structure -> Libraries-》加号-》添加Scala SDK-》选择所需要的scala版本-》ok进行下载
在这里插入图片描述

在工程 src/main 目录中创建 scala 文件夹,然后右键,选择 Mark Directory as,并将其标记为 Sources Root。

在这里插入图片描述

在 scala 目录里创建 com.vlab.wc 包,并分别创建 BatchWordCountStreamWordCount 两个 Scala Object,分别代表 Flink 批处理和 Flink 流处理。
在这里插入图片描述

至此,我们的准备工作已经完成,接下来正式进入编码阶段。

Flink 批处理 WordCount

修改 pom.xml 文件中的代码为如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>pblh123.lh</groupId>
    <artifactId>FlinkLearning</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

<!--    配置国内 Maven 依赖库的镜像源镜-->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
    </repositories>
<!--配置插件的镜像源-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.17.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.17.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

修改完成后注意点击加载图标重新 Load。
在这里插入图片描述

BatchWordCount.scala 中的代码如下:

package com.vlab.wc

import org.apache.flink.api.scala._

/**
 * @projectName FlinkLearning
 * @package com.vlab.wc
 * @className com.vlab.wc.BatchWordCount
 * @description Flink Batch Word Count Example
 * @author pblh123
 * @date 2025/2/7 14:41
 * @version 1.17.2
 */
object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 判断输入参数数量是否正确
    if (args.length != 1) {
      System.err.println("Usage: BatchWordCount <input path>")
      System.exit(5)
    }
    // 获取输入路径
    val inputPath = args(0)
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 读取输入数据
    val inputDS: DataSet[String] = env.readTextFile(inputPath)
    // 计算词频
    val wordCountDS: DataSet[(String, Int)] = inputDS
      .flatMap(_.split("\\s+")) // 扁平化,并且处理多个空格作为分隔符
      .map((_, 1)) // 转换为 (word, 1)
      .groupBy(0) // 按第一个字段(word)进行分组
      .sum(1) // 对第二个字段(计数)求和
    // 打印输出
    wordCountDS.print()
  }
}

datas/words.txt 路径下创建 words.txt 文件并在其中加入如下内容(注意每个单词之间使用空格分隔):

在这里插入图片描述

hello world
hello flink
hello spark
hello java

右键选中 BatchWordCount.scala,点击 Run 运行,将会看到如下输出:

如果出现一些其他的报错和警告可以忽略。

(java,1)
(world,1)
(flink,1)
(hello,4)
(spark,1)
Flink 流处理 WordCount

StreamWordCount.scala 中加入如下代码:

package com.vlab.wc

import org.apache.flink.streaming.api.scala._

/**
 * @projectName FlinkLearning  
 * @package com.vlab.wc  
 * @className com.vlab.wc.StreamWordCount  
 * @description ${description}  
 * @author pblh123
 * @date 2025/2/7 14:41
 * @version 1.0
 *
 */
    
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //  监控Socket数据
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
    // 导入隐式转换
    import org.apache.flink.api.scala._
    // 计算逻辑
    val dataStream: DataStream[(String, Int)] = textDstream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    // 设置并行度
    dataStream.print().setParallelism(1)

    // 执行
    env.execute("Socket stream word count")
  }
}

打开终端,并输入 nc -l -p 9999,然后输入以下内容:

hello world
hello flink
hello spark
hello java

运行 StreamWordCount.scala 将会看到如下输出:

(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)
...

至此,我们的第一个 Flink 实验就已经完成了。

实验总结

本节实验中,我们介绍了 Flink 出现的背景,并和 Storm、Spark Streaming 做了简单对比,然后在实验环境下安装了 idea 开发工具并运行了第一个 Flink 程序。至此,相信大家已经对 Flink 已经有了一个初步的认识。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2294953.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WPF基础 | 初探 WPF:理解其核心架构与开发环境搭建

WPF基础 | 初探 WPF&#xff1a;理解其核心架构与开发环境搭建 一、前言二、WPF 核心架构2.1 核心组件2.2 布局系统2.3 数据绑定机制2.4 事件处理机制 三、WPF 开发环境搭建3.1 安装 Visual Studio3.2 创建第一个 WPF 应用程序 结束语优质源码分享 WPF基础 | 初探 WPF&#xff…

JVM 四虚拟机栈

虚拟机栈出现的背景 由于跨平台性的设计&#xff0c;Java的指令都是根据栈来设计的。不同平台CPU架构不同&#xff0c;所以不能设计为基于寄存器的。优点是跨平台&#xff0c;指令集小&#xff0c;编译器容易实现&#xff0c;缺点是性能下降&#xff0c;实现同样的功能需要更多…

深入理解小波变换:信号处理的强大工具

引言 在科学与工程领域&#xff0c;信号处理一直是关键环节&#xff0c;傅里叶变换与小波变换作为重要的分析工具&#xff0c;在其中发挥着重要作用。本文将深入探讨小波变换&#xff0c;阐述其原理、优势以及与傅里叶变换的对比&#xff0c;并通过具体案例展示其应用价值。 一…

【大数据技术】搭建完全分布式高可用大数据集群(Kafka)

搭建完全分布式高可用大数据集群(Kafka) kafka_2.13-3.9.0.tgz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 Kafka 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件安装至/opt目录下。 安…

关于ESP-IDF 5.4 中添加第三方组件esp32-camera找不到文件,编译错误解决办法(花了一天时间解决)

最近需要使用ESP32-S3-CAM 的OV2640摄像头采集图像&#xff0c;为了加速开发进度&#xff0c;于是选择了esp32-camera组件&#xff0c;该组件不是官方组件&#xff0c;需要自己git clone。但在为项目添加esp32-camera组件时&#xff0c;一直编译错误&#xff0c;找不到头文件&a…

Android LifecycleOwner 闪退,java 继承、多态特性!

1. 闪退 同意隐私政策后&#xff0c;启动进入游戏 Activity 闪退 getLifecycle NullPointerException 空指针异常 FATAL EXCEPTION: main Process: com.primer.aa.gg, PID: 15722 java.lang.RuntimeException: Unable to instantiate activity ComponentInfo{com.primer.aa.…

[LeetCode]day16 242.有效的字母异位词

242. 有效的字母异位词 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的 字母异位词 示例 1: 输入: s "anagram", t "nagaram" 输出: true示例 2: 输入: s "rat"…

基于SpringBoot养老院平台系统功能实现五

一、前言介绍&#xff1a; 1.1 项目摘要 随着全球人口老龄化的不断加剧&#xff0c;养老服务需求日益增长。特别是在中国&#xff0c;随着经济的快速发展和人民生活水平的提高&#xff0c;老年人口数量不断增加&#xff0c;对养老服务的质量和效率提出了更高的要求。传统的养…

【3分钟极速部署】在本地快速部署deepseek

第一步&#xff0c;找到网站&#xff0c;下载&#xff1a; 首先找到Ollama &#xff0c; 根据自己的电脑下载对应的版本 。 我个人用的是Windows 我就先尝试用Windows版本了 &#xff0c;文件不是很大&#xff0c;下载也比较的快 第二部就是安装了 &#xff1a; 安装完成后提示…

Linux ftrace 内核跟踪入门

文章目录 ftrace介绍开启ftraceftrace使用ftrace跟踪指定内核函数ftrace跟踪指定pid ftrace原理ftrace与stracetrace-cmd 工具KernelShark参考 ftrace介绍 Ftrace is an internal tracer designed to help out developers and designers of systems to find what is going on i…

[Day 16]螺旋遍历二维数组

今天我们看一下力扣上的这个题目&#xff1a;146.螺旋遍历二维数组 题目描述&#xff1a; 给定一个二维数组 array&#xff0c;请返回「螺旋遍历」该数组的结果。 螺旋遍历&#xff1a;从左上角开始&#xff0c;按照 向右、向下、向左、向上 的顺序 依次 提取元素&#xff0c…

【教程】docker升级镜像

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 自动升级 手动升级 无论哪种方式&#xff0c;最重要的是一定要通过-v参数做数据的持久化&#xff01; 自动升级 使用watchtower&#xff0c;可…

使用jmeter进行压力测试

使用jmeter进行压力测试 jmeter安装 官网安装包下载&#xff0c;选择二进制文件&#xff0c;解压。 tar -xzvf apache-jmeter-x.tgz依赖jdk安装。 yum install java-1.8.0-openjdk环境变量配置&#xff0c;修改/etc/profile文件&#xff0c;添加以下内容。 export JMETER/…

链表和 list

一、单链表的模拟实现 1.实现方式 链表的实现方式分为动态实现和静态实现两种。 动态实现是通过 new 申请结点&#xff0c;然后通过 delete 释放结点的形式构造链表。这种实现方式最能体 现链表的特性&#xff1b; 静态实现是利用两个数组配合来模拟链表。一个表示数据域&am…

【AI大模型】Ubuntu18.04安装deepseek-r1模型+服务器部署+内网访问

以下内容主要参考博文&#xff1a;DeepSeek火爆全网&#xff0c;官网宕机&#xff1f;本地部署一个随便玩「LLM探索」 - 程序设计实验室 - 博客园 安装 ollama Download Ollama on Linux curl -fsSL https://ollama.com/install.sh | sh 配置 ollama 监听地址 ollama 安装后…

cmd执行mysql命令

安装mysql之后如果想使用cmd执行mysql命令&#xff0c;需要怎么操作呢&#xff0c;下面一起看一下。 安装mysql之后&#xff0c;如果直接去cmd窗口执行MySQL命令&#xff0c;窗口可能会提示mysql不是可执行命令。 需要配置系统的环境变量&#xff0c;将mysql的安装路径配置系…

网络安全威胁框架与入侵分析模型概述

引言 “网络安全攻防的本质是人与人之间的对抗&#xff0c;每一次入侵背后都有一个实体&#xff08;个人或组织&#xff09;”。这一经典观点概括了网络攻防的深层本质。无论是APT&#xff08;高级持续性威胁&#xff09;攻击、零日漏洞利用&#xff0c;还是简单的钓鱼攻击&am…

详细教程 | 如何使用DolphinScheduler调度Flink实时任务

Apache DolphinScheduler 非常适用于实时数据处理场景&#xff0c;尤其是与 Apache Flink 的集成。DolphinScheduler 提供了丰富的功能&#xff0c;包括任务依赖管理、动态调度、实时监控和日志管理&#xff0c;能够有效简化 Flink 实时任务的管理和部署。通过 DolphinSchedule…

【通俗易懂说模型】线性回归(附深度学习、机器学习发展史)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. …

【R语言】apply函数族

在R语言中使用循环操作时是使用自身来实现的&#xff0c;效率较低。所以R语言有一个符合其统计语言出身的特点&#xff1a;向量化。R语言中的向量化运用了底层的C语言&#xff0c;而C语言的效率比高层的R语言的效率高。 apply函数族主要是为了解决数据向量化运算的问题&#x…