Flink自定义Source模拟数据流

news2025/1/14 18:35:46

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zxl</groupId>
    <artifactId>FlinkJoin</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <!--com.mysql.cj.jdbc.Driver-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--这个一定要加,否则会报错(5001好像是,记不清了)-->
        <dependency>
            <groupId>org.glassfish.jersey.inject</groupId>
            <artifactId>jersey-hk2</artifactId>
            <version>2.34</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

实体类

订单类

package com.zxl.bean;


// TODO: 2024/1/6 订单类 


public class Orders {
    //订单ID
    private Long order_id;
    //用户ID
    private Long user_id;
    //订单日期
    private Long order_date;
    //订单金额
    private Integer order_amount;
    //商品ID
    private Integer product_id;
    //订单数量
    private Long order_num;

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public Long getUser_id() {
        return user_id;
    }

    public void setUser_id(Long user_id) {
        this.user_id = user_id;
    }

    public Long getOrder_date() {
        return order_date;
    }

    public void setOrder_date(Long order_date) {
        this.order_date = order_date;
    }

    public Integer getOrder_amount() {
        return order_amount;
    }

    public void setOrder_amount(Integer order_amount) {
        this.order_amount = order_amount;
    }

    public Integer getProduct_id() {
        return product_id;
    }

    public void setProduct_id(Integer product_id) {
        this.product_id = product_id;
    }

    public Long getOrder_num() {
        return order_num;
    }

    public void setOrder_num(Long order_num) {
        this.order_num = order_num;
    }

    public Orders() {
    }

    public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {
        this.order_id = order_id;
        this.user_id = user_id;
        this.order_date = order_date;
        this.order_amount = order_amount;
        this.product_id = product_id;
        this.order_num = order_num;
    }

    @Override
    public String toString() {
        return "Orders{" +
                "order_id=" + order_id +
                ", user_id=" + user_id +
                ", order_date=" + order_date +
                ", order_amount=" + order_amount +
                ", product_id=" + product_id +
                ", order_num=" + order_num +
                '}';
    }
}

支付类

package com.zxl.bean;



// TODO: 2024/1/6 支付类 


public class Payments {
    //支付ID
    private Long payment_id;
    //订单号
    private Long order_id;
    //支付金额
    private Integer payment_amount;
    //支付类型
    private String payment_type;
    //支付日期
    private Long payment_date;

    public Long getPayment_id() {
        return payment_id;
    }

    public void setPayment_id(Long payment_id) {
        this.payment_id = payment_id;
    }

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public Integer getPayment_amount() {
        return payment_amount;
    }

    public void setPayment_amount(Integer payment_amount) {
        this.payment_amount = payment_amount;
    }

    public String getPayment_type() {
        return payment_type;
    }

    public void setPayment_type(String payment_type) {
        this.payment_type = payment_type;
    }

    public Long getPayment_date() {
        return payment_date;
    }

    public void setPayment_date(Long payment_date) {
        this.payment_date = payment_date;
    }

    public Payments() {
    }

    public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {
        this.payment_id = payment_id;
        this.order_id = order_id;
        this.payment_amount = payment_amount;
        this.payment_type = payment_type;
        this.payment_date = payment_date;
    }

    @Override
    public String toString() {
        return "payments{" +
                "payment_id=" + payment_id +
                ", order_id=" + order_id +
                ", payment_amount=" + payment_amount +
                ", payment_type='" + payment_type + '\'' +
                ", payment_date=" + payment_date +
                '}';
    }
}

商品类

用作维表测试

package com.zxl.bean;

// TODO: 2024/1/6 商品类


public class Products {
    //商品ID
    private Integer product_id;
    //商品名称
    private String product_name;
    //商品价格
    private Integer product_price;
    //商品库存
    private Long product_num;
    //商品分类
    private String product_type;

    public Integer getProduct_id() {
        return product_id;
    }

    public void setProduct_id(Integer product_id) {
        this.product_id = product_id;
    }

    public String getProduct_name() {
        return product_name;
    }

    public void setProduct_name(String product_name) {
        this.product_name = product_name;
    }

    public Integer getProduct_price() {
        return product_price;
    }

    public void setProduct_price(Integer product_price) {
        this.product_price = product_price;
    }

    public Long getProduct_num() {
        return product_num;
    }

    public void setProduct_num(Long product_num) {
        this.product_num = product_num;
    }

    public String getProduct_type() {
        return product_type;
    }

    public void setProduct_type(String product_type) {
        this.product_type = product_type;
    }

    public Products() {
    }

    public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {
        this.product_id = product_id;
        this.product_name = product_name;
        this.product_price = product_price;
        this.product_num = product_num;
        this.product_type = product_type;
    }

    @Override
    public String toString() {
        return "products{" +
                "product_id=" + product_id +
                ", product_name='" + product_name + '\'' +
                ", product_price=" + product_price +
                ", product_num=" + product_num +
                ", product_type='" + product_type + '\'' +
                '}';
    }
}

数据生成

订单数据生成

package com.zxl.datas;

