【跟小嘉学 Apache Flink】二、Flink 快速上手

news2025/1/15 23:31:56

系列文章目录

【跟小嘉学 Apache Flink】一、Apache Flink 介绍
【跟小嘉学 Apache Flink】二、Flink 快速上手

文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 一、创建工程
    • 1.1、创建 Maven 工程
    • 1.2、log4j 配置
  • 二、批处理单词统计(DataSet API)
    • 2.1、创建 BatchWordCount 类型
    • 2.4、运行结果
  • 三、流处理单词统计(DataSet API)
    • 3.1、读取文件流
      • 3.1.1、过时的写法
      • 3.1.2、执行错误的处理
    • 3.1.3、执行结果
      • 3.1.4、readTextFile 过时问题
    • 3.2、读取 socket 网络流
      • 3.2.1、读取socket 流代码
      • 3.2.2、使用 nc 监听端口
      • 3.2.3、执行结果
      • 3.2.4、从命令行参数获取主机名和端口号

一、创建工程

1.1、创建 Maven 工程

创建 maven 工程 并且添加如下依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.xiaojia</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>19</maven.compiler.source>
        <maven.compiler.target>19</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>


    </dependencies>

</project>

1.2、log4j 配置

在 resource 目录下创建 log4j.properties 文件,写入如下内容

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

二、批处理单词统计(DataSet API)

2.1、创建 BatchWordCount 类型

package org.xiaojia.demo.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) {
        // 1、创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        // 2、 从文件读取数据
        DataSource<String> lineDataSource = executionEnvironment.readTextFile("input/words.txt");

        // 3、将每一行数据进行分词,转换为二元组类型
        FlatMapOperator<String, Tuple2<String, Long>>  wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5、分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        try {
            sum.print();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2.4、运行结果

运行结果
实际上在 Flink 里面已经做到流批处理统一,官方推荐使用 DateStream API,在跳任务时通过执行模式设置为 Batch 来进行批处理

bin/fliink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

三、流处理单词统计(DataSet API)

使用 DataSet API可以很容易实现批处理。对于Flink而言,流处理才是处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据是有界流。所以批处理,其实可以看作是有界流的处理。

3.1、读取文件流

3.1.1、过时的写法

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取文件
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.readTextFile("input/words.txt");

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();

    }
}

3.1.2、执行错误的处理

Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module
    @7ce6a65d

如果出现上述类似错误,解决方案,通过添加 VM参数打开对应模块的对应模块包

--add-opens java.base/java.lang=ALL-UNNAMED 
--add-opens java.base/java.util=ALL-UNNAMED

添加vm参数

3.1.3、执行结果

执行结果

3.1.4、readTextFile 过时问题

过时问题
解决方案可以按照提示给出的 使用 FileSource(需要用到Flink的连接器)


<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-files</artifactId>
  <version>${flink.version}</version>
</dependency>

3.2、读取 socket 网络流

3.2.1、读取socket 流代码

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取socket流
        String hostname = "127.0.0.1";
        int port = 8888;
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();
    }
}

3.2.2、使用 nc 监听端口

(base) xiaojiadeMacBook-Pro:~ xiaojia$ nc -lk 8888
hello java
hello flink
hello world

3.2.3、执行结果

执行结果
此时,只要有数据进来,就会统计

3.2.4、从命令行参数获取主机名和端口号

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取socket流

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String hostname = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();
    }
}

命令行参数传递
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1000520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

今日话题:解决Linux中可以识别但无法连接Airpods问题(亲测且实用)

今日话题&#xff1a;解决Linux中可以识别但无法连接Airpods问题 起因经过结果方式一方式二 起因经过 在根据“Linux启动黑屏卡住Logo登录界面无法进入系统的终极解决方式”博客解决掉gdm3以及lightdm图形界面之间冲突的问题后&#xff0c;准备设置打开蓝牙连接Airpods&#x…

