Flink实现数据写入MySQL

news2025/1/18 4:52:19

 先准备一个文件里面数据有:

a, 1547718199, 1000000
b, 1547718200, 1000000
c, 1547718201, 1000000
d, 1547718202, 1000000
e, 1547718203, 1000000
f, 1547718204, 1000000
g, 1547718205, 1000000
h, 1547718210, 1000000
i, 1547718210, 1000000
j, 1547718210, 1000000

 scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

  case class SensorReading(name: String, timestamp: Long, salary: Double)
  object a1 {
    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)
      //数据源
      val dataStream: DataStream[String] = env.readTextFile("D:\\wlf.备份24.1.3\\wlf\\ideaProgram\\bbbbbb\\src\\main\\resources\\salary.txt")
      val stream = dataStream.map(data => {
        val splited = data.split(",")
        SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)
      })
      stream.addSink( new JDBCSink() )
      env.execute("  job")
    }
  }


  class JDBCSink() extends RichSinkFunction[SensorReading]{
    // 定义sql连接、预编译器
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _
    // 初始化,创建连接和预编译语句
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      conn = DriverManager.getConnection("jdbc:mysql://bigdata1:3306/flink?serverTimezone=UTC", "root", "123456")
      insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")
      updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERE name = ?")
    }
    override def invoke(value: SensorReading): Unit = {
      // 执行更新语句
      updateStmt.setString(1, value.name)
      updateStmt.setDouble(2, value.salary)
      updateStmt.execute()
      // 如果update没有查到数据,那么执行插入语句
      if( updateStmt.getUpdateCount == 0 ){
        insertStmt.setString(1, value.name)
        insertStmt.setDouble(2, value.salary)
        insertStmt.execute()
      }
    }
    // 关闭时做清理工作
    override def close(): Unit = {
      insertStmt.close()
      updateStmt.close()
      conn.close()
    }
}

MySQL中查看表 :

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413546.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数学建模-------误差来源以及误差分析

绝对误差:精确值-近似值; 举个例子:从A到B,应该有73千米,但是我们近似成了70千米;从C到D,应该是1373千米,我们近似成了1370千米,如果使用绝对误差,结果都是3…

Docker容器部署OpenCV,打造高效可移植的计算机视觉开发环境

推荐 海鲸AI-ChatGPT4.0国内站点:https://www.atalk-ai.com 前言 在计算机视觉领域,快速部署和测试算法是研究和开发的关键。OpenCV作为一个强大的开源计算机视觉库,广泛应用于各种图像处理和视频分析任务。然而,配置OpenCV环境可…

compose部署tomcat

1.部署tomcat 1.1.下载相关镜像tomcat8.5.20 $ docker pull tomcat:8.5.20 1.2 在/data目录下创建tomcat/webapps目录 mkdir -p /data/tomcat/webapps 注意:这里是准备将宿主机的/data/tomcat/webapps映射到容器的 /usr/…

HDFS的standby节点启动过慢原因分析以及应对策略

HDFS的standby节点启动过慢原因分析以及应对策略 1. NN启动大致流程2. Editlog日志清理策略2.1 为什么需要合并editlog?2.2 什么时候删除editlog? 3. NN启动的日志加载策略4. Standby启动慢应对策略5. 疑问和思考5.1 如何人工阅读editlog文件的内容&…

IDEA jdk版本切换问题

打开 IntelliJ IDEA 的 Project Structure(快捷键通常是 Ctrl Alt Shift S)。 转到 Project Settings > Modules。 选择相应的模块,然后在 Sources 标签页下,查看 Language level 是否设置为 自己需要的jdk版本语言。 接…

YOLOv8训练自己的数据集,通过LabelImg

记录下labelImg标注数据到YOLOv8训练的过程,其中容易遇到labelImg的坑 数据集处理 首先在mydata下创建4个文件夹 images文件夹下存放着所有的图片,包括训练集和测试集等。后续会根据代码进行划分。 json文件夹里存放的是labelImg标注的所有数据。需要注意的是&…

qtcreator使用qwt库

