Flink将数据写入MySQL(JDBC)

news2025/1/12 6:46:34

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hbase.version>1.4.9</hbase.version>
        <hive.version>2.3.5</hive.version>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>8.0.22</mysql.version>
        <scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( 
      `id` varchar(100) NOT NULL
      ,`ts` bigint(20) DEFAULT NULL
      ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) 
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;


import java.util.Objects;

/**
 * TODO POJO类的特点
 * 类是公有(public)的
 * 有一个无参的构造方法
 * 所有属性都是公有(public)的
 * 所有属性的类型都是可以序列化的
 */
public class WaterSensor {
    //类的公共属性
    public String id;
    public Long ts;
    public Integer vc;

    //无参构造方法
    public WaterSensor() {
        //System.out.println("调用了无参数的构造方法");
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    //生成get和set方法
    public void setId(String id) {
        this.id = id;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public Long getTs() {
        return ts;
    }

    public Integer getVc() {
        return vc;
    }

    //重写toString方法
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }

    //重写equals和hasCode方法
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;

import org.apache.flink.api.common.functions.MapFunction;

public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] datas = value.split(",");
        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
    }
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;

import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * Flink 输出到 MySQL(JDBC)
 */
public class flinkSinkJdbc {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        //TODO Source
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO Transfer
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
        /**TODO 写入 mysql
         * 1、只能用老的 sink 写法
         * 2、JDBCSink 的 4 个参数:
         *   第一个参数: 执行的 sql,一般就是 insert into
         *   第二个参数: 预编译 sql, 对占位符填充值
         *   第三个参数: 执行选项 ---->攒批、重试
         *   第四个参数: 连接选项---->url、用户名、密码
         */
        SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                        System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
                    }
                }
                , JdbcExecutionOptions
                        .builder()
                        .withMaxRetries(3)         // 重试次数
                        .withBatchSize(100)        // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("********")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        //TODO 写入到Mysql
        waterSensorSingleOutputStreamOperator.addSink(sinkFunction);

        streamExecutionEnvironment.execute();
    }
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2000-2021年上市公司内部薪酬差距数据(原始数据+计算代码Stata do文档+计算结果)

2000-2021年上市公司内部薪酬差距数据&#xff08;原始数据计算代码Stata do文档计算结果&#xff09; 1、时间&#xff1a;2000-2021年 2、来源&#xff1a;上市公司年报 3、指标&#xff1a; 原始数据指标&#xff1a;code、year、证券代码、应付职工薪酬、员工人数、支付…

美国IP代理如何获取?适用于哪些场景?

美国代理IP可以是静态&#xff08;不会改变&#xff09;或动态&#xff08;周期性更改&#xff09;&#xff0c;并且可以由专业的代理服务提供商提供。不同的代理IP服务提供商可能提供不同类型的代理&#xff0c;包括数据中心代理、住宅代理和移动代理&#xff0c;以满足不同用…

RT-Thread 7. RT-Thread Studio ENV修改MCU型号

1. 修改MCU型号 2.在ENV界面输入 scons -c scons --dist3. dist下为更新后完整源代码 4.导入RT-Thread Studio 发现GD32F330已经生效了。 5. 自己编写startup_gd32f3x0.S&#xff0c;准确性待验证 ;/* ; * Copyright (c) 2006-2021, RT-Thread Development Team ; * ; * SPD…

Vue组件样式设置,解决样式冲突问题

如果我们在不同的组件内&#xff0c;使用相同的类名&#xff0c;就会发生样式的冲突&#xff0c;使用后引入的组件中的样式&#xff1a; 一、scoped属性 在一个组件的style标签上添加scoped属性&#xff0c;该组件的所有样式都是该组件独有的&#xff0c;即使其他组件中有相同…

详解傅立叶变换,看这一文足矣!

从听到傅立叶变换这个名词后到现在已经四年了&#xff0c;这次终于对傅立叶变换有了一个基本的初步了解。记录一下&#xff0c;这个傅立叶变换也同时记录了我本科到研究生的四年&#xff0c;一路以来跌跌撞撞&#xff0c;没想到最后还是入了图像的坑 数字图像处理——傅立叶变换…

freeRTOS内部机制——创建任务的内部细节

创建任务的两个核心&#xff1a;栈和任务结构体 函数运行过程中的局部变量保存在哪里&#xff1f;他自己的栈中 任务被切换过后&#xff0c;在切换的瞬间&#xff0c;哪些寄存器的值保存在哪里&#xff1f;保存在任务结构体中 在任务创建函数中&#xff0c;会malloc动态分配…

【Javascript】函数隐藏参数ar

function test(a,b,c){console.log(abc);}test(1,2,3);这里的形参有三个分别是a&#xff0c;b&#xff0c;c分别对应实参1&#xff0c;2&#xff0c;3 假如在调用函数的时候多传入几个实参会怎么样&#xff1f; function test(a,b,c){console.log(arguments);console.log(a…

吃瓜教程3|决策树

