Flink02:Flink快速上手(Streaming WorldCount)

news2025/1/12 6:00:31

一、Flink快速上手 使用

(1)先把Flink的开发环境配置好。
(2)创建maven项目:db_flink
(3)首先在model中将scala依赖添加进来。

(4)然后创建scala目录,因为针对flink我们会使用java和scala两种语言

(5)创建包名
        在src/main/java下创建 com.imooc.java
        在src/main/scala下创建 com.imooc.scala
(6)接下来在 pom.xml 中引入flink相关依赖,前面两个是针对java代码的,后面两个是针对scala代码的,最后一个依赖是这对 flink1.11 这个版本需要添加的

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.11.1</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.11.1</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.1</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.1</version>
<!--             <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.1</version>

 二、Flink Job开发步骤

        1:获得一个执行环境
        2:加载/创建 初始化数据
        3:指定操作数据的transaction算子
        4:指定数据目的地
        5:调用execute()触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序
和Spark类似,Spark中是必须要有action算子才会真正执行。 

 三、Streaming WordCount

需求:通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来

 1. scala代码

package com.imooc.scala.stream

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 需求:通过Socket实时产生一些单词,
 * 使用Flink实时接收数据
 * 对指定时间窗口内(例如:2秒)的数据进行聚合统计
 * 并且把时间窗口内计算的结果打印出来
 */


object SocketWindowWordCountScala {
  /**
   * 注意:在执行代码之前,需要先在bigdata01机器上开启socket,端口为9001
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //连接socket获取输入数据
    val text = env.socketTextStream("bigdata01",9001)

    import org.apache.flink.api.scala._

    val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
      .map((_,1))//每一个单词转换为tuple2的形式(单词,1)
      //.keyBy(0)//根据tuple2中的第一列进行分组
      .keyBy(tup=>tup._1)//官方推荐使用keyselector选择器选择数据
      .timeWindow(Time.seconds(2))//时间窗口为2秒,表示每隔2秒钟计算一次接收到的数据
      .sum(1)// 使用sum或者reduce都可以
    //.reduce((t1,t2)=>(t1._1,t1._2+t2._2))

    //使用一个线程执行打印操作
    wordCount.print().setParallelism(1)

    //执行程序
    env.execute("SocketWindowWordCountScala")

  }
}

注意:在idea等开发工具里面运行代码的时候需要把pom.xml中的scope配置注释掉

在bigdata01上面开启socket

[root@bigdata01 ~]# nc -l 9001
hello me 
hello you hello me

idea控制台可以看到如下效果

 

注意:此时代码执行的时候下面会显示一些红色的log4j的警告信息,提示缺少相关依赖和配置

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>

此时再执行就没有红色的警告信息了,但是使用info日志级别打印的信息太多了,所以将log4j中的日志级别配置改为error级别 (resources下面的log4j.properties)

log4j.rootLogger=error,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

 2. Java代码

package com.imooc.java.stream;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 连接socket获取输入数据
        DataStream<String> text = env.socketTextStream("bigdata01", 9001);

        //处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = text.flatMap(new FlatMapFunction<String, String>() {
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> map(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            public String getKey(Tuple2<String, Integer> tup) throws Exception {
                return tup.f0;
            }
        }).timeWindow(Time.seconds(2)).sum(1);

        // 使用一个线程执行打印操作
        wordCount.print().setParallelism(1);

        //执行程序
        env.execute("SocketWindowWordCountJava");
    }
}

四、Batch WordCount

需求:统计指定文件中单词出现的总次数

1. Scala代码

package com.imooc.scala.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode

/**
 * 需求:对HDFS上的文件进行wordcount统计
 */
object BatchWordCountScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val input_hdfs = "hdfs://bigdata01:9000/add_partition.sh"
    val out_path = "hdfs://bigdata01:9000/out"

    //读取文件中的数据
    val text = env.readTextFile(input_hdfs)
    //处理数据
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" ")).map(x => (x, 1)).groupBy(0).sum(1)
      .setParallelism(2)

    //将结果保存到HDFS
    wordCount.writeAsCsv(out_path, "\n", " ", WriteMode.OVERWRITE)
//    wordCount.print()

    //执行程序
    env.execute("BatchWordCountScala")
  }
}

注意:

        (1)这里面执行setParallelism(1)设置并行度为1是为了将所有数据写到一个文件里面,我们查
