大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器

news2024/9/21 0:46:52

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • Flink YARN 模式
  • YARN模式下申请资源
  • YARN模式下提交任务

在这里插入图片描述

DataStream API

主要分为3块:
● DataSource:程序的数据源输入,可以通过StreamExecutionEnvironment.addSource为程序添加数据源
● Transformation:具体的操作,它对一个或者多个输入源进行计算处理,比如Map、FlatMap、Filter操作等
● Sink:程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中

Flink针对DataStream提供了大量已经实现的DataSource(数据源接口)。
下面来进行分析。

基于文件

readTextFile(path):读取本地文件,文件遵循TextInputFormat逐行读取规则并返回
如果你是本地IDEA要读取HDFS,那你需要额外的依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  <version>1.11.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.9.2</version>
</dependency>

基于Socket

socketTextStream:从Socket中读取数据,元素可以通过一个分割符号分开。

基于集合

fromCollection:通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
如果满足一下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用)

  • 该类是共有且独立的(没有非静态内部类)
  • 该类有共有的无参构造方法
  • 类(及父类)中所有的不被static、transient修饰的属性要么是公有的且不被final修饰,要么是包含公有的Getter和Setter方法,这些方法遵循JavaBean的命名规范。

编写代码

编写的代码如下:

package icu.wzk;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class StreamFromCollection {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<People> peopleList = new ArrayList<>();
        peopleList.add(new People("wzk", 18));
        peopleList.add(new People("icu", 15));
        peopleList.add(new People("wzkicu", 10));

        DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People value) throws Exception {
                return value.getAge() > 15;
            }
        });
        filtered.print();
        env.execute("StreamFromCollection");
    }

    public static class People {

        private String name;
        private Integer age;

        public People() {

        }

        public People(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }
    }

}

运行结果

运行结果如下图所示:
在这里插入图片描述

toString

我们可以通过重写 People 的 toString() 方法,来打印内容:

@Override
public String toString() {
    return "name: " + this.name + ", age: " + this.age;
}

重新运行

重新运行可以看到:
在这里插入图片描述

自定义输入

可以使用 StreamExecutionEnvironment.addSource()将一个数据源添加到程序中。
Flink提供了许多预先实现的源函数,但是也可以编写自己的自定义源,方法是非并行源:implements SourceFunction,或者为并行源 implements ParallelSourceFuction接口,或者 extends RichParallelSourceFunction
Flink也提供了一些内置的 Connector(连接器),如下表列了几个主要的:
在这里插入图片描述

Kafka连接器

添加依赖

我们需要继续添加依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.1</version>
</dependency>

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class StreamFromKafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        // Kafka
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }

}

启动Kafka

我们需要启动 Kafka 的服务来进行测试,之前章节我们已经配置和启动过Kafka了,这里就是直接启动了。

cd /opt/servers/kafka_2.12-2.7.2/bin
./kafka-server-start.sh ../config/server.properties

启动结果如下图所示:
在这里插入图片描述

创建主题

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test

生产消息

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test
# 我们等Java程序启动后,产生几条消息

运行代码

观察控制台可以看到:

3> (hello,1)
5> (world,1)
3> (hello,2)
5> (world,2)
3> (hello,3)
3> (hello,4)
2> (hello!,1)
2> (hello!,2)
...

运行的截图如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2091505.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用 Web 浏览器构建 Java Media Player

如果您需要在 Java 桌面应用程序中嵌入媒体播放器&#xff0c;有几种方法可供选择&#xff1a; 您可以使用 JavaFX Media API 来实现所有必需的媒体播放器功能。虽然稍显过时但仍然可用的 Java Media Framework 也可以作为一种解决方案。您可以集成像 VLCJ 这样的第三方 Java …

统计机器学习基础知识

一、统计机器学习定义 统计机器学习&#xff08;Statistical Machine Learning&#xff09;又称为统计学习&#xff08;Statistical Learning&#xff09;&#xff0c;是关于计算机基于数据构建概率统计模型并运用模型对数据进行预测与分析的一门学科&#xff0c;是概率论、统…

ET6框架(十)通讯消息编写

文章目录 一、消息在的定义&#xff1a;二、客户端消息的发送&#xff1a;三、服务器消息的处理&#xff1a;四、查看结果 一、消息在的定义&#xff1a; ET消息主要分为两类&#xff0c;一个种是普通消息&#xff0c;一种时通过Gate网关转发的消息叫Local消息 这里我们编写客…

【突发事件】Runway删库了,文章结尾有解决方法

最近&#xff0c;Runway 悄悄地从 Hugging Face 平台上删除了自己的代码库&#xff0c;其中包括备受瞩目的 Stable Diffusion v1.5 项目&#xff0c;这在科技界引起了轩然大波。 Runway 的行为不仅没有留下任何痕迹&#xff0c;也没有通知 Hugging Face 或任何社区成员。 更令人…

QEMU - user network

Documentation/Networking - QEMUQEMU/KVM中的网络虚拟化--Part2 User Networking | Xiaoye Zhengs blog (zxxyy.github.io)QEMU Network — ARM SoC Device Assignment Notes documentation (cwshu.github.io)slirp / libslirp GitLabGitHub - virtualsquare/libvdeslirp: li…

运用Premiere自学视频剪辑,这些岗位你能胜任!

