【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql

news2024/11/24 6:32:27

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、自定义Source介绍及示例
    • 1、示例-自定义数据源
      • 1)、java bean
      • 2)、自定义数据源实现
      • 3)、使用自定义的数据源
      • 4)、自定义数据源验证
  • 三、自定义数据源-MySQL
    • 1、maven依赖
    • 2、java bean
    • 3、mysql自定义数据源实现
    • 4、使用mysql自定义的数据源
    • 5、验证
      • 1)、准备mysql环境-建库表
      • 2)、启动TestCustomMySQLSourceDemo.java
      • 3)、往user表中添加数据,并观察应用程序控制台输出


本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、自定义Source介绍及示例

Flink提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

  • SourceFunction,非并行数据源,并行度只能=1
  • RichSourceFunction,多功能非并行数据源,并行度只能=1
  • ParallelSourceFunction,并行数据源,并行度能够>=1
  • RichParallelSourceFunction,多功能并行数据源,并行度能够>=1

1、示例-自定义数据源

本示例展示如何实现自定义数据源,并通过随机数产生数据信息,比较简单。

1)、java bean

如果使用其自定义的数据结构也可以,视情况需要。

package org.datastreamapi.source.custom.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private long clicks;
	private long ranks;
	private Long createTime;
}

2)、自定义数据源实现

继承RichParallelSourceFunction,并源源不断的产生数据,直到自己手动停止其运行或将启停标志flag设置成false。

package org.datastreamapi.source.custom;

import java.util.Random;
import java.util.UUID;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomUserSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;

	// 生产数据
	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		Random random = new Random();
		while (flag) {
			ctx.collect(new User(random.nextInt(100000001), "alanchan" + UUID.randomUUID().toString(), random.nextInt(9000001), random.nextInt(10001), System.currentTimeMillis()));
			Thread.sleep(1000);
		}
	}

	@Override
	public void cancel() {
		flag = false;
	}

}

3)、使用自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomUserSource());

完整的示例如下

package org.datastreamapi.source.custom;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomSourceDemo {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source,设置并行度是看每次产生多少条数据
		DataStream<User> userDS = env.addSource(new CustomUserSource()).setParallelism(2);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

4)、自定义数据源验证

运行TestCustomSourceDemo.java查看运行结果,本示例运行结果如下(每次运行结果均不同,供参考)

7> User(id=14317087, name=alanchan4ea184fd-472d-429c-b1b5-8fc16ba8c373, clicks=1815669, ranks=9648, createTime=1701824325507)
9> User(id=35956856, name=alanchanb715dab7-d3cb-40f3-b9e8-e0ba76d8b998, clicks=8888642, ranks=8143, createTime=1701824325507)
10> User(id=46536055, name=alanchan2aa31683-1af3-4d0b-81b6-c89265ba63f4, clicks=3766665, ranks=3282, createTime=1701824326536)
8> User(id=55937607, name=alanchan2ef54410-eaa4-47ed-8531-478957143051, clicks=5684939, ranks=8006, createTime=1701824326536)
9> User(id=92498863, name=alanchan1a324cdf-7ee6-4c6b-8059-7db01648428d, clicks=7199973, ranks=8005, createTime=1701824327552)
11> User(id=67794502, name=alanchanc73d8f15-abeb-4b86-9809-98094439e25c, clicks=654744, ranks=6799, createTime=1701824327552)

以上就完整的介绍了一个自定义数据源的过程,其中使用场景视情况而定。

三、自定义数据源-MySQL

上述示例中展示了自定义数据源的一种方式,就是自己产生数据或者其他的数据源获取,本示例展示的是以mysql数据源作为flink的数据源的实现。flink本身没有实现mysql数据源,需要自己实现。

1、maven依赖

本示例需要新增mysql的依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
	<!--<version>8.0.20</version> -->
</dependency>

2、java bean

package org.datastreamapi.source.custom.mysql.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}

3、mysql自定义数据源实现

package org.datastreamapi.source.custom.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomMySQLSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;
	private Connection conn = null;
	private PreparedStatement ps = null;
	private ResultSet rs = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
		String sql = "select id,name,pwd,email,age,balance from user";
		ps = conn.prepareStatement(sql);
	}

	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		while (flag) {
			rs = ps.executeQuery();
			while (rs.next()) {
				User user = new User(rs.getInt("id"), rs.getString("name"), rs.getString("pwd"), rs.getString("email"), rs.getInt("age"), rs.getDouble("balance"));
				ctx.collect(user);
			}
			//每5秒查询一次数据库
			Thread.sleep(5000);
		}

	}

	@Override
	public void cancel() {
		flag = false;
	}

}

