Apache Flink 入门

news2024/9/24 9:27:55

零、概述

Apache Flink 是一个高性能的开源分布式流处理框架,专注于实时数据流的处理。

它设计用于处理无界和有界数据流,在内存级速度下提供高效的有状态计算。

Flink 凭借其独特的Checkpoint机制和Exactly-Once语义,确保数据处理的准确性和一致性,同时支持高吞吐量和低延迟。

通过灵活的窗口操作和丰富的状态管理功能,Flink 能够应对复杂的实时数据处理需求,是大数据处理领域的重要技术之一。

其强大的DataStream API和Table API为开发者提供了高效、简洁的数据处理手段。

一、添加依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xch</groupId>
    <artifactId>java-flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <flink.version>1.12.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>

二、map() filter() flatMap()方法示例

2.1 map()方法示例

简单处理,和java8的stream的map()类似,不过只能进行简单的处理,返回:数组元素自身的和

public static List<Integer> mapDemo(DataSource<Integer> dataSteam) throws Exception {
    return dataSteam.map(x -> x + x).collect();
}

2.2 filter()方法示例

过滤方法,返回偶数,

public static List<Integer> filterDemo(DataSource<Integer> dataSteam) throws Exception {
    return dataSteam.filter(x -> x % 2 == 0).collect();
}

2.3 flatMap()方法示例

flatMap方法可以处理复杂、定制化的逻辑,返回元素的类型也可以是复杂的;

  • 第一个简单处理的示例
public static List<Object> flatMapDemo(DataSource<Integer> dataSteam) throws Exception {
    return dataSteam.flatMap(new FlatMapFunction<Integer, Object>() {
        @Override
        public void flatMap(Integer integer, Collector<Object> collector) throws Exception {
            collector.collect(integer);
            collector.collect(integer * integer);
        }
    }).collect();
}
  • 第二个复杂的示例
public static List<Map<Integer, Object>> flatMapDemo1(DataSource<Integer> dataSteam) throws Exception {
    return dataSteam.flatMap(new FlatMapFunction<Integer, Map<Integer, Object>>() {
        @Override
        public void flatMap(Integer integer, Collector<Map<Integer, Object>> collector) throws Exception{
            Map<Integer, Object> hashMap = new HashMap<>();
            hashMap.put(integer, integer * integer);
            collector.collect(hashMap);
        }
    }).collect();
}

2.4 示例演示

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlinkDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSteam = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        System.out.println("mapDemo:" + mapDemo(dataSteam));
        System.out.println("filterDemo:" + filterDemo(dataSteam));
        System.out.println("flatMapDemo:" + flatMapDemo(dataSteam));
        System.out.println("flatMapDemo1:" + flatMapDemo1(dataSteam));
    }
}

输出内容:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1932172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何追查一个packet在linux 系统哪里丢失

要想追一个包在系统哪里丢失了&#xff0c; 就要了解 一个应用层的包在送出时 要经历那些 检查点 和被丢掉的点。 1. 在传输层&#xff0c;如果是 tcp 包 会有contrack 的 buf 的限制 可能会导致 packets 的丢失。 > 检查办法&#xff1a;查看dmesg日志有报错&#xff1a;k…

大厂面试-基本功

大厂面试第4季 服务可用性多少个9是什么意思遍历集合add或remove操作bughashcode冲突案例BigdecimalList去重复IDEA Debugger测试框架ThreaLocal父子线程数据同步 InheritableThreadLocal完美解决线程数据同步方案 TransmittableThreadLocal 服务可用性多少个9是什么意思 遍历集…

线程池笔记

笔记梳理 前言.PHONYC标准库头文件C/C通用或C特有头文件mkdirc_str()snprintfvsnprintfumaskopen函数可变参数列表va_startva_endfunctionalstatic_castpthread_cond_init_threads.emplace_backstd::bindstd::placeholdersThreadPool(const ThreadPool<T> &tp) dele…

抢着发布地表最强开源模型,Meta凭什么勇攀AI高峰?

【科技明说 &#xff5c; 科技热点关注】 据外媒可靠消息&#xff0c;扎克伯格的Meta 公司将在7月23日发布开源大模型Llama 3-405B&#xff0c;这是基于现有80亿和700亿参数两个版本之外推出的4050亿参数版本&#xff0c;号称当前地球表面最强大的开源大模型。 Llama 3-405B是…

快速排序及归并排序的实现与排序的稳定性

目录 快速排序 一. 快速排序递归的实现方法 1. 左右指针法 步骤思路 为什么要让end先走&#xff1f; 2. 挖坑法 步骤思路 3. 前后指针法 步骤思路 二. 快速排序的时间和空间复杂度 1. 时间复杂度 2. 空间复杂度 三. 快速排序的优化方法 1. 三数取中优化 2. 小区…

【C++】拷贝构造函数及析构函数

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

