Flink学习之旅:(三)Flink源算子(数据源)

news2025/1/15 6:57:12

1.Flink数据源

        Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。

数据集合
数据文件
Socket数据
kafka数据
自定义Source

2.案例

2.1.从集合中获取数据

        创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用)

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 16:13
 */
public class FlinkSource_List {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        ArrayList<Student> clicks = new ArrayList<>();
        clicks.add(new Student("Mary",25,1));
        clicks.add(new Student("Bob",26,2));
        DataStream<Student> stream = env.fromCollection(clicks);
        stream.print();
        env.execute();
    }
}

运行结果:

Student{name='Mary', age=25, sex=1}
Student{name='Bob', age=26, sex=2}

2.2.从文件中读取数据

文件数据:

spark
hello world kafka spark
hadoop spark

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 16:31
 */
public class FlinkSource_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stream = env.readTextFile("input/words.txt");
        stream.print();
        env.execute();
    }
}

运行结果:(没做任何处理)

spark
hello world kafka spark
hadoop spark

2.3.从Socket中读取数据

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 17:41
 */
public class FlinkSource_Socket {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文本流
        DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
                7777);
        lineDSS.print();
        env.execute();
    }
}

运行结果:

服务器上执行:

 nc -lk 7777

疯狂输出

控制台打印结果 

6> hello
7> world

2.4.从Kafka中读取数据

pom.xml 添加Kafka连接依赖

      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
      </dependency>
package com.qiyu;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:01
 */
public class FlinkSource_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.addSource(
                new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties
        ));
        stream.print("Kafka");
        env.execute();
    }
}

启动 zk 和kafka

创建topic

bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks

生产者、消费者命令

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092  --topic clicks
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --topic clicks --from-beginning

启动生产者命令后疯狂输入 

运行java类,运行结果:和生产者输入的是一样的

Kafka> flinks
Kafka> hadoop
Kafka> hello
Kafka> nihaop

2.5.从自定义Source中读取数据

        大多数情况下,前面几个数据源已经满足需求了。但是遇到特殊情况我们需要自定义的数据源。实现方式如下:

        1.编辑自定义源Source

package com.qiyu;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:37
 */

/***
 * 主要实现2个方法 run() 和 cancel()
 */
public class FlinkSource_Custom implements SourceFunction<Student> {


    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Student> sourceContext) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据
        String[] name = {"Mary", "Alice", "Bob", "Cary"};
        int[] sex = {1,2};
        int age = 0;
        while (running) {
            sourceContext.collect(new Student(
                    name[random.nextInt(name.length)],
                    sex[random.nextInt(sex.length)],
                    random.nextInt(100)
            ));
            // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

        2.编写主程序

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:46
 */
public class FlinkSource_Custom2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//有了自定义的 source function,调用 addSource 方法
        DataStreamSource<Student> stream = env.addSource(new FlinkSource_Custom());
        stream.print("SourceCustom");
        env.execute();
    }
}

 运行主程序,运行结果:

SourceCustom> Student{name='Mary', age=1, sex=46}
SourceCustom> Student{name='Cary', age=2, sex=52}
SourceCustom> Student{name='Bob', age=1, sex=14}
SourceCustom> Student{name='Alice', age=1, sex=84}
SourceCustom> Student{name='Alice', age=2, sex=82}
SourceCustom> Student{name='Cary', age=1, sex=28}

.............

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1110824.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5256C 5G终端综合测试仪

01 5256C 5G终端综合测试仪 产品综述&#xff1a; 5256C 5G终端综合测试仪主要用于5G终端、基带芯片的研发、生产、校准、检测、认证和教学等领域。该仪表具备5G信号发送功能、5G信号功率特性、解调特性和频谱特性分析功能&#xff0c;支持5G终端的产线高速校准及终端发射机…

Dev-C++ 软件安装教程

Dev-C 软件安装包https://download.csdn.net/download/W_Fe5/88446511&#xff08;软件包下载后&#xff0c;右键解压&#xff09; 一、打开文件夹&#xff0c;双击“Dev-C” 二、软件安装&#xff0c;点击“OK” 三、点击“I Agree” 四、点击“Next” 五、更改安装目录&…

虚拟机ubantu系统突然重启失去网络

1.进入 root用户 cd /var/lib/NetworkManager然后查看网络服务状态 如果网络状态和我一样不可用 ,就先停止网络服务 service ModemManager stop#删除状态rm networker.stateservice ModemManager start

基于Pytorch的CNN手写数字识别

作为深度学习小白&#xff0c;我想把自己学习的过程记录下来&#xff0c;作为实践部分&#xff0c;我会写一个通用框架&#xff0c;并会不断完善这个框架&#xff0c;作为自己的入门学习。因此略过环境搭建和基础知识的步骤&#xff0c;直接从代码实战开始。 一.下载数据集并加…

【遮天】最新预告,叶凡一怒报仇,导演再删减人物,还暴露一个严重问题

Hello,小伙伴们&#xff0c;我是小郑继续为大家深度解析遮天国漫资讯。 《遮天》动漫第30集预告已出&#xff0c;叶凡被挟持进入荒古禁地&#xff01;这一集看下来&#xff0c;导演又删减人物了&#xff0c;还暴露一个问题。 在预告中&#xff0c;叶凡已经被姬家和姜家的人带往…

【C++ 学习 ㉙】- 详解 C++11 的 constexpr 和 decltype 关键字

目录 一、constexpr 关键字 1.1 - constexpr 修饰普通变量 1.2 - constexpr 修饰函数 1.3 - constexpr 修饰类的构造函数 1.4 - constexpr 和 const 的区别 二、decltype 关键字 2.1 - 推导规则 2.2 - 实际应用 一、constexpr 关键字 constexpr 是 C11 新引入的关键字…

