Flink 从入门到实战

news2024/11/29 16:59:04

Flink中的批和流

批处理的特点是有界、持久、大量,非常适合需要访问全部记录才能完成的计算工作,一般用于离线统计

流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统 传输的每个数据项执行操作,一般用于实时统计。

一个无界流可以分解为多个有界流

性能 Flink > Spark > Hadoop

Flink的四种安装模式:

  1. Local

  2. Standalone

  3. standaloneHA

  4. Yarn

flink在使用input、output执行测试文件WordCount.jar 的时候,报出找不到文件的错误(但是文件路径存在),原因是:

因为我们的flink是task节点在执行任务的,task在三台机器上都有分布,我们当前文件只在一台服务器中,所以当其他task运行的时候,就会报出找不到文件的错误,将此文件分发到每台服务器中就不会出现这个错误。(我们以后在使用flink的时候,数据都是存放在hdfs上(一式三份),就不存在找不到文件的错误)

Flink-WordCount案例:

  1. 第一版代码

这一版代码比较简单,看代码就可以看懂

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _01WorkCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");

        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        });

        // 将切割好的字符串形成 (word,1)的二元组的形式
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);

            }
        });

        // 聚合
        DataStream<Tuple2<String, Integer>> sumResult = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        }).sum(1);

        sumResult.print();
        env.execute();

    }
}
第二版代码:简化了第一版的书写形式

