Flink-Source的使用

news2024/11/24 13:05:57

Data Sources 是什么呢?就字面意思其实就可以知道:数据来源

Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。

flink在/流处理中常见的source主要有两大类。

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

预定义Source演示 

Collection [测试]--本地集合Source

在flink最常见的创建DataStream方式有四种:

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:

|使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue

l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了

l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.fromSequence(开始,结束);

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _01YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 各种获取数据的Source
        DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
        dataStreamSource.print();
        // 演示一个错误的
        //DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
        //dataStreamSource2.print();
        DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
                Tuple2.of("张三", 18),
                Tuple2.of("lisi", 18),
                Tuple2.of("wangwu", 18)
        );
        elements.print();

        // 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的API
        String[] arr = {"hello","world"};
        System.out.println(arr.length);
        System.out.println(Arrays.toString(arr));
        List<String> list = Arrays.asList(arr);
        System.out.println(list);

        env.fromElements(
                Arrays.asList(arr),
                Arrays.asList(arr),
                Arrays.asList(arr)
        ).print();




        // 第二种加载数据的方式
        // Collection 的子接口只有 Set 和 List
        ArrayList<String> list1 = new ArrayList<>();
        list1.add("python");
        list1.add("scala");
        list1.add("java");
        DataStreamSource<String> ds1 = env.fromCollection(list1);
        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));

        // 第三种
        DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
        ds3.print();


        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

本地文件的案例:

package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _02YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取并行度
        System.out.println(env.getParallelism());

        // 讲第二种Source File类型的
        // 给了一个相对路径,说路径不对,老闫非要写,我咋办?
        // 相对路径,转绝对路径
        File file = new File("datas/wc.txt");
        File file2 = new File("./");
        System.out.println(file.getAbsoluteFile());
        System.out.println(file2.getAbsoluteFile());
        DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
        ds1.print();
        // 还可以获取hdfs路径上的数据
        DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
        ds2.print();



        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

Socket [测试] 

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

使用nc 进行数据的发送

yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!

如果是windows平台:nc -lp 8888 

代码演示:

//socketTextStream创建的DataStream,不论怎样,并行度永远是1
public class StreamSocketSource {

    public static void main(String[] args) throws Exception {

        //local模式默认的并行度是当前机器的逻辑核的数量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        int parallelism0 = env.getParallelism();

        System.out.println("执行环境默认的并行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //获取DataStream的并行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的并行度:" + parallelism);

        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        int parallelism2 = words.getParallelism();

        System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);

        words.print();

        env.execute();
    }
}

以下用于演示:统计socket中的 单词数量,体会流式计算的魅力!

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceDemo02_Socket {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source-加载数据
        DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);

        //TODO 3.transformation-数据转换处理
        //3.1对每一行数据进行分割并压扁
        DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        //3.2每个单词记为<单词,1>
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //3.4聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

        //TODO 4.sink-数据输出
        result.print();

        //TODO 5.execute-执行
        env.execute();
    }
}

自定义数据源

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 */

@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{
    private String orderId;
    private int uid;
    private int money;
    private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {
    boolean flag = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        // 源源不断的产生数据
        Random random = new Random();
        while(flag){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString());
            orderInfo.setUid(random.nextInt(3));
            orderInfo.setMoney(random.nextInt(101));
            orderInfo.setTimeStamp(System.currentTimeMillis());
            ctx.collect(orderInfo);
            Thread.sleep(1000);// 间隔1s
        }
    }

    // source 停止之前需要干点啥
    @Override
    public void cancel() {
        flag = false;
    }
}
public class CustomSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
        // 将自定义的数据源放入到env中
        DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
        System.out.println(dataStreamSource.getParallelism());
        dataStreamSource.print();
        env.execute();
    }


}

 通过ParallelSourceFunction创建可并行Source

/**
 * 自定义多并行度Source
 */
public class CustomerSourceWithParallelDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
        mySource.print();
        env.execute();
    }
    public static class MySource implements ParallelSourceFunction<String> {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            ctx.collect(UUID.randomUUID().toString());
            /*
            如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
             */
        }

        @Override
        public void cancel() {}
    }
}

如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话。

总结:Rich富函数总结 ctrl + o

 Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntimeContext 方法可以获得当前的Runtime对象(底层API)

Kafka Source --从kafka中读取数据

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

创建一个topic1 这个主题:

