[Flink]wordcount

news2024/10/5 20:24:26

一、有界流

1、代码

package wc;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        //TODO 1.创建流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //TODO 2.读取文件
        DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");
        //TODO 3.处理数据:切分、转换、分组、求和
        //flatMap方法的参数是一个接口,该接口需要重写flatMap方法
        //这里使用的是匿名实现类
        //value为读入的每条数据的,数据类型
        //out为采集器,用来返回数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1); //将每个单词转换成2元组
                    out.collect(wordAndOne);//使用Collector向下游发送数据
                }
            }
        });

        //TODO 4.按照word分组
        //new KeySelector<Tuple2<String, Integer>, String> 第一个类型指的是传入的数据的类型,第二个类型指的是key的数据类型
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 5.聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        //TODO 6.打印
        sumDS.print();

        //TODO 7.执行
        env.execute(); //默认核数为电脑的所有核数

    }
}

2、说明

假如接口A,里面有一个方法a()
1)正常写法:定义一个class B,去实现接口A,并且实现它的方法a()
B b=new B()
2)匿名实现类写法

new A(){
  实现a(){ }
}

二、无界流

1、代码

package wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //TODO 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2.读取数据:socket
        DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop1",7777);

        //TODO 3.处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDataStream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = value.split("\\s+");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }})
                .returns(Types.TUPLE(Types.STRING, Types.INT)) //存在泛型擦除的问题,需要指定flatmap
                .keyBy((value) -> value.f0)
                .sum(1);       //value:只有一个参数的时候,类型可以不写

        //TODO 4.打印
        sum.print();

        //TODO 5.启动执行
        env.execute(); //默认核数为电脑的所有核数
    }
}

2、在hadoop上启动

nc -lk 7777

3、报错

 1)报错原因:泛型擦除

没有指定Collector的类型

2)解决方法:增加returns方法,指定Collector的类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/698859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【UIAutomator2相关问题】UIAutomator2初始化无法自动安装ATX插件的解决方法

UIAutomator2初始化无法自动安装ATX插件的解决方法.md 文章目录 UIAutomator2初始化无法自动安装ATX插件的解决方法.md安装步骤第一步 安装 uiautomator-server第二步 安装 atx-agent第三步 点击ATX软件测试 总结 安装步骤 第一步 安装 uiautomator-server 在链接https://git…

提升搜索引擎效率:使用 Elasticsearch 别名

​ ​Elasticsearch 因其强大的搜索能力而备受推崇&#xff0c;使其成为构建高性能搜索引擎的热门选择。其中一个关键特性是使用别名&#xff0c;Elasticsearch 的别名为优化搜索操作、提升查询性能以及启用动态索引管理提供了强大的机制。在本文中&#xff0c;我们将探讨如何使…

如何选择最适合企业的自动化测试工具?

在软件开发过程中&#xff0c;测试是不可或缺的环节。而自动化测试工具则能够提高测试效率&#xff0c;减少人力成本&#xff0c;为企业带来更好的收益。但如何选择最适合企业的自动化测试工具呢&#xff1f;下面就为大家介绍一些选择自动化测试工具的方法和要点。 1.明确测试…

新版危废标签解读及制作教程分享

《HJ 1276-2022危险废物识别标志设置技术规范》&#xff08;以下简称为《规范》&#xff09;将于7月1日起正式实施。新的《规范》具体规定了产生、收集、贮存、利用、处置危险废物单位需设置的危险废物识别标志的分类、内容要求、设置要求和制作方法。 为方便企业更好地了解《…

AGRCZO-A-10/315先导比例减压阀放大器

AGRCZO-A-10/50、AGRCZO-A-10/100、AGRCZO-A-10/210/I、AGRCZO-A-10/315、AGRCZO-A-20/50、AGRCZO-A-20/100/U0/PE、AGRCZO-A-20/210、AGRCZO-A-20/315先导式比例减压阀 在比例溢流导阀2与主阀座0中间配有压力过载保护的手调溢流阀3。此阀流量范围比较大&#xff0c;一般用在系…

基于SpringBoot+vue的旅游管理系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

大数据面试题:Zookeeper架构

面试题来源&#xff1a; 《大数据面试题 V4.0》 大数据面试题V3.0&#xff0c;523道题&#xff0c;679页&#xff0c;46w字 可回答&#xff1a; 1&#xff09;说一说Zookeeper中的角色 问过的一些公司&#xff1a;京东提前批(2020.07)&#xff0c;蘑菇街实习(2020.03) 参…

ML@集成学习中结合策略

