flink1.10中三种数据处理方式的连接器说明

news2025/1/18 8:43:29

第一种 Streaming(DataStream API)

 

流式处理的所有的连接器如上图,常用的是kafka、Elasticsearch、Hadoop FileSystem

Kafka连接器 依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

第二种 Batch(DataSet API)

 

 

批处理连接器不常用

第三种 Table API&SQL

 

这是表连接器

 

 

JDBC连接器需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

 

直接运行会报错

错误信息中重要提示:
Caused by: java.lang.IllegalArgumentException: JDBC driver class not found.
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

需要添加jdbc驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

在这个程序中,流环境使用的是org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,java的流环境

package flink_jdbc_mysql

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCAppendTableSink, JDBCInputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment


object Flink_Mysql_Stream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val rowTypeInfo: RowTypeInfo = new RowTypeInfo(
      //和mysql中字段的数据一致
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO
    )
    val jdbc: JDBCInputFormat = JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost/test")
      .setUsername("root")
      .setPassword("root")
      //SELECT COUNT(t.*) from accept_materials m, accept_materials_attachment t WHERE m.materials_id=t.accept_materials_id and m.take_type=2 AND apply_no='';
      .setQuery("select * from e where id = 1")//
      .setRowTypeInfo(rowTypeInfo)
      .finish()

      val datastream: DataStreamSource[Row] = env.createInput(jdbc)

    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost/test")
      .setUsername("root")
      .setPassword("root")
      .setParameterTypes(
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO
      )
      .setQuery("insert into a values(?,?)")
      .build()
    /*
    *     emitDataStream需要传入的是org.apache.flink.streaming,api.datastream.DataStream
    *     而这个地方我传入的是org.apache.flink.streaming.api.scala.DataStreaming
    *     导致一致出现类型匹配错误
    *
    *     原因是创建流式环境时
    *     val env = StreamExecutionEnvironment.getExecutionEnvironment
    *     导入的是scala流环境
    *     import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    *     解决办法
    *     导入flink.streaming.api 流环境
    *     import org.apache.flink.streaming.api.datastream.DataStream
    *
    *     创建flink流环境之后
    *     val datastream: DataStreamSource[Row] = env.createInput(jdbc)
    *     返回值类型是DataStreamSource
    *
    *
    *
    * */

    /*
    *  自定义数据流
    * */

    sink.emitDataStream(datastream)

    env.execute()
  }
}

JDBC连接器导入的依赖包,实际上是java语言开发的,使用

env.createInput(jdbc),这个jdbc是JDBCInputFormat类型的

createInput需要传入的类型是 org.apache.flink.api.common.io.InputForamt

而JDBCInputFormat是InputFormat的子类,所以即使我们使用scala的流式环境,仍然可以使用java的输入流

JDBCInputFormat extends RichInputFormat implements InputFormat

但是使用JDBC连接器以流的方式输出的时候,JDBCAppendOutput类中emitDataStream方法

而方法需要的参数类型DataStream类型是 org.apache.flink.streaming.api.datastream.DataStream,java的流环境

如果我们使用scala创建流环境,使用jdbc连接器以流的方式读取mysql中数据,然后以流的方式输出到mysql中时,则会因为参数类型不匹配导致编译不成功

解决办法就是使用java创建流环境。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022-11-21 mysql列存储引擎-架构实现缺陷梳理-P1

1. 前言 发现和指出问题为了&#xff1a;更好的解决问题和避免问题的再次发生 项目在演进&#xff0c;代码不停地在堆砌。如果代码的质量一直不被重视&#xff0c;代码总是会往越来越混乱的方向演进。当混乱到一定程度之后&#xff0c;量变引起质变&#xff0c;项目的维护成本…

二叉树和堆

二叉树和堆什么是树树的一些专业术语树的表示二叉树的概念什么是二叉树特殊的二叉树二叉树的性质堆的概念堆的表示方式堆的实现堆的初始化及销毁堆的插入堆的删除堆的判空与获取堆顶元素堆的主要应用堆排序利用堆数据结构建堆利用向上调整算法来建堆利用向下调整算法建堆TopK问…