cd /opt/installs/kafka3/

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1

通过控制台向topic1发送消息:
bin/kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1
package com.bigdata.day02;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        // 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
        // 返回true 的数据就保留下来,返回false 直接丢弃
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String word) throws Exception {
                // 查看单词中是否包含success 字样
                return word.contains("success");
            }
        }).print();

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246669.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分公司如何纳税

分公司不进行纳税由总公司汇总纳税“子公司具有法人资格&#xff0c;依法独立承担民事责任;分公司不具有法人资格&#xff0c;其民事责任由公司承担。”企业设立分支机构&#xff0c;使其不具有法人资格&#xff0c;且不实行独立核算&#xff0c;则可由总公司汇总缴纳企业所得税…

亚马逊搜索关键词怎么写?

在亚马逊这个全球领先的电子商务平台&#xff0c;如何让自己的产品被更多的消费者发现&#xff0c;是每一个卖家都需要深入思考的问题。而搜索关键词&#xff0c;作为连接卖家与买家的桥梁&#xff0c;其重要性不言而喻。那么&#xff0c;如何撰写有效的亚马逊搜索关键词呢&…

跨视角差异-依赖网络用于体积医学图像分割|文献速递-生成式模型与transformer在医学影像中的应用

Title 题目 Cross-view discrepancy-dependency network for volumetric medical imagesegmentation 跨视角差异-依赖网络用于体积医学图像分割 01 文献速递介绍 医学图像分割旨在从原始图像中分离出受试者的解剖结构&#xff08;例如器官和肿瘤&#xff09;&#xff0c;并…

基本功能实现

目录 1、环境搭建 2、按键控制灯&电机 LED 电机 垂直按键(机械按键) 3、串口调试功能 4、定时器延时和定时器中断 5、振动强弱调节 6、万年历 7、五方向按键 1、原理及分析 2、程序设计 1、环境搭建 需求: 搭建一个STM32F411CEU6工程 分析: C / C 宏定义栏…

C++11新特性探索:Lambda表达式与函数包装器的实用指南

文章目录 前言&#x1f349;一、Lambda表达式&#xff08;匿名函数&#xff09;&#x1f353;1.1 Lambda 表达式的基本语法&#x1f353;1.2 示例&#xff1a;基本 Lambda 表达式&#x1f353;1.3 捕获列表&#xff08;Capture&#xff09;&#x1f353;1.4 使用 Lambda 表达式…

msvcp110.dll丢失修复的多种科学方法分析,详细解析msvcp110.dll文件

遇到“msvcp110.dll丢失”的错误时&#xff0c;这表明你的系统缺少一个关键文件&#xff0c;但解决这一问题比较直接。本文将指导你通过几个简单的步骤迅速修复此错误&#xff0c;确保你的程序或游戏可以顺利运行。接下来的操作将非常简洁明了&#xff0c;易于理解和执行。 一.…

HDR视频技术之四:HDR 主要标准

HDR 是 UHD 技术中最重要维度之一&#xff0c;带来新的视觉呈现体验。 HDR 技术涉及到采集、加工、传输、呈现等视频流程上的多个环节&#xff0c;需要定义出互联互通的产业标准&#xff0c;以支持规模化应用和部署。本文整理当前 HDR 应用中的一些代表性的国际标准。 1 HDR 发…

Bug Fix 20241122:缺少lib文件错误

今天有朋友提醒才突然发现 gitee 上传的代码存在两个很严重&#xff0c;同时也很低级的错误。 因为gitee的默认设置不允许二进制文件的提交&#xff0c; 所以PH47框架下的库文件&#xff08;各逻辑层的库文件&#xff09;&#xff0c;以及Stm32Cube驱动的库文件都没上传到Gi…

c++源码阅读__smart_ptr__正文阅读

文章目录 简介源码解析1. 引用计数的实现方式2. deleter静态方法的赋值时间节点3.make_smart的实现方式 与 好处4. 几种构造函数4.1 空构造函数4.2 接收指针的构造函数4.3 接收指针和删除方法的构造函数 , 以及auto进行模板lambda的编写4.4 拷贝构造函数4.5 赋值运算符 5. rele…

【BUG】ES使用过程中问题解决汇总

