DataStream API(输出算子)

news2024/11/23 11:48:15

源算子

源算子

转换算子

转换算子

输出算子

1.连接到外部系统

        连接外部系统是计算机科学和信息技术领域中常见的一个任务,通常涉及到与外部数据源或服务进行交互。具体的方法和工具会根据不同的应用场景和需求而有所不同。以下是一些常见的连接外部系统的方法:

  1. 应用程序接口(API):许多外部系统都提供API,以便其他系统可以与其进行交互。通过API,可以使用各种编程语言和工具来获取或交换数据。常见的API包括REST API和SOAP API。
  2. 数据集成工具:这些工具可以帮助将数据从各种来源(包括数据库、文件、API等)集成到一个中央位置。一些流行的数据集成工具包括Talend、Apache NiFi和 Informatica PowerCenter。
  3. 连接器或适配器:这些是专用的软件组件,用于连接到特定的外部系统。例如,一些CRM系统提供连接器,使其他应用程序可以与CRM系统中的数据进行交互。
  4. 编程语言和框架:许多编程语言和框架可用于连接到外部系统。例如,Python是一种流行的语言,可用于通过API或Web scraping技术连接到外部系统。
  5. 第三方服务:许多第三方服务提供连接外部系统的功能,例如OAuth、SAML和OpenID Connect等身份验证协议。这些服务可以与许多不同的系统进行交互,并提供安全的身份验证和授权机制。

        连接外部系统时需要考虑许多因素,例如安全性、性能、可靠性和兼容性。因此,选择适当的工具和技术,并仔细规划和测试连接过程是非常重要的。

2.输出到文件

要将数据流输出到文件,您可以使用 Flink 的 File Sink。以下是一个示例代码片段

import org.apache.flink.api.common.io.OutputFormat;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;  
import org.apache.flink.util.IOUtils;  
  
import java.io.BufferedWriter;  
import java.io.FileWriter;  
import java.io.IOException;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 创建自定义的输出格式,用于将数据写入文件  
class MyOutputFormat implements OutputFormat<MyData> {  
    private String filePrefix = "output";  
    private int count = 0;  
    private String suffix = ".txt";  
    private String fieldDelimiter = "\t";  
    private BufferedWriter writer;  
  
    @Override  
    public void open(Configuration parameters) throws IOException {  
        filePrefix = parameters.getString("filePrefix", filePrefix);  
        suffix = parameters.getString("suffix", suffix);  
        fieldDelimiter = parameters.getString("fieldDelimiter", fieldDelimiter);  
        count = 0;  
        writer = new BufferedWriter(new FileWriter(filePrefix + count + suffix));  
    }  
  
    @Override  
    public void writeRecord(MyData record) throws IOException {  
        writer.write(record.getField1() + fieldDelimiter + record.getField2());  
        writer.newLine();  
    }  
  
    @Override  
    public void close() throws IOException {  
        writer.close();  
    }  
}  
  
// 创建 File Sink 并将其添加到数据流中  
dataStream.addSink(new RichSinkFunction<MyData>() {  
    private MyOutputFormat outputFormat;  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        outputFormat = new MyOutputFormat();  
    }  
  
    @Override  
    public void invoke(MyData value, Context context) throws Exception {  
        outputFormat.writeRecord(value);  
    }  
});  
  
