大数据-玩转数据-Flink 自定义Sink(Mysql)

news2024/11/26 17:50:12

一、说明

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表

CREATE TABLE `sensor` (
  `id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 导入驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

三、编写程序

package com.lyh.flink06;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkMysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
        KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer value) throws Exception {
                return value.intValue();
            }
        });
        keyedStream.addSink(new MysqlSink());
        env.execute();

    }
    public static class  MysqlSink extends RichSinkFunction<Integer>{

        private Connection sunbo;

        @Override
        public void open(Configuration parameters) throws Exception {
            Class.forName("com.mysql.cj.jdbc.Driver");
            sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");

        }

        @Override
        public void close() throws Exception {
            if (sunbo != null) {
                sunbo.close();
            }
        }

        @Override
        public void invoke(Integer value, Context context) throws Exception {
            String sql = "insert into sensor(id)values(?)";
            PreparedStatement ps = sunbo.prepareStatement(sql);
            ps.setInt(1,value.intValue());
            ps.execute();
            ps.close();

        }
    }
}

四、运行测试

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872565.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用日志来监控应用

根据提取规则运行的位置可以分为两类做法&#xff0c;一个是在中心端&#xff0c;一个是在日志端。 中心端就是把要处理的所有机器的日志都统一传到中心&#xff0c;比如通过 Kafka 传输&#xff0c;最终落到 Elasticsearch&#xff0c;指标提取规则可以作为流计算任务插到 Ka…

3.解构赋值

解构赋值是一种快速为变量赋值的简洁语法&#xff0c;本质上仍然是为变量赋值。 3.1数组解构 数组解构是 将数组的单元值快速批量赋值给一系列变量 的简洁语法 1.基本语法: &#xff08;1&#xff09;赋值运算符左侧的[ ]用于批量声明变量&#xff0c;右侧数组的单元值将被赋…

免费开源的多种人工智能项目,比如:训练一个模型,让人工智能玩王者荣耀

免费开源的多种人工智能项目&#xff0c;比如&#xff1a;训练一个模型&#xff0c;让人工智能玩王者荣耀。 全文大纲 PULSE - 该开源项目可以通过给图片增加像素点来实现去马赛克或高清化。 Depix - 给打了马赛克的文字去码。 TecoGAN - 给视频去马赛克或者进行超分辨率。 Sk…

python -- 函数闭包

1. LEGB规则 L: local 是局部作用域 E: Enclosed 是嵌套函数的外层函数作用域 G: Global 全局作用域 B:Build-In 内置作用域 变量的使用权重&#xff1a;局部变量 > 外层作用域变量 > 全局变量 > 内置变量 下面代码执行后&#xff0c;x变量的值分别为多少&#xff1…

【JavaEE基础学习打卡03】Java EE 平台有哪些内容?

目录 前言一、Java EE平台说明二、Java EE平台容器及组件1.平台容器2.平台组件 三、JavaEE平台API服务1.API服务概览2.平台API 总结 前言 &#x1f4dc; 本系列教程适用于Java Web初学者、爱好者&#xff0c;小白白。我们的天赋并不高&#xff0c;可贵在努力&#xff0c;坚持不…

Opencv特征检测之ORB算法原理及应用详解

Opencv特征检测之ORB算法原理及应用详解 特征是图像信息的另一种数字表达形式。一组好的特征对于在指定 任务上的最终表现至关重要。视觉里程 &#xff08;VO&#xff09; 的主要问题是如何根据图像特征来估计相机运动。但是&#xff0c;整幅图像用来计算分析通常比较耗时&…

算法通过村第三关-数组基础笔记|爱不起的数组

文章目录 前言线性表的概念什么是线性表从语言实现的角度看从存储的角度看从访问限制的角度看从扩容的角度看数组的概念数组元素的特征 数组的基本操作数组的创建和初始化查找一个元素增加一个元素删除一个元素 总结 前言 提示&#xff1a;孩子们有时候挺伤人的&#xff0c;他…

两个数组的交集-C语言/Java

描述 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序。&#xff08;1 < nums1.length, nums2.length < 1000&#xff0c;0 < nums1[i], nums2[i] < 1000&#xff09; 示例1 输入…

Linux源码剖析匿名共享内存shmem原理

如下问题如果都清楚了就不用看本文了&#xff1a; 1. shmem ram文件系统的初始化流程是怎样的 2. shmem思想上想复用基于文件的操作流程&#xff0c;实现上shmem也引入了一个文件&#xff0c;那么类似文件open会生成struct file&#xff0c;shmem的struct file怎么生成的 3.…

C语言 棱形图案

目录 一、问题分析 上部分&#xff1a; 下部分&#xff1a; 二、代码演示 一、问题分析 如上图所示&#xff0c;我们可以将棱形进行拆解&#xff0c;分为上下两个部分。 上部分&#xff1a; 通过观察&#xff0c;我们得到 单边空格数 上半部分总行数 - 行数 - 1 …

graphab 教程 ——安装

graphab 软件致力于从图论的框架对生态网络进行建模。Graphab是基于图论原理建立生态网络模型的软件,它可以实现景观组分可视化、连通性分析等,且易于与地理信息系统兼容。Graphab 是基于Java平台开发的,可直接在 Windows、Linux,Mac等操作系统中运行,界面友好且易于使用。Grap…

HCIP学习--BGP实验

一、实验拓扑 二、实验需求 除R5的5.5.5.0环回外&#xff0c;其他所有的环回均可互相访问 三、实验步骤 首先配置IP&#xff0c;配置好IBGP 建立直连的EBGP邻居关系 R1和R2建立直连的EBGP邻居关系 [r1]bgp 1 [r1-bgp]router-id 1.1.1.1 [r1-bgp]peer 12.1.1.2 as-number …

MyBatis插件开发

目录 一、项目简单搭建二 、一个接口了、两大注解、四大对象三、脱敏插件开发 一、项目简单搭建 demo结构&#xff0c;已经搭建了无数次了&#xff0c;懒的粘贴了 o(╥﹏╥)o pom文件 <dependency><groupId>org.springframework.boot</groupId><artifa…

AIGC商用实例—大模型技术助力AI测谎仪,实现视频通话实施测谎!

大家好&#xff0c;我是千寻哥&#xff0c;最近一段时间&#xff0c;给大家分享了不少的AI绘画相关的项目教程&#xff0c;很多星友都反映真的不错&#xff0c;我自己也是感觉很有意义&#xff01; 哈哈哈&#xff0c;今天我在看到了一个项目柑感觉是一个不错的idea&#xff0c…

下一代深度学习的思考与若干问题

下一代深度学习的思考和若干问题

OpenCV基本操作——图像的基础操作

目录 图像的IO操作读取图像显示图像保存图像 绘制几何图形绘制直线绘制圆形绘制矩形向图像中添加文字效果展示 获取并修改图像中的像素点获取图像的属性图像通道的拆分与合并色彩空间的改变 图像的IO操作 读取图像 cv2.imread()import numpy as np import cv2 imgcv2.imread(…

Postman: 前端必备工具还是后端独享利器

目录 Postman 的使用场景&#xff1a;适用于前端和后端 Postman 适用于前端的场景 Postman 适用于后端的场景 结论 Postman 的使用场景&#xff1a;适用于前端和后端 Postman 是一个流行的 API 测试与开发工具。它被广泛地应用在前后端开发的过程中&#xff0c;但是很多人…

SCAU操作系统知识点之(八)虚拟内存

1、虚拟地址概念&#xff0c;实地址概念 实存储器&#xff08;实存&#xff09;&#xff1a;内存 虚存储器&#xff08;虚存&#xff09;&#xff1a;磁盘 虚拟地址&#xff1a;在虚拟内存中分配给某一位置的地址&#xff0c;它使得该位置可被访问&#xff0c;就好像是主内的一…

机器学习---对数几率回归

1. 逻辑回归 逻辑回归&#xff08;Logistic Regression&#xff09;的模型是一个非线性模型&#xff0c; sigmoid函数&#xff0c;又称逻辑回归函数。但是它本质上又是一个线性回归模型&#xff0c;因为除去sigmoid映射函 数关系&#xff0c;其他的步骤&#xff0c;算法都是…

网络中的一些基本概念整理总结

1.IP地址 是用来定位主机的网络地址,主要是用于标识主机和其他的一些网络设备. 比如路由器是用点分十进制来表示的 2.端口号 用于标识网络协议中不同的服务或应用程序。 3.协议 这里主要说网络协议,是网络通信时,所有经过的网络设备都必须遵守的一套规定,包含怎么建立连接…