Flink DataSet API和DataStream API 对于WordCount的演示

news2024/12/29 14:01:58

文章目录

  • 准备工作
  • Flink DataSet API
  • Flink DataStream API
  • 结论


准备工作

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.chad</groupId>
    <artifactId>guigu_learning_flink</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <properties>
        <flink.version>1.14.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 日志管理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
</project>

创建word.txt
在项目下创建input目录,并创建word.txt文件
在这里插入图片描述

Flink DataSet API

创建java类BatchWordCount

package org.chad.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2. 读取数据源,从文件中读取
        final DataSource<String> fileDS = env.readTextFile("input/word.txt");

        //3. 转换算子操作环节,将数据转换为二元组
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = fileDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            //3. 将一行文本进行拆分,将每个单词转换为二元组输出
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        //4. 按照word进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOne.groupBy(0);

        //5. 分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> wordAndOneSum = wordAndOneGroup.sum(1);

        //6. 对结果打印输出
        wordAndOneSum.print();

    }
}

补充说明:
以上方式为dataset api 官放在1.12之后已经将其视为软弃用状态,使用批流一体的方式,怎么运行批处理,只需要在执行任务时,

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

运行结果
在这里插入图片描述

Flink DataStream API

创建java类BoundedStreamWordCount

package org.chad.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds = env.readTextFile("input/word.txt");
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = ds.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOne.keyBy(data -> data.f0);

        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);

        sum.print();

        env.execute("流处理");
    }
}

因为是流处理,所以最后一定要加上env.execute(), 去执行以下它

它的运行结果
在这里插入图片描述

结论

  • 我们可以看到DataStream API的形式输出的结果是一条一条的去相加的,并且每一行前面会有一个进程号
  • 批处理是一下子全部输出的
  • 既然后续DataSet API会弃用我们就只要掌握DataStream API就可以了,只需要在后续提交任务的时候,提交模式改为BATCH 就可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/170707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何多人配音一个作品?这3招帮你快速实现

大家平时喜欢听书吗&#xff1f;听书是一种既能释放双眼&#xff0c;又能降低压力的放松方式。那么大家平时在听书的时候&#xff0c;有没有碰到过一些多人配音的小说&#xff1f;大家有好奇过这样的小说是怎么来的吗&#xff1f;今天&#xff0c;教大家多人配音怎么制作的&…

请问想考软考,零基础的话,哪个证书最好考呢

可以直接考中级&#xff0c;软考中级中也有适合零基础报考的&#xff0c;中级的含金量也比初级的高&#xff0c;初级的用途不太大&#xff0c;建议直接中级。 系统集成项目管理工程师&#xff0c;软考中级比较热门的一个科目&#xff0c;零基础的也适合相比较容易通过。 软考…

Fisher确切概率基本原理详解

Fisher确切概率 基本原理 比较两组有效率是否有差异。 在周边合计不变的情况下&#xff0c;计算实际频率变动时的Pi&#xff08;概率&#xff09;。然后计算累积概率&#xff0c;依据检验水平做推断。 累积概率的计算 以a从小到大的概率排序 左侧概率&#xff1a;现有样本…

【SpringCloud17】SpringCloud Alibaba入门简介

1.为什么会出现SpringCloud Alibaba Spring Cloud Netflix项目进入维护模式官网 1.1 什么是维护模式 将模块置于维护模式&#xff0c;意味着 Spring Cloud 团队将不会再向模块添加新功能。我们将修复 block 级别的 bug 以及安全问题&#xff0c;我们也会考虑并审查社区的小型 …

shell处理多盘跑fio(minimal)的结果脚本编写

作为一个专业测试storage的测试人员&#xff0c;除了对服务器&#xff0c;硬盘熟悉之外&#xff0c;还要对测试工具fio特别熟悉才行。如果在OEM或者专门的HDD&SSD厂家测试&#xff0c;会经常看到测试脚本里边&#xff0c;开发喜欢用fio minimal 模式&#xff0c;这样解析lo…

【GD32F427开发板试用】利用SPI驱动ADS8354

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;STY 前言 本文期望通过板载硬件SPI外加DMA传输的方式来实现对全差分同步采样模数转换器ADS8354的控制&#xff0c;并且将采集数据进行初步脉冲…

2.1、进程的定义、组成、组织方式、特征

整体框架 1、进程的定义 程序\color{red}程序程序&#xff1a;就是一个指令序列 早期的计算机&#xff08;只支持单道\color{red}单道单道程序&#xff09; 引入多道\color{red}多道多道程序之后&#xff1a; 为了方便操作操作系统管理&#xff0c;完成各个程序并发执行、 引…

Go map 实现原理

Go map 实现原理 go map 源码路径在&#xff1a; src/runtime/map.gogo 源码结构 |– AUTHORS — 文件&#xff0c;官方 Go语言作者列表 |– CONTRIBUTORS — 文件&#xff0c;第三方贡献者列表 |– LICENSE — 文件&#xff0c;Go语言发布授权协议 |– PATENTS — 文件&…