// 执行 Flink 作业并等待结果  
env.execute("Flink File Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment 和一个数据流。接下来,我们定义了一个自定义的 OutputFormat 类,用于将数据写入文件。在 MyOutputFormat 类中,我们实现了 open() 方法来打开文件并设置文件名、字段分隔符等参数,writeRecord() 方法来写入数据到文件,以及 close() 方法来关闭文件。然后,我们创建了一个 RichSinkFunction,并使用 MyOutputFormat 作为序列化器。最后,我们将 Sink 添加到数据流中并执行 Flink 作业。请注意,在打开 OutputFormat 时,我们传递了作业的参数,以便在打开文件时使用。 

3.输出到 Kafka

要将数据流输出到 Kafka,您可以使用 Flink 的 Kafka Producer Sink。以下是一个示例代码片段

 

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;  
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;  
  
import java.util.Properties;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Kafka 连接选项  
Properties properties = new Properties();  
properties.setProperty("bootstrap.servers", "localhost:9092");  
properties.setProperty("group.id", "test");  
  
// 创建 Kafka Producer Sink 并将其添加到数据流中  
dataStream.addSink(new FlinkKafkaProducer010<MyData>("topic", new MyDataSerializationSchema(), properties));  
  
// 执行 Flink 作业  
env.execute("Flink Kafka Sink Example");

         在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment 和一个数据流。接下来,我们配置了 Kafka 连接选项,包括 Kafka Broker 的地址和消费者组的 ID。然后,我们使用 FlinkKafkaProducer010 类创建一个 Kafka Producer Sink,并设置要写入的 Kafka 主题、序列化器和 Kafka 连接属性。在本例中,我们自定义了一个 MyDataSerializationSchema 类来实现 KafkaSerializationSchema 接口,用于将 MyData 对象序列化为字节数组。最后,我们将 Sink 添加到数据流中并执行 Flink 作业。

4.输出到 Redis

要将数据流输出到 Redis,您可以使用 Flink 的 Redis Sink。以下是一个示例代码片段

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.redis.RedisSink;  
import org.apache.flink.streaming.connectors.redis.common.config.RedisConfig;  
import org.apache.flink.streaming.connectors.redis.common.config.RedisConfigBuilder;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;  
import org.apache.flink.streaming.connectors.redis.common.mapper.StringRedisMapper;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Redis 连接选项  
RedisConfig redisConfig = new RedisConfigBuilder()  
    .setHost("localhost")  
    .setPort(6379)  
    build();  
  
// 创建 Redis Sink 并将其添加到数据流中  
dataStream.addSink(RedisSink.<String, MyData>builder()  
    // 定义 Redis Mapper,将 MyData 对象转换为 Redis 命令和参数  
    .setMapper(new StringRedisMapper<MyData>() {  
        @Override  
        public String getCommandDescription() {  
            return RedisCommandDescription.<MyData>builder()  
                // 设置 Redis 命令为 "SET"  
                .setCommand(RedisCommand.<MyData>set())  
                // 设置键名为 "key"  
                .setKey("key")  
                // 设置值为 MyData 对象的 col1 字段  
                .setValue(myData -> "value:" + myData.col1)  
                build();  
        }  
    })  
    // 配置 Redis 连接选项  
    .setRedisConfig(redisConfig)  
    build());  
  
// 执行 Flink 作业  
env.execute("Flink Redis Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 Redis 连接选项,包括主机和端口。然后,我们使用 RedisSink 的 builder() 方法创建一个 Redis Sink,并设置 Redis Mapper。在 Redis Mapper 中,我们将 MyData 对象转换为 Redis 命令和参数。在本例中,我们使用了 StringRedisMapper,将 MyData 对象的 col1 字段作为值。最后,我们配置 Redis 连接选项并将 Sink 添加到数据流中。最后,我们执行 Flink 作业并等待结果。 

5.输出到 Elasticsearch

要将数据流输出到 Elasticsearch,您可以使用 Flink 的 Elasticsearch Sink。以下是一个示例代码片段

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;  
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;  
import org.apache.flink.streaming.connectors.elasticsearch.common.config.ESClusterConfig;  
import org.apache.flink.streaming.connectors.elasticsearch.common.config.ESClusterConfigBuilder;  
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkFunctionImpl;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Elasticsearch 连接选项  
ESClusterConfig esClusterConfig = new ESClusterConfigBuilder()  
    .setClusterName("my-cluster")  
    .setHost("localhost")  
    .setPort(9200)  
    .build();  
  
// 创建 Elasticsearch Sink 并将其添加到数据流中  
dataStream.addSink(ElasticsearchSink.sink(esClusterConfig, new ElasticsearchSinkFunctionImpl<MyData>() {  
    @Override  
    public void sink(MyData value, OutputCollector<Void> out) {  
        out.collect(new MapRecord<>(Map.<String, Object>of("field1", value.col1, "field2", value.col2)));  
    }  
}));  
  
// 执行 Flink 作业  
env.execute("Flink Elasticsearch Sink Example");

         在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 Elasticsearch 连接选项,包括集群名称、主机和端口。然后,我们使用 ElasticsearchSink 将数据流写入 Elasticsearch。在 ElasticsearchSink 的构造函数中,我们传递了 Elasticsearch 连接选项和 ElasticsearchSinkFunction。在 ElasticsearchSinkFunction 中,我们将 MyData 对象转换为 MapRecord,其中包含要写入 Elasticsearch 的字段和值。最后,我们执行 Flink 作业并等待结果。

6.输出到 MySQLJDBC

要将数据流输出到 MySQL 数据库,您需要使用 JDBC(Java Database Connectivity)连接器。以下是一个示例代码片段

import org.apache.flink.api.common.functions.RuntimeContext;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;  
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;  
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;  
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilderImpl;  
import org.apache.flink.streaming.connectors.jdbc.JdbcType;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 JDBC 连接选项  
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()  
    .withUrl("jdbc:mysql://localhost:3306/mydatabase")  
    .withDriverName("com.mysql.jdbc.Driver")  
    .withUsername("username")  
    .withPassword("password")  
    .build();  
  
// 创建 JDBC Sink 并将其添加到数据流中  
dataStream.addSink(JdbcSink.sink(  
    "INSERT INTO mytable (col1, col2) VALUES (?, ?)", // SQL 语句模板,使用占位符表示参数  
    (JdbcStatementBuilder<MyData>) (ps, value) -> { // 参数转换函数,将 MyData 对象转换为 JDBC 参数  
        ps.setString(1, value.col1);  
        ps.setInt(2, value.col2);  
    },  
    JdbcType.STRING, // JDBC 参数类型,用于类型转换和占位符替换  
    jdbcOptions));  
  
// 执行 Flink 作业  
env.execute("Flink JDBC Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 JDBC 连接选项,包括 JDBC URL、驱动程序名称、用户名和密码。然后,我们使用 JdbcSink 将数据流写入 MySQL 数据库。在 JdbcSink 的构造函数中,我们提供了 SQL 语句模板和参数转换函数。在参数转换函数中,我们将 MyData 对象转换为 JDBC 参数。最后,我们执行 Flink 作业并等待结果。

7.自定义 Sink 输出

        Flink 是一个流处理框架,它提供了强大的功能来处理实时数据流。与 Source 类似,Flink 也提供了 SinkFunction 接口和 RichSinkFunction 抽象类,用于将数据写入外部存储。通过实现这些接口,开发人员可以自定义写入任何外部存储的方式,如数据库、文件系统、消息队列等。

        通过简单地调用 DataStream 的 addSink() 方法,并将自定义的 SinkFunction 传递给该方法,开发人员可以轻松地将数据写入外部存储。这样,开发人员可以更加灵活地处理数据流,并将处理后的结果存储在所需的位置。

        Flink 的 SinkFunction 接口定义了将数据写入外部存储的基本操作。开发人员需要实现该接口,并覆盖其中的一些方法,如 open()、invoke() 和 close()。RichSinkFunction 是 SinkFunction 的一个扩展,它提供了更多的功能和灵活性,如处理异常和延迟触发等。

        Flink 的 SinkFunction 和 RichSinkFunction 抽象类为开发人员提供了灵活的数据写入方式,使得开发人员可以轻松地将数据流写入到任何外部存储中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406166.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BioTech - 量子化学与分子力场

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/135787607 量子化学是应用量子力学的规律和方法来研究化学问题的一门学科&#xff0c;主要关注分子的结构、性质和反应过程。 量子化学的理论方法…

Midjourney基础 | 使用流程 注册,基础文生图,图的放大微调,保存

文章目录 1 使用流程2 生成自己的第一张图3 图的放大&#xff0c;微调3.1 放大3.2 微调变化 4 图的保存 Midjourney是依托于Discord的&#xff0c;但我也是通过Midjourney才了解的Discord 维基百科说~~Discord是一款专为社群设计的免费网络实时通话&#xff0c;主要针对游戏玩家…

Dify学习笔记-手册(三)

1、应用构建及提示词 在 Dify 中&#xff0c;一个“应用”是指基于 GPT 等大型语言模型构建的实际场景应用。通过创建应用&#xff0c;您可以将智能 AI 技术应用于特定的需求。它既包含了开发 AI 应用的工程范式&#xff0c;也包含了具体的交付物。 简而言之&#xff0c;一个应…

【设计模式】美团三面:你连装饰器都举不出例子?

什么是装饰器模式&#xff1f; 装饰器模式&#xff0c;这个设计模式其实和它的名字一样&#xff0c;非常容易理解。 想象一下&#xff0c;每天出门的时候&#xff0c;我们都会思考今天穿什么。睡**衣、睡裤加拖鞋&#xff0c;还是西装、领带加皮鞋&#xff1f;又或者说是&…

获取b站目录

参考链接&#xff1a; JS获取B站视频选集目录 Week6 - 知乎 代码 var x document.getElementsByClassName("clickitem"); var i; for (i 0; i < x.length; i) {var page_num x[i].getElementsByClassName("page-num")[0].innerText;var part x[i…

huggingface学习|云服务器部署Grounded-Segment-Anything:bug总会一个一个一个一个又一个的解决的

文章目录 一、环境部署&#xff08;一&#xff09;模型下载&#xff08;二&#xff09;环境配置&#xff08;三&#xff09;库的安装 二、运行&#xff08;一&#xff09; 运行grounding_dino_demo.py文件&#xff08;二&#xff09;运行grounded_sam_demo.py文件&#xff08;三…

[反转链表] [合并两个有序链表][分割链表]

这里写目录标题 反转链表合并两个有序链表分割链表 反转链表 1、题目&#xff1a; 2.思路  思路1&#xff1a;建立一个newHead,取一个节点进行头插。具体做法如下&#xff01; 建立一个newHead(新头)&#xff0c;由于一个节点里面存的是下一个节点的地址&#xff0c;如果取…

华夏基金“冰火两重天”:产品增量不增值,靠什么赢得用户?

近日&#xff0c;华夏基金发布关于华夏野村日经225交易型开放式指数证券投资基金&#xff08;QDII&#xff09;&#xff08;下称“华夏野村日经ETF”&#xff09;二级市场交易价格溢价风险提示及临时停牌公告。 公告内容显示&#xff0c;华夏野村日经ETF二级市场交易价格明显高…

DFT计算杂谈调查问卷

为更好了解公众号受众对于DFT计算的了解情况以及目标需求&#xff0c;目的以更好更准确并实用地推送给公众号受众所需要的文章&#xff0c;所以本次推送发布调查问卷并收集填写者相关信息。 调查问卷调查内容仅与公众号运营和DFT计算相关&#xff0c;所收集信息仅用作公众号受众…

如何选择和配置适合医院病历管理系统的MySQL版本?

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

7+细胞焦亡+ceRNA+实验验证,如何脱离套路求创新?

导语 今天给同学们分享一篇生信文章“Dissection of pyroptosis-related prognostic signature and CASP6-mediated regulation in pancreatic adenocarcinoma: new sights to clinical decision-making”&#xff0c;这篇文章发表在Apoptosis期刊上&#xff0c;影响因子为7.2。…

操作系统的灵魂--MMU详解

虚拟内存是现代操作系统中最伟大的发明之一。它为每个进程提供了一个一致的、私有的地址空间&#xff0c;让每个进程产生了一种自己在独享主存的错觉。 为了讲清楚MMU是如何一步一步完成地址翻译&#xff0c;取出数据的&#xff0c;本篇文章在前4节中讲解了虚拟内存中一些重要…

Aspx漏洞总结

第一部分&#xff0c;.NET项目当中的dll都可以进行反编译&#xff1a; 在java中有很多jar包&#xff0c;而在.NET框架中的bin中对应有很多DLL文件&#xff0c;bin下面都是可执行文件&#xff0c;这些文件都是很多代码封装的&#xff0c;想要查看源码&#xff0c;都需要通过反编…

vscode运行python,终端能正常运行,输出(Code Runner)不能正常运行

右键->Run Code报错&#xff1a; [Done] exited with code9009 in 0.111 seconds 我的解决方案&#xff1a;仔细检查自己选的python.exe&#xff08;解释器&#xff09;在path环境变量中是否存在或路径是否正确&#xff01;&#xff01;&#xff01; 我就是太自信了&#xf…

STL第三讲

第三讲 stl六大部件&#xff1a;算法是函数模板&#xff0c;其他的是类模板 算法形式&#xff1a;传入两个迭代器&#xff08;第三个参数可能有&#xff1a;一个比较的准则 算法需要的所有信息从迭代器获取 迭代器分类 基于红黑树的结构是双向迭代器&#xff1b; 基于hash的取…

idea——git提交到本地记录如何退回/删除

目录 一、git提交到本地记录如何退回/删除 一、git提交到本地记录如何退回/删除 git提交到本地记录&#xff0c;如下图【更新】记录&#xff0c;表示本次提交到git本地需要退回/删除的操作&#xff1a; 选中项目&#xff0c;右键点击【git】——>【Show History】——>…

苹果笔记本MacBook电脑怎么卸载软件?三种方法快速卸载软件

苹果笔记本MacBook电脑是一款非常流行的电脑&#xff0c;但是有时候我们可能需要卸载一些不需要的软件。下面是一些简单的步骤&#xff0c;可以帮助您在MacBook电脑上卸载软件。 苹果笔记本MacBook电脑怎么卸载软件&#xff1f;三种实用方法快速卸载软件&#xff01; 方法一&a…

【数据分享】2023年全球范围土壤数据集HWSD2.0(7个土壤深度/40多种土壤指标)

土壤数据&#xff0c;包括土壤类型、土壤酸碱度、土壤沙含量等指标&#xff0c;对于农业、林业、园艺、环境保护等领域都非常重要。 本次我们为大家带来的是2023年1月份发布的全球范围的土壤数据库。该数据的核心信息如下&#xff1a; ①该数据是基于联合国粮农组织&#xff0…

云HIS为连锁医院机构提供统一医院管理解决方案

云HIS重建统一的信息架构体系&#xff0c;重构管理服务流程&#xff0c;重造病人服务环境&#xff0c;向不同类型的医疗机构提供SaaS化HIS服务解决方案。 云HIS优势 1、云端数据优势 在传统的HIS模式里&#xff0c;数据存于医院本身的服务器机组&#xff0c;一旦发生故障&…

Python学习04—基本图形绘制

通过一个案例来初步认识Python的图形绘制 案例&#xff1a;绘制Python蟒蛇 #PythonDraw.py import turtle turtle.setup(650,350,200,200) turtle.penup() turtle.fd(-250) turtle.pendown() turtle.pensize(25) turtle.pencolor("purple") turtle.seth(-40) for i…