【flink】 flink入门教程demo 初识flink

news2024/11/15 4:53:50

文章目录

  • 通俗解释什么是flink及其应用场景
  • flink处理流程及核心API
  • flink代码快速入门
  • flink重要概念

什么是flink? 刚接触这个词的同学 可能会觉得比较难懂,网上搜教程 也是一套一套的官话, 如果大家熟悉stream流,那或许会比较好理解 就是流式处理。博主也是刚学习,简单做了个入门小结,后续学习 文章也会不断完善

通俗解释什么是flink及其应用场景

flink是一个流式处理框架,且高性能。说通俗点就是把数据转成流的形式进行处理,可以在多进程中执行,而且是分布式架构 支持集群部署

那么实际应用场景是怎么样的呢?还是通俗点举例,我们可以将文本文件中的内容,通过flink流式读取、统计等操作,这是最基础的操作;也可以监听服务器端口,不断从端口获取数据 并进行处理;还可以把消息队列中的消息进行读取; 此外,用于IOT场景也是没有问题的。比如某社交网站,要实时统计点赞排行榜,就可以通过flink进行处理。换句话说,有数据的地方,都可以用flink处理。

flink是基于内存的,所以高效;
与大多数组件一样,内存不安全,所以会有持久化的功能 checkPoint
flink本身就是为大数据服务的,所以避免宕机风险 能够支持集群部署

当然 杀鸡焉用牛刀 ,flink一般是在大数据量的情况下,才会使用的。

flink处理流程及核心API

在此之前,我们看看在flink出现之前的上一代架构:
在这里插入图片描述
批处理:有序 低速
流处理:无序 高速
lambda架构是有两套处理方式的,而flink的出现,可以实现批流处理。


flink的四层API

  • 流处理和批处理 都是基于DataStream和DataSet
  • 早期flink批处理都是基于DataSet API ,在1.12版本开始 统一使用 DataStream 就可实现批流处理
    在这里插入图片描述

flink代码快速入门

下面快速入门 在springboot环境中flink的应用 , 注意导包不要导错了。
我们的demo业务场景是 统计words.txt中 每个单词出现的次数。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

/**
 * DataSet API 批处理 (有序 低速)
 *
 */

/**
 * flink 分层api
 *
 *   SQL                          最高层语言
 *   table API                   声明式领域专用语言
 *   DataStream / DataSet API   核心Apis
 *   (流处理和批处理 基于这两者  早期flink批处理都是基于DataSet API  在1.12版本开始 统一使用 DataStream 就可实现批流处理)
 *   有状态流处理                 底层APIs
 */
@RestController
public class DataSetAPIBatchWordCount {

    @PostConstruct
    public void test() throws Exception {
        // 1. 创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件中读取数据
        // 继承自Operator  Operator 继承自DataSet ,  DataSource基于DataSet
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");

        // 3. 逻辑处理: 将每行数据进行分词 转换成二元组类型
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap(
                // 将每行打散 放到一个收集器里
                (String line, Collector<Tuple2<String, Long>> out) -> {
                    // 将一行文本进行分词
                    String[] words = line.split(" ");
                    // 将每个单词转换成二元组分组
                    for (String word : words) {
                        // 每来一个单词 计数1
                        out.collect(Tuple2.of(word, 1L));
                    }
                    // 因为有泛型擦除 所以需要指定回类型
                }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 按照word进行分组 groupBy可以传入索引位置 0表示索引 of(word 0)
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5. 分组内 进行累加 1表示索引 of(word 索引0 , 1L 索引1);
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        // 6. 打印输出
        sum.print();


    }


}


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

/**
 * DataStream API 批处理
 * (启动jar包时 指定模式)
 */
@RestController
public class DataStreamAPIBatchWordCount {

