Flink学习(七)-单词统计

news2024/11/13 12:04:11

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;

//2nd 设置流
DataSource xxxDS=env.xxxx();

//3rd 设置转换
Xxx transformation =xxxDS.xxxx();

//4th 设置sink
transformation.print();

//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {
        //1,创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2,获取输入流
        DataSource<String> lineDS = env.readTextFile("input/word.txt");
        //3,处理数据
        FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                //3.1 分隔字符串
                String[] values = value.split(" ");
                //3.2 汇总统计
                for (String word : values) {
                    Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);
                    collector.collect(wordTuple);
                }
            }
        });
        //4,按单词聚合
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);
        //5,分组内聚合
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);

        //6,输出结果
        sum.print();
    }
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> temp = Tuple2.of(word, 1);
                    collector.collect(temp);
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);
        sum.print();
        env.execute();

    }
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                ).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        sum.print();

        env.execute();

    }
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1612781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【春秋云镜】CVE-2023-43291 emlog SQL注入

靶场介绍 emlog是一款轻量级博客及CMS建站系统&#xff0c;在emlog pro v.2.1.15及更早版本中的不受信任数据反序列化允许远程攻击者通过cache.php组件执行SQL语句。 不感兴趣的可以直接拉到最后面&#xff0c;直接获取flag 备注&#xff1a;没有通过sql注入获取到flag&…

python多线程技术(Threading)