4、使用mysql自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

完整示例如下:

package org.datastreamapi.source.custom.mysql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomMySQLSourceDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

5、验证

1)、准备mysql环境-建库表

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `age` smallint(6) NULL DEFAULT NULL,
  `balance` double(255, 0) NULL DEFAULT NULL,
  `email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pwd` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2)、启动TestCustomMySQLSourceDemo.java

启动TestCustomMySQLSourceDemo.java程序,并观察控制台输出。

3)、往user表中添加数据,并观察应用程序控制台输出

  • user 表数据
    在这里插入图片描述
  • 程序控制台输出(每5秒输出一次)
2> User(id=1, name=alan, pwd=123, email=alan.chan.chn@163.com, age=18, balance=20.0)
4> User(id=3, name=alanchanchn, pwd=123, email=alan.chan.chn@163.com, age=20, balance=30.0)
6> User(id=5, name=alan_chan_chn, pwd=123, email=alan.chan.chn@163.com, age=20, balance=46.0)
3> User(id=2, name=alanchan, pwd=123, email=alan.chan.chn@163.com, age=19, balance=25.0)
5> User(id=4, name=alan_chan, pwd=123, email=alan.chan.chn@163.com, age=19, balance=36.0)

以上,本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1304238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图解Redis,谈谈Redis的持久化,RDB快照与AOF日志

目录 专栏导读一、RDB持久化1、自动触发2、手动触发3、设置保存条件4、加解密5、RDB持久化优缺点6、哪些情况会触发RDB持久化?二、AOF持久化1、AOF持久化过程2、appendfsync的选项值3、AOF持久化优缺点4、数据恢复顺序和加载流程三、Redis事务</

Leetcode—2962.统计最大元素出现至少 K 次的子数组【中等】

2023每日刷题&#xff08;五十六&#xff09; Leetcode—2962.统计最大元素出现至少 K 次的子数组 滑动窗口算法思想 参考的灵神思路 实现代码 class Solution { public:long long countSubarrays(vector<int>& nums, int k) {int n nums.size();long long ans…

深度学习 Day12——P1实现mnist手写数字识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文章目录 前言1 我的环境2 代码实现与执行结果2.1 前期准备2.1.1 引入库2.1.2 设置GPU&#xff08;如果设备上支持GPU就使用GPU,否则使用C…

【计算机网络】URL概念及组成

目录 一、什么是URL 二、URL格式 示例&#xff1a; 1. Scheme&#xff08;协议&#xff09;&#xff1a; 2. Host&#xff08;主机&#xff09;&#xff1a; 3. Port&#xff08;端口&#xff09;&#xff1a; 4. Path&#xff08;路径&#xff09;&#xff1a; 5. Quer…

容器化升级对服务有哪些影响?

容器技术是近几年计算机领域的热门技术&#xff0c;特别是随着各种云服务的发展&#xff0c;越来越多的服务运行在以 Docker 为代表的容器之内。 本文我们就来分享一下容器化技术相关的知识。 容器化技术简介 相比传统虚拟化技术&#xff0c;容器技术是一种更加轻量级的操作…

如何使用玻璃材质制作3D钻石模型

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 当谈到游戏角色的3D模型风格时&#xff0c;有几种不同的风格&#xf…

Python数值类型(整形、浮点型和复数)及其用法

数值类型是计算机程序最常用的一种类型&#xff0c;既可用于记录各种游戏的分数、游戏角色的生命值、伤害值等&#xff0c;也可记录各种物品的价格、数量等&#xff0c;Python 提供了对各种数值类型的支持&#xff0c;如支持整型、浮点型和复数。 Python整型 Python 3 的整型…

Python入门第6篇(FastApi、uvicorn)

前言 FastApi用来开发webapi&#xff0c;可以定义路由、定义get、post方法等 但是要让浏览器可以访问webapi&#xff0c;还需要用到uvicorn&#xff0c;即web服务器&#xff0c;类似Tomcat、iis这种的 pip安装相关包 FastApi uvicorn 注意&#xff1a;可能一次安装不成功…

二叉树--基础OJ

