Flink DataStream之创建执行环境

news2024/10/6 1:44:15

新建project:

  • pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test.wyh</groupId>
    <artifactId>Flink117_Test_01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>
  • Main类(先用批的方式测试)
package test01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class EnvDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");

        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
  • 创建测试文件

  • 运行程序

------------------------------------------------- 

  •  Main类(用批的方式测试)
package test01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class EnvDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");

        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
  • 运行程序

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/698787.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot实现的博客系统(免费)

1.1 项目概述 开发语言&#xff1a;Java8 数据库&#xff1a;MySQL5.7以上版本 前端技术&#xff1a;template模板引擎 后端技术&#xff1a;Springboot SpringMVC MyBaties shiro 数据库连接池&#xff1a;Druid 服务器&#xff1a;Tomcat 开发工具&#xff1a;idea…

如何优雅的实现跨应用的代码共享

在 2020 年上半年&#xff0c;Webpack 发布了一项非常激动人心的特性&#xff1a;Module Federation(译为模块联邦)&#xff0c;这个特性一经推出就获得了业界的广泛关注&#xff0c;甚至被称为前端构建领域的Game Changer。实际上&#xff0c;这项技术确实很好地解决了多应用模…

一步一步指导如何使用 ESP 深度学习在 ESP32-S3 上进行手势识别

在本文中,我们将了解如何使用ESP-DL并在ESP32-S3上部署深度学习模型。文末附免费源代码下载链接 人工智能改变了计算机与现实世界交互的方式。决策是通过将微型低功耗设备和传感器的数据获取到云端来进行的。连接性、高成本和数据隐私是这种方法的一些缺点。边缘人工智…

Character类(Java)

文章目录 1. 介绍2. 分析3. 方法3.1 isDigit()方法 --- isLetter()方法3.2 xxxx()方法3.2 xxxx()方法 1. 介绍 A. 类介绍&#xff1a;   Character 类在对象中包装一个基本类型 char 的值。Character 类型的对象包含类型为 char 的单个字段。 2. 分析 A. 类包结构&#xff1a…

SpringBoot整合redis并使用缓存注解

SpringBoot整合redis并使用缓存注解 直接上代码 添加Redis依赖&#xff0c;在pom.xml文件中添加以下依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> &l…

《找 bug 的活动》VIP 会员免费的视频,PC Web 端无法观看

《找 bug 的活动》VIP 会员免费的视频&#xff0c;PC Web 端无法观看 文章目录 《找 bug 的活动》VIP 会员免费的视频&#xff0c;PC Web 端无法观看问题描述期望 问题描述 CSDN 学习中心的视频课&#xff0c;有部分是 VIP 会员免费的课程&#xff0c;但是会重复跳转到 确认订…

支持CT、彩色超声、内窥镜检查的医院影像PACS系统源码(三维重建技术)

首先&#xff0c;PACS影像存取与传输系统是以实现医学影像数字化存储、诊断为核心任务&#xff0c;从医学影像设备&#xff08;如CT、CR、DR、MR、DSA、RF等&#xff09;获取影像&#xff0c;集中存储、综合管理医学影像及病人相关信息&#xff0c;建立数字化工作流程。 其次&…

思必驰:以对话式语言计算大模型为核心的大模型体系,才是未来!

2023年6月26日&#xff0c;思必驰联合创始人、首席科学家俞凯在第五届全球智博会发表主题演讲《对话式通用人工智能与专业化语言大模型》&#xff0c;他表示大模型是人工智能的新时代&#xff0c;语言大模型、对话式的语言大模型是整个人工智能大模型进一步突破的核心。而专业化…

Unity UGUI Canvas Overlay模式获取屏幕坐标

UGUI Canvas Overlay模式获取屏幕坐标 &#x1f354;效果&#x1f371;获取 &#x1f354;效果 &#x1f371;获取 ui的position就是屏幕坐标(●’◡’●) var screenPos new Vector2(transform.position.x, transform.position.y);

什么是直放站

直放站是无线通信系统中信号向地下空间延伸覆盖设备的总称&#xff0c;直放站分近端机和远端机两部分&#xff0c;近端机通过基站或空间耦合信号并进行选频、滤波处理后变换为光信号&#xff0c;通过光纤传输到地下空间&#xff08;隧道&#xff09;内&#xff0c;由光直放站远…

