Flink学习2

news2024/9/20 4:10:00

创建一个无界流

package com.qyt;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
 * DataStreamSource API使用
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        //TODO 1、获取流的类
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2、获取无界流
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("127.0.0.1", 9000, "\n");

         //TODO 3 ETL
        //TODO 3.1 转换成二元数组,简单ETL的过程
        SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
                    out.collect(tuple2);
                }
            }
        });

        //TODO 3.1 分组
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 3.2 聚合计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);

        //TODO 4、打印
        sum.print();

        //TODO 5、无界流需要这个不断执行的方法
        env.execute();
    }
}

maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
         <scope>provided</scope>
     </dependency>
    </dependencies>


    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
</project>

创建好项目后,开始进行打包,打包完后
在这里插入图片描述
将jar包上传上WEBUI后
在这里插入图片描述
可以看到对应的job任务,这个时候选中view taskmanage log
在这里插入图片描述
就可以查看到输出的结果了

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2142991.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《微信小程序实战(2) · 组件封装》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

138、Java内部类源码

01.代码如下&#xff1a; package TIANPAN;class Outer { // 外部类private String msg "Hello World !";class Inner { // 定义一个内部类private String info "世界&#xff0c;你好&#xff0…

【深度分析】OpenAI o1是最强的推理模型,却不是最强模型!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…

磁盘写操作压力测试工具的设计与实现

磁盘写操作压力测试工具的设计与实现 1. 设计概述2. 关键技术点3. 伪代码设计4. C代码实现5. 运行与测试6. 结论在进行磁盘性能评估时,写操作压力测试是不可或缺的一部分。本篇文章将介绍如何使用C语言结合系统调用,设计并实现一个针对磁盘写操作的压力测试工具。这个工具将模…

【设计模式-桥接】

定义 桥接模式&#xff08;Bridge Pattern&#xff09;是一种结构型设计模式&#xff0c;它通过将抽象部分与实现部分分离&#xff0c;使它们都可以独立地变化。桥接模式的关键在于将类的抽象部分与其实现部分解耦&#xff0c;以便两者可以独立地变化。这种设计模式的一个主要…

湖北产教融合教育研究院成功协办武汉工程大学2024年同等学力申硕开学典礼

9月7日&#xff0c;武汉工程大学&#xff08;流芳校区&#xff09;教育教学综合楼102报告厅内庄严肃穆&#xff0c;近百位怀揣梦想、追求卓越的学子与校领导、教师代表汇聚一堂&#xff0c;共同迎接“乘风破浪 逐光前行”武汉工程大学2024年同等学力申请硕士学位人员开学典礼的…

【觅图网-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

搜索二叉树的认识以及底层实现

如果说到对一个数组进行查找相应的数据&#xff0c;要求效率最高&#xff0c;大家会想到什么方式呢&#xff1f;二分查找&#xff1f;二分查找的效率确实很高&#xff0c;时间复杂度为O(logN)。但是如果我们想要在数组当中添加新的数据呢&#xff1f;加上这一功能之后二分查找的…

KVM创建的虚拟机无法访问外网

基础环境如下&#xff1a; [rootlocalhost ~]# virsh domifaddr CentOS7_YFName MAC address Protocol Address -------------------------------------------------------------------------------vnet0 52:54:00:cb:a6:0d ipv4 192.168.…

后台数据管理系统 - 项目架构设计-Vue3+axios+Element-plus(0917)

十一、登录注册页面 [element-plus 表单 & 表单校验] 注册登录 静态结构 & 基本切换 安装 element-plus 图标库 pnpm i element-plus/icons-vue静态结构准备 <script setup> import { User, Lock } from element-plus/icons-vue import { ref } from vue cons…

P2865 [USACO06NOV] Roadblocks G

*原题链接* 次短路模版题 在刚学最短路时&#xff0c;我做过这道题集合位置&#xff0c;那时博客上写的是枚举删除最短路上的边&#xff0c;然后求解。不过这种做法最坏时间复杂度可以有&#xff0c;对于这道题数据范围较大&#xff0c;所以可以用更好写&#xff0c;思维难度…

