flinkjar开发 自定义函数

news2024/12/23 17:52:15

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;

public class AESUtil extends ScalarFunction {
    private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";
    private static String KEY_ALGORITHM = "AES";
    private static String key = "AD42F6697B035B75";

//必须有这个方法,在这个方法里实现业务逻辑
    public String eval(String str) {
        return encrypt(str);
    }
    /**
     * 加密
     *
     * @param key
     * @param messBytes
     * @return
     */
    private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {
        if (key != null) {

            Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
            cipher.init(Cipher.ENCRYPT_MODE, key);
            return cipher.doFinal(messBytes);
        }
        return null;
    }

    /**
     * AES(256)解密
     *
     * @param key
     * @param cipherBytes
     * @return
     */
    private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {
        if (key != null) {

            Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);
            cipher.init(Cipher.DECRYPT_MODE, key);
            return cipher.doFinal(cipherBytes);
        }
        return null;
    }


    /**
     * 生成加密秘钥
     *
     * @return
     * @throws NoSuchAlgorithmException
     */
    private static KeyGenerator getKeyGenerator() {
        KeyGenerator keygen = null;
        try {
            keygen = KeyGenerator.getInstance(KEY_ALGORITHM);
            SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);
            secureRandom.setSeed(key.getBytes());
            keygen.init(128, secureRandom);
        } catch (NoSuchAlgorithmException e) {

        }

        return keygen;
    }

    public static String encrypt(String message) {
        try {
            KeyGenerator keygen = getKeyGenerator();
            SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);
            return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) {

        }
        return null;
    }

    public static String decrypt(String ciphertext) {
        try {
            KeyGenerator keygen = getKeyGenerator();
            SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);
            return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);
        } catch (Exception e) {

        }
        return null;
    }
  

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;

public class FlinkMysqlToMysql {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        env.enableCheckpointing(5000);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
        tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'localhost',\n" +
                " 'port' = '3306',\n" +
                " 'database-name' = 'testdb',\n" +
                " 'table-name' = 'flinktest',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'admin'\n" +
                ")");
//这里注册加密函数
        tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密
        Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");
        tEnv.registerTable("sourceTable", result);
          //创建skink表
        tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
                "'connector' = 'jdbc'," +
                "'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
                " 'table-name' = 'flinktest2',\n" +
                " 'username' = 'root',\n" +
                " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                " 'password' = 'admin'\n" +
                ")");
// 执行CDC过程
        String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
        tEnv.executeSql(query).print();
    }
}

运行结果,加密成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1428973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Axure 动态面板初使用 - 实现简单的Banner图轮播效果

使用工具版本 Axure 9 实现的效果 步骤过程 1、打开Axure工具,从元件库拖个动态面板到空白页; 2、给面板设置一个常用的banner尺寸,举个栗子:343151(移动端我常用的banner尺寸),顺便给它起个名字,就叫…

探讨深浅拷贝在js加密中的运用

深浅拷贝是JavaScript中常用的概念,用于复制对象或数组。它们在处理数据时有不同的用途,适用于不同的场景。在本文中,我们将详细介绍深浅拷贝的概念,提供案例代码,并探讨它们在JavaScript中的应用场景,以及…

演讲回顾:如何为大规模研发团队加速CI构建,实现高效流水线

近日,龙智联合Atlassian举办的DevSecOps研讨会年终专场”趋势展望与实战探讨:如何打好DevOps基础、赋能创新”在上海圆满落幕。龙智Atlassian技术与顾问咨询团队,以及清晖、JamaSoftware、CloudBees等生态伙伴的嘉宾发表了主题演讲&#xff0…

Mysql的BufferPool

Mysql的BufferPool Mysql是一个存储数据到磁盘的进程,但是磁盘的速度难以与CPU相比,所以InnoDB存储引擎在处理客户端的请求时,当需要访问某个页的数据时,就会把完整的页的数据全部加载到内存中。将整个页加载到内存中后就可以进行…

C++初阶之类与对象(上)详细解析

个人主页:点我进入主页 专栏分类:C语言初阶 C语言进阶 数据结构初阶 Linux C初阶 欢迎大家点赞,评论,收藏。 一起努力,一起奔赴大厂 目录 一.前言 二.类的定义和使用 2.1类的引入 2.2类的定义和访问限定…

进程中线程使用率偏高问题排查

