Flink 离线计算

news2024/12/27 9:03:17

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!--表程序规划器和运行时-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );
        // 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");


        // 2、加载数据源到 DataSet
        DataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");

        // 3、将DataSet装换为Table
        Table students = bTableEnv.fromDataSet(csv);
        bTableEnv.registerTable("student", students);

        // 4、注册student表
        Table result = bTableEnv.sqlQuery("select name,age from student");
        result.printSchema();
        DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);
        System.out.println("count-->" + dset.count());
        dset.print();

        // 5、sink输出
        CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
        String[] fieldNames = {"name", "age"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
        bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);
        result.insertInto("CsvOutPutTable");

        env.execute("SQL-Batch");
    }

    @Data
    public static class Student {
        private String name;
        private int age;
    }
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 方式一
        DataSource s = env.createInput(jdbcInputFormat);

        s.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("insert into student values(?, ?)")
                .finish()
        );
        
        // 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );

        env.execute("SQL-Batch");
    }
}

  数据准备:

CREATE TABLE student (
    name STRING,
    age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);

insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>flink.KafkaDemo1</mainClass>
                                </transformer>-->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);

        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
        dataSource.print();

        Table students = bTableEnv.fromDataSet(dataSource);
        bTableEnv.registerTable("student", students);

        Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");
        result.printSchema();

        DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);

        dset.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("insert into student values(?, ?)")
                .finish()
        );

        env.execute("SQL-Batch");
    }
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Navcat连接sqlserver报错:[IM002][Microsoft][ODBC 驱动程序管理器] 未发现数据源名称并且未指定默认驱动程序 (0)

Navcat连接sqlserver报错&#xff1a;[IM002][Microsoft][ODBC 驱动程序管理器] 未发现数据源名称并且未指定默认驱动程序 (0) 原因&#xff1a;navicat没有找到sqlserver驱动 解决&#xff1a;安装sqlserver驱动&#xff0c;下载后双击安装&#xff0c;安装完重新连接就可以了…

IDEA Maven 打包找不到程序包错误或找不到符号,报错“程序包不存在“

参考文章&#xff1a;https://blog.csdn.net/yueeryuanyi/article/details/14211090 问题&#xff1a;IDEA Maven 打包找不到程序包错误或找不到符号,报错“程序包不存在“编译都没问题 解决思路 – >【清除缓存】 1. 强制刷新Maven缓存 选择 Maven 标签&#xff0c;Exe…

微信小游戏/抖音小游戏SDK接入踩坑记录

文章目录 前言问题记录1、用是否存在 wx 这个 API 来判断是微小平台还是抖小平台不生效2、微小支付的参数如何获取?3、iOS 平台不支持虚拟支付怎么办?微小 iOS 端支付时序图:抖小 iOS 端支付:4、展示广告时多次回调 onClose5、在使用单例时 this 引起的 bug6、使用 fetch 或…

vue 2 父组件根据注册事件,控制相关按钮显隐

目标效果 我不注册事件&#xff0c;那么就不显示相关的按钮 注册了事件&#xff0c;才会显示相关内容 实现思路 组件在 mounted 的时候可以拿到父组件注册监听的方法 拿到这个就可以做事情了 mounted() {console.log(this.$listeners, this.$listeners);this.show.search !…

服务熔断-熔断器设计

文章目录 服务为什么需要熔断熔断器设计思想熔断器代码实现 服务为什么需要熔断 对于服务端采用的保护机制为服务限流。 对于服务调用端是否存在保护机制&#xff1f; 假如要发布一个服务 B&#xff0c;而服务 B 又依赖服务 C&#xff0c;当一个服务 A 来调用服务 B 时&#x…

【Maven】依赖冲突如何解决?

准备工作 1、创建一个空工程 maven_dependency_conflict_demo&#xff0c;在 maven_dependency_conflict_demo 创建不同的 Maven 工程模块&#xff0c;用于演示本文的一些点。 什么是依赖冲突&#xff1f; 当引入同一个依赖的多个不同版本时&#xff0c;就会发生依赖冲突。…

代理IP地址的含义与设置指南‌

在数字化时代&#xff0c;互联网已经成为我们日常生活不可或缺的一部分。然而&#xff0c;在享受互联网带来的便利的同时&#xff0c;我们也面临着隐私泄露、访问限制等问题。代理IP地址作为一种有效的网络工具&#xff0c;能够帮助我们解决这些问题。本文将详细介绍代理IP地址…

【MyBatis】验证多级缓存及 Cache Aside 模式的应用

文章目录 前言1. 多级缓存的概念1.1 CPU 多级缓存1.2 MyBatis 多级缓存 2. MyBatis 本地缓存3. MyBatis 全局缓存3.1 MyBatis 全局缓存过期算法3.2 CacheAside 模式 后记MyBatis 提供了缓存切口&#xff0c; 采用 Redis 会引入什么问题&#xff1f;万一遇到需强一致场景&#x…

