【Apache Flink】Flink DataStream API的基本使用

news2025/1/23 2:17:11

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream
        .map(s -> s.length())
        .filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 转换操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter()) // 将文本行切分为单词
                .keyBy(0) // 按单词分组
                .sum(1); // 对每个单词的计数求和

        // 4. 数据输出
        wordCountStream.print();

        // 5. 执行程序
        env.execute("Socket Word Count Example");
    }

    // 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>flink-wordcount-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.13.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 转换操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter())  // 将文本行切分为单词
                .keyBy(0)  // 按单词分组
                .sum(1);  // 对每个单词的计数求和

        // 4. 数据输出
        wordCountStream.print();

        // 5. 执行程序
        env.execute("Socket Word Count Example");
    }

    // 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)

print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)

while True:
    data = input("Enter text: ")
    client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1153215.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何理解 Spring Boot 中的 Starter?

Starter 是 Spring Boot 的四大核心功能特性之一&#xff0c;除此之外,Spring Boot 还有自动装配、Actuator 监控等特性。Spring Boot 里面的这些特性&#xff0c;都是为了让开发者在开发基于 Spring 生态下的企业级应用时&#xff0c;只需要关心业务逻辑&#xff0c;减少对配置…

初识FFmpeg

前言 无意间见到群里的小伙伴展示视频工具。功能比较多&#xff0c;包括视频编码修改&#xff0c;画质处理&#xff0c;比例处理、名称提取&#xff0c;剪辑、标题拆解。因此开始了FFmpeg学习。以下摘自百度百科的解释。 FFmpeg是一套可以用来记录、转换数字音频、视频&#xf…

【LVS实战】02 搭建一个LVS-NAT实验

一、网络结构 用虚拟机搭建如下的几台机器&#xff0c;并配置如下的ip 关于虚拟机网卡和网络的配置&#xff0c;可以参考 iptables章节&#xff0c;05节&#xff1a;网络转发实验 主机A模拟外网的机器 B为负载均衡的机器 C和D为 RealServer 二、C和D主机的网关设置 C和D机…

vue项目引入elementui样式组件05

vue前端开发&#xff0c;关于样式部分&#xff0c;不需要自己去写&#xff0c;可以引用现有的一些组件&#xff0c;比如elemtnui&#xff0c;可官网查看 1、下载对应的包到vue项目中 通过npm进行安装 npm i element-ui -S2、引入到项目中&#xff0c;官网也提供了例子 3、运…

Shopee流量和销量不佳?或许你没有掌握正确的引流方法

很多卖家做了很久&#xff0c;但是发现流量和销量都没怎么增长&#xff0c;今天陈哥就分享一下如何正确的引流。 以下是一些有效的引流策略&#xff1a; 1. 站内引流&#xff1a;选择高性价比的潮流商品&#xff0c;根据目标客户群和重点品类进行选品。优化商品名称和描述&am…

顺序表练习

顺序表练习 图解插入与删除&#xff0c;详见相关内容&#xff1a;顺序存储结构的插入与删除 //顺序表的定义、创建、插入、删除、查找 //定义&#xff1a;结构体中数组、表长 //创建:输入元素&#xff0c;表长 //插入&#xff1a;判断表是否已满、判断位序合法性 //插入位序k…

好用的视频下载工具推荐

我不允许还有人不知道这款视频下载工具, 真的太好用了! &#xff01;! 随着视频行业的崛起&#xff0c;如今网络上各种各样的视频层出不穷, 那我们看到喜欢的视频该如何下载呢&#xff1f;今天小编来给大家分享一款非常实用的视频下载工具——Downni, 它兼容国内外大多数视频网…

网络编程服务端与客户端存在的端口问题

服务端的窗口不能再次使用的原因如下&#xff1a; 服务器端的窗口不能再次使用的原因可能有以下几点&#xff1a; 1. 窗口已经关闭&#xff1a;如果服务器端的窗口已经被关闭&#xff0c;那么就无法再次使用。关闭窗口后&#xff0c;服务器会释放相关资源&#xff0c;包括与该…

深度学习入门(二)之神经网络