Spring Boot学习笔记(1)

Spring Boot学习笔记&#xff08;1&#xff09; 1.环境1.win2.mac3. IDEA 2.知识点1.Record类2.Switch开关表达式3. var和sealed4.springboot5.启用lombok 学习资料&#xff1a; 官网&#xff0c; 手册&#xff0c; 视频。 1.环境 1.win 1.下载vscode 2.安装jdk&#xff0…

求助C语言大佬:C语言的main函数参数问题

最近在敲代码的过程中&#xff0c;突发奇想&#xff0c;产生了一个疑问&#xff1a; 为什么main函数可以任由我们定义&#xff1a;可以接收一个参数、两个参数、三个参数都接接收&#xff0c;或者可以不接收&#xff1f;这是如何实现的 int main(){retrun 0; } int main (int…

移动app安全检测报告有什么作用?

移动app安全测试是一项至关重要的任务&#xff0c;它能够帮助确保移动应用程序在使用过程中不会受到各种安全威胁的侵害。在如今移动应用程序日益普及的时代&#xff0c;移动app安全测试尤为重要。移动app安全检测报告是基于专业的安全测试团队进行的全面分析后生成的&#xff…

博客积分上一万了

博客积分上一万了 继续努力&#xff0c;勇往直前。

JOSEF约瑟 JD3-40/23 JD3-70/23漏电继电器 AC220V\0.05-0.5A

JD3系列漏电继电器&#xff08;以下简称继电器&#xff09;适用于交流电压至1140V&#xff0c;频率为50Hz&#xff0c;该继电器与分励脱扣器或失压脱扣器的断路器、交流接触器、磁力启动器等组成漏电保护装置&#xff0c;作漏电和触电保护之用&#xff0c;可配备蜂鸣器、信号等…

短视频是“风口”还是“疯口”?

熟悉我的粉丝都知道&#xff0c;最近去追了下短视频的风口&#xff0c;折腾了几个视频出来。且不说视频效果如何&#xff0c;单单是制作视频的过程&#xff0c;就差点没要了童话的老命。看似短短的几分钟&#xff0c;真的应了那句话&#xff1a;台上一分钟&#xff0c;台下十年…

Ubuntu系统忘记Root用户密码-无法登录系统-更改Root密码-Ubuntu系统维护

一、背景 很多时候&#xff0c;我们总会设计复杂的密码&#xff0c;但是大多数时候&#xff0c;我们反而会先忘记我们的密码&#xff0c;导致密码不仅仅阻挡其他用户进入系统&#xff0c;同时也阻碍我们进入系统。 本文将介绍在忘记密码的情况下&#xff0c;如何进入系统并更改…

macOS Sonoma 桌面小工具活学活用!

macOS Sonoma 虽然不算是很大型的改版&#xff0c;但当中触目的新功能是「桌面小工具」&#xff08;Widget&#xff09;。如果我们的萤幕够大&#xff0c;将能够放更多不同的Widget&#xff0c;令用户无须开App 就能显示资讯&#xff0c;实在相当方便。 所有iPhone Widget 也能…

基于Springboot服装商品管理系统免费分享

基于Springboot服装商品管理系统 作者: 公众号(擎云毕业设计指南) 更多毕设项目请关注公众号&#xff0c;获取更多项目资源。如需部署请联系作者 注&#xff1a;禁止使用作者开源项目进行二次售卖&#xff0c;发现必究&#xff01;&#xff01;&#xff01; 运行环境&…

controller调用service层报错Invalid bound statement (not found)

报错信息&#xff1a; "Invalid bound statement (not found): com.gelei.system.service.TbUserFollowService.getMyUserFanList" 这个问题就很神奇&#xff0c;请看下图&#xff0c;我测试的时候就是这么个情况&#xff1b; 综上所述&#xff0c;解决方法如下&…

pragma once与ifndef的区别

概要 代码编译过程中&#xff0c;为了防止同一份代码被重复引用&#xff0c;通常有两种实现方式 方式一 #pragma once 方式二 #ifndef _TEST_H_ #define _TEST_H_ #endif // !TEST_H 通常情况下&#xff0c;使用上述两种方式中的任意一种都是可以的。最近工作中&#xff0c;代…

阿里云ECS服务器的搭建学习

云服务器ECS&#xff1a; 云服务器&#xff08;Elastic Compute Service&#xff0c;简称ECS&#xff09;是阿里云提供的性能卓越、稳定可靠、弹性扩展的IaaS&#xff08;Infrastructure as a Service&#xff09;级别云计算服务。云服务器ECS免去了您采购IT硬件的前期准备&a…

直线模组有哪些配件组成的?

直线模组又称线性模组或线性滑台&#xff0c;是自动化设备中重要的传动元件&#xff0c;主要由以下几部分组成&#xff1a; 1、直线导轨&#xff1a;直线导轨又称线性滑轨&#xff0c;是用于直线往复运动场合的重要零部件&#xff0c;它具有比直线轴承更高的额定负载&#xff0…

吉利高端品牌领克汽车携手体验家,重塑智能创新的汽车服务体验

浙江吉利控股集团&#xff08;以下简称“吉利集团”&#xff09;始建于1986年&#xff0c;1997年进入汽车行业&#xff0c;一直专注实业&#xff0c;专注技术创新和人才培养&#xff0c;坚定不移地推动企业转型升级和可持续发展。现资产总值超5100亿元&#xff0c;员工总数超过…