组播基础实验

当需要同时发给多个接受者或者接收者ip未知时使用组播 一、组播IP地址 1、组播IP地址范围 组播地址属于D类地址&#xff1a;224.0.0.0/4&#xff08;224.0.0.0-239.255.255.255&#xff09; 2、分类 &#xff08;1&#xff09;链路本地地址&#xff08;link-local&#xf…

智慧银行反欺诈大数据管控平台方案(二)

智慧银行反欺诈大数据管控平台建设方案&#xff0c;通过系统性整合多元化数据源&#xff0c;融合先进的大数据处理与机器学习技术&#xff0c;构建一个具备实时性、智能性和高度集成能力的反欺诈系统框架。该方案以提升银行风险管理效率与精度为目标&#xff0c;创新性地采用多…

(免费送源码)计算机毕业设计原创定制:Java+ssm+JSP+Ajax SSM棕榈校园论坛的开发

摘要 随着计算机科学技术的高速发展,计算机成了人们日常生活的必需品&#xff0c;从而也带动了一系列与此相关产业&#xff0c;是人们的生活发生了翻天覆地的变化&#xff0c;而网络化的出现也在改变着人们传统的生活方式&#xff0c;包括工作&#xff0c;学习&#xff0c;社交…

#渗透测试#红蓝攻防#HW#漏洞挖掘#漏洞复现01-笑脸漏洞(vsftpd)

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

手机控制载货汽车一键启动无钥匙进入广泛应用

移动管家载货汽车一键启动无钥匙进入手机控车系统‌&#xff0c; 该系统广泛应用于物流运输、工程作业等货车场景&#xff0c;为车主提供了高效、便捷的启动和熄火解决方案&#xff0c;体现了科技进步对物流行业的积极影响‌ 核心功能‌&#xff1a;简化启动流程&#xff0c;提…

OGRE 3D----5. OGRE和QML事件交互

在现代图形应用程序开发中,OGRE(Object-Oriented Graphics Rendering Engine)作为一个高性能的3D渲染引擎,广泛应用于游戏开发、虚拟现实和仿真等领域。而QML(Qt Modeling Language)则是Qt框架中的一种声明式语言,专注于设计用户界面。将OGRE与QML结合,可以充分利用OGR…

macos下brew安装redis

首先确保已安装brew&#xff0c;接下来搜索资源&#xff0c;在终端输入如下命令&#xff1a; brew search redis 演示如下&#xff1a; 如上看到有redis资源&#xff0c;下面进行安装&#xff0c;执行下面的命令&#xff1a; brew install redis 演示效果如下&#xff1a; …

汽车轮毂结构分析有哪些?国产3D仿真分析实现静力学+模态分析

本文为CAD芯智库原创&#xff0c;未经允许请勿复制、转载&#xff01; 之前分享了如何通过国产三维CAD软件如何实现「汽车/汽配行业产品设计」&#xff0c;兼容NX&#xff08;UG&#xff09;、Creo&#xff08;Proe&#xff09;&#xff0c;轻松降低企业上下游图纸交互成本等。…

Kafka的消费消息是如何传递的?

大家好&#xff0c;我是锋哥。今天分享关于【Kafka的消费消息是如何传递的&#xff1f;】面试题。希望对大家有帮助&#xff1b; Kafka的消费消息是如何传递的&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Kafka中&#xff0c;消息的消费是通过消费…

常见靶场的搭建

漏洞靶场 渗透测试&#xff08;漏洞挖掘&#xff09;切忌纸上谈兵&#xff0c;学习渗透测试&#xff08;漏洞挖掘&#xff09;知识的过程中&#xff0c;我们通常需要一个包含漏洞的测试环境来进行训练。而在非授权情况下&#xff0c;对于网站进行渗透测试攻击&#xff0c;是触及…

PMP–一、二、三模、冲刺–分类–8.质量管理

文章目录 技巧五、质量管理 一模8.质量管理--质量管理计划--质量管理计划包括项目采用的质量标准&#xff0c;到底有没有满足质量需求&#xff0c;看质量标准即可。6、 [单选] 自项目开始以来&#xff0c;作为项目经理同事的职能经理一直公开反对该项目&#xff0c;在讨论项目里…

mini-spring源码分析

IOC模块 关键解释 beanFactory&#xff1a;beanFactory是一个hashMap, key为beanName, Value为 beanDefination beanDefination: BeanDefinitionRegistry&#xff0c;BeanDefinition注册表接口&#xff0c;定义注册BeanDefinition的方法 beanReference&#xff1a;增加Bean…