Flink 自定义源算子之 读取MySQL

news2024/11/18 9:44:47

1、功能说明:

在Flink 自定义源算子中封装jdbc来读取MySQL中的数据

2、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

自定义Source算子,这里我们继承RichParallelSourceFunction,因为要使用open方法来初始化数据库连接对象

Tips:这种实现方式为可并行算子,当并行度>1时,每个并行任务都会读取相同的数据,使用的时候需要注意

package com.baidu.bean

case class User(id: Long, name: String)
class MysqlSource extends RichParallelSourceFunction[User] {
  // 定义 Connection、PreparedStatement对象
  var connection: Connection = null
  var ps: PreparedStatement = null

  // 函数初始化方法,常用来初始化资源对象,常用来做一次性的设置
  // 当 MysqlSource对象被创建时,调用一次
  override def open(parameters: Configuration): Unit = {
    // 初始化 Connection、PreparedStatement对象
    // 加载数据库驱动
    Class.forName("com.mysql.jdbc.Driver")
    // 获取连接
    connection = DriverManager.getConnection("jdbc:mysql://worker01/flink", "root", "worker123")
    // 读取user表
    ps = connection.prepareStatement("select *  from user")
  }

  override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
    // 执行查询操作,获取查询结果
    val resultSet = ps.executeQuery()
    // 将查询结果封装到user对象
    while (resultSet.next()) {
      val user = User(resultSet.getLong(1),
        resultSet.getString(2))
      ctx.collect(user)
    }
  }

  // 关闭连接资源
  override def cancel(): Unit = {
    connection.close()
    ps.close()
  }
}

使用 MysqlSource 来读取数据(作为有界流来处理):

  test("使用 自定义Source算子,读取mysql数据") {
    // 1. 获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 将 自定义数据源 作为数据源
    val ds: DataStream[User] = env.addSource(new MysqlSource).setParallelism(4)

    // 3. 打印DataStream
    ds.print()

    // 4. 出发程序执行
    env.execute()
  }

执行结果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/700072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Egg.js阿里JS后端框架,可以放心用。

目录 一、快速开始 二、尝试创建一个controll,修改路由,然后检查测试单元。 一、快速开始 npm install -g yarn yarn create egg --typesimple cd egg yarn install yarn devhttp://127.0.0.1:7001 二、尝试创建一个controll,修改路由,然后检查测试单…

PDF怎么转图片?PDF转图片的方法分享!​

PDF怎么转图片呢?相信很多人都会觉得PDF的非常的好用,小编也是被身边很多朋友推荐过后用了这个软件。但很多人在使用的时候有疑问,比如说PDF如何转图片?这难倒不少人,那么今天这篇文章就带你解析PDF怎么转图片&#xf…

LiangGaRy-学习笔记-Day25

1、Apache web相关 1.1、curl命令 作用:用来与服务器之间传输数据的工具 官网:https://curl.se支持很多种协议 语法:curl选项网址 选项: -A:设置代理给服务器-I(大写i):输出返…

Spring Boot 中的 WebMvc 是什么,原理,如何使用

Spring Boot 中的 WebMvc 是什么,原理,如何使用 介绍 在 Spring Boot 中,WebMvc 是非常重要的一个模块。它提供了一系列用于处理 Web 请求的组件和工具。在本文中,我们将介绍 Spring Boot 中的 WebMvc 是什么,其原理…

Python+ddt+Excel实现接口自动化测试生成完美测试报告

接口自动化测试是指通过编写代码或使用工具,模拟用户发送请求,验证接口是否符合设计规范和功能需求的过程。” 如何用 python ddtexcel 实现接口自动化测试 接口自动化测试可以提高测试效率和质量,节省测试成本和时间,保证测试覆…

一步一步学OAK之八:通过OAK相机实现视频帧拼接

帧拼接在有些场景下非常有用,比如将一个较大的帧输入到尺寸较小的神经网络中时。可以将较大的帧拆分成多个较小的帧,并将这些较小的帧输入到神经网络中。 这里我们使用 2 个 ImageManip 将原始预览帧拆分为两个帧。 这里写目录标题 涉及到的节点内容Co…

STM32实战项目—停车计费系统

文章目录 一、任务要求1.1 概述1.2 串口收发1.2.1 串口输出内容1.2.2 串口接收内容 1.3 说明 二、实现思路2.1 指令判别2.1 车辆进入2.2 车辆驶出2.3 费率调整 三、程序设计3.1 串口接收消息处理3.2 车辆驶入处理函数3.3 车辆驶出处理函数3.4 费率调整处理函数 题目原型是第十二…