文章目录 集成学习中结合策略结合策略平均法简单平均法加权平均法 投票法绝对多数投票法MV相对多数投票法PV加权投票法WV小结其他投票法技巧 学习法Stacking伪代码次级训练集的生成&#x1f388; refs更多集成学习相关参考 Skearn中的集成学习引言摘要翻译1翻译2 集成学习中结合…

界面开发框架Qt新手入门教程:如何使用Calendar组件创建日历(二)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本文中的CalendarWi…

基于区块链技术的旅游积分通兑系统设计

基于区块链技术的旅游积分通兑系统设计 贵向泉1,2, 郭志礼1, 杨裔2,3&#xff0c;秦炳峰4 1 兰州理工大学计算机与通信学院&#xff0c;甘肃 兰州 730050 2 兰州大学旅游信息融合处理与数据权属保护文化和旅游部重点实验室&#xff0c;甘肃 兰州 730000 3 兰州大学信息科学与工…

Linux驱动学习(4) MTD字符驱动和块驱动3

系列文章目录 Linux驱动学习&#xff08;4&#xff09; MTD字符驱动和块驱动1 Linux驱动学习&#xff08;4&#xff09; MTD字符驱动和块驱动2 Linux驱动学习&#xff08;4&#xff09; MTD字符驱动和块驱动3 文章目录 目录 系列文章目录 文章目录 前言 二、MTD块设备驱…

NXP i.MX 6ULL工业核心板硬件说明书( ARM Cortex-A7,主频792MHz)

1 硬件资源 创龙科技SOM-TLIMX6U是一款基于NXP i.MX 6ULL的ARM Cortex-A7高性能低功耗处理器设计的低成本工业级核心板&#xff0c;主频792MHz&#xff0c;通过邮票孔连接方式引出Ethernet、UART、CAN、LCD、USB等接口。核心板经过专业的PCB Layout和高低温测试验证&…

一篇文章带你从入门都入土 RocketMQ 消息中间件

目录 一、下载、安装 二、基本演示 2.1 创建项目导入依赖 2.2 生产者发送消息 2.3 消费者消费消息 三、topic 、broker、messageQueue之间的关系 四、普通消息 4.1 普通消息生命周期 4.2 可靠同步发送 4.3 可靠异步发送 4.4 单向发送 五、顺序消息 5.1 如何保证消…

机器学习之朴素贝叶斯(Naive Bayes)

1 朴素贝叶斯算法介绍 朴素贝叶斯是经典的机器学习算法之一&#xff0c;也是为数不多的基于概率论的分类算法。朴素贝叶斯分类器(Naive Bayes Classifier 或 NBC)发源于古典数学理论&#xff0c;有着坚实的数学基础&#xff0c;以及稳定的分类效率&#xff0c;是应用最为广泛的…

python发送邮件,超简单!

1、我的使用场景 最近有几个爬虫定时任务在运行&#xff0c;需要及时知道发生异常&#xff0c;以便于处理&#xff0c;于是&#xff0c;想到了邮件&#xff0c;记录一下。 2、邮箱服务设置 我是专门为这个脚本注册的&#xff0c;如果有&#xff0c;也可以不注册。 注册完&a…

Transformer(四)--实现验证:transformer 机器翻译实践

转载请注明出处&#xff1a;https://blog.csdn.net/nocml/article/details/125711025 本系列传送门&#xff1a; Transformer(一)–论文翻译&#xff1a;Attention Is All You Need 中文版 Transformer(二)–论文理解&#xff1a;transformer 结构详解 Transformer(三)–论文实…

sql server 触发器往链接服务器同步数据的坑

链接服务器无法启动分布式服务 检查数据库是否勾选了一下按钮&#xff0c;双方都要检查 链接服务器属性&#xff0c;需要检查这些地方是否已经设置为true 该伙伴事务管理器已经禁止了它对远程/网络事务的支持 双方启动MSDTC服务 (1)在windows控制面版–>windows 工具->…

Sourcetree: The host key is not cached for this server:

使用Sourcetree 出现提示&#xff1a;The host key is not cached for this server: 工具>选项>一般 确认完成&#xff0c;解决问题。

【嵌入式Qt开发入门】使用 UI 设计器开发程序

本文我们继续学习如何使用 Qt Designer 开发程序&#xff0c;Qt Designer 是属于 Qt Creator 的一个功能而已&#xff0c;大家不要搞混了。Qt Designer 也叫 UI 设计师或者 UI 设计器&#xff0c;这都是指的同一 个东西。下面简单介绍如何使用 UI 设计器开发程序&#xff0c;以…

webassembly如何在js侧使用注册的容器类型

需求&#xff1a;是有个C接口&#xff0c;参数是vector<string>类型&#xff0c;那么如何在js端调用这个接口呢&#xff1f; #include <emscripten/emscripten.h> #include <emscripten/bind.h> #include <vector> #include <string> #include …