大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据

news2025/1/10 21:21:26

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • Flink 基本介绍
  • 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构

在这里插入图片描述
在这里插入图片描述

再次回到最初的起点,Hello Word Count!

Flink 流处理 (Stream Processing)

定义

流处理是指对持续不断的数据流进行实时处理。Flink 的流处理模式非常适合处理持续产生的数据,例如来自传感器、日志记录系统或金融交易的数据流。

核心概念

  • 无界数据流:流处理通常处理无界数据流,即数据流没有明确的结束点,持续不断地产生。
  • 事件时间:Flink 支持基于事件时间的处理,能够处理乱序和延迟数据,使得处理更加精确。事件时间指的是数据在其产生源头的时间。
  • 窗口操作:在流处理过程中,通常需要将数据按时间窗口(如滑动窗口、滚动窗口、会话窗口)进行分组,以便执行聚合或其他操作。
  • 状态管理:Flink 支持有状态的流处理,这意味着处理每条数据时,可以记住之前的数据状态。例如,在流中计算一个累积的总和或频率。

Flink 批处理 (Batch Processing)

定义

批处理是指对静态的、有界的数据集进行处理。这种处理通常用于一次性的大规模数据分析,如定期的业务报告生成、数据转换和加载任务。

核心概念

  • 有界数据集:批处理通常处理有界数据集,即数据集是固定大小的,有明确的开始和结束点。
  • 任务并行化:在批处理模式下,Flink 会将数据集划分为多个子任务,并行执行这些任务,以加快处理速度。
  • DataSet API:Flink 的 DataSet API 提供了一组高层次的操作符,用于对批数据集执行各种操作,如映射(map)、过滤(filter)、联接(join)和聚合(aggregate)。

单词统计(批数据)

需求说明

统计一个文件中各个单词出现的次数,把统计结果输出到文件

  • 读取数据源
  • 处理数据源
  • 将读取到的数据源文件中的每一行根据空格切分
  • 将切分好的每个单词拼接1
  • 根据单词聚合(将相同的单词放到一起)
  • 累加相同的单词(单词后面的1进行累加)
  • 保存处理结果

导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;


public class WordCount {

    public static void main(String[] args) throws Exception {
        String inPath = "word-count/word-count.txt";
        String outPath = "word-count/word-count-result.csv";
        // 获取Flink批处理执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取文件中的内容
        DataSet<String> text = env.readTextFile(inPath);
        // 对数据进行处理
        DataSet<Tuple2<String, Integer>> dataSet = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        for (String word : line.split(" ")) {
                            collector.collect(new Tuple2<>(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);
        dataSet
                .writeAsCsv(outPath, "\n", " ", FileSystem.WriteMode.OVERWRITE)
                .setParallelism(1);
        // 触发执行程序
        env.execute("Word Count");
    }

}

测试数据

Stateful Computations over Data Streams
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Correctness guarantees
Exactly-once state consistency
Event-time processing
Sophisticated late data handling
SQL on Stream & Batch Data
DataStream API & DataSet API
ProcessFunction (Time & State)
Flexible deployment
High-availability setup
Savepoints

运行测试

在这里插入图片描述

结果数据

查看 word-count/word-count-result.csv 打开即可看到以下内容:

Stateful 1
any 1
common 1
computations 2
on 1
setup 1
state 1
streams. 1
unbounded 1
& 3
Data 2
DataStream 1
High-availability 1
for 1
perform 1
run 1
to 1
Event-time 1
Flexible 1
Sophisticated 1
framework 1
is 1
scale. 1
Exactly-once 1
ProcessFunction 1
Stream 1
a 1
been 1
handling 1
in 1
late 1
processing 2
Batch 1
DataSet 1
at 2
bounded 1
consistency 1
deployment 1
distributed 1
engine 1
has 1
API 2
Apache 1
Flink 2
SQL 1
Streams 1
all 1
designed 1
over 2
Computations 1
Savepoints 1
and 3
data 2
environments, 1
in-memory 1
speed 1
stateful 1
(Time 1
Correctness 1
State) 1
cluster 1
guarantees 1

单词统计(流数据)

需求说明

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5秒)的数据进行聚合统计,每隔1秒汇总计算一次,并且把时间窗口内计算结果打印出来。

编写代码

Server部分

编写一个Socket服务,提供一定的数据流。

package icu.wzk;


import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class WordCountServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        String ip = "localhost";
        int port = 9999;
        Random random = new Random();
        ServerSocket serverSocket = new ServerSocket();
        InetSocketAddress address = new InetSocketAddress(ip, port);
        serverSocket.bind(address);
        Socket socket = serverSocket.accept();
        OutputStream outputStream = socket.getOutputStream();
        PrintWriter writer = new PrintWriter(outputStream, true);
        for (int i = 0; i < 1000; i ++) {
            int number = random.nextInt(100);
            System.out.println(number);
            writer.println(number);
            Thread.sleep((random.nextInt(900) + 100));
        }
        socket.close();
        serverSocket.close();
    }

}