LeetCode_Day7 | 三数之和、四数之和

LeetCode_哈希表 15.三数之和1.题目描述2.双指针法2.1思路及注意点2.2代码实现 3.哈希法(有待修正)3.1 思路3.2 代码实现 18.四数之和1. 题目描述 15.三数之和 1.题目描述 详情leetcode链接 2.双指针法 2.1思路及注意点 将数组排序&#xff0c;有一层for循环&#xff0c;i从…

uniapp 中 引入vant组件 和 vant 报错Unclosed bracket 的问题解决

在uniapp 中引入vant组件&#xff0c;遇到一个报错&#xff0c;所以在此记录一下完整过程 一、引入vant组件 方式一&#xff1a;前往 GitHub官网 Vant 下载压缩文件&#xff0c;获取下载中的dist 文件 方式二&#xff1a;通过npm install 方式引入 npm i vant/weapp -S --pr…

【Android开发日常】一文弄懂桌面图标快捷菜单 桌面小组件

本文将介绍如何创建和管理应用快捷方式、如何创建和管理应用桌面小组件。 目录 一、桌面菜单1.1 概览1.2、为什么需要桌面图标快捷菜单1.3、如何实现桌面图标快捷菜单1.3.1 创建静态快捷方式1.3.2 创建动态快捷方式1.3.3 创建固定快捷方式1.3.4 使用快捷方式的最佳做法 1.4 注…

DOTA-c(RGDyK)和DOTA-cyclo(RGDyK)对αVβ3的亲和力和选择性的影响

&#xff08;文章编辑来源于&#xff1a;西安凯新生物科技有限公司小编WMJ&#xff09; ●英文名&#xff1a;DOTA-cyclo(RGDyK)&#xff0c;DOTA-c(RGDyK) ●外观以及性质&#xff1a; DOTA-cyclo(RGDyK)中Cyclo(RGDyK)作用于αVβ3比作用于αVβ5和αIIbβ3表现更高的亲和力…

Prometheus 指标存储 观测 dubbo /windows_exporter指标 windows 版本 其他系统换个语法思路一样

目录 下载 Prometheus 访问Prometheus Targets 发现服务 对应的 dubbo 指标就出来了 Dubbo脚手架生成个最简单的项目 导入 Prometheus 相关包 或者使用这个包即可 启动后就自动上报指标了 Windows_exporter or node_exporter 端口 9182 Prometheus 配置 windows_exp…

第37节:cesium 下雪效果(含源码+视频)

结果示例: 完整源码: <template><div class="viewer"><vc-viewer @ready="ready" :logo="false"><!

SpringBoot - 在IDEA中经常发现:Could not autowire. No beans of ‘xxx‘ type found的错误

错误描述 在SPRINGBOOT的项目中&#xff0c;使用IDEA时经常会遇到Could not autowire. No beans of ‘xxxx’ type found的错误提示&#xff0c;但是程序的编译和运行都没有问题&#xff0c;这个错误提示并不影响项目的生产。 解决方案

Unity3D:专属 Inspector

推荐&#xff1a;将 NSDT场景编辑器 加入你的3D工具链 3D工具集&#xff1a; NSDT简石数字孪生 专属 Inspector 专属 Inspector 是专门用于特定游戏对象、Unity 组件或资源的 Inspector 窗口。 它始终显示为其打开的项目的属性&#xff0c;即使您在场景或项目中选择了其他内容…

Flask框架之WTForms(详解)

目录 WTForms介绍和基本使用 WTForms介绍 基本使用 WTForms表单验证的基本使用 WTForms常用验证器 WTForms自定义验证器 场景&#xff1a;验证码实现 WTForms渲染模版 常用的方法 代码示例 WTForms介绍和基本使用 WTForms介绍 这个插件库主要有两个作用。 第一个是做…

十九、socket套接字编程(一)——UDP

文章目录 一、socket套接字编程接口&#xff08;一&#xff09;socket头文件&#xff08;二&#xff09;socket 常见API&#xff08;套接字编程接口&#xff09;1. 创建 socket 文件描述符 (TCP/UDP, 客户端 服务器 )2.绑定网络信息 (TCP/UDP, 服务器 )3.开始监听 socket (TCP…