4-Python如何创建等比数列?【视频版】

目录 问题视频解答 问题 视频解答 点击观看: 4-如何创建等比数列?

windows无法启动RemoteDesktopServices服务(位于本地计算机上)。错误126:找不到指定的模块。

win10的搜索栏输入 注册表编辑器。打开,找到如下路径 计算机\HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\TermService\Parameters 将指定数值项ServiceDll的数值数据改成默认值: %SystemRoot%\System32\termsrv.dll 再重新尝试就好了。 …

Java(七):项目部署

项目部署 运行容器解决Centos8中yum命令遇到的问题打包项目拷贝.jar到容器中安装jdk后台运行.jar后台运行.jar并输入日志实时查看日志查看/杀死运行程序目录结构日志配置 运行容器 $ docker run -d -p 8001:8001 -p 8081:8081 -p 8082:8082 --namelocal_centos --privilegedtr…

DigiCert SSL证书有什么优势?

DigiCert是全球领先的SSL证书颁发机构,2017年收购赛门铁克数字证书业务后,成为全球市场占有率领先的SSL证书提供商,提供高保证TLS/SSL证书和自动化解决方案,也是沃通CA在全球信任数字证书业务方面的重要合作伙伴。 与全球其他品牌…

AI聊天对话工具,让沟通更简单轻松

人工智能技术的发展不断为我们带来新的惊喜和变革,其中之一就是ai聊天对话应用。这种应用利用自然语言处理、机器学习和对话管理等技术,在智能手机、电脑等设备上实现了人机对话,让人们更轻松地与计算机之间进行交流和互动。随着移动互联网的…

基于SpringBoot的电脑商城项目

基于SpringBoot的电脑商城项目 — 参考B站袁庭新老师项目 Maven多聚合项目 技术栈: Spring boot、MyBtis、Bootstrap、Layui,Redis,内网穿透等 功能: 后台:管理员登录、商品信息管理、订单管理、用户信息管理、统计图…

springbootbatis

1、maven <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…

在 Maya、ZBrush、Substance 3D 和 UE5 中创建理发椅

今天瑞云渲染小编给大家带来Kevin J. Coulman 分享的理发椅项目背后的工作流程&#xff0c;详细介绍了如何在 Maya 和 ZBrush 中为道具建模&#xff0c;分享了制作准确材质的技巧&#xff0c;并解释了为什么选择 UE5 进行渲染。 介绍 大家好! 我的名字是Mehdi Benmansour&…

如何搭建和使用minio?保姆级教程

目录 前言搜索镜像找到bitnami/minio镜像拉取镜像查看下载好的镜像创建数据卷目录&#xff0c;并提升权限根据镜像创建一个minio容器参数说明 查看 minio 容器的启动日志查看 minio 容器的信息开放安全组端口访问minio进入首页创建桶设置权限上传文件 前言 如果公司想要自己搭…

ASEMI代理艾赛斯MOS管IXFH4N100Q的性能与应用

编辑-Z 在电子元件领域&#xff0c;MOS管是一种重要的半导体器件&#xff0c;它在电子设备中起着至关重要的作用。今天&#xff0c;我们将重点介绍一款特别的MOS管——IXFH4N100Q&#xff0c;探讨其性能特点和应用领域。 首先&#xff0c;让我们了解一下什么是MOS管。MOS管&am…

Linux--进入一个路径:cd

Linux系统中&#xff0c;磁盘上的文件和目录被组成一棵目录树&#xff0c;每个节点都是目录或文件 cd是change directory的简写 语法&#xff1a; cd 目录名 功能&#xff1a; 改变工作目录。将当前工作目录改变到指定的目录下。 举例&#xff1a; cd .. : 返回上级目录&…

BFS (Java) 广度优先搜索 简单介绍、模板、案例(一)

一. BFS的简单介绍 深度优先搜索DFS和广度优先搜索BFS是经常使用的搜索算法&#xff0c;在各类题目中都有广泛的应用。 深度优先搜索算法&#xff08;英语&#xff1a;Depth-First-Search&#xff0c;DFS&#xff09;是一种用于遍历或搜索树或图的算法。其过程简要来说是对每一…

学习c++第01天

学习c的第01天 前言1、变量是声明&#xff1f;2.建议定义数据都对其进行初始化3.有符号数和无符号数4.进制间的相互转换5.原反补码6.const 、register 、volatile和typedef关键字7.数据类型的自动转换8.左移<< &右移操作>>9.将data的指定位数进行0、1转化的应用…