Flink部分

连接到上述的Server部分

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount2 {

    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 9999;

        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取 Socket 输入数据
        DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] splits = value.split("\\s");
                        for (String word : splits) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .sum(1);

        // 输出并运行
        word.print();
        env.execute("Word Count");
    }

}

观察结果

Server部分

35
18
84
72
24
51
15
13
65
98
55
68
22
84
17

Flink部分

3> (35,1)
4> (18,1)
3> (35,1)
5> (84,1)
4> (18,1)
6> (72,1)
3> (35,1)
5> (84,1)
5> (24,1)
3> (35,1)
6> (72,1)
4> (18,1)
7> (51,1)
5> (24,1)
5> (84,1)
4> (15,1)
6> (72,1)
7> (51,1)
3> (35,1)
4> (15,1)
4> (18,1)

运行结果过程截图如下所示:
在这里插入图片描述

过程总结

  • 获得一个执行环境
  • 加载、创建 初始化环境
  • 指定数据操作的算子
  • 指定结果数据存放位置
  • 调用Execute触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正的触发执行程序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2085087.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV绘图函数(6)绘制椭圆函数ellipse()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 画出一个简单的或粗的椭圆弧或者填充一个椭圆扇形。 函数 cv::ellipse 使用更多的参数可以画出椭圆轮廓、填充的椭圆、椭圆弧或填充的椭圆扇形。…

复现很难吗?找我帮你解决烦恼

代码复现&#xff0c;算法复现&#xff0c;文章复现&#xff0c;科研复现 Matlab&#xff0c;Python均可 文献里的算法&#xff0c;方法均可复现&#xff0c; 提供代码改进&#xff0c;模型优化&#xff0c;增加模块&#xff0c;python代做&#xff0c;预测&#xff0c;微调&am…

潮玩宇宙无聊猿斗兽场游戏开发代码示例

明确游戏目标和定位&#xff1a;确定游戏的类型&#xff08;比如是竞技类、策略类等&#xff09;、风格、玩法规则等。设计游戏架构&#xff1a;包括服务器架构、客户端架构、数据库设计等。美术设计&#xff1a;创作游戏中的角色、场景、道具等美术资源。编程实现&#xff1a;…

五分钟本地部署Uptime Kuma运维监控结合内网穿透实现远程访问

文章目录 前言**主要功能**一、前期准备本教程环境为&#xff1a;Centos7&#xff0c;可以跑Docker的系统都可以使用本教程安装。本教程使用Docker部署服务&#xff0c;如何安装Docker详见&#xff1a; 二、Docker部署Uptime Kuma三、实现公网查看网站监控四、使用固定公网地址…

MySQL:简述多版本并发控制MVCC

一、MVCC的概念 1、MVCC 数据库并发场景有三种&#xff0c;分别为&#xff1a; &#xff08;1&#xff09;读读&#xff1a;不存在任何问题&#xff0c;也不需要并发控制。 &#xff08;2&#xff09;读写&#xff1a;有线程安全问题&#xff0c;可能会造成事务隔离性问题&am…

App弱网测试是怎么测试的!

一、网络测试的一般流程 step1&#xff1a;首先要考虑网络正常的情况 ① 各个模块的功能正常可用 ② 页面元素/数据显示正常 step2&#xff1a;其次要考虑无网络的情况 ① APP各个功能在无网络情况下是否可用 ② APP各个页面之间切换是否正常 ③ 发送网络请求时是否会…

算法力扣刷题记录 九十【739. 每日温度】

前言 单调栈第一篇。单调栈解题思路如何&#xff1f; 一、题目阅读 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会…

2024 Python3.10 系统入门+进阶(十):Python字典及其常用操作详解

目录 一、初始化1.1 {}--直接创建字典1.2 dict()函数--创建字典1.3 fromkeys()方法--创建一个新字典 二、元素访问2.1 使用中括号[]语法2.2 get()方法--获取字典中指定键的值2.3 setdefault()方法--获取字典中指定键的值 三、新增和修改3.1 直接赋值3.2 update()方法--更新字典…

RabbitMQ练习(Routing)