第一版中 SingleOutputStreamOperator、DataStreamSource的父类其实都是DataStream,所以可以连着写下来
 

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _02WorkCount {

    /**
     *
     *  简化版案例
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
                .flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);

            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        }).sum(1).print();
        env.execute();

    }
}
第三版,使用lambda表达式,更加简化的书写

不过在使用lambda的时候,需要在后面指定一个方法的返回值,要不然会报错

package com.bigdata;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _03WorkCount_lambda {

    /**
     * lambda 表达式简化版
     * @param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {

        // 使用lambda简化的时候,需要指定返回值类型,不指定的话会报错

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
                .flatMap((String line, Collector<String> collector) -> {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
            }).returns(Types.STRING).map((String word) ->  Tuple2.of(word, 1)

            ).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) ->  tuple2.f0).sum(1).print();
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        env.execute();

    }
}
复习lambda表达式:
  • lambda可以用来简化匿名内部类的书写

  • lambda只能简化函数式接口(有且仅有一个方法的接口)的匿名内部类的书写

省略规则:

  • 只拿小括号里面的 加上 -> 指向大括号

  • 只有一个参数的时候,参数类型可以省略

  • 如果方法体中的代码只有一行,大括号和return等都可以省略(但是需要同时省略)

没省略之前的 (第一版)

省略后(第三版)

第四版,自定义输入与输出的路径地址

可以打包到集群中使用,使用的时候在jar包的后面跟上input路径以及output路径即可

package com.bigdata;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _04WorkCount_zidingyipass {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 将任务的并行度设置为2
        // env.setParallelism(2);

        // 通过args传参
        DataStreamSource<String> dataStream01 = null;
        if (args.length == 0){
            dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
        }else {
            String input = args[0];
            dataStream01 = env.readTextFile(input);
        }


        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
                .flatMap((String line, Collector<String> collector) -> {

                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }

        }).map((String word) -> Tuple2.of(word, 1)


        ).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0

         // 此处的1 指的是元组的第二个元素,进行相加的意思
        ).sum(1);

        if (args.length == 0){
            sumResult.print();
        }else {
            String output = args[1];
            sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }

        env.execute();

    }
}

打包后执行结果如下:

第五版,采用和flink中相同的书写方式 即带(--input 以及 --output)

也可以打包到集群中使用,使用的时候在jar包的后面跟上 --input +路径以及 -output + 路径即可

package com.bigdata;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _05WorkCount_zidingyipass_input {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 将任务的并行度设置为2
        // env.setParallelism(2);
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String input = "";
        String output = "";
        if (parameterTool.has("output") && parameterTool.has("input")) {
            input = parameterTool.get("input");
            output = parameterTool.get("output");
        } else {
            output = "hdfs://bigdata01:9820/home/wordcount/output";
        }

        // 通过args传参
        DataStreamSource<String> dataStream01 = null;
        if (args.length == 0){
            dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
        }else {
            dataStream01 = env.readTextFile(input);
        }
        // 首先先对字符串进行切割,形成一个新的数组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
                .flatMap((String line, Collector<String> collector) -> {
                
                String[] arr = line.split(" ");
                for (String word : arr) {
                    collector.collect(word);
                }
        }).returns(Types.STRING).map((String word) -> Tuple2.of(word, 1)

        ).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0

         // 此处的1 指的是元组的第二个元素,进行相加的意思
        ).sum(1);

        if (args.length == 0){
            sumResult.print();
        }else {
            sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249881.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu20.04运行LARVIO

文章目录 1.运行 Toyish 示例程序2.运行 ROS Nodelet参考 1.运行 Toyish 示例程序 LARVIO 提供了一个简化的toyish示例程序&#xff0c;适合快速验证和测试。 编译项目 进入 build 文件夹并通过 CMake 编译项目&#xff1a; mkdir build cd build cmake -D CMAKE_BUILD_TYPER…

小程序-基于java+SpringBoot+Vue的戏曲文化苑小程序设计与实现

项目运行 1.运行环境&#xff1a;最好是java jdk 1.8&#xff0c;我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境&#xff1a;IDEA&#xff0c;Eclipse,Myeclipse都可以。推荐IDEA; 3.tomcat环境&#xff1a;Tomcat 7.x,8.x,9.x版本均可 4.硬件环境&#xff1a…

mybatis plus如何使用mybatis xml拼接sql

在 MyBatis Plus 中&#xff0c;如果你想使用 MyBatis 的 XML 文件来拼接 SQL&#xff0c;可以结合使用 MyBatis 和 MyBatis Plus 的功能。MyBatis Plus 是一个增强 MyBatis 的工具&#xff0c;它提供了很多便捷的操作&#xff0c;但有时你可能需要使用 XML 文件来定义更复杂的…

【uniapp】轮播图

前言 Uniapp的swiper组件是一个滑块视图容器组件&#xff0c;可以在其中放置多个轮播图或滑动卡片。它是基于微信小程序的swiper组件进行封装&#xff0c;可以在不同的平台上使用&#xff0c;如微信小程序、H5、App等。 效果图 前端代码 swiper组件 <template><vi…

Python爬虫爬取数据报错

报错&#xff1a; Error fetching the URL: (Connection aborted., ConnectionResetError(10054, 远程主机强迫关闭了一个现有的连接。, None, 10054, None)) 报错原因&#xff1a; 目标服务器限制&#xff1a; 目标网站可能已经检测到你的请求来自自动化工具&#xff08;如爬虫…

人工智能与传统控制系统的融合发展

在这个科技快速迭代的时代&#xff0c;人工智能技术正以前所未有的速度改变着我们的生活。在控制系统领域&#xff0c;AI技术的引入为传统控制带来了新的发展机遇和挑战。然而&#xff0c;这并不意味着传统控制将被完全取代&#xff0c;相反&#xff0c;AI与传统控制的深度融合…

shell综合

声明&#xff01; 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&#…

什么是串联谐振

比如有一个由电阻、电容和电感的串联电路中&#xff0c;存在一个频率能使这个电路的电流最大&#xff0c;这个现象就叫谐振。 那么这个频率是多少呢&#xff1f; 交流电频率与电路固有频率一致时&#xff0c;它就能发生谐振&#xff0c;此时这个电路的电流是最大的 这个固有频…

韦东山stm32hal库--定时器喂狗模型按键消抖原理+实操详细步骤

一.定时器按键消抖的原理: 按键消抖的原因: 当我们按下按键的后, 端口从高电平变成低电平, 理想的情况是, 按下, 只发生一次中断, 中断程序只记录一个数据. 但是我们使用的是金属弹片, 实际的情况就是如上图所示, 可能会发生多次中断,难道我们要记录3/4次数据吗? 答:按键按下…

雨云服务器搭建docker且用docker部署kali服务器教程

雨云 - 新一代云服务提供商 介绍 大家好今天教大家如何使用雨云的服务器安装docker并且用docker搭建kali服务器&#xff0c;实现大家做黑客的梦。 性价比比较高的云服务器提供参考&#xff1a;雨云 - 新一代云服务提供商 优惠码&#xff1a;MzkxODI4 什么是kali Kali L…

SQL进阶——JOIN操作详解

在数据库设计中&#xff0c;数据通常存储在多个表中。为了从这些表中获取相关的信息&#xff0c;我们需要使用JOIN操作。JOIN操作允许我们通过某种关系&#xff08;如相同的列&#xff09;将多张表的数据结合起来。它是SQL中非常重要的操作&#xff0c;广泛应用于实际开发中。本…

分析JHTDB数据库的Channel5200数据集的数据(SciServer服务器)

代码来自https://github.com/idies/pyJHTDB/blob/master/examples/channel.ipynb %matplotlib inline import numpy as np import math import random import pyJHTDB import matplotlib.pyplot as plt import time as ttN 3 T pyJHTDB.dbinfo.channel5200[time][-1] time …

数据分析:彩票中奖号码分析与预测

预测双色球彩票的中奖号码是一个典型的随机事件&#xff0c;因为每个号码的出现概率是独立的&#xff0c;且历史数据并不能直接用于预测未来的开奖结果。然而&#xff0c;我们可以通过统计分析来了解号码的分布规律&#xff0c;从而提供一些可能的参考。 样例数据【点击下载】…

详细分析 npm run build 基本知识 | 不同环境不同命令

目录 前言1. 基本知识2. 构建逻辑 前言 关于部署服务器的知识推荐阅读&#xff1a;npm run build部署到云服务器中的Nginx&#xff08;图文配置&#xff09; 1. 基本知识 npm run 是 npm 的一个命令&#xff0c;用于运行 package.json 中定义的脚本&#xff0c;可以通过 “s…

Jpype调用jar包

需求描述 ​   公司要求使用python对接口做自动化测试&#xff0c;接口的实现是Java&#xff0c;部分接口需要做加解密&#xff0c;因此需要使用python来调用jar包来将明文加密成密文&#xff0c;然后通过http请求访问接口进行测试。 如何实现 1.安装Jpype ​   首先我…

Realtek网卡MAC刷新工具PG8168.exe Version:2.34.0.4使用说明

本刷新工具虽然文件名叫PG8168.EXE&#xff0c;但不是只有RTL8168可用&#xff0c;是这一个系列的产品都可以使用。实验证明RTL8111也可以使用。 用法&#xff1a; PG8168 [/h][/?][/b][/c HexOffsetHexValue][/d NICNumber][/l][/r][/w][/v] [/# NICNumber] [/nodeidHexNOD…

【Unity】Unity编辑器扩展,替代预制体上重复拖拽赋值

今天做游戏时有个需求&#xff0c;游戏中需要给不同年份不同月份的奖牌制定不一样的非规则形状&#xff0c;其中形状为100个像素组成的不同图形&#xff0c;并且按照从1-100路径一个个解锁&#xff0c;所以需要全部手动放置。但是手动放置好后&#xff0c;发现再一个个挂到脚本…

c语言的qsort函数理解与使用

介绍&#xff1a;qsort 函数是 C 标准库中用于排序的快速排序算法函数。它的用法非常灵活&#xff0c;可以对任意类型的元素进行排序&#xff0c;只要提供了比较函数即可。 qsort 函数原型及参数解释&#xff1a; void qsort ( void* base, //指向要排序的数组的首元素…

【力扣】125. 验证回文串

问题描述 思路详情 本题目的重点是对java中字符串的各种API用法的掌握理解 1.首先只保留字母和数字 1.1可以使用正则表达式1.2 Character.isLetterOrDight(ch) &#xff0c;但是这个只能单个字符判断2.将大写字母全部小写3.验证是否为回文串 代码 通过正则表达式 &#xff…

JavaEE---计算机是如何工作的?

1.了解冯诺依曼体系结构 2.CPU的核心概念,CPU的两个重要指标(核心数和频率) 3.CPU执行指令的流程(指令表,一条一条指令,取指令,解析指令,执行指令) 4.操作系统核心概念(管理硬件,给软件提供稳定的运行环境) 5.进程的概念(运行起来的程序和可执行文件的区别) 6.进程的管理(…