使用flink编写WordCount

news2024/11/24 10:49:31

1. env-准备环境

2. source-加载数据

3. transformation-数据处理转换

4. sink-数据输出

5. execute-执行

流程图:

DataStream API开发

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/

 添加依赖

<properties>
  <flink.version>1.13.6</flink.version>
</properties>
 
<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
 
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
 
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
 
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
 
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
 
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    <version>2.7.5-10.0</version>
  </dependency>
 
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
  </dependency>
 
</dependencies>
 
<build>
  <extensions>
    <extension>
      <groupId>org.apache.maven.wagon</groupId>
      <artifactId>wagon-ssh</artifactId>
      <version>2.8</version>
    </extension>
  </extensions>
 
  <plugins>
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>wagon-maven-plugin</artifactId>
      <version>1.0</version>
      <configuration>
        <!--上传的本地jar的位置-->
        <fromFile>target/${project.build.finalName}.jar</fromFile>
        <!--远程拷贝的地址-->
        <url>scp://root:root@bigdata01:/opt/app</url>
      </configuration>
    </plugin>
  </plugins>
 
</build>

 

编写代码

package com.bigdata.day01;
 
 
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
 
 
public class WordCount01 {
 
    /**
     * 1. env-准备环境
     * 2. source-加载数据
     * 3. transformation-数据处理转换
     * 4. sink-数据输出
     * 5. execute-执行
     */
 
    public static void main(String[] args) throws Exception {
        // 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
 
        // 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类
        DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
 
        DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {
 
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(" ");
                for (String word : arr) {
                    // 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
                    collector.collect(word);
                }
            }
        });
        //flatMapStream.print();
        // Tuple2 指的是2元组
        DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
 
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1); // ("hello",1)
            }
        });
        DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元素,进行相加的意思
        }).sum(1);
        sumResult.print();
        // 执行
        env.execute();
    }
}

批处理结果:前面的序号代表分区

流处理结果:

也可以通过如下方式修改分区数量:

 env.setParallelism(2);

关于并行度的代码演示:

系统以及算子都可以设置并行度,或者获取并行度

package com.bigdata.day01;
 
 
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
 
 
public class WordCount01 {
 
    /**
     * 1. env-准备环境
     * 2. source-加载数据
     * 3. transformation-数据处理转换
     * 4. sink-数据输出
     * 5. execute-执行
     */
 
    public static void main(String[] args) throws Exception {
        // 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这个是 自动 ,根据流的性质,决定是批处理还是流处理
        //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 批处理流, 一口气把数据算出来
        // env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 将任务的并行度设置为2
        // env.setParallelism(2);
        // 通过这个获取系统的并行度
        int parallelism = env.getParallelism();
        System.out.println(parallelism);
 
        // 获取数据  多态的写法 DataStreamSource 它是 DataStream 的子类
        DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
 
        DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {
 
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(" ");
                for (String word : arr) {
                    // 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
                    collector.collect(word);
                }
            }
        });
        // 每一个算子也有自己的并行度,一般跟系统保持一致
        System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());
        //flatMapStream.print();
        // Tuple2 指的是2元组
        DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
 
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1); // ("hello",1)
            }
        });
        DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
            // 此处的1 指的是元组的第二个元组,进行相加的意思
        }).sum(1);
        sumResult.print();
        // 执行
        env.execute();
    }
}
  1. 打包、上传

 文件夹需要提前准备好

提交我们自己开发打包的任务

flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

去界面中查看运行结果:

因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniop触摸屏维修eTOP40系列ETOP40-0050

在现代化的工业与商业环境中&#xff0c;触摸屏设备已成为不可或缺的人机交互界面。UNIOP&#xff0c;作为一个知名的触摸屏品牌&#xff0c;以其高性能、稳定性和用户友好性&#xff0c;广泛应用于各种自动化控制系统、自助服务终端以及高端展示系统中。然而&#xff0c;即便如…

基于AXI PCIE IP的FPGA PCIE卡示意图

创作不易&#xff0c;转载请注明出处&#xff1a;https://blog.csdn.net/csdn_gddf102384398/article/details/143926217 上图中&#xff0c;在FPGA PCIE卡示意图内&#xff0c;有2个AXI Master设备&#xff0c;即&#xff1a;PCIE到AXI4-Full-Master桥、AXI CDMA IP&#xff1…

学习与理解LabVIEW中多列列表框项名和项首字符串属性

多列列表框控件在如下的位置&#xff1a; 可以对该控件右击&#xff0c;如下位置&#xff0c;即可设置该控件的显示项&#xff1a; 垂直线和水平线指的是上图中组成单元格的竖线和横线&#xff08;不包括行首列首&#xff09; 现在介绍该多列列表框的两个属性&#xff0c;分别…

使用 前端技术 创建 QR 码生成器 API1

前言 QR码&#xff08;Quick Response Code&#xff09;是一种二维码&#xff0c;于1994年开发。它能快速存储和识别数据&#xff0c;包含黑白方块图案&#xff0c;常用于扫描获取信息。QR码具有高容错性和快速读取的优点&#xff0c;广泛应用于广告、支付、物流等领域。通过扫…

UE5材质篇5 简易水面

不得不说&#xff0c;UE5里搞一个水面实在是相比要自己写各种反射来说太友好了&#xff0c;就主要是开启一堆开关&#xff0c;lumen相关的&#xff0c;然后稍微连一些蓝图就几乎有了 这里要改一个shading model&#xff0c;要这个 然后要增加一个这个node 并且不需要连接base …