计算机毕业设计——校园二手市场

一.项目介绍 系统包含 普通用户 和 管理员 两种角色 普通用户 浏览其他用户发布的二手物品&#xff0c;修改个人信息、查看订单&#xff08;买的、卖的&#xff09;、 发布闲置、查看我的闲置、查看我的关注、跳转后台、更改用户名等功能 管理员用户登录 操作用户管理、商品管理…

中学物理教学参考杂志社中学物理教学参考编辑部2022年第21期目录

前沿导航_课改在线《中学物理教学参考》投稿&#xff1a;cn7kantougao163.com 问题导向式学习 提升学生思维品质——以“自由落体运动”教学为例 汪欣; 1-2 物理实验探究式复习模式研究——以“机械能守恒定律”为例 王栋; 3-4 初中物理作业设计类型及其实施策略 马…

阿里8年测试经验,手工测试转变为自动化测试,送给正在迷茫的你

前言 随着软件测试技术的发展&#xff0c;人们已经从最初的纯粹的手工测试转变为手工与自动化测试技术相结合的测试方法。近年来&#xff0c;自动化测试越来越受到人们的重视&#xff0c;对于自动化测试的研究也越来越多。 项目版本功能日趋增加&#xff0c;系统模块越来越多…

2009-2013、2018-2020计算机网络考研408真题

文章目录2009年选择综合应用2010年选择综合应用2011年选择综合应用2012年选择综合应用2009年 选择 在OSI参考模型中&#xff0c;自下而上第一个提供端到端服务的层次是……。 A.数据链路层 B.传输层 C.会话层 D.应用层 考查OSI模型中传输层的功能。 传输层提供应用进程间的逻辑…

3-3、python中内置数据类型(集合和字典)

文章目录集合集合的创建集合的特性集合的常用操作增加删除查看练习-对集合的排序frozenset 不可变的集合字典字典的创建字典的特性字典的常用方法查看增加和修改删除遍历字典 &#xff08;for&#xff09;defaultdict默认字典&#xff08;给字典设置默认值&#xff09;内置数据…

echarts+vue实现柱状图分页显示

非常感谢这篇博客&#xff1a;ECharts柱状图分页显示&#xff08;数据循环&#xff09;_s小布丁的博客-CSDN博客 大家好&#xff0c;我是南宫&#xff0c;很久没有更新博客了。 本文参考了这篇博客里面的思路和分割数据的代码&#xff0c;不过我用的是vue页面&#xff0c;后面…

【Kubernetes系列】工作负载资源之Deployment

文章目录概述Deployment用例创建 DeploymentDeployment 状态更新 Deployment回滚 DeploymentDeployment 规约Pod 模板副本选择算符策略进度期限秒数最短就绪时间修订历史限制paused&#xff08;暂停的&#xff09;概述 Deployment 很适合用来管理你的集群上的无状态应用&#…

数据结构--不定长顺序表

1.不定长顺序表 与定长顺序表相比不定长顺序表的区别在于我们可以通过扩容来进行增添元素的存储单元。 2.结构 是对定长顺序表的一种改进&#xff0c;在初始时开辟内存被利用完后&#xff0c;还要继续插入数据时&#xff0c;这时候据需要扩容。故顺序表的结构设计就要发生变…

BHQ-1羧酸是一种黑暗淬灭剂BHQ-1 acid,1190431-95-8

英文名称&#xff1a;BHQ-1 acid 中文名称&#xff1a;BHQ-1酸 丁酸&#xff0c;4-[[4-[2-[2-甲氧基-5-甲基-4-[2-&#xff08;4-甲基-2-硝基苯基&#xff09;二氮基]苯基]二氮基]苯基]甲氨基]- 分子量&#xff1a;504.55 化学式&#xff1a;C26H28N6O5 CAS&#xff1a;119…

