flink设置保存点和恢复保存点

news2024/9/28 18:25:47

增加了hdfs

package com.qyt;

 import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * DataStreamSource API使用
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception {


        //TODO 1、获取流的类
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        System.setProperty("HADOOP_USER_NAME", "root");
        env.enableCheckpointing(3000);
        // 配置存储检查点到文件系统
        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop01:9000/flink"));
        env.getCheckpointConfig().setCheckpointTimeout(2000l);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //TODO 2、获取无界流
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("192.168.1.10", 9000, "\n");

         //TODO 3 ETL
        //TODO 3.1 转换成二元数组,简单ETL的过程
        SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
                    out.collect(tuple2);
                }
            }
        }).uid("etl");

        //TODO 3.1 分组
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 3.2 聚合计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);

        //TODO 4、打印
        sum.print();

        //TODO 5、无界流需要这个不断执行的方法
        env.execute();
    }
}

要增加hadoop客户端的使用

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
         <scope>provided</scope>
     </dependency>

      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.4</version>
         </dependency>
    </dependencies>


    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
</project>

提交flink集群

#生成对应的任务
./flink run -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
# 恢复上一次保存点,bc5fae2e282247486003ed259f2f37a7为jobID
./flink run -s hdfs://hadoop01:9000/flink/bc5fae2e282247486003ed259f2f37a7/chk-33 -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar

查看对应的jobId
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2174393.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++友元和运算符重载

目录 一. 友元 friend 1.1 概念 1.2 友元函数 1.3 友元类 1.4 友元成员函数 二. 运算符重载 2.1 概念 2.2成员函数运算符重载 2.3 成员函数运算符重载 2.4 特殊运算符重载 2.4.1 赋值运算符重载 2.4.2 类型转换运算符重载 2.5 注意事项 三、std::string 字符串类…

什么是期望最大化算法?

一、期望最大化算法 期望最大化&#xff08;EM&#xff09;算法是一种在统计学和机器学习中广泛使用的迭代方法&#xff0c;它特别适用于含有隐变量的概率模型参数估计问题。在统计学和机器学习中&#xff0c;有很多不同的模型&#xff0c;例如高斯混合模型&#xff08;GMM&…

NSSCTF [HNCTF 2022 Week1]超级签到

查看主函数 看到遍历 Str2&#xff0c;如果字符为 o&#xff0c;则替换为 0 int __fastcall main_0(int argc, const char **argv, const char **envp) {char *v3; // 指向 v7 的指针__int64 i; // 循环计数器size_t v5; // 存储 Str2 的长度char v7; // 存储输入字符int j; …

如何快速自定义一个Spring Boot Starter!!

目录 引言&#xff1a; 一. 我们先创建一个starter模块 二. 创建一个自动配置类 三. 测试启动 引言&#xff1a; 在我们项目中&#xff0c;可能经常用到别人的第三方依赖&#xff0c;又是引入依赖&#xff0c;又要自定义配置&#xff0c;非常繁琐&#xff0c;当我们另一个项…

mysql8.0安装后没有my.ini

今天安装mysql后想改一下配置文件看了一下安装路径 C:\Program Files\MySQL\MySQL Server 8.0 发现根本没有这个文件查看隐藏文件也没用查了之后才知道换地方了和原来的5.7不一样 新地址是C:\ProgramData\MySQL\MySQL Server 8.0 文件也是隐藏的记得改一下配置

【Redis 源码】7RDB持久化

1 功能说明 RDB (Redis Database Backup) 是 Redis 的一种持久化方式&#xff0c;它通过将某一时刻的内存快照&#xff08;snapshot&#xff09;以二进制格式保存到磁盘上。这种持久化方式提供了高性能和紧凑的数据存储&#xff0c;但相对于 AOF (Append Only File) 来说&…

充电桩安装-理想充电桩如何安装全流程-从准备到材料准备全流程

充电桩安装 Willya 2023年3月6日 新能源车出行成本低&#xff0c;那肯定是要在便利的条件下&#xff0c;得有自己的充电桩才行&#xff0c;实在安装不了自己的充电桩&#xff0c;那也要保证居住周边有充足的充电站&#xff0c;这样才能保证用车的便捷。 理想汽车充电桩安装一般…

智能化转型新篇章:EasyCVR引领大型连锁超市视频监控进入AI时代

随着科技的飞速发展&#xff0c;视频监控系统在各行各业中的应用日益广泛&#xff0c;大型连锁超市作为人员密集、商品繁多的公共场所&#xff0c;其安全监控显得尤为重要。为了提升超市的安全管理水平、减少损失、保障顾客和员工的安全&#xff0c;引入高效、全面的视频监控系…