计算机网络 实验六 组网实验

一、实验目的 通过构造不同的网络拓扑结构图并进行验证&#xff0c;理解分组转发、网络通信及路由选择的原理&#xff0c;理解交换机和路由器在子网划分中的不同作用。 二、实验原理 组网实验是指将多个计算机通过网络连接起来&#xff0c;实现数据的共享和通信。 组网需要考虑…

LeetCode 力扣 热题 100道(八)相交链表(C++)

给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返回结果后&…

Spring |(四)IoC/DI配置管理第三方bean

文章目录 &#x1f4da;数据源对象管理&#x1f407;环境准备&#x1f407;实现Druid管理&#x1f407;实现C3P0管理 &#x1f4da;加载properties文件&#x1f407;第三方bean属性优化&#x1f407;读取单个属性 学习来源&#xff1a;黑马程序员SSM框架教程_SpringSpringMVCMa…

鸿蒙NEXT开发案例:随机数生成

【引言】 本项目是一个简单的随机数生成器应用&#xff0c;用户可以通过设置随机数的范围和个数&#xff0c;并选择是否允许生成重复的随机数&#xff0c;来生成所需的随机数列表。生成的结果可以通过点击“复制”按钮复制到剪贴板。 【环境准备】 • 操作系统&#xff1a;W…

[译]Elasticsearch Sequence ID实现思路及用途

原文地址:https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0 如果 几年前&#xff0c;在Elastic&#xff0c;我们问自己一个"如果"问题&#xff0c;我们知道这将带来有趣的见解&#xff1a; "如果我们在Elasticsearch中对索引操作进行全面排序会怎样…

小米14升级澎湃OS 2.0.6.VNCCNXM 记录

简介 11.23 小米14凌晨推送了澎湃2.0,还真是11月压轴的,不是内测申请的。 btw,什么时候才能有红米耳机连接的弹窗啊??为什么13都有,但是14没有? 系统更新推送 版本介绍 1.0.47 更新到 2.0.6.VNCCNXM,记录一些界面变化,应用问题和内存情况。 澎湃OS 2 更新 - 功能介…

【单点知识】基于PyTorch进行模型部署

文章目录 0. 前言1. 模型导出1.1 TorchScript1.1.1 使用 torch.jit.trace1.1.2 使用 torch.jit.script 1.2 ONNX1.2.1 导出为 ONNX 格式 1.3 导出后的模型加载1.3.1 加载 TorchScript 模型1.3.2 加载 ONNX 模型 2. 模型优化2.1 模型量化2.2 模型剪枝 3. 服务化部署3.1 Flask 部…

FreeRTOS——互斥信号量

一、为什么需要互斥信号量 前面的学习中&#xff1a; 调度锁、临界段不可避免的破坏了实时性&#xff0c;还有二值信号量存在这样的隐患——“优先级翻转” 优先级翻转 简单来说&#xff0c;就是由于信号量被低优先级任务占用&#xff0c;即使遇到高优先级任务&#xff0c;它…

前端-react(class组件和Hooks)

文章主要以Hooks为主,部分涉及class组件方法进行对比 一.了解react 1.管理组件的方式 在React中&#xff0c;有两种主要的方式来管理组件的状态和生命周期&#xff1a;Class 组件和 Hooks。 Class 组件&#xff1a; Class 组件是 React 最早引入的方式&#xff0c;它是基于…

Ngrok实现内网穿透(Windows)

Ngrok实现内网穿透&#xff08;Windows&#xff09; 什么是内网穿透&#xff0c;内网穿透有什么用 内网穿透&#xff08;NAT traversal&#xff09;是一种技术手段&#xff0c;使得位于内网或防火墙后面的设备能够通过外网访问。例如&#xff0c;如果你的计算机、服务器等设备…

如何使用Jest测试你的React组件

在本文中&#xff0c;我们将了解如何使用Jest&#xff08;Facebook 维护的一个测试框架&#xff09;来测试我们的React组件。我们将首先了解如何在纯 JavaScript 函数上使用 Jest&#xff0c;然后再了解它提供的一些开箱即用的功能&#xff0c;这些功能专门用于使测试 React 应…

力扣 三数之和-15

三数之和-15 class Solution { public:vector<vector<int>> threeSum(vector<int>& nums) {int temp 0;//定义一个二维vector数组vector<vector<int>> ans;int n nums.size();//对nums数组进行排序sort(nums.begin(), nums.end());//固定…

深度学习每周学习总结J6(ResNeXt-50 算法实战与解析 - 猴痘识别)

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 目录 0. 总结ResNeXt基本介绍 1. 设置GPU2. 导入数据及处理部分3. 划分数据集4. 模型构建部分5. 设置超参数&#xff1a;定义损失函数&…

Transformer架构笔记

Attention is All You Need. 3.Model Architecture 3.1 整体架构如图 3.2 Encoder与Decoder Encoder&#xff1a;由 N 6 N6 N6个相同的Block/Layer堆叠而成。每个Block有两个子层sub-layer&#xff1a;多头注意力和MLP&#xff08;FFN&#xff0c;前馈神经网络&#xff09;&…

【大数据学习 | Spark-Core】spark-shell开发

spark的代码分为两种 本地代码在driver端直接解析执行没有后续 集群代码&#xff0c;会在driver端进行解析&#xff0c;然后让多个机器进行集群形式的执行计算 spark-shell --master spark://nn1:7077 --executor-cores 2 --executor-memory 2G sc.textFile("/home/ha…