Flink定制化功能开发,demo代码

news2024/9/26 3:27:25

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

2.2.2 kafka生产者线程方法

package org.example.util;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.*;

/**
 * 向kafka生产数据
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class KafkaProducerUtil extends Thread {

    private String topic;

    public KafkaProducerUtil(String topic) {
        super();
        this.topic = topic;
    }

    private static Producer<String, String> createProducer() {
        // 通过Properties类设置Producer的属性
        Properties properties = new Properties();
//        测试环境 kafka 配置
        properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(properties);
    }

    @Override
    public void run() {
        Producer<String, String> producer = createProducer();
        Random random = new Random();
        Random random2 = new Random();

        while (true) {
            int nums = random.nextInt(10);
            int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();

            String time = new Date().getTime() / 1000 + 5 + "";
            String type = "pv";
            try {
                if (nums2 % 2 == 0) {
                    type = "pv";
                } else {
                    type = "uv";

                }
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";
                String info = nums + "=" + nums2;

                System.out.println("message : " + info);
                producer.send(new ProducerRecord<String, String>(this.topic, info));
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("=========数据已经写入==========");
            
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        new KafkaProducerUtil("test01").run();
    }
    
    public static void sendMessage(String topic, String message) {
        Producer<String, String> producer = createProducer();
        producer.send(new ProducerRecord<String, String>(topic, message));
    }
    
}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;

import org.apache.flink.api.common.functions.FilterFunction;

/**
 * FilterFunction重构
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class InfoFilterFunction implements FilterFunction<String> {

    private double threshold;

    public InfoFilterFunction(double threshold) {
        this.threshold = threshold;
    }

    @Override
    public boolean filter(String value) throws Exception {

        if (value.split("=").length == 2)
            // 阈值过滤
            return Double.valueOf(value.split("=")[1]) > threshold;
        else return false;
    }
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;

import org.apache.flink.api.common.functions.MapFunction;

public class ActionMapFunction implements MapFunction<String, String> {

    @Override
    public String map(String value) throws Exception {
        System.out.println("value:" + value);
        if (value.endsWith("2"))
            return value.concat(":Special processing information");
        else return value;
    }
}

2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;

import java.util.*;

public class FlinkTestDemo {
    public static void main(String[] args) throws Exception {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka 配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
        kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "test01",// Kafka 主题名称
                new SimpleStringSchema(),
                kafkaProps);

        // 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
        env.disableOperatorChaining();

        kafkaStream
                .filter(new InfoFilterFunction(40))
                .map(new ActionMapFunction())
                .print("阈值大于40以上的message=");

        // 执行任务
        env.execute("This is a testing task");
    }


}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1380016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp中uview组件库丰富的CountTo 数字滚动使用方法

目录 #平台差异说明 #基本使用 #设置滚动相关参数 #是否显示小数位 #千分位分隔符 #滚动执行的时机 #API #Props #Methods #Event 该组件一般用于需要滚动数字到某一个值的场景&#xff0c;目标要求是一个递增的值。 注意 如果给组件的父元素设置text-align: cente…

【随笔】遗传算法优化的BP神经网络(随笔,不是很详细)

文章目录 一、算法思想1.1 BP神经网络1.2 遗传算法1.3 遗传算法优化的BP神经网络 二、代码解读2.1 数据预处理2.2 GABP2.3 部分函数说明 一、算法思想 1.1 BP神经网络 BP神经网络&#xff08;Backpropagation Neural Network&#xff0c;反向传播神经网络&#xff09;是一种监…

第一个 OpenGL 程序:旋转的立方体(VS2022 / MFC)

文章目录 OpenGL API开发环境在 MFC 中使用 OpenGL初始化 OpenGL绘制图形重置视口大小 创建 MFC 对话框项目添加 OpenGL 头文件和库文件初始化 OpenGL画一个正方形OpenGL 坐标系改变默认颜色 重置视口大小绘制立方体使用箭头按键旋转立方体深度测试添加纹理应用纹理换一个纹理 …

R语言【paleobioDB】——pbdb_map():根据化石记录绘制地图

Package paleobioDB version 0.7.0 paleobioDB 包在2020年已经停止更新&#xff0c;该包依赖PBDB v1 API。 可以选择在Index of /src/contrib/Archive/paleobioDB (r-project.org)下载安装包后&#xff0c;执行本地安装。 Usage pbdb_map (data, col.int"white" ,p…

AI-图片转换中国风动漫人物

&#x1f3e1; 个人主页&#xff1a;IT贫道-CSDN博客 &#x1f6a9; 私聊博主&#xff1a;私聊博主加WX好友&#xff0c;获取更多资料哦~ &#x1f514; 博主个人B栈地址&#xff1a;豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. AI卡通秀原理 2. …

序章 搭建环境篇—准备战士的剑和盾

第一步&#xff1a;安装node.js Node.js 内置了npm&#xff0c;只要安装了node.js&#xff0c;就可以直接使用 npm&#xff0c;官网地址&#xff1a; Download | Node.js 在这里不建议安装最新版本的node.js&#xff0c;可以选跟我一样的版本&#xff0c;node版本v16.13.2 链…

Maven多模块项目打包:Unable to find main class

目录 一、错误来源 二、原始pom文件 common模块 pojo模块 server模块 父工程 三、解决方法 四、修改pom文件 common模块 pojo模块 server模块&#xff08;不改动&#xff09; 父工程 一、错误来源 使用Maven对项目进行多模块开发&#xff0c;在项目打包时出现错误…

Grind75第8天 | 278.第一个错误的版本、33.搜索旋转排序数组、981.基于时间的键值存储

278.第一个错误的版本 题目链接&#xff1a;https://leetcode.com/problems/first-bad-version 解法&#xff1a; 二分查找。 如果一个版本为错误版本&#xff08;isBadVersion为True&#xff09;&#xff0c;那么第一个错误版本在该版本左侧&#xff08;包括该版本&#x…

nova组件讲解和glance对接swift

1、openstack架构 &#xff08;1&#xff09;openstack是一种SOA架构&#xff08;微服务就是从这种架构中剥离出来的&#xff09; &#xff08;2&#xff09;这种SOA架构&#xff0c;就是把每个服务独立成一个组件&#xff0c;每个组件通过定义好的api接口进行互通 &#xff…

RK3568平台 温度传感器芯片SD5075

一.SD5075芯片简介 SD5075 是一款高准确度温度传感器芯片内含高精度测温 ADC&#xff0c;在-40C ~100C 范围内典型误差小于0.5C&#xff0c;在-55C~125C 范围内典型误差小于士1.0C。通过两线 IC/SMBus接口可以很方便与其他设备建立通信。设置 A2~A0 的地址线&#xff0c;可支持…

如何使用“通义听悟”提高工作和学习效率

如何使用通义听悟提高工作和学习效率 通义听悟是一款利用人工智能技术&#xff0c;自动为音频和视频内容提供转写、翻译、总结、检索等功能的在线工具。它可以在会议、学习、访谈、培训等场景下&#xff0c;帮助您记录、阅读、整理、复习音视频信息&#xff0c;成为您的工作和…

深入浅出线程原理

Linux 中的线程本质 线程接口由 Native POSIX Thread Library 提供&#xff0c;即&#xff1a;NPTL 库函数 线程被称为轻量级进程 (Light Weight Process) 每一个线程在内核中都对应一个调度实体&#xff0c;拥有独立的结构体 (task_struct) 内核设计&#xff1a;一个进程对…

python + selenium 初步实现数据驱动

如果在进行自动化测试的时候将测试数据写在代码中&#xff0c;若测试数据有变&#xff0c;不利于数据的修改和维护。但可以尝试通过将测试数据放到excel文档中来实现测试数据的管理。 示例&#xff1a;本次涉及的项目使用的12306 selenium 重构------三层架构 excel文件数据如…

【RV1126 学习】SDK/ U-Boot/kernel/rootfs 编译学习

文章目录 RV1126芯片介绍rv1126 模块代码目录相关说明 SDK 包下的脚本使用build.sh 脚本使用envsetup.sh 脚本使用mkfirmware.sh 脚本使用rkflash.sh 脚本使用 U-Boot 编译和配置uboot 的配置修改编译操作 kernel 的修改编译rootfs 编译和配置buildroot 配置busybox 配置 RV112…

如何从电脑找回/恢复误删除的照片

按 Shift Delete 以后会后悔吗&#xff1f;想要恢复已删除的照片吗&#xff1f;好吧&#xff0c;如果是这样的话&#xff0c;那么您来对地方了。在本文中&#xff0c;我们将讨论如何从 PC 中检索已删除的文件。 自从摄影的概念被曝光以来&#xff0c;人们就对它着迷。早期的照…

SQL:一行中存在任一指标就显示出来

当想要统计的两个指标不在一张表中时&#xff0c;需要做关联。但很多情况下&#xff0c;也没有办法保证其中一张表的维度是全的&#xff0c;用left join或right join可能会导致数据丢失。所以借助full join处理。 如&#xff0c;将下面的数据处理成表格中的效果&#xff08;维…

IntersectionObserver

IntersectionObserver 这个API主要实现图片懒加载、加载更多等等。 该API作用是观察两个元素之间有没有交叉&#xff0c;有没有重叠 现在要做的是当图片跟视口有交叉的情况下&#xff0c;把data-src的图片路径替换给src属性 //第一个参数是 回调&#xff0c;第二个参数的 配置…

论文翻译 | 【深入挖掘Java技术】「底层原理专题」深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理

深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理这里写目录标题 前提介绍摘要引言设计性能优秀任务粒度合理Cilk框架和基础fork/join的可移植性FJTask框架设计思路线程映射关系拆分子任务排队及调度设置调度管理 标准示例 未完待续 前提介绍 Doug …

【linux驱动开发】在linux内核中注册一个杂项设备与字符设备以及内核传参的详细教程

文章目录 注册杂项设备驱动模块传参注册字符设备 开发环境&#xff1a; windows ubuntu18.04 迅为rk3568开发板 注册杂项设备 相较于字符设备&#xff0c;杂项设备有以下两个优点: 节省主设备号:杂项设备的主设备号固定为 10&#xff0c;在系统中注册多个 misc 设备驱动时&…

c++学习笔记-STL案例-机房预约系统2-创建身份类

前言 衔接上一篇“c学习笔记-STL案例-机房预约系统1-准备工作”&#xff0c;本文主要包括&#xff1a;创建身份类&#xff0c;建立了整个系统的框架&#xff0c;Identity基类&#xff0c;派生类&#xff1a;Sudent、Teacher、Manager&#xff0c;基类无实现源文件&#xff0c;…