8. 微服务之消息队列RabbitMQ以及五种消息队列模式

8.1 同步调用 即客户端向服务端请求做数据处理&#xff0c;客户端需要一直等待服务端处理直到返回结果给客户端 同步调用存在的问题&#xff1a; 耦合度高&#xff1a;每次加入新的需求&#xff0c;都要修改原来的代码性能下降&#xff1a;调用者需要等待服务提供者响应&…

C++模拟OpenGL库——图形学状态机接口封装(二):基于状态机接口的画线画三角形

目录 画线操作 画三角形操作 按区间取点来进行绘制 加入纹理 画线操作 上次我们定义了一系列状态机接口&#xff0c;并遗留了&#xff1a; void Canvas::gtDrawArray(DRAW_MODE _mode, int _first, int _count) 没有去实现&#xff0c;这次对他进行一个实现&#xff0c;…

计算机网络-传输层(TCP协议特点和TCP报文段格式,TCP连接管理)

文章目录1. TCP协议特点&#xff0c;报文段格式2. TCP连接管理1. TCP协议特点&#xff0c;报文段格式 TCP是面向连接&#xff08;虚连接&#xff09;的传输层协议每一条TCP连接只能有两个端点&#xff0c;每一条TCP连接只能是点对点的。TCP提供可靠交付的服务&#xff0c;无差…

代码随想录61——额外题目【数组】:1365有多少小于当前数字的数字、941有效的山脉数组、1207独一无二的出现次数

文章目录1.1365有多少小于当前数字的数字1.1.题目1.2.解答2.941有效的山脉数组2.1.题目2.2.解答3.1207独一无二的出现次数3.1.题目3.2.解答1.1365有多少小于当前数字的数字 参考&#xff1a;代码随想录&#xff0c;1365有多少小于当前数字的数字&#xff1b;力扣题目链接 1.1…

MySQl(六):日志

Mysql&#xff1a;日志日志错误日志二进制日志查询日志慢查询日志事务日志日志 错误日志 Linux 实时查看尾部内容 创建一个错误&#xff0c;进入log去看 二进制日志 查询日志 慢查询日志 事务日志 数据库都具有事务日志&#xff0c;用于记录所有事务以及每个事务对数据库所做…

IAST技术进阶系列(五):共生进化,自适应云原生

伴随着云计算带来的基础设施变革以及应用技术架构的转变&#xff0c;云原生和云原生应用已经成为耳熟能详的词汇。Gartner预测&#xff0c;到2025年&#xff0c;云原生平台将成为95%以上新数字化计划的基础。伴随云原生场景的普及&#xff0c;云原生应用将引领下一个应用时代&a…

day2【代码随想录】移除元素

文章目录一、移除数据元素1、暴力求解2、双指针法3、相向双指针法二、删除有序数组中的重复项三、删除有序数组中的重复项II四、移动零一、移除数据元素 一个数组 nums 和一个值 val&#xff0c;需要原地移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不…

GitHub热榜 这份《亿级流量并发手册》彻底揭开阿里高流量的秘密

我们知道&#xff0c;高并发代表着大流量&#xff0c;高并发系统设计的魅力就在于我们能够凭借自己的聪明才智设计巧妙的方案&#xff0c;从而抵抗巨大流量的冲击&#xff0c;带给用户更好的使用体验。这些方案好似能操纵流量&#xff0c;让流量更加平稳得被系统中的服务和组件…

从零开始学前端:DOM、BOM、焦点事件、键盘事件 --- 今天你学习了吗?(JS:Day20)

从零开始学前端&#xff1a;程序猿小白也可以完全掌握&#xff01;—今天你学习了吗&#xff1f;&#xff08;JS&#xff09; 复习&#xff1a;从零开始学前端&#xff1a;CSSOM视图模式 — 今天你学习了吗&#xff1f;&#xff08;JS&#xff1a;Day19&#xff09; 文章目录从…