看结果的时候比较方便

        (2)还有就是flink在windows中执行代码,使用到hadoop的时候,需要将hadoop-client的依赖添加到项目中,否则会提示不支持hdfs这种文件系统。在pom.xml文件中增加

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.2</version>
</dependency>

2.Java代码 

package com.imooc.java.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 统计HDFS文件上的词频
 */
public class BatchWordCountJava {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        String input_hdfs = "hdfs://bigdata01:9000/add_partition.sh";
        String out_path = "hdfs://bigdata01:9000/out2";
        
        //读取文件中的数据
        DataSource<String> text = env.readTextFile(input_hdfs);
        
        //处理数据
        DataSet<Tuple2<String, Integer>> worcCount = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }).groupBy(0).sum(1).setParallelism((2));

        //将结果保存到HDFS上
        worcCount.writeAsCsv(out_path, "\n", "");

        //执行程序
        env.execute("");
    }
}

 五、Flink Streaming和Batch的区别

流处理Streaming
执行环境:StreamExecutionEnvironment
数据类型:DataStream


批处理Batch
执行环境:ExecutionEnvironment
数据类型:DataSet

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/357103.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Lesson5---NumPy科学计算库

5.1 多维数组 Python拥有出色的第三方库生态系统在机器学习中&#xff0c;需要把所有的输入数据&#xff0c;都转变为多为数组的形式。score[i, j]二维数组i,j都从0开始 score[5] [85, 72, 61, 92, 80] score[2,5] [[85, 72, 61, 92, 80],[85, 72, 61, 92, 80]] score[30,5…

Linux系统之iptables应用SNAT与DNAT

目录 SNAT 一.SNAT的原理介绍 1.应用环境 2.SNAT原理 3.SNAT转换前提条件 二.开启SNAT 1.临时打开 2.永久打开 三.SNAT的转换 1.固定的公网IP地址 2.非固定的公网IP地址(共享动态IP地址) 四.SNAT实验 1.实验环境准备 2.配置web服务器&#xff08;192.168.100.100…

测试3.测试方法的分类

3.测试分类 系统测试包括回归测试和冒烟测试 回归测试&#xff1a;修改了旧的代码后&#xff0c;重新测试功能是否正确&#xff0c;有没有引入新的错误或导致其它代码产生错误 冒烟测试&#xff1a;目的是确认软件基本功能正常&#xff0c;可以进行后续的正式测试工作 按是否…

什么是 RESTful 风格?

一、什么是 REST &#xff1f; REST即表述性状态传递&#xff08;英文&#xff1a;Representational State Transfer&#xff0c;简称REST&#xff09;是Roy Thomas Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。它是一种针对网络应用的设计和开发方式&#…

游戏开发 - 开发流程 - 收集

1.应用场景 主要用于了解&#xff0c;掌握游戏开发的整个流程。 2.学习/操作 1.文档阅读 复习课 | 带你梳理客户端开发的三个重点-极客时间 2.整理输出 2.1 游戏开发流程 -- 参考 按照游戏开发中的三大模块策划、程序、美术&#xff0c;画了一个图。 开发游戏的时候&#xff…

LeetCode171-Excel表列序号(进制转换问题)

LeetCode171-Excel表列序号1、问题描述2、解题思路&#xff1a;进制转换3、代码实现1、问题描述 给你一个字符串columnTitle,表示Excel表格中得列名称。返回该列名称对应得列序号。 例如&#xff1a; A -> 1 B -> 2 C -> 3 ... Z -> 26 AA -> 27 AB -> 28 …

linux shell 入门学习笔记3 shebang

shebang 计算机程序中&#xff0c;shebang指的是出现在文本文件的第一行前两个字符#! 在Unix系统中&#xff0c;程序会分析shebang后面的内容&#xff0c;作为解释器的指令&#xff0c;例如 以#!/bin/sh 开头的文件&#xff0c;程序在执行的时候会调用/bin/sh&#xff0c;也就…

[软件工程导论(第六版)]第5章 总体设计(复习笔记)

文章目录5.1 设计过程5.2 设计原理5.2.1 模块化5.2.2 抽象5.2.3 逐步求精5.2.4 信息隐藏和局部化5.2.5 模块独立5.3 启发规则5.4 描绘软件结构的图形工具5.4.1 层次图5.4.2 HIPO图5.4.3 结构图5.5 面向数据流的设计方法目的 总体设计的基本目的就是回答“概括地说&#xff0c;系…

2.19 索引和事务

