Flink学习-单词统计WordCount

news2025/1/11 12:50:27

WordCount(流处理)

通过socket数据源,去请求一个socket服务(9999),得到数据流然后统计数据流中出现的单词及其个数

1.创建一个编程入口,生成环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 批处理入口环境
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); //流批一体的入口环境

2.设置该环境的默认并行度

streamEnv.setParallelism(1);
本地运行模式时,程序的默认并行度为CPU的逻辑核数

3.通过source算子,把socket数据源加载为一个dataStream(数据流)

DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

4.通过算子对数据流进行各种转换(计算逻辑)

SingleOutputStreamOperator<Tuple2<String, Integer>> words = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
     public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        //切单词
        String[] split = s.split("//s+");
        for (String word : split) {
          //返回每一对(单词,1)
          collector.collect(Tuple2.of(word, 1));
        }
     }
});
输入数据流由文本行组成,flatMap 函数将每行拆分为单独的单词,并将它们作为单独的输出元素发出。生成的数据流包含所有输入行中的所有单词。
KeyedStream<Tuple2<String, Integer>, String> Keyed = words.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {

                return tuple2.f0;
            }
        });
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = Keyed.sum("f1");
将输入数据流 words 转换成一个键控流 (KeyedStream),使用 .keyBy() 方法指定按照元组中的第一个元素 f0 进行键控,也就是相同的 f0 值将被分到同一个分区中。这里用了匿名类实现了 KeySelector 接口,用于从元组中提取出键值。然后,调用 .sum() 方法对键控流进行求和操作。由于之前已经根据元组中的第一个元素进行了分区,所以对每个分区内所有元组的第二个元素 f1 求和得到的结果是每个不同键的数量总和。最后,将结果流 resultStream 作为输出。

5.通过sink算子,将结果输出

resultStream.print();

6.触发程序运行

env.execute();

7.程序测试

通过netcat来创建一个socket连接

监听端口9999,当有实体连接时,就可以相互发送socket信息
当连接后,我们输入词以后,程序会对这些词进行分区统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/400964.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式Linux驱动开发(二)LED驱动

1. Linux下LED驱动原理 与裸机区别在于&#xff0c;编写驱动要符合linux驱动框架规范。裸机直接对寄存器物理地址进行读写&#xff0c;linux下需要经过MMU。 1.1 地址映射相关概念 1&#xff09;MMU&#xff08;Memory Manage Unit - 内存管理单元&#xff09;&#xff1a; …

新星计划·第四季·Python赛道报名入口 -〖你就是下一个新星〗

↓↓↓报名方式&#xff1a;&#xff08;下滑到本页面底部&#xff09;重要提醒&#xff1a;这里是新星计划第四季Python赛道报名入口&#xff0c;一经报名&#xff0c;不可更换。报名入口点击此处跳转 一、新星计划 新星计划是一个以发掘潜力新人、培养优质博主为目标的创作…

css3动画属性

边框弧度 border-radius:value // 四角 border-radius:value value // 左上右下 右上左下 border-radius:value value value value // 左上 右上 右下 左下 text-shadow:value value value color; // 水平 垂直 模糊度 颜色 线性渐变&#xff1a;background-image:linear-…

oracle的时间戳获取不含中文内容的方式

背景&#xff1a; 在做oracle的数据库同步时发现&#xff0c;创建的行级触发器获取表的时间戳数据时含有中文&#xff0c;导致入库时转义乱码&#xff0c;条件匹配失败。 调试过程&#xff1a; 写了一个declare脚本测试&#xff1a; declare --类型定义 cursor c_job IS sele…

java反射机制及其详解

反射反射机制反射调用优化有时候我们做项目的时候不免需要用到大量配置文件&#xff0c;就拿框架举例&#xff0c;通过这些外部文件配置&#xff0c;在不修改的源码的情况下&#xff0c;来控制文件&#xff0c;就要用到我们的反射来解决 假设有一个Cat对象 public class Cat …

堆的应用(topk问题)

文章目录1.堆排序1.1代码实现2. TOP-K问题2.1原理2.2实例分析1.堆排序 堆排序即利用堆的思想来进行排序&#xff0c;总共分为两个步骤&#xff1a; 1.建堆 升序&#xff1a;大堆 降序&#xff1a;小堆 2.利用堆删除思想来排序 1.1代码实现 void Heapsort(int* a, int n) {f…

C#中通过HttpClient发送Post请求

C#中HttpClient进行各种类型的传输我们可以看到, 尽管PostAsync有四个重载函数, 但是接受的都是HttpContent, 而查看源码可以看到, HttpContent是一个抽象类那我们就不可能直接创建HttpContent的实例, 而需要去找他的实现类, 经过一番研究, 发现了, 如下四个:MultipartFormData…

系列一、AliyunOSS开通及使用