ID3算法 假定当前样本集合D中第k类样本所占比例为pk&#xff0c;则样本集合D的信息熵定义为 信息增益 C4.5算法 ID3算法存在一个问题&#xff0c;就是偏向于取值数目较多的属性&#xff0c;因此C4.5算法使用了“增益率”&#xff08;gain ratio&#xff09;来选择划分属性 CA…

第四章 文件管理 六、文件的基本操作

目录 一、创建文件(create系统调用) 1、进行Create系统调用时&#xff0c;需要提供的几个主要参数: 2、操作系统在处理Create系统调用时&#xff0c;主要做了两件事: 二、删除文件(delete系统调用) 1、进行Delete系统调用时&#xff0c;需要提供的几个主要参数: 2、操作系…

【20年VIO梳理】

19-20年VIO 梳理 1. 开源代码介绍&#xff1a; DSM2. FMD Stereo SLAM&#xff1a;融合MVG和直接方法&#xff0c;实现准确&#xff0c;快速的双目SLAM3. 基于VINS-Mono开发的SPVIS4. 改进&#xff1a;一种基于光流的动态环境移动机器人定位方案5. PVIO:基于先验平面约束的高效…

第四章 文件管理 五、文件存储空间管理

目录 一、逻辑结构和物理结构的比较 二、空闲表法 1、磁盘中的空闲块表 2、例子 3、如何回收空闲区间 ①回收区的前后都没有相邻空闲区; ②回收区的前后都是空闲区; ③回收区前面是空闲区; ④回收区后面是空闲区; 三、空闲链表法 1、分类 2、空闲盘块链 &#xff…

【顺序栈的表示和实现,顺序栈的初始化,是否为空,清空顺序栈,销毁顺序栈,】

文章目录 一、栈和队列的定义和特点1.1顺序栈的表示和实现1.2顺序栈的基本操作1.2.1顺序栈的初始化1.2.2判断顺序栈是否为空1.2.3清空顺序栈1.2.4销毁顺序栈1.2.5顺序栈的入栈 一、栈和队列的定义和特点 栈和队列是限定插入和删除只能在表的“端点”进行的线性表。 栈是先进后…

【Linux】安装与配置虚拟机及虚拟机服务器坏境配置与连接---超详细教学

一&#xff0c;操作系统介绍 1.1.什么是操作系统 操作系统&#xff08;Operating System&#xff0c;简称OS&#xff09;是一种系统软件&#xff0c;它是计算机硬件和应用软件之间的桥梁。它管理计算机的硬件和软件资源&#xff0c;为应用程序提供接口和服务&#xff0c;并协…

VS搭建32位和64位汇编开发环境

VS搭建32位和64位汇编开发环境 1 VS2017软件安装2 创建汇编工程3 配置X86汇编环境&#xff08;32位&#xff09;4 配置X64汇编环境&#xff08;64位&#xff09;5 调试技巧 本文属于《 X86架构指令基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 VS2017软件安装 安装过…

根据键名解析特定属性的值相关API

mycdev.c #include <linux/init.h> #include <linux/module.h> #include <linux/of.h>struct device_node *node; //解析得到的设备树节点对象指针 struct property *pr; //属性结构体指针 int len; u32 a; u32 b[2]; const char *str; u8 c[6]; static i…

MFI芯片I2C地址转换(写读转7位传入API接口)

是否需要申请加入数字音频系统研究开发交流答疑群(课题组)&#xff1f;可加我微信hezkz17, 本群提供音频技术答疑服务 MFI芯片I2C地址转换(写读转7位传入API接口&#xff09; #define MFI_I2C_CHIP_ADDR 0x10// 芯片写/读 0x20/0x21(写/读) 七位地址 0x10 //zk 使用读地址…

jdbc 对事务的支持

MySQL中默认开启事务自动提交功能&#xff0c;即 每个SQL语句都会自动开启一个事务并提交&#xff0c;如果没有显式地使用COMMIT或者ROLLBACK语句&#xff0c;则所有的修改都将被保存到数据库中。这种情况下&#xff0c;如果某个操作出现错误&#xff0c;就无法回滚事务&#x…

使用Python实现文字的声音播放

winsound 是 Python 的一个内置模块&#xff0c;它提供了访问 Windows 操作系统的声音播放功能的接口。这个模块可以用来播放简单的声音&#xff0c;例如提示音或者短促的音效。 # Author : 小红牛 # 微信公众号&#xff1a;WdPython import win32com.client import winsound#…

【已解决】axios post请求body为字符串

文章目录 现在需要的参数格式正常post请求参数 解决方法代码示例axiosfetch![在这里插入图片描述](https://img-blog.csdnimg.cn/9372f6efae13432896368aa3e25194cf.png) 现在需要的参数格式 正常post请求参数 解决方法 修改 Content-Type 为 text/plain 参数直接给一个字符串…

RT-Thread 8. RT-Thread Studio arm-gcc使用10.2.1编译

1. gcc编译器下载 E:\RT-ThreadStudio\repo\Extract\ToolChain_Support_Packages\ARM\GNU_Tools_for_ARM_Embedded_Processors2. 把5.4.1 改为5.4.11 再“全部构建”&#xff0c;提示错误 3. 把工具链版本改为10.2.1&#xff0c;再“全部构建”