一.联合查询面试问题:聚合查询与联合查询的区别聚合查询是行与行之间的数据加工聚合函数 :count,sum,avg...group by 进行分组,指定列的值,相同的记录合并到同一个组,每个组又可以分别进行聚合查询分组还可以指定条件筛选,如果分组之前指定条件 用where,如果对分组之后指定条件…

< CSDN周赛解析:第 28 期 >

CSDN周赛解析&#xff1a;第 27 期&#x1f449; 第一题&#xff1a; 小Q的鲜榨柠檬汁> 题目解析> 解决方案&#x1f449; 第二题&#xff1a; 三而竭> 解析> 解决方案> 拓展知识&#x1f449; 第三题&#xff1a; 隧道逃生> 解析> 解决方案&#x1f449;…

【人工智能AI】四、NoSQL进阶《NoSQL 企业级基础入门与进阶实战》

帮我写一篇介绍NoSQL的技术文章&#xff0c;文章的标题是《四、NoSQL进阶》&#xff0c;不少于3000字。帮我细化到三级目录&#xff0c;使用markdown格式。这篇文章的目录是&#xff1a; 四、NoSQL 进阶 4.1 NoSQL 高可用 4.2 NoSQL 数据安全 4.3 NoSQL 性能优化 4.4 总结 四、…

Vue:extends继承组件复用性

提到extends继承&#xff0c;最先想到的可能是ES6中的class、TS中的interface、面向对象编程语言中中的类和接口概念等等&#xff0c;但是我们今天的关注点在于&#xff1a;如何在Vue中使用extends继承特性。 目录 Vue&#xff1a;创建Vue实例的方式 构造函数方式&#xff1…

3D点云处理:点云聚类--FEC: Fast Euclidean Clustering for Point Cloud Segmentation

文章目录 聚类结果一、论文内容1.1 Ground Surface Removal1.2 Fast Euclidean Clustering题外:欧几里得聚类Fast Euclidean Clustering二、参考聚类结果 原始代码中采用的是pcl中的搜索方式,替换为另外第三方库,速度得到进一步提升。 一、论文内容 论文中给出的结论:该…

java基础学习 day42(继承中构造方法的访问特点,this、super的使用总结)

继承中&#xff0c;构造方法的访问特点 父类的构造方法不会被子类继承&#xff0c;但可以通过super()调用父类的构造方法&#xff0c;且只能在子类调用&#xff0c;在测试类中是不能手动单写构造方法的。子类中所有的构造方法默认先调用父类的无参构造&#xff0c;再执行自己构…

vue3+ts+node个人博客系统(三)

一.主页顶部和中心面板布局 &#xff08;1&#xff09; 首先先去element-plus选择合适的布局el-container (2)在头部处编写相应的菜单栏el-menu,在这里要注意动态绑定路由的问题:default-active"$route.path"。将default-active设置为$route.path&#xff0c;el-me…

Java File类、IO流、Properties属性类

文章目录一、补充二、File类File类的含义创建多级文件File类的常见方法三、IO流IO流分类输入输出流FileOutputStreamInputStreamInputStream与OutputStream的实例ReaderWriterFileReader和FileWriter的实例缓冲流转换流序列化与ObjectInputStream、ObjectOutputStream打印流Pro…

MySQL 10:MySQL事务

MySQL 中的事务是由存储引擎实现的。在 MySQL 中&#xff0c;只有 InnoDB 存储引擎支持事务。事务处理可用于维护数据库的完整性&#xff0c;确保批处理的 SQL 语句要么执行要么根本不执行。事务用于管理 DDL、DML 和 DCL 操作&#xff0c;例如插入、更新和删除语句&#xff0c…

JVM10垃圾回收算法

1.什么是垃圾&#xff1f; 垃圾是指在运行程序中没有任何指针指向的对象&#xff0c;这个对象就是需要被回收的垃圾。 如果不及时对内存中的垃圾进行清理&#xff0c;那么&#xff0c;这些垃圾对象所占的内存空间会一直保留到应用程序的结束&#xff0c;被保留的空间无法被其…

XLink 和 XPointer 简介

XLink 定义了一套标准的在 XML 文档中创建超级链接的方法。 XPointer 使超级链接可以指向 XML 文档中更多具体的部分&#xff08;片断&#xff09;。 您应当具备的基础知识 学习本教程前您应当具备的基础知识: HTML / XHTMLXML / XML 命名空间XPath 如果您希望首先学习这些项…