1. top命令查看CPU使用率高的进程 2. top -H -p 15931 查看进程下的线程 3. printf "%x\n" 17503 线程PID 10进制转16进制 0x445f 4. jstack -l 15931 导出java进程栈信息,里面包含线程nid0x445f和所在的类 第二种: Arthas方式快捷明了 结合代…

069:vue中EventBus的使用方法(图文示例)

第069个 查看专栏目录: VUE ------ element UI 本文章目录 示例背景示例效果图示例源代码父组件:子组件A:子组件B:eventbus/index.js: EventBus的基本使用方法: 示例背景 在Vue中,使用EventBus可以实现组件…

布隆过滤器的概述和使用

1 布隆过滤器概述 1.1 概述 布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是由一个很长的二进制向量(数组)和一系列随机映射函数(hash函数)组成,它不存放数据的明细内容&#xff0…

C语言中的指针详解

大家好,今天给大家介绍C语言中的指针详解,文章末尾附有分享大家一个资料包,差不多150多G。里面学习内容、面经、项目都比较新也比较全!可进群免费领取。 **指针是C语言中的一个重要概念,它提供了一种直接访问内存地址…

【学员分享-考试心得】国产数据库潜力无限,云贝教育OBCP认证培训帮您解难题

近年来,随着国产化转型的推进,国外数据库的岗位需求逐渐减少,让许多IT从业者倍感压力。在这种情况下,了解国产数据库成为了求职市场上的竞争力。云贝老师们将聚焦于OceanBase、PostgreSQL、TDSQL等IT培训,探讨其对国产…

LED点阵屏(基于51单片机)

师从江科大 LED点阵屏 LED点阵屏由若干个独立的LED组成,LED以矩阵的形式排列,以灯珠亮灭来显示文字、图片、视频等。 LED点阵屏分类 按颜色:单色、双色、全彩 按像素:8*8、16*16等(大规模的LED点阵通常由很多个…

SpringBoot实战2

目录 1.如何返回两个类型的数据?User和Booth 2.如何使用MyBatis遍历一个数组进行查询? 3.前端要的数据太多太杂,我们拼接多个List,前端找数据困难,浪费时间。因此我们进行三表联表查询。 1.首先创建一个vo包&#x…

【数据结构】双向链表 超详细 (含:何时用一级指针或二级指针;指针域的指针是否要释放)

目录 一、简介 二. 双链表的实现 1.准备工作及其注意事项 1.1 先创建三个文件 1.2 注意事项:帮助高效记忆 1.3 关于什么时候 用 一级指针接收,什么时候用 二级指针接收? 1.4 释放节点时,要将节点地址 置为NULL&#xff0…

FANUC机器人开机时无法进入系统,示教器黑屏故障处理总结

FANUC机器人开机时无法进入系统,示教器黑屏故障处理总结 故障描述: FANUC机器人开机时,示教器在初始化时显示:EMAC initial call failed(示教器上电时会进入boot画面,左上角会出现一些白色的英文提示&#…

项目安全-----加密算法实现

目录 对称加密算法 AES (ECB模式) AES(CBC 模式)。 非对称加密 对称加密算法 对称加密算法,是使用相同的密钥进行加密和解密。使用对称加密算法来加密双方的通信的话,双方需要先约定一个密钥,加密方才能加密&#…

openGauss学习笔记-212 openGauss 数据库运维-日志参考

文章目录 openGauss学习笔记-212 openGauss 数据库运维-日志参考212.1 日志类型简介212.2 系统日志212.3 操作日志212.4 审计日志212.5 WAL日志212.6 性能日志 openGauss学习笔记-212 openGauss 数据库运维-日志参考 212.1 日志类型简介 在数据库运行过程中,会出现…

Java面试——计网篇

一、基础篇 1、 TCP/IP 网络模型 对于同一台设备上的进程间通信,有很多种方式,比如有管道、消息队列、共享内存、信号等方式,而对于不同设备上的进程间通信,就需要网络通信,而设备是多样性的,所以要兼容多…

RDBMS-MySQL高级

数据操作语句(DML)多表/关联查询Mysql中的函数事务执行流程数据库的备份与还原数据库表设计三范式 一、数据操作语句(DML) 插入数据 语法: 1.1插入(insert [into])或添加一条数据 -- 指定列…

excel统计分析——卡方独立性检验(下)

参考资料:生物统计学 书接上文:https://blog.csdn.net/maizeman126/article/details/135893731 2、配对列联表 配对设计的数据,进行列联表检验时,采用McNemar-Bowker检验法进行检验。检验统计量为: 自由度dfk(k-1)/2…