Java | 多线程

不爱生姜不吃醋⭐️ 如果本文有什么错误的话欢迎在评论区中指正 与其明天开始&#xff0c;不如现在行动&#xff01; 文章目录 &#x1f334;前言&#x1f334;一、什么是多线程&#xff1f;1.进程2.线程3.多线程作用 &#x1f334;二、多线程中的两个概念1. 并发2. 并行3.举例…

python-面向运行时性能优化-threading

python-面向运行时性能优化-threading 一:线程基础1> 线程状态2> 线程同步1. 锁的状态3> 线程通信-条件变量4> 线程阻塞-之间转换1. 阻塞分类二:threading类1> threading介绍2> Thread类1. Thread的生命周期2. 实例化Thread类3. 继承Thread类4. Thread构造…

9.11作业

实现一个对数组求和的函数&#xff0c;数组通过实参传递给函数 sum0 arr(11 22 33 44 55) Sum() {for i in ${arr[*]}do$((sumi))donereturn $sum } Sum ${arr[*]} var$? echo $var写一个函数&#xff0c;输出当前用户的uid和gid&#xff0c;并使用变量接收结果 Sum() {aid -…

C高级作业 【使用shell脚本】 实现一个对数组求和的函数,数组通过实参传递给函数+写一个函数输出当前用户的uid和gid,并使用变量接收结果

作业 1、实现一个对数组求和的函数&#xff0c;数组通过实参传递给函数 #!/bin/bash # 定义求和函数 function sum() {local arr("$") # 将传入的参数保存到一个数组中local sum0 # 初始化求和为0# 遍历数组元素进行求和for num in "${arr[]}";dosum$…

零代码编程:用ChatGPT批量合并ts文件

文件夹中有很多个ts后缀的视频文件&#xff0c;要合并成一个视频文件&#xff0c;在ChatGPT中可以这样输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个批量合并ts文件的任务&#xff0c;具体步骤如下&#xff1a; 打开文件夹&#xff1a;C:\Users\dell\…

亚马逊测评下单怎么操作?有没有自动下单软件?

想要进行亚马逊的测评下单&#xff0c;可以按照以下步骤进行操作&#xff1a; 首先&#xff0c;在亚马逊官方网站上搜索你感兴趣的产品&#xff0c;选择你喜欢的产品并查看其详细信息、价格以及其他用户的评价&#xff0c;确认你的购买意向后&#xff0c;点击“加入购物车”将…

C高级 shell指令分支和循环

实现一个对数组求和的函数&#xff0c;数组通过实参传递给函数 #!/bin/bash s0 function sum() {local brr($*)for i in ${brr[*]}do((si))doneecho $s } arr(1 2 3 4 5 6 7 8 9 ) sum ${arr[*]}写一个函数&#xff0c;输出当前用户的uid和gid&#xff0c;并使用变量接收结果 #…

Grafana配置邮件告警

1、创建一个监控图 2、grafana邮件配置 vim /etc/grafana/grafana.ini [smtp] enabled true host smtp.163.com:465 user qinziteng05163.com password xxxxx # 授权码 from_address qinziteng05163.com from_name Grafanasystemctl restart grafana-serv…

DAY03_瑞吉外卖——公共字段自动填充新增分类分类信息分页查询删除分类修改分类

目录 1. 公共字段自动填充1.1 问题分析1.2 基本功能实现1.2.1 思路分析1.2.2 代码实现1.2.3 功能测试 1.3 功能完善1.3.1 思路分析1.3.2 ThreadLocal1.3.3 操作步骤1.3.4 代码实现1.3.5 功能测试 2. 新增分类2.1 需求分析2.2 数据模型2.3 前端页面分析2.4 代码实现2.5 功能测试…

【算法】二分查找算法——leetcode二分查找、搜索插入位置