随着短视频的兴起和火热&#xff0c;短视频后期制作越来越受到人们的重视&#xff0c;甚至衍生出很多岗位的高薪工作。如大家所了解的&#xff0c;Adobe premiere正是一款视频后期剪辑和制作工具&#xff0c;其功能强大&#xff0c;应用也十分广泛&#xff0c;是从事后期工作者…

【舞动生命,不缺营养!】亨廷顿舞蹈症患者的维生素秘籍✨

Hey小伙伴们&#xff5e;&#x1f44b; 在这个充满色彩的世界里&#xff0c;每个人都是独一无二的舞者&#xff0c;但对于患有亨廷顿舞蹈症的朋友来说&#xff0c;他们的舞蹈却多了几分挑战与不易。&#x1f4aa; 今天&#xff0c;就让我带你一起揭秘&#xff0c;那些能够助力亨…

机器学习/数据分析案例---糖尿病预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 前言 这是一篇数据分析/机器学习很好的入门案例&#xff0c;对糖尿病的影响进行预测和分析通过随机森林预测&#xff0c;平均准确率和召回率都不错不足&#x…

Photomator 3.3.22 (macOS Universal) - 照片编辑软件

Photomator 3.3.22 (macOS Universal) - 照片编辑软件 适用于 Mac、iPhone 和 iPad 的终极照片编辑器 请访问原文链接&#xff1a;https://sysin.org/blog/photomator/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org Photoma…

美发店拓客营销预约到店连锁小程序拓展

传统印象里的10元美发店&#xff0c;在城市里已然升级为大店&#xff0c;服务多样化&#xff0c;价格也是几十元到几千元不等数个区间&#xff0c;除了单店外也有连锁品牌进行区域拓展&#xff0c;以量和品牌形象收获更多客户和自身的宣传等。 尤其是规模相对较大的门店&#…

AcWing 896. 最长上升子序列 II

学习视频↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 【E04 线性DP 最长上升子序列 二分优化】 O ( n l o g n ) O(nlogn) O(nlogn) #include<iostream> #include<algorithm> #define N 100010 using namespace std; int n; int a[N],q[N]; i…

【软件工程】软件工程

考点2 软件工程 一、定义 二、软件工程基本原理 三、软件工程方法学&#xff08;范型&#xff09; 题目 选择题

数字乡村振兴智慧农业整体规划建设方案

1. 项目建设需求 《数字乡村振兴智慧农业整体规划建设方案》旨在通过遥感、物联网等技术&#xff0c;实现土地资源监测、测土配方施肥、农产品销售分析、农资监管、物流配送监管、农业专家库、市场分析、产业链应用和金融服务。 2. 项目需求分析 项目需求覆盖生产、经营、监…

关于计算机网络原理问题

2017年12月07日星期四&#xff0c; 问题&#xff1a; 答案&#xff1a; 接下来&#xff0c;我们来分析和解答&#xff0c; 首先&#xff0c;你要知道&#xff0c;一个byte&#xff08;字节&#xff09;能表示两个十六进制数&#xff0c;那么四个字节就可以表示8个十六进制数…

Simulink代码生成:关系运算与逻辑运算

文章目录 1 引言2 模块使用实例2.1 关系运算2.2 关系运算 3 代码生成4 总结 1 引言 在Simulink中经常需要判断两个信号的大小关系、是否相等&#xff0c;或者判断布尔类型信号的与、或、非等。本文研究通过关系运算与逻辑运算模块实现上述需求。 2 模块使用实例 2.1 关系运算…

hello树先生——二叉搜索树

文章目录 一.搜索二叉树的性质二&#xff0c;功能函数接口1.二叉树的节点结构&#xff0c;分为左右指针和数据2.二叉树的插入函数3.删除接口4.中序遍历 三.测试项目 一.搜索二叉树的性质 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值若它的右子树不…

uniapp scroll-view滚动触底加载 height高度自适应

背景&#xff1a; scroll-view组件是使用&#xff0c;官网说必须给一个高度height&#xff0c;否则无法滚动&#xff0c;所以刚开始设置了<scroll-view :style"height: 94vh" :scroll-y"true">设置了一个高度&#xff0c;想着vh应该挺合适的&#xf…

眼镜清洗机哪个品牌好?2024超声波清洗机推荐

眼镜作为日常生活不可或缺的配件&#xff0c;其卫生状况直接影响着我们的健康。日常简单的擦拭往往忽略了隐匿于镜片细微处的细菌群落&#xff0c;未彻底清洁的眼镜可能潜藏健康隐患。因此&#xff0c;深度清洁眼镜显得尤为关键&#xff0c;而超声波清洗机正是一种高效便捷的解…

探索《黑神话:悟空》背后的先进技术

黑神话&#xff1a;悟空》是一款备受期待的国产动作角色扮演游戏&#xff0c;凭借其令人惊叹的画面效果和极具深度的游戏玩法&#xff0c;吸引了全球玩家的目光。究竟是什么让这款游戏如此出色&#xff1f;让我们一起来探讨《黑神话&#xff1a;悟空》在开发过程中采用的几项尖…

java计算机毕设课设—固定资产管理系统(附源码、文章、相关截图、部署视频)

这是什么系统&#xff1f; java计算机毕设课设—固定资产管理系统(附源码、文章、相关截图、部署视频) 获取资料方式在最下方 本系统主要用于高校中的“资产”进行管理。具体地讲&#xff0c;固定资产管理系统就是通过资产的增加、删除、查看、借出、归还、维修等一系列手段…