先配置好.pro文件,再去ui界面拖拽控件 ui界面会更改配置,故顺序错一个,就凉了,重来吧 准备:库,库头文件 库文件:路径如下 头文件:路径如下 鼠标->右键 (有些不用勾…

读元宇宙改变一切笔记13_治理与管理

1. 元宇宙的经济价值 1.1. 元宇宙的价值最终将“超过”物理世界 1.2. 人们之所以对低延迟网络进行投资,是因为有一些体验需要元宇宙:同步实时渲染的虚拟世界、AR和云游戏流 1.3. 在大多数情况下,数字经济并不是什么新鲜事 1.3.1. 数字经济…

【算法】北极通讯网络(Kruskal)

题目 北极的某区域共有 n 座村庄,每座村庄的坐标用一对整数 (x,y) 表示。 为了加强联系,决定在村庄之间建立通讯网络,使每两座村庄之间都可以直接或间接通讯。 通讯工具可以是无线电收发机,也可以是卫星设备。 无线电收发机有…

【shell-10】shell实现的各种kafka脚本

kafka-shell工具 背景日志 log一.启动kafka->(start-kafka)二.停止kafka->(stop-kafka)三.创建topic->(create-topic)四.删除topic->(delete-topic)五.获取topic列表->(list-topic)六. 将文件数据 录入到kafka->(file-to-kafka)七.将kafka数据 下载到文件-&g…

Oracle RAC集群日志

文章目录 一、DB日志1、日志所在位置介绍2、知识介绍 二、ASM日志1、日志所在位置介绍2、知识介绍 三、CRS日志1、日志所在位置介绍2、知识介绍 四、RAC相关日志详细总结 一、DB日志 DB日志也就是数据库日志,全称Oracle Database Logs 1、日志所在位置介绍 日志位…

【计算机图形学】实验五 一个简单的交互式绘图系统(实验报告分析+截图+源码)

可以先看一看这篇呀~【计算机图形学】专栏前言-CSDN博客https://blog.csdn.net/m0_55931547/article/details/135863062 目录 一、实验目的 二、实验内容

Transformer and Pretrain Language Models3-6

Pretrain Language Models预训练语言模型 content: language modeling(语言模型知识) pre-trained langue models(PLMs)(预训练的模型整体的一个分类) fine-tuning approaches GPT and BERT(…

银行数据仓库体系实践(3)--数据架构

狭义的数据仓库数据架构用来特指数据分布,广义的数据仓库数据架构还包括数据模型、数据标准和数据治理。即包含相对静态部分如元数据、业务对象数据模型、主数据、共享数据,也包含相对动态部分如数据流转、ETL、整合、访问应用和数据全生命周期管控治理。…

Angular组件(一) 分割面板ShrinkSplitter

Angular组件(一) 分割面板ShrinkSplitter 前言 分割面板在日常开发中经常使用,可将一片区域,分割为可以拖拽整宽度或高度的两部分区域。模仿iview的分割面板组件,用angular实现该功能,支持拖拽和[(ngModel)]双向绑定的方式控制区…

为什么 FPGA 比 CPU 和 GPU 快?

FPGA、GPU 与 CPU——AI 应用的硬件选择 现场可编程门阵列 (FPGA) 为人工智能 (AI) 应用带来许多优势。图形处理单元 (GPU) 和传统中央处理单元 (CPU) 相比如何? 人工智能(AI)一词是指能够以类似于人类的方式做出决策的非人类机器智能。这包…

Excel 2019 for Mac/Win:商务数据分析与处理的终极工具

在当今快节奏的商业环境中,数据分析已经成为一项至关重要的技能。从市场趋势预测到财务报告,再到项目管理,数据无处不在。而作为数据分析的基石,Microsoft Excel 2019 for Mac/Win正是一个强大的工具,帮助用户高效地处…

77 C++对象模型探索。虚函数- 从静态联编,动态联编出发,分析 虚函数调用问题探究

什么叫做单纯的类: 比较简单的类,尤其不包括 虚函数 和虚基类。 什么叫不单纯的类: 从上一章的学习我们知道,在某些情况下,编译器会往类内部增加一些我们看不见但是真实存在的成员变量,例如vptr&#xff…

matlab appdesigner系列-图窗工具2-工具栏

工具栏,就是一般在任意软件界面上方的工具菜单栏 示例:工具菜单绘制正弦函数 操作步骤如下: 1)将坐标区和工具栏拖拽到画布上 2)点击工具栏的号,可以看到可以添加2种工具,按钮工具和切换工具&#xff0c…

【JavaScript权威指南第七版】读书笔记速度

JavaScript权威指南第七版 序正文前言:图中笔记重点知识第1章 JavaScript简介第一章总结 第2章 词法结构注释字面量标识符和保留字Unicode可选的分号第二章总结 第3章 类型、值和变量【重要】原始类型特殊类型第三章总结 第4章 表达式与操作符表达式操作符条件式调用…