文章目录 从感知机到神经网络神经网络的例子复习感知机激活函数 激活函数sigmoid函数阶跃函数的实现阶跃函数的图形sigmoid函数的图形sigmoid函数与阶跃函数比较非线性函数ReLU函数 多维数组的运算多维数组矩阵乘法神经网络的内积 三层神经网络的实现符号确认各层间信号传递的实…

视频剪辑达人教您:如何运用嵌套合并技巧制作固定片尾

在视频剪辑的过程中&#xff0c;嵌套合并技巧是一种非常实用的技术&#xff0c;可以帮助您将多个素材叠加在一起&#xff0c;制作出更加丰富多彩的视频。本文将由视频剪辑达人为您详细介绍如何运用云炫AI智剪嵌套合并技巧制作固定片尾&#xff0c;让您的视频剪辑水平更上一层楼…

场景交易额超40亿,海尔智家三翼鸟开始收获

文 | 螳螂观察 作者 | 余一 随着双十一的到来&#xff0c;国内的消费情绪再次被点燃。在这类大促之下&#xff0c;品牌们就像一个个天体&#xff0c;不断引动着市场潮汐&#xff0c;期待自己能触发更大的“海潮效应”。 所谓“海潮效应”是指&#xff0c;海水因天体的引力而…

总结之数据分析工具cube.js通过Docker部署

cube.js介绍 官网地址&#xff1a;https://cube.dev/ Cube.js是一个开源的模块化框架&#xff0c;用于构建分析web应用程序。它主要用于构建内部业务智能工具或向现有应用程序添加面向客户的分析。 Cube.js设计用于无服务器查询引擎&#xff0c;如AWS Athena和谷歌BigQuery。…

一张动图告诉你,输入网址之后,发生了什么事情?

让我们一步一步地来看这个过程。 步骤1&#xff1a; 用户在浏览器中输入一个URL&#xff08;比如www.bytebytego.com&#xff09;&#xff0c;然后按下回车键。首先&#xff0c;我们需要将这个URL转换成一个IP地址。通常&#xff0c;这个映射关系会被存储在缓存中&#xff0…

【设计模式】第6节:创建型模式之“原型模式”

由于本人现在所使用的语言主要是golang&#xff0c;所以后面的代码主要使用golang编写。语言实现应该不是障碍&#xff0c;主要是理解每种设计模式它的思想。 如果对象的创建成本比较大&#xff0c;而同一个类的不同对象之间差别不大&#xff08;大部分字段都相同&#xff09;…

企业 Tomcat 运维 部署tomcat反向代理集群

一、Tomcat 简介 Tomcat服务器是一个免费的开放源代码的Web应用服务器&#xff0c;属于轻量级应用服务器&#xff0c; Tomcat和Nginx、Apache(httpd)、Web服务器一样&#xff0c;具有处理HTML页面的功能不过Tomcat处理静态HTML的能力不如Nginx/Apache服务器 一个tomcat默认并…

我的ChatGPT的几个使用场景

示例一&#xff0c;工作辅助、写函数代码&#xff1a; 这里展示了一个完整的代码&#xff0c;修正&#xff0c;然后最终输出的过程。GPT具备足够丰富的相关的小型代码生成能力&#xff0c;语法能力也足够好。这类应用场景&#xff0c;在我的GPT使用中&#xff0c;能占到65%以上…

快速入门:使用 Spring Boot 构建 Web 应用程序

前言 本文将讨论以下主题&#xff1a; 安装 Java JDK、Gradle 或 Maven 和 Eclipse 或 IntelliJ IDEA创建一个新的 Spring Boot 项目运行 Spring Boot 应用程序编写一个简单的 Web 应用程序打包应用程序以用于生产环境 通过这些主题&#xff0c;您将能够开始使用 Spring Boo…

使用logback按天生成日志并按等级进行分类

先看效果---->>>> 按照&#xff1a;error、info、warn进行分类&#xff1a; 每个文件里面按日期进行分类&#xff1a; 其中对应的Maven如下&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven…

微服务框架SpringcloudAlibaba+Nacos集成RabbiMQ

目前公司使用jeepluscloud版本&#xff0c;这个版本没有集成消息队列&#xff0c;这里记录一下&#xff0c;集成的过程&#xff1b;这个框架跟ruoyi的那个微服务版本结构一模一样&#xff0c;所以也可以快速上手。 1.项目结构图&#xff1a; 配置类的东西做成一个公共的模块 …