1.对称二叉树 题目链接&#xff1a;101. 对称二叉树 - 力扣&#xff08;LeetCode&#xff09; 题解&#xff1a; 我们可以用递归的方法去做&#xff1a; 如果两个树互为镜像&#xff08;1.根节点的值相同&#xff0c;2.左子树的值与右子树的值对称&#xff09;则为对称二叉树&a…

win中查看MD5、Linux中查看MD5

win中的MD5计算 1、用GitBash Git Bash Here md5sum.exe 我记得-孙燕姿.mp32、win自带命令 certutil -hashfile 我记得-孙燕姿.mp3 MD5Linux中MD5计算 md5sum 我记得-孙燕姿.mp3

使用paddleocr识别图片文本的一种方案

pdf文本分为两种&#xff0c;一种是标准的pdf格式的文本&#xff0c;这种无需利用ocr识别&#xff0c;另外一种就是图片文本&#xff0c;这种需要进行ocr的识别。 OCR 识别文本和文本区域 ppstructure是paddleocr里面的一个子库&#xff0c;可以识别文档的页眉页脚、正文、标…

多线程 - 学习笔记

前置知识 什么是线程和进程? 进程: 是程序的一次执行,一个在内存中运行的应用程序。每个进程都有自己独立的一块内存空间&#xff0c;一个进程可以有多个线程&#xff0c;比如在Windows系统中&#xff0c;一个运行的xx.exe就是一个进程。 线程: 进程中的一个执行流&#xff0…

我的软考拿证之路

我的软考拿证之路 前言2021年上半年2022年下半年2023年上半年2023年下半年 前言 在2020年8月份&#xff0c;准备参加软考高级项目管理师考试&#xff0c;经历了疫情三年&#xff0c;停考3次&#xff0c;经历了教材更新&#xff0c;经历了考试形式由纸考换机考&#xff0c;共参…

实现TensorBoard可视化网络的参数

前言 最近在做神经网络相关的实验&#xff0c;为了方便神经网络参数是否变化&#xff0c;学习一下TensorBoard可视化网络的参数的方法&#xff0c;这里使用pytorch实现。 实现 当使用PyTorch训练一个简单的神经网络时&#xff0c;可以使用TensorBoardX来可视化网络的参数。以…

机器学习---KNN最近邻算法

1、KNN最近邻算法 K最近邻(k-Nearest Neighbor&#xff0c;KNN)分类算法&#xff0c;是一个理论上比较成熟的方法&#xff0c;也是最简单的机器学习算法之一&#xff0c;有监督算法。该方法的思路是&#xff1a;如果一个样本在特征空间中的k个最相似的样本中的大多数属于某一个…

小米耳机定制音效选项灰色无法开启使用_开启定制音效_音效模式设置

使用环境&#xff1a;Redmi K50 Ultra &#xff0b;MIUI 14.0.11&#xff0b;定制音效选项是灰色的无法开启及音效模式无法选择 定制音效无法开启 音效模式无法选择&#xff08;需下载小米耳机APP才能设置&#xff09;&#xff0c;根据提示解决问题即可 解决方法&#xff1a;关…

ICC2:low power与pg strategy(pg_mesh)

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 用pg_strategy创建power stripe,示例如下: set pd_list {{DEFAULT_VA VDD_DIG VDD_DIG VSS} {PD_DSP VDD_DIG VDD_DSP VSS} } ;#两个电源域,DEFAULT_VA和PD_DSP是对应voltage area名字,其中D…

硬件开发笔记(十六):RK3568底板电路mipi摄像头接口原理图分析、mipi摄像头详解

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/134922307 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

【神行百里】pandas查询加速之行索引篇

最近进行大数据处理的时候&#xff0c;发现我以前常用的pandas查询方法太慢了&#xff0c;太慢了&#xff0c;真是太慢了&#xff0c;查阅资料&#xff0c;遂发现了一种新的加速方法&#xff0c;能助力我飞上天&#xff0c;和太阳肩并肩&#xff0c;所以记录下来。 1. 场景说明…

章鱼网络进展月报 | 2023.11.1-11.30

章鱼网络大事摘要 1、2023年12月&#xff0c;Octopus 2.0 将会正式启动。 2、隐私协议 Secret Network 宣布使用 Octopus Network 构建的 NEAR-IBC 连接 NEAR 生态。 3、Louis 受邀作为嘉宾&#xff0c;在 NEARCON2023 的多链网络主题沙龙中发言&#xff1a;我们依然处于区…