文章目录 二分查找704. 二分查找35. 搜索插入位置 二分查找 二分查找算法是一种在有序数组中查找特定元素的搜索算法。算法的工作原理是&#xff0c;通过比较数组中间元素和目标值&#xff0c;如果目标值等于中间元素&#xff0c;那么查找结束。如果目标值小于或大于中间元素&a…

单元测试界的高富帅,Pytest框架 (三) 用例标记和测试执行篇

pytest用例标记和测试执行篇 上一篇文章入门篇咱们介绍了pytest的前后置方法和fixture机制&#xff0c;这个章节主要给大家介绍pytest中的标记机制和用例执行的方法。pytest可以通过标记将数据传入于测试函数中&#xff0c;也可以通过标记中对执行的用例做筛选&#xff0c;接下…

NTC 温度采样 二分查表及公式法

NTC 温度采样&#xff1a; 本文记录对NTC 温度采样&#xff0c;分别采用二分查表法及公式法进行描述 资源下载链接&#xff1a;Excel 生成数组表 https://download.csdn.net/download/qq_41359157/88326839?spm1001.2014.3001.5503 NTC参数&#xff1a; NTC采样电路&#xf…

2023 年 Vue 最流行的动画库

数字世界以短暂的注意力和激烈的竞争为主导&#xff0c;因此必须立即将受众的注意力吸引到您的网站上。使用 Vue 动画库&#xff0c;您可以毫不费力地实现这一目标。据报道&#xff0c;VueJs 是 JavaScript 类别中第 7 大最受欢迎的&#xff0c;来自世界各地的开发人员使用它来…

arthas基本应用

下载 arthas curl https://arthas.aliyun.com/arthas-boot.jar启动 arthas&#xff08;启动之前确保有一个 java进程服务&#xff09; java -jar arthas-boot.jar输入3&#xff0c;再输入回车/enter。Arthas 会 attach 到目标进程上&#xff0c;并输出志&#xff1a; 输入das…

如何抢占3020亿美元市场先机?送你一份指南

印度电商市场规模正在快速增长。预计到2023年&#xff0c;印度电商市场规模将达到2000亿美元。其中&#xff0c;B2C电商市场规模将占据主导地位&#xff0c;预计将增长至1000亿美元。 此外&#xff0c;印度政府也在积极推动数字化发展&#xff0c;为电商企业提供更多机会。政府…

Python语言:算术运算符知识点讲解

前言&#xff1a;学了几天python&#xff0c;可把我折磨坏了。为什么呢&#xff0c;就是python语言都特别爱空格&#xff0c;我有时候就忘了&#xff0c;就报错了啦。就比如这个&#xff1a;a 8&#xff0c;等于号前面和后面都需要空格&#xff0c;有点不习惯&#xff0c;在慢…

Spring中的事务与事务传播机制

事务 在学习MySQL时我们学习过事务&#xff0c;而到了Spring的学习时&#xff0c;同样会学习到事务&#xff0c;两个东西的事务的定义都是一样的&#xff1a;将一组操作封装成一个执行单元&#xff0c;要么全部成功&#xff0c;要么全部失败 在Spring中使用事务有两种方式 一…

GitHub星标超70K,阿里大佬的架构总结“分布式全解”笔记霸榜

分布式架构与微服务平台是当今IT界的关键技术&#xff0c;也是资深软件工程师和系统架构师必须掌握的核心技术。 因此小编为各位粉丝朋友带来这份阿里大佬的分布式笔记&#xff1a;从传统分布式架构迁移到基于容器技术的微服务架构为主线&#xff0c;全面、透彻地介绍了与分布…

十七、Webpack搭建本地服务器

一、为什么要搭建本地服务器&#xff1f; 目前我们开发的代码&#xff0c;为了运行需要有两个操作&#xff1a; 操作一&#xff1a;npm run build&#xff0c;编译相关的代码&#xff1b;操作二&#xff1a;通过live server或者直接通过浏览器&#xff0c;打开index.html代码…