一、对象存储OSS服务开通及配置 1.1、开通OSS 1.2、进入管理控制台 1.3、控制台使用 1.3.1、创建Bucket 命名&#xff1a;20230309-oss 读写权限&#xff1a;公共读 1.3.2、上传默认头像 创建文件夹 avater&#xff0c;上传默认的用户头像 1.4、使用RAM子用户 1.4.1、添加…

设计模式3——结构型模式

结构型模式描述如何将类或对象按某种布局组成更大的结构&#xff0c;它分为类结构型和对象结构型模式&#xff0c;前者采用继承机制来组织接口和类&#xff0c;后者采用组合或聚合来组合对象。 由于组合关系或聚合关系比继承关系耦合度低&#xff0c;满足“合成复用原则”&…

哈希表的实现

哈希表概念 二叉搜索树具有对数时间的表现&#xff0c;但这样的表现建立在一个假设上&#xff1a;输入的数据有足够的随机性。哈希表又名散列表&#xff0c;在插入、删除、搜索等操作上具有「常数平均时间」的表现&#xff0c;而且这种表现是以统计为基础&#xff0c;不需依赖…

CMU15-445 Project.4总结

在线测试 Project #4 - Concurrency Control 以下是Project #4的网址&#xff0c;2022FALL的Project #4是实现并发控制&#xff0c;可以分为以下三个任务&#xff1a; 我们首先需要实现一个锁管理器&#xff0c;能够支持 READ_UNCOMMITED、READ_COMMITTED、REPEATABLE_READ…

layui表格合并

先看一下最终合并之后的效果&#xff0c;能对单选、复选框进行按照某一列的合并 最开始的解决方案来自于这篇博客介绍的方法&#xff1a;https://blog.csdn.net/guishifoxin/article/details/81480136 但是还是存在没能解决的问题。 在完善之后达到的效果&#xff1a; 一&…

移动硬盘格式化?想要恢复硬盘那就看这里!

案例&#xff1a;移动硬盘无法打开&#xff0c;提示格式化&#xff1f; “怎么办啊&#xff01;&#xff01;&#xff01;今天下午给同学重装系统&#xff0c;插上自己的移动硬盘&#xff0c;却发现读不出来&#xff0c;提示需要格式化&#xff01;里面有很多东西&#xff0c;…

第十四章 opengl之高级OpenGL(深度测试)

OpenGL深度测试深度测试函数深度值精度深度缓冲的可视化深度冲突防止深度冲突深度测试 前面我们渲染一个3D图片中运用了深度缓冲&#xff1a;防止被阻挡的面渲染到其他面的前面。 深度缓冲就像颜色缓冲(Color Buffer)&#xff08;储存所有的片段颜色&#xff1a;视觉输出&…

JAVA开发(JAVA中的异常)

在java开发与代码运行过程中&#xff0c;我们经常会遇到需要处理异常的时候。有时候是在用编辑器写代码&#xff0c;点击保存的时候&#xff0c;编辑器就提示我们某块代码有异常&#xff0c;强制需要处理。有时候是我们启动&#xff0c;运行JAVA代码的时候的&#xff0c;日志里…

案例06-没有复用思想的接口和sql--mybatis,spring

目录一、背景二、思路&方案问题1优化问题2优化三、总结四、升华一、背景 写这篇文章的目的是通过对没有复用思想接口的代码例子优化告诉大家&#xff0c;没有复用思想的代码不要写&#xff0c;用这种思维方式和习惯来指导我们写代码。 项目中有两处没有复用思想代码&#…

R语言基础(三):运算

接前文 R语言基础(一)&#xff1a;注释、变量 R语言基础(二)&#xff1a;常用函数 4.运算 4.1 数学运算 R语言中支持加减乘除四则运算、乘方运算、求余数(取模)运算&#xff1a; 符号含义示例加法11 结果是2-减法2-1 结果是1*乘法4*5 结果是20/除法4/5 结果是0.8%/%整除(只要…

MySQL 事务隔离

MySQL 事务隔离事务隔离实现事务的启动ACID : 原子(Atomicity)、一致(Consistency)、隔离(Isolation)、永久(Durability) 多个事务可能出现问题 : 脏读 (dirty read) , 不可重复读 (non-repeatable read) , 幻读 (phantom read) 事务隔离级别 : 读未提交 (read uncommitted)…

一篇学习ES

文章目录ES简介1.什么是ElasticSearch2.ElasticSearch的使用案例3.ElasticSearch对比SolrElasticSearch环境搭建1. 下载ES压缩包2. 安装ES服务3. 启动ES服务3. 安装ES的图形化界面插件ES术语1.概述2.索引 index3.类型 type4.字段Field5.映射 mapping6.文档 document7. 接近实时…

制造业数字化转型要注重哪些方面?

近年来&#xff0c;制造业企业数字化转型的话题一直处于行业高热位置。中央经济工作会议作出“大力发展数字经济”的部署&#xff0c;工信部提出要深化产业数字化转型&#xff0c;建设一批全球领先的智能工厂、智慧供应链&#xff0c;并向中小企业场景化、标准化复制推广。 随…