胤娲科技:AI界的超级充电宝——忆阻器如何让LLM告别电量焦虑

当AI遇上“记忆橡皮擦”&#xff0c;电量不再是问题&#xff01; 嘿&#xff0c;朋友们&#xff0c;你们是否曾经因为手机电量不足而焦虑得像个无头苍蝇&#xff1f;想象一下&#xff0c;如果这种“电量焦虑”也蔓延到了AI界&#xff0c; 特别是那些聪明绝顶但“耗电如喝水”的…

逃离陷阱:如何巧妙避免机器学习中的过拟合与欠拟合

逃离陷阱&#xff1a;如何巧妙避免机器学习中的过拟合与欠拟合 前言过拟合&#xff1a;定义与识别定义表现原因示例&#xff1a;决策树模型的过拟合 欠拟合&#xff1a;定义与识别定义表现原因示例&#xff1a;线性回归模型的欠拟合 避免过拟合的策略减少模型复杂度使用正则化…

基于nodejs+vue的校园二手物品交易系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

SSM超市售卖管理系统-计算机毕业设计源码23976

目 录 摘要 Abstract 1 绪论 1.1研究的背景和意义 1.2研究内容 1.3论文结构与章节安排 2 开发技术介绍 2.1 SSM框架 2.2 MySQL数据库 3 超市售卖管理系统系统分析 3.1 可行性分析 3.2 系统流程分析 3.2.1 数据流程 3.3.2 业务流程 3.3 系统功能分析 3.3.1 功…

低代码可视化-UniApp二维码可视化-代码生成器

市面上提供了各种各样的二维码组件&#xff0c;做了一简单的uniapp二维码组件&#xff0c;二维码实现依赖davidshimjs/qrcodejs。 组件特点 跨浏览器支持&#xff1a;利用Canvas元素实现二维码的跨浏览器兼容性&#xff0c;兼容微信小程序、h5、app。 无依赖性&#xff1a;QR…

留学生如何适应海外生活以及应对文化差异

对于即将出国学习和生活的留学生来说&#xff0c;文化差异和生活方式的变化常常是一个紧迫的问题。那么&#xff0c;如何应对这些文化差异&#xff0c;以及如何适应新的学习环境和社交生活呢&#xff1f;本文将分享一些具体可行的建议和方法&#xff0c;助您顺利跨越这道难关&a…

数据结构:队列及其应用

队列&#xff08;Queue&#xff09;是一种特殊的线性表&#xff0c;它的主要特点是先进先出&#xff08;First In First Out&#xff0c;FIFO&#xff09;。队列只允许在一端&#xff08;队尾&#xff09;进行插入操作&#xff0c;而在另一端&#xff08;队头&#xff09;进行删…

Hadoop三大组件之YARN(一)

YARN架构与任务提交流程详解 1. YARN的组成架构 YARN&#xff08;Yet Another Resource Negotiator&#xff09;是Hadoop生态系统中的一个重要组成部分&#xff0c;主要用于资源管理和调度。YARN的架构主要由以下几个关键组件构成&#xff1a; 1.1 ResourceManager&#xff…

vue3结合 vue-router和keepalive实现路由跳转保持滚动位置不改变(超级简易清晰)

1.首先我们在路由跳转页面设置keepalive(Seeall是我想实现结果的页面) 2. 想实现结果的页面中如果不是全屏实现滚动而是有单独的标签实现滚动效果

Spring Boot技术栈:打造高效在线商城

2 相关技术 2.1 Springboot框架介绍 Spring Boot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。通过这种方式&#xff0c;Spring…

AI大模型对我国劳动力市场潜在影响研究报告(2024)|附19页PDF文件下载

前言 北京大学国家发展研究院与智联招聘日前联合发布《AI大模型对我国劳动力市场潜在影响研究》。该研究显示&#xff0c;2024年上半年&#xff0c;招聘职位数同比增速前五的人工智能职业&#xff0c;包括大语言模型方面的自然语言处理&#xff08;111%&#xff09;、深度学习…

STM32 RTC实时时钟学习总结

STM32 RTC实时时钟学习总结 写于2024/9/25下午 文章目录 STM32 RTC实时时钟学习总结1. 简介2. 流程框图介绍3. 相关寄存器介绍4. 代码解析 1. 简介 STM32F103 的实时时钟&#xff08;RTC&#xff09;是一个独立的定时器。STM32 的 RTC 模块拥有一组连续计数的计数器&#xff…