Linux学习记录十四----------线程的创建和回收

文章目录 五、Linux线程1.守护进程1.1.守护进程的特点1.2.进程组1.3会话1.4创建守护进程模型 2.线程的概念3.线程的创建及相关函数3.1.创建线程‐‐pthread_create3.2.单个线程退出 --pthread_exit3.3.阻塞等待线程退出&#xff0c;获取线程退出状态--pthread_join3.4.线程分离…

python怎么运行cmd命令

使用os.system(“cmd”) 该方法在调用完shell脚本后,返回一个16位的二进制数,低位为杀死所调用脚本的信号号码,高位为脚本的退出状态码,即脚本中“exit 1”的代码执行后,os.system函数返回值的高位数则是1,如果低位数是0的情况下,则函数的返回值是0100,换算为10进制得到256。 …

JavaScript web API完结篇---多案例

BOM window对象 >包含docment Browser Object Model 定时器–延时函数 之前学的是间歇函数 让代码延迟执行 仅执行一次 setTimeout(回调函数&#xff0c;等待毫秒数) 消除 clearTimeout(timer) > 用于递归时需要进行去除 JS执行机制 单线程 > 一个任务结束&…

ROS组合导航笔记2:使用外部定位系统

在上一单元中&#xff0c;我们了解了如何合并不同传感器的数据以生成机器人的姿势估计。因此&#xff0c;基本上&#xff0c;我们介绍了图表的以下部分&#xff0c;其中向 robot_localization 节点提供了不同的传感器&#xff0c;以便通过卡尔曼滤波器进行合并。 但是...图表的…

【浅水模型MATLAB】尝试复刻SCI论文中的溃坝流算例

【浅水模型MATLAB】尝试复刻SCI论文中的溃坝流算例 前言问题描述控制方程及数值方法浅水方程及其数值计算方法边界条件的实现 代码框架与关键代码模拟结果 更新于2024年9月17日 前言 这篇博客算是学习浅水方程&#xff0c;并利用MATLAB复刻Liang (2004)1中溃坝流算例的一个记录…

【FreeRL】Rainbow_DQN的实现和测试

文章目录 前言环境1 PER note2 C51 note3 Noisy note4 Rainbow note其他 前言 具体代码实现见&#xff1a;https://github.com/wild-firefox/FreeRL/blob/main/DQN_file/DQN_with_tricks.py 将其中所有的trick都用上即为Rainbow_DQN。 效果如下&#xff1a;&#xff08;学习曲…

word文档的写入(1)

Word文档的写入 我们手动复制Excel信息&#xff0c;再粘贴进Word&#xff0c;进行文件保存的整个操作。属于机械性的重复劳动&#xff0c;并不能带来太大价值。在Excel和Word的操作内&#xff0c;也没有能很好解决此类问题的方法。如果遇到信息一多&#xff0c;几十上百个文件&…

Win11小技巧之调节音量

无意中发现&#xff0c;鼠标悬停在喇叭&#x1f508;处可通过滚轮调节音量&#xff0c;无需每次都点开音量面板&#xff0c;再悬停在音量滚动条处通过滚轮调节&#xff01;&#xff08;设计师……怎么不早告诉我……&#xff09; 不用点开&#xff0c;之前一直都是这么调节音量…

c++—多态【万字】【多态的原理】【重写的深入学习】【各种继承关系下的虚表查看】

目录 C—多态1.多态的概念2.多态的定义及实现2.1多态的构成条件2.2虚函数的重写2.2.1虚函数重写的两个例外&#xff1a;2.2.1.1协变2.2.1.2析构函数的重写 2.3 c11的override和final2.3.1final2.3.2override 2.4 重载、重写、重定义的对比 3.抽象类3.1抽象类的概念3.2接口继承和…