import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


import java.util.Random;


public class OrdersData implements SourceFunction<Orders> {

    private static Random random = new Random();

    private static boolean isRunning = true;

    private static Integer num = 0;

    //订单ID
    private static Long getOrder_id() {
        num++;
        long aLong = Long.parseLong(num.toString());
        return aLong;
    }

    //订单日期
    private static Long getOrder_date() {

        return System.currentTimeMillis();
    }

    //用户ID
    private static Long getUser_id() {
        return random.nextLong();
    }

    //订单金额
    private static Integer getOrder_amount() {
        return random.nextInt(100);
    }

    //商品ID
    private static Integer getProduct_id() {

        return random.nextInt(100);
    }

    //订单数量
    private static Long getOrder_num() {
        return random.nextLong();
    }

    //订单类
    private static Orders getOrders() {
        Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());
        return orders;
    }

    @Override
    public void run(SourceContext<Orders> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(getOrders());
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

支付数据生成

package com.zxl.datas;


import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Date;
import java.util.Random;

public class PaymentData implements SourceFunction<Payments> {

    private static Random random = new Random();

    private static boolean isRunning = true;

    private static Integer num = 0;

    //支付ID
    private static Long getPayment_id(){
        return random.nextLong();
    }

    //订单ID
    private static Long getOrder_id() {
        num++;
        long aLong = Long.parseLong(num.toString());
        return aLong;
    }

    //支付金额
    private static Integer getPayment_amount(){
        return random.nextInt(1000);
    }

    //支付类型
    private static String getPayment_type(){
        String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};
        int are= random.nextInt(6);
        String area=type[are];
        return area;
    }

    //支付日期
    private static Long getPayment_date(){
        return System.currentTimeMillis();
    }

    //支付类
    private static Payments getPayments(){
        Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());
        return payments;
    }

    @Override
    public void run(SourceContext<Payments> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(getPayments());
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

测试数据打印

package com.zxl.flink;


import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flinkWorks {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据 
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/6 支付数据 
        DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());
        //打印数据
        paymentsDataStreamSource.print();
        ordersDataStreamSource.print();
        //启动程序
        environment.execute();
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1363132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS3 边框border、outline、box-shadow

1 border 语法&#xff1a;border: width style color 2 outline 语法&#xff1a;outline: width style color 2.1 outline-offet MDN解释&#xff1a;用于设置outline与一个元素边缘或边框之间的间隙 即&#xff1a;设置outline相对border外边缘的偏移&#xff0c;可以为…

Python 全栈体系【四阶】(十一)

第四章 机器学习 机器学习&#xff1a; 传统的机器学习&#xff1a;以算法为核心深度学习&#xff1a;以数据和计算为核心 感知机 perceptron&#xff08;人工神经元&#xff09; 可以做简单的分类任务掀起了第一波 AI 浪潮 感知机不能解决线性不可分问题&#xff0c;浪潮…

RouterOS L2TP安装与配置

申明&#xff1a;本文仅针对国内L2TP/PPTP&#xff0c;适用于国内的游戏加速或学术研究&#xff0c;禁止一切利用该技术的翻墙行为。 1. L2TP介绍 L2TP&#xff08;Layer 2 Tunneling Protocol&#xff09;是一种在计算机网络中广泛使用的隧道协议&#xff0c;它被设计用于通过…

java火车查询管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web火车查询管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql…

2023年12月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C++编程(1~8级)全部真题・点这里 第1题:数的输入和输出 输入一个整数和双精度浮点数,先将浮点数保留2位小数输出,然后输出整数。 时间限制:1000 内存限制:65536 输入 一行两个数,分别为整数N(不超过整型范围),双精度浮点数F,以一个空格分开。 输出 一行两个数,分…

电子学会C/C++编程等级考试2023年12月(二级)真题解析

C/C++编程(1~8级)全部真题・点这里 第1题:统计指定范围里的数 给定一个数的序列S,以及一个区间[L, R], 求序列中介于该区间的数的个数,即序列中大于等于L且小于等于R的数的个数。 时间限制:1000 内存限制:65536 输入 第一行1个整数n,分别表示序列的长度。(0 < n ≤…

使用Enterprise Architect绘制架构图

如何使用Enterprise Architect绘制架构图 之前没有使用过Enterprise Architect软件绘制&#xff0c;目前由于工作需求&#xff0c;需要使用Enterprise Architect绘制一些架构图&#xff0c;现在只使用Enterprise Architect绘制过简单的Flow Chart&#xff0c;想请教一下大神们…

如何批量自定义视频画面尺寸

在视频制作和编辑过程中&#xff0c;对于视频画面尺寸的调整是一项常见的需求。有时候&#xff0c;为了适应不同的播放平台或满足特定的展示需求&#xff0c;我们需要对视频尺寸进行批量调整。那么&#xff0c;如何实现批量自定义视频画面尺寸呢&#xff1f;本文将为您揭示这一…

纳尼??Rabbitmq居然被一个逗号给坑了??

转载说明&#xff1a;如果您喜欢这篇文章并打算转载它&#xff0c;请私信作者取得授权。感谢您喜爱本文&#xff0c;请文明转载&#xff0c;谢谢。 前言 这个问题发生在部署一套新的环境。搭建一个单节点的Rabbitmq&#xff0c;按照小伙伴写的部署文档搭建的。其中搭建步骤和我…

JetCache源码解析——配置加载

JetCache自动化配置加载 JetCache的配置加载主要是在jetcache-autoconfigure模块中完成的&#xff0c;无论是使用内存缓存LinkedHashMap和caffeine&#xff0c;亦或是通过lettuce、redisson和spring-data-redis来操作Redis服务缓存数据&#xff0c;其自动加载配置的操作基本上…

JSON Crack数据可视化工具结合内网穿透实现公网访问

文章目录 1. 在Linux上使用Docker安装JSONCrack2. 安装Cpolar内网穿透工具3. 配置JSON Crack界面公网地址4. 远程访问 JSONCrack 界面5. 固定 JSONCrack公网地址 JSON Crack 是一款免费的开源数据可视化应用程序&#xff0c;能够将 JSON、YAML、XML、CSV 等数据格式可视化为交互…

redis介绍与数据类型(一)

redis介绍与数据类型 1、redis1.1、数据库分类1.2、NoSQL分类1.3、redis简介1.4、redis应用1.5、如何学习redis 2、redis的安装2.1、Windows安装2.2.1、客户端redis管理工具 2.2、Linux安装&#x1f525;2.2.1、redis核心文件2.2.2、启动方式2.2.3、redis桌面客户端1、redis命令…

windows下载官方正版notepad++

一、前言 notepad是一款非常好用的编辑器&#xff0c;简洁、快速、高效。可是很多时候我们想去官网下载时&#xff0c;百度出来的都是一堆第三方下载地址&#xff0c;捆绑流氓软件&#xff0c;要么就是付费&#xff0c;作为一款优秀开源软件&#xff0c;我们必须要知道正确的下…

反射UnityEditor.GameView设置GamePlayMode分辨率

现在很有游戏考虑横屏适配、竖屏适配、阿拉伯语适配&#xff08;横竖屏&#xff09;导致拼界面变得越来越繁琐。 有很多时候需要记录各个控件的状态。 为了减少操作&#xff0c;特意制作了这个工具&#xff0c;点击用x配置可以自动切换到 宽高分辨率&#xff0c;如果当前没有则…

[计算机提升] 通过任务管理器管理任务

4.3 通过任务管理器管理任务 4.3.1 查看任务状态 通过任务管理器可以查看各种打开的任务的状态&#xff0c;比如&#xff0c;任务名称&#xff0c;PID、进程名、任务内存、CPU、磁盘、网络占用情况。 4.3.2 运行新任务 1、打开任务管理器&#xff0c;点击文件运行新任务&a…

解析数据链路层——组帧

组帧是数据链路层的重要功能之一&#xff0c;它将较长的数据分割成较小的帧以便在网络中传输。在本文中&#xff0c;我们将深入探讨组帧的概念、目的以及常见的组帧技术。 组帧是将数据封装成具有一定格式的帧的过程。帧是数据链路层传输的基本单位&#xff0c;它包含了有效数…

stm32引脚输入输出设置寄存器操作汇总

下图时正点原子i2c时使用的宏定义 下面的代码是对PA0-PH15的引进行了穷举法代码&#xff0c;使用的时候只需要拷贝三行相应的引脚即可。 //IO方向设置 #define IIC_SDA PAout(0) //SDA #define SDA_IN() {GPIOA->CRL&0XFFFFFFF0;GPIOA->CRL|(u32)8<<0…

servlet+jdbc+jsp实现登录界面的验证(基于MVC思想)

一、MVC的概念 MVC是模型(Model)和视图(View)以及控制器(Controller)的简写&#xff0c;是一种将数据、界面显示和业务 逻辑进行分离的组织方式&#xff0c;这样在改进界面及用户交互时&#xff0c;不需要重新编写业务逻辑&#xff0c;从而提高了 代码的可维护性。 M&#xf…

第 121 场 LeetCode 双周赛题解

A 大于等于顺序前缀和的最小缺失整数 模拟&#xff1a;先求最长顺序前缀的和 s s s &#xff0c;然后从 s s s 开始找没有出现在 n u m s nums nums 中的最小整数 class Solution { public:int missingInteger(vector<int> &nums) {unordered_set<int> vis(…

嵌入式(四)定时器 | 定时器功能 分类 定时器工作模式 寄存器全介绍

文章目录 1 定时器工作原理2 定时器功能3 定时器分类3.1 定时器13.2 定时器23.3 定时器3和定时器43.4 睡眠定时器3.5 看门狗定时器 4 定时器工作模式4.1 自由运行模式4.2 模模式4.3 正计数/倒计数模式 5 定时器1寄存器5.1 计数寄存器5.2 计数控制寄存器 6 定时器的两种使用方式…