1、RabbitMQ教程 《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials 2、环境准备 参考&#xff1a;《RabbitMQ练习&#xff08;Hello World&#xff09;》和《RabbitMQ练习&#xff08;Work Queues&#xff09;》。 确保RabbitMQ、Sender、Receiver、Receiver2容器…

人工智能训练师一级(高级技师)、二级(技师)考试指南

随着经济快速发展&#xff0c;人工智能技术在制造业、交通运输、农业、医疗健康、金融服务、物流配送以及城市服务等多个领域得到了广泛的应用。不仅带来产业的转型升级&#xff0c;更是对具备相应技能的人工智能训练师需求的激增。 根据教育部发布的《关于做好职业教育“…

BugKu练习记录:把猪困在猪圈里

题目&#xff1a; 用base64解码 再对应猪圈密码解码&#xff0c;得到答案 t h i s i s p i g p a s s w o r d

house of pig

文章目录 house of pig介绍&#xff1a;利用条件&#xff1a;利用流程&#xff1a; 例题&#xff1a;利用&#xff1a; 总结&#xff1a; house of pig 介绍&#xff1a; House of Pig 是一个将 Tcache Stash Unlink Attack 和 FSOP 结合的攻击&#xff0c;同时使用到了 Larg…

MQ专题:事务消息的实现方式

方案 事务消息投递的过程 step1&#xff1a;开启本地事务step2&#xff1a;执行本地业务step3&#xff1a;消息表t_msg写入记录&#xff0c;status为0&#xff08;待投递到MQ&#xff09;step4&#xff1a;提交本地事务step5&#xff1a;若事务提交成功&#xff0c;则投递消息…

【原创教程】电气电工13:按钮开关指示灯篇

按钮开关在电气电工工作中,看似简单,但是有些细节问题,我们还是要注意的。电气电工工作是一个完整的体系,任何一件事疏忽,都会埋下安全隐患。 首先我们来看下 开关按钮的定义: 按钮开关是指利用按钮推动传动机构,使动触点与静触点按通或断开并实现电路换接的开关。按…

软件测试面试题!收藏起来,每天看一看,月薪20K!

初级测试总结题&#xff01;必背&#xff01;必背&#xff01;必背&#xff01; 1&#xff09;软件的概念&#xff1f; 软件是计算机系统中与硬件相互依存的一部分&#xff0c;包括程序、数据以及与其相关文档的完整集合。 2&#xff09;软件测试的概念&#xff1f; 使用人…

讲透一个强大的算法模型,Transformer

Transformer 模型是一种基于注意力机制的深度学习模型&#xff0c;广泛应用于自然语言处理&#xff08;NLP&#xff09;任务&#xff0c;如机器翻译、文本生成和语义理解。 它最初由 Vaswani 等人在2017年的论文《Attention is All You Need》中提出。它突破了传统序列模型&am…

搬运5款实用工具,帮你更好地完成各种任务

​ 在日常工作和生活中&#xff0c;使用各种工具来提升效率和简化任务变得尤为重要。本文将介绍几款实用的工具&#xff0c;帮助你更好地完成各种任务。 1. 自动化脚本——AutoHotkey ​ AutoHotkey是一款功能强大的自动化脚本编写工具&#xff0c;可以用来自动执行日常任务&…

【网络安全】漏洞挖掘

漏洞描述 Spring框架为现代基于java的企业应用程序(在任何类型的部署平台上)提供了一个全面的编程和配置模型。 Spring Cloud 中的 serveless框架 Spring Cloud Function 中的 RoutingFunction 类的 apply 方法将请求头中的“spring.cloud.function.routing-expression”参数…

餐饮_零售_麻辣烫_水果店_零食店_生鲜店等收银系统

介绍 多商户多门店的Sass收银系统。适用于餐饮_零售_麻辣烫_水果店_零食店_生鲜店等收银系统&#xff0c;包含windows收银pos端、商家小程序管理端、商家运营端、电子会员、电子小票 软件架构 收银Pos&#xff1a;vue2、node、electron、sqlite、antd 后端服务&#xff1a;spri…

【如何用本机的Navicat远程连接到ubuntu服务器上的mysql】

文章目录 版本一、ubuntu服务器安装mysql5二、远程连接——mysql配置1.创建新mysql用户2.修改配置文件3.查看端口是否开启 三、远程连接——Navicat 版本 mysql:5.7.32 服务器&#xff1a;ubuntu20.04 PC:win10 一、ubuntu服务器安装mysql5 因为ubuntu20.04默认mysql其实是my…