文章目录 前言一、多线程(Threading)是什么?二、threading库1.初识多线程2.增加新线程2.1 多线程的基本使用2.2 对多线程是同时进行的进行一个直观上的演示(非重点--理解是实时就行)2.3 thread.join()功能2.4 使用queue(队列)功能获取多线程的返回值(重要,这就是前面那…

MySql对于时间段交集的处理和通用实现方式(MyBatis-Plus)

问题&#xff1a;一般传统时间筛选是在[ 开始时间 → 结束时间 ]这个区间内的子集&#xff0c;也就是全包含查询方式&#xff0c;这种只会筛选一种情况。如果场景需要是开展一个活动&#xff0c;需要活动时间检索应该但凡包含就返回&#xff0c;也就是需要查询这个时间段有涉及…

Java的Hash算法及相应的Hmac算法

【相关知识】 加密算法知识相关博文&#xff1a;浅述.Net中的Hash算法&#xff08;顺带对称、非对称算法&#xff09;-CSDN博客 【出处与参考】 MessageDigest 类介绍、分多次调用update方法与一次性调用一致的说明引自&#xff1a; https://blog.csdn.net/cherry_chenr…

2024 IDM最新破解版及软件介绍

*IDM&#xff1a;信息时代的高效管理工具** 在快节奏的现代社会中&#xff0c;随着信息的爆炸式增长&#xff0c;如何高效、有序地管理信息成为每个人都需要面对的挑战。IDM&#xff0c;作为一种信息管理工具&#xff0c;正在逐渐受到人们的青睐。 IDM&#xff0c;全称Inform…

HAL STM32 I2C方式读取MT6701磁编码器获取角度例程

HAL STM32 I2C方式读取MT6701磁编码器获取角度例程 &#x1f4cd;相关篇《Arduino通过I2C驱动MT6701磁编码器并读取角度数据》&#x1f388;《STM32 软件I2C方式读取MT6701磁编码器获取角度例程》&#x1f4cc;MT6701当前最新文档资料&#xff1a;https://www.magntek.com.cn/u…

Scanpy(1)数据结构和样本过滤

注&#xff1a;主要讲述scanpy处理数据的结构、数据过滤&#xff08;生信领域&#xff09;和数据预处理&#xff08;和机器学习类似&#xff0c;但是又有不同。&#xff09; 1. Scanpy简介与安装 Scanpy 是一个可扩展的工具包&#xff0c;用于分析与 AnnData&#xff08;一种…

git 小记

一、 github新建仓库 git clone 。。。。。。。。。。。 &#xff08;增删查补&#xff0c;修改&#xff09; git add . git commit -m "修改” git push (git push main) 二、branch 分支 branch并不难理解&#xff0c;你只要想像将代码拷贝到不同目录…

ruoyi-vue前端的一些自定义插件介绍

文章目录 自定义列表$tab对象打开页签关闭页签刷新页签 $modal对象提供成功、警告和错误等反馈信息&#xff08;无需点击确认&#xff09;提供成功、警告和错误等提示信息&#xff08;类似于alert&#xff0c;需要点确认&#xff09;提供成功、警告和错误等提示信息&#xff08…

restful请求风格的增删改查-----修改and删除

一、修改&#xff08;和添加类似&#xff09; 前端&#xff1a; <script type"text/javascript">function update(){//创建user对象var user {id:$("#id").val(),username:$("#username").val(),password:$("#password").val…

排序 “贰” 之选择排序

目录 ​编辑 1. 选择排序基本思想 2. 直接选择排序 2.1 实现步骤 2.2 代码示例 2.3 直接选择排序的特性总结 3. 堆排序 3.1 实现步骤 3.2 代码示例 3.3 堆排序的特性总结 1. 选择排序基本思想 每一次从待排序的数据元素中选出最小&#xff08;或最大&#xff09;的一个…

又来!黄金主题LOF(161116)溢价40%开放申购,拖拉机都开冒烟了!

查看基金公告&#xff0c;黄金主题LOF(161116)下周一(4月22号)开放申购&#xff0c;限额100元&#xff0c;目前溢价40%&#xff0c;可以一拖七套利。 这熟悉的配方&#xff0c;这熟悉的套路&#xff01;一个月前的今天&#xff0c;我好像在标普500LOF上见过。又是易方达这个狗基…

数据结构_时间复杂度

✨✨所属专栏&#xff1a;数据结构✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ 什么是时间复杂度&#xff1f; 时间复杂度的定义&#xff1a;在计算机科学中&#xff0c;算法的时间复杂度是一个函数&#xff0c;它定量描述了该算法的运行时间。一个算法执行所耗费的时间&#xff0…

上位机图像处理和嵌入式模块部署(树莓派4b和类muduo网络编程)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 既然是linux编程&#xff0c;那么自然少不了网络编程。在linux平台上面&#xff0c;有很多的网络编程库可以选择&#xff0c;大的有boost、qt&…

python3如何提取汉字

采用正则表达式的方法对字符串进行处理。 str1 "&#xff5b;我%$是&#xff0c;《速$.度\发》中 /国、人"&#xff08;1&#xff09;提取汉字 汉字的范围为”\u4e00-\u9fa5“&#xff0c;这个是用Unicode表示的。 import re res1 .join(re.findall([\u4e00-\u9fa…

如何对图片进行压缩和缩放

在手机像素越来越高的时代&#xff0c;照片的体积也在不断地膨胀&#xff0c;大部分情况下我们是不需要这么大的图片的&#xff0c;这个时候我们就需要对图片进行压缩或者缩放了&#xff0c;今天教大家如何缩小图片体积 打开智游剪辑&#xff08;官网: zyjj.cc&#xff09;&…

MySQL慢查询怎么办?需要关注Explain的哪些关键字?

目录 1-引言&#xff1a;什么是慢查询1-1 慢查询定义1-2 为什么排查慢查询 2-核心&#xff1a;慢查询排查2-1 慢查询定位2-2 慢查询解决2-2-1 Explain 排查慢查询2-2-2 Explain 重点关键字 3-总结&#xff1a;慢查询知识点小结 1-引言&#xff1a;什么是慢查询 1-1 慢查询定义…

LabelMe数据集格式问题

注意图片的通道数&#xff0c;之前我们都说RGB&#xff0c;但是在这里要看图片位深。 图像是rgba四个通道的&#xff0c;第四个通道是透明通道。 注意png格式的不只是文件名后缀是 .png &#xff0c;也可能是后缀名是 .jpg 但是图片里面的深度是为32的&#xff0c;常规的后缀是…

如何使用JSONB类型在PostgreSQL中存储和查询复杂的数据结构?

文章目录 解决方案1. 创建包含JSONB列的表2. 插入JSONB数据3. 查询JSONB数据4. 创建索引以优化查询性能 示例代码结论 在PostgreSQL中&#xff0c;JSONB是一种二进制格式的JSON数据类型&#xff0c;它允许你在数据库中存储和查询复杂的JSON数据结构。与普通的JSON类型相比&…

Matlab新手快速上手2(粒子群算法)

本文根据一个较为简单的粒子群算法框架详细分析粒子群算法的实现过程&#xff0c;对matlab新手友好&#xff0c;源码在文末给出。 粒子群算法简介 粒子群算法&#xff08;Particle Swarm Optimization&#xff0c;PSO&#xff09;是一种群体智能优化算法&#xff0c;灵感来源于…