    @PostConstruct
    public void test() throws Exception {

        // 1. 创建流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件 (有界流)
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");

        // 3. 转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组操作  wordAndOneTuple.keyBy(0) 根据0索引位置分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(item -> item.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);

        // 6. 打印
        sum.print();

        // 7. 启动执行 上面步骤只是定义了流的执行流程
        env.execute();

        // 数字表示子任务编号 (默认是cpu的核心数 同一个词会出现在同一个子任务上进行叠加)
//        3> (java,1)
//        9> (test,1)
//        5> (hello,1)
//        3> (java,2)
//        5> (hello,2)
//        9> (test,2)
//        9> (world,1)
//        9> (test,3)


    }
}

文本文件位于根目录的input目录下

在这里插入图片描述

test
hello test
world
hello java
java
test

运行:启动application中的main方法即可


flink重要概念

JobManger
TaskManger

JobManger是调度中心,将客户端的数据收集成任务,分发给TaskManger执行,
TaskManger是真正执行任务的地方。
JobManger可以理解为master, TaskManger可以理解为worker (slaver)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

异步循环

业务 &#xff1a; 批量处理照片 &#xff0c; 批量拆建 &#xff0c; 裁剪一张照片需要异步执行等待 &#xff0c; 并且是批量 所以需要用到异步循环 裁剪图片异步代码 &#xff1a; 异步循环 循环可以是 普通 for 、 for of 、 for in 不能使用forEach ,这里推荐 for…

笔记-什么是神经网络机器学习深度学习

深度学习&#xff1a;一种实现机器学习的技术所谓深度学习&#xff0c;简单来说是机器学习的一个子集&#xff0c;用于建立、模拟人脑进行数据处理和分析学习的神经网络&#xff0c;因此也可以被称作是深度神经网络。其基本特点是模仿大脑的神经元之间传递和处理信息的模式。深…

如何使用固态继电器实现更高可靠性的隔离和更小的解决方案尺寸

自晶体管发明之前&#xff0c;继电器就已被用作开关。从低压信号安全控制高压系统的能力&#xff0c;如隔离电阻监控&#xff0c;对于许多汽车系统的开发是必要的。虽然机电继电器和接触器的技术多年来有所改进&#xff0c;但设计人员要实现其终身可靠性和快速开关速度以及低噪…

计算机图形学(1):VS配置openGL和画一个简单正方形

简单记录一下这门课的学习过程 1.下载并安装VS 直接看这片文章即可 http://t.csdn.cn/auPGf 2.下载OpenGL相关库 已经打包好了 需要的可以直接下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1Q7XTD4jkRhRBfTW9wYgzGg?pwd1111 提取码&#xff1a;1111 3.打开…

触摸屏与模拟量测试终端之间无线MODBUS通信

本方案是昆仑通态触摸屏与4台DTD433FC无线模拟量信号测试终端进行无线 MODBUS 通信的实现方法。本方案中昆仑通态触摸屏作为主站显示各从站的模拟量信号&#xff0c;传感器、DCS、PLC、智能仪表等4个设备作为Modbus从站输出模拟量信号。方案中采用无线模拟量信号测控终端DTD433…

UVM实战--带有寄存器的加法器

一.整体的设计结构图 这里将DUT换成加法器&#xff0c;可以理解为之前UVM加法器加上寄存器&#xff0c;这里总线的功能不做修改&#xff0c;目的看代码的移植那些部分需要修改。 二.各个组件代码详解 2.1 DUT module dut( input clk, input rst_n, input…

【深度学习】softmax和交叉熵的配合求导

在分类问题中&#xff0c;尤其是在神经网络中&#xff0c;交叉熵函数非常常见。因为经常涉及到分类问题&#xff0c;需要计算各类别的概率&#xff0c;所以交叉熵损失函数与sigmoid函数或者softmax函数成对出现。 1.softmax softmax用于多分类过程中&#xff0…

这么简单的 CSS 动效,快来瞧瞧

前言 这几天逛网站浏览网页的时候&#xff0c;看到一个不错的CSS效果&#xff0c;便想来实现一下。整个效果实现起来比较简单&#xff0c;但是并不缺少交互感&#xff0c;因此来分享一下这个CSS效果。 效果展示 HTML 搭建 HTML部分一如既往地简单&#xff0c;认清楚它的布局…

密码传输和存储,如何保证数据安全?

本文从一个输入密码登录场景说起&#xff0c;详细介绍了密码传输过程的改进和思路&#xff0c;最后展现出一个相对安全的传输和存储方案。点击上方“后端开发技术”&#xff0c;选择“设为星标” &#xff0c;优质资源及时送达场景在互联网项目中&#xff0c;我们经常会遇到以下…

研报精选230302

目录 【个股230302华西证券_比亚迪】系列点评五十四&#xff1a;迪“王”需求向上 出口“海”阔天空【个股230302华西证券_华利集团】下游去库存背景下承压&#xff0c;毛利率保持稳健【个股230302开源证券_恒顺醋业】公司信息更新报告&#xff1a;四季度业绩承压&#xff0c;期…

运维级医院PACS系统全套源代码

PACS系统源码 运维级医院PACS系统源码&#xff0c;有演示&#xff0c;带使用手册和操作说明书 开发环境&#xff1a;VC MSSQL 文末获取方式&#xff01; PACS系统可实现检查预约、病人信息登记、计算机阅片、电子报告书写、胶片打印、数据备份等一系列满足影像科室日常工作…

每日统计部门人员考勤打卡情况并汇总通知

在值班时&#xff0c;HR需要及时了解到部分人员的打卡情况。这个时候&#xff0c;可以通过腾讯云HiFlow来实现自动通知考勤打卡情况。实现步骤&#xff1a;Step1&#xff1a;我们进入腾讯云HiFlow官网&#xff0c;进入控制台。我们在触发应用选择【定时启动-每天】触发。这里我…

VSCode下载与安装使用教程【超详细讲解】

目录 一、VSCode介绍 二、官方下载地址 三、VSCode安装 1、点击我同意此协议&#xff0c;点击下一步&#xff1b; 2、点击浏览&#xff0c;选择安装路径&#xff0c;点击下一步&#xff1b; 3、添加到开始菜单&#xff0c;点击下一步&#xff1b; 4、根据需要勾选&#…

开创高质量发展新局面,优炫数据库助推数字中国建设

最新印发《数字中国建设整体布局规划》&#xff0c;建设数字中国是数字时代推进中国式现代化的重要引擎&#xff0c;是构筑国家竞争新优势的有力支撑。 数字中国建设按照“2522”的整体框架进行布局&#xff0c;即夯实数字基础设施和数据资源体系“两大基础”&#xff0c;推进…

Java流Stream实战-常用api案例解析

本文介绍java 8 Stream流的常用高频api&#xff0c;通过实战级别的案例进行演示。实现结合实际业务、开发需要来应用技术&#xff0c;不让技术讲解枯燥无味&#xff0c;带来技术落地成生产力的价值。1. 思考&#xff0c;stream 的多个操作&#xff0c;相当于几个for循环&#x…

resultMap 用法?工作中是怎么实现“多表联查”的?

目录 一、resultMap用法 1.1、使用场景 1.2、用法说明 1.2.1、模拟场景 1.2.2、使用 二、多表联查 2.1、分析 2.2、具体步骤 2.3、总结 一、resultMap用法 1.1、使用场景 字段名称和程序中的属性名不同的情况&#xff0c;可使⽤ resultMap 配置映射&#xff1b;⼀对⼀…

英语好不好,不影响做外贸

对于国际贸易而言&#xff0c;英语到底有多重要&#xff1f;还记得我刚去墨西哥的时候&#xff0c;怕语言不通&#xff0c;我还带了一本《西班牙语入门》的书籍&#xff0c;靠着那本书一边说一边学&#xff0c;刚开始的时候很痛苦的。无法想象一个国家大部分的人都不懂得讲英语…

【已解决】nvidia-smi不显示正在使用GPU的进程

目录1 问题背景2 问题探索3 问题解决4 告别Bug1 问题背景 环境&#xff1a; 远程服务器Ubuntu20.04CUDA 11.6 现象&#xff1a;在日志文件和终端均显示Python脚本已使用了GPU 但是nvidia-smi中的Processes进程无显示 2 问题探索 首先&#xff0c;可以看到 | 0 Tesla V…

Android Handler机制(三) Looper源码分析

一. 简介 我们接上一篇文章:Android Handler机制(二) Handler 实现原理 继续分析Looper Looper 的职责很单一&#xff0c;就是单纯的从 MessageQueue 中取出消息分发给消息对应 的宿主 Handler&#xff0c;因此它的代码不多(400行左右) . Looper 是线程独立的且每个线程只能存在…

MySQL运维知识

1 日志1.1 错误日志1.2 二进制日志查看二进制日志&#xff1a;mysqlbinlog ./binlog.000007purge master logs to binlog.000006reset mastershow variables like %binlog_expire_logs_seconds%默认二进制文件只存放30天&#xff0c;30天后会自动删除。1.3 查询日志1.4 慢查询日…