如何解决Prometheus的数据回填问题

去年10月底的时候&#xff0c;我们的监控系统因为一个偶然的问题&#xff0c;出乎意料地发生了重大的故障&#xff0c;这次故障暴露了当前监控系统存在的一下重大隐患。故障背景及现象我们的监控系统基于Thanos构建&#xff0c;基本架构如下&#xff08;箭头表示数据流向&#…

Docker安装Mysql8.0主从复制

1使用portainer快速创建mysql 2.mysql-master version: 3.3 services:mysql-app:image: mysql:8.0container_name: mysqlrestart: alwaysports:- 3307:3306environment:MYSQL_ROOT_PASSWORD: 123456 # root用户的密码MYSQL_ROOT_HOST: % # 访问权限# MYSQL_USER: test …

[leetcode]刷题--关于位运算的几道题

&#xff08;1&#xff09;位运算的本质&#xff0c;其实是对二进制补码储存形式的修改。 位运算常见的运算符为 <<左移n个位置&#xff08;算数移位&#xff0c;符号位不变&#xff09; >>右移动n个位置&#xff08;采用直接丢弃末尾数字的方法&#xff0c;符号…

Android Raphael使用(专治native 内存泄漏)

1.前期准备 在项目根目录build.gradle中,添加仓库地址&#xff1a; allprojects {repositories {maven { url https://jitpack.io }} }2.案例实践 构建一个新的Library Module&#xff0c;其中build.gradle中添加依赖&#xff1a; dependencies {implementation com.github…

gitlab-runner搭建CI/CD

1. 背景 每次发布代码&#xff0c;需要连接服务器更新代码&#xff0c;进行部署&#xff0c;比较繁琐&#xff0c;浪费时间。方案有jenkins或gitlab-runner。由于代码仓库是gitlab并且只需要自动部署&#xff0c;不需要其他额外功能&#xff0c;这里选择使用gitlab-runner。 …

【React】三.React组件基础学习

目录 React组件介绍 React组件的两种创建方式 使用函数创建组件 函数组件 渲染函数组件 示例 使用类创建组件 抽离为独立的JS文件 步骤 问题记录 React事件处理 事件绑定 记录问题 事件对象 有状态组件和无状态组件 无状态组件&#xff08;木偶组件&#xff09;…

XSS(Cross Site Scripting)攻击简介

环境 Ubuntu 22.04IntelliJ IDEA 2022.1.3JDK 17.0.3.1Spring Boot 3.0.1Firefox 108.0.2 问题和分析 在IntelliJ IDEA中创建Spring Boot项目 test0116 &#xff0c;并选中 Spring Web 依赖。 在 src/main/java 下创建 MyController.java 如下&#xff1a; package com.ex…

Redis缓冲区不会还有人不知道吧?

1 简介 缓冲区&#xff0c;用一块内存空间暂时存放命令数据&#xff0c;以免因 数据和命令的处理速度&#xff1c;发送速度而导致数据丢失和性能问题。但缓冲区的内存空间有限&#xff0c;若持续&#xff1a; 往里写数据速度&#xff1e;从里读数据速度会导致缓冲区需越来越…

ATGM332D-5N卫星导航模块介绍

ATGM332D-5N卫星导航模块简介ATGM332D-5N系列模块是12X16 尺寸的高性能BDS/GNSS 全星座定位导航模块系列的总称。该系列模块产品都是基于中科微第四代低功耗GNSS SOC单芯片—AT6558&#xff0c;支持多种卫星导航系统&#xff0c;包括中国的BDS&#xff08;北斗卫星导航系统&…

BFS的入门与应用

目录 一、前言 二、BFS原理 二、BFS与最短路径 1、最短路径问题用BFS 2、迷宫&#xff08;2019年省赛&#xff0c;填空题&#xff0c;lanqiaoOJ题号602&#xff09; &#xff08;1&#xff09;字典序最小的最短路径 &#xff08;2&#xff09;输出路径的两种方法 三、B…

拉伯证券|今年首批游戏版号发放,机构看好春节行业景气度恢复

2023年第一批游戏版号发放。 昨日晚间&#xff0c;国家新闻出版署发布1月国产网络游戏审批信息&#xff0c;共88款游戏获批&#xff0c;其我国内各大游戏龙头均有所收成&#xff0c;包含腾讯《黎明觉悟&#xff1a;活力》、网易《逆水寒》&#xff08;移动版&#xff09;、完美…

数据治理:数据治理之道-组织机制-敏捷的治理组织

参考《一本书讲透数据治理》、《数据治理》等 组织机制&#xff1a;敏捷的治理组织 数据、组织、软件平台&#xff0c;是企业数字化转型面临的三座大山 数据&#xff1a;数据是企业数字化转型的根本驱动力之一&#xff0c;数字化转型中的企业必须做好数据治理与应用&#xff…