超详细信息收集篇

1 域名信息收集 1.1 域名是什么 域名&#xff08;英语&#xff1a;Domain Name&#xff09;&#xff0c;又称网域&#xff0c;是由一串用点分隔的名字组成的 Internet 上某一台 计算机 或计算机组的名称&#xff0c;用于在数据传输时对计算机的定位标识&#xff08;有时也指地…

学习分布式事务遇到的小bug

一、介绍Seata 在处理分布式事务时我用到是Seata&#xff0c;Seata的事务管理中有三个重要的角色&#xff1a; TC (Transaction Coordinator) - 事务协调者&#xff1a;维护全局和分支事务的状态&#xff0c;协调全局事务提交或回滚。 TM (Transaction Manager) - 事务管理器…

DockerHub无法拉取镜像怎么办

快速构建企业级AIGC项目 LangChat是Java生态下企业级AIGC项目解决方案&#xff0c;在RBAC权限体系的基础上&#xff0c;集成AIGC大模型功能&#xff0c;帮助企业快速定制知识库、企业机器人。 网站文档&#xff1a;Index – LangChat 后台地址&#xff1a;LangChain Chat 前台…

3.5、matlab打开显示保存点云文件(.ply/.pcd)以及经典点云模型数据

1、点云数据简介 点云数据是三维空间中由大量二维点坐标组成的数据集合。每个点代表空间中的一个坐标点&#xff0c;可以包含有关该点的颜色、法向量、强度值等额外信息。点云数据可以通过激光扫描、结构光扫描、摄像机捕捉等方式获取&#xff0c;广泛应用于计算机视觉、机器人…

Redis之List列表

目录 一.列表讲解 二.列表命令 三.内部编码 四.应用场景 Redis的学习专栏&#xff1a;http://t.csdnimg.cn/a8cvV 一.列表讲解 列表类型是用来存储多个有序的字符串&#xff0c;如下所示&#xff0c;a、b、c、d、e五个元素从左到右组成了一个有序的列表&#xff0c;列表中的…

【连续四届EI检索|稳定ACM出版、EI检索|线上线下结合】2024年第五届医学人工智能国际学术会议(ISAIMS 2024,8月13-17)

第五届医学人工智能国际学术会议&#xff08;ISAIMS2024&#xff09;将于2024年8月13-17日于荷兰阿姆斯特丹自由大学召开&#xff0c;国内分会场将于2024年10月25-27日于中国武汉召开。 会议自2020年至今已经成功举办四届&#xff0c;吸引了来自海内外相关领域学者600余名。本届…

贪心算法(2024/7/16)

1合并区间 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&#xff1a;inter…

MongoDB教程(七):mongoDB分片

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; 文章目录 引言MongoDB 分…

Mysql数据库的概念及sql语法和规范+数据库的用户管理

一、数据库的概念 1.数据库&#xff1a;组织、管理、存储数据的仓库 2.数据库的管理系统&#xff08;DBMS&#xff09;&#xff1a;实现对数据有效组织&#xff0c;管理和存储的系统软件。 3.关系型数据库和非关系性数据库&#xff1a; 关系型数据库&#xff1a;mysql or…

神经网络概述

目录 1. 前馈神经网络(Feedforward Neural Networks, FNNs) 2. 卷积神经网络(Convolutional Neural Networks, CNNs) 3. 循环神经网络(Recurrent Neural Networks, RNNs) 4. 长短期记忆网络(Long Short-Term Memory, LSTM) 5. 门控循环单元(Gated Recurrent Unit…

MSPM0G3507——时钟主频拉到80MHZ

先点开使用时钟树 在配置时钟界面这样配置

【送书活动十期】从零开始node.js制作CLI工具

这篇博客的由来是源于工作中一个java项目的配置项是加密后的私钥&#xff0c;私钥是由其他项目中调用web3生成随机账号得到的&#xff0c;而加密方法只是简单在java项目中执行代码得到。这便导致两步操作有点割裂&#xff0c;需要有一个脚本来完成生成私钥和加密私钥&#xff0…

[Tensor学习]你不得不知道的知识点-切点-反转

wait...突然发现了一个错误&#xff1a; 在tensor里面只有size相同才允许相加&#xff0c; 如果想要相连接&#xff1a; PS: 如果tensor是多维的&#xff0c;比如说 a Tensor([1,2,3], [4,5,6]) 那么有 a[ : , :] a a[0,0] 1 a[ 第一维 &#xff0c;第二维] ...…

乘积量化pq:将高维向量压缩 97%

向量相似性搜索在处理大规模数据集时&#xff0c;往往面临着内存消耗的挑战。例如&#xff0c;即使是一个包含100万个密集向量的小数据集&#xff0c;其索引也可能需要数GB的内存。随着数据集规模的增长&#xff0c;尤其是高维数据&#xff0c;内存使用量会迅速增加&#xff0c…