安装elasticsearch内存不足问题 问题回顾 运行kibana服务的时候&#xff0c;无法本地访问 解决 首先排查端口问题&#xff0c;然后检查错误日志 无法运行kibana服务&#xff0c;是因为elasticsearch没有启动的原因 发现致命错误&#xff0c;确定是elasticsearch服务没有运行导…

C语言--分支循环编程题目

第一道题目&#xff1a; #include <stdio.h>int main() {//分析&#xff1a;//1.连续读取int a 0;int b 0;int c 0;while (scanf("%d %d %d\n", &a, &b, &c) ! EOF){//2.对三角形的判断//a b c 等边三角形 其中两个相等 等腰三角形 其余情…

Linux——用户级缓存区及模拟实现fopen、fweite、fclose

linux基础io重定向-CSDN博客 文章目录 目录 文章目录 什么是缓冲区 为什么要有缓冲区 二、编写自己的fopen、fwrite、fclose 1.引入函数 2、引入FILE 3.模拟封装 1、fopen 2、fwrite 3、fclose 4、fflush 总结 前言 用快递站讲述缓冲区 收件区&#xff08;类比输…

git(Linux)

1.git 三板斧 基本准备工作&#xff1a; 把远端仓库拉拉取到本地了 .git --> 本地仓库 git在提交的时候&#xff0c;只会提交变化的部分 就可以在当前目录下新增代码了 test.c 并没有被仓库管理起来 怎么添加&#xff1f; 1.1 git add test.c 也不算完全添加到仓库里面&…

GESP2023年9月认证C++四级( 第三部分编程题(1-2))

编程题1&#xff08;string&#xff09;参考程序&#xff1a; #include <iostream> using namespace std; long long hex10(string num,int b) {//int i;long long res0;for(i0;i<num.size();i) if(num[i]>0&&num[i]<9)resres*bnum[i]-0;else //如果nu…

Ultiverse 和web3新玩法?AI和GameFi的结合是怎样

Gamef 和 AI 是我们这个周期十分看好两大赛道之一&#xff0c;(Gamef 拥有极强的破圈效应&#xff0c;引领 Web2 用户进军 Web3 最佳利器。AI是这个周期最热门赛道&#xff0c;无论 Web2的 OpenAl&#xff0c;还是 Web3&#xff0c;都成为话题热议焦点。那么结合 GamefiA1双叙事…

如何在 UniApp 中实现 iOS 版本更新检测

随着移动应用的不断发展&#xff0c;保持应用程序的更新是必不可少的&#xff0c;这样用户才能获得更好的体验。本文将帮助你在 UniApp 中实现 iOS 版的版本更新检测和提示&#xff0c;适合刚入行的小白。我们将分步骤进行说明&#xff0c;每一步所需的代码及其解释都会一一列出…

解决 npm xxx was blocked, reason: xx bad guy, steal env and delete files

问题复现 今天一位朋友说&#xff0c;vue2的老项目安装不老依赖&#xff0c;报错内容如下&#xff1a; npm install 451 Unavailable For Legal Reasons - GET https://registry.npmmirror.com/vab-count - [UNAVAILABLE_FOR_LEGAL_REASONS] vab-count was blocked, reas…

【AI系统】GPU 架构回顾(从2018年-2024年)

Turing 架构 2018 年 Turing 图灵架构发布&#xff0c;采用 TSMC 12 nm 工艺&#xff0c;总共 18.6 亿个晶体管。在 PC 游戏、专业图形应用程序和深度学习推理方面&#xff0c;效率和性能都取得了重大进步。相比上一代 Volta 架构主要更新了 Tensor Core&#xff08;专门为执行…

每天五分钟机器学习:支持向量机数学基础之超平面分离定理

本文重点 超平面分离定理(Separating Hyperplane Theorem)是数学和机器学习领域中的一个重要概念,特别是在凸集理论和最优化理论中有着广泛的应用。该定理表明,在特定的条件下,两个不相交的凸集总可以用一个超平面进行分离。 定义与表述 超平面分离定理(Separating Hy…

docker镜像源配置、换源、dockerhub国内镜像最新可用加速源(仓库)

一、临时拉取方式 在docker pull后先拼接镜像源域名&#xff0c;后面拼接拉取的镜像名 $ docker pull dockerpull.org/continuumio/miniconda3 二、永久配置方式 vim修改/etc/docker/daemon.json&#xff0c;并重启docker服务。 # 创建目录 sudo mkdir -p /etc/docker# 写…