Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)

news2024/12/26 9:26:31

在这里插入图片描述
                       星光下的赶路人star的个人主页

                      世间真正温煦的春色,都熨帖着大地,潜伏在深谷

文章目录

  • 1、输出算子(Sink)
    • 1.1 连接到外部系统
    • 1.2 输出到文件
    • 1.3 输出到Kafka
    • 1.4 输出到MySQL(JDBC)
    • 1.4 自定义Sink输出

1、输出算子(Sink)

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。
在这里插入图片描述

1.1 连接到外部系统

Flink的DataStream API专门提供了向外部提供写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

1.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

1.3 输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

输出无key的record:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);


        /**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

运行代码,在Linux主机启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

1.4 输出到MySQL(JDBC)

(1)添加依赖

<!--mysql驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.0-1.17</version>
</dependency>

(2)启动MySQL,在test库下建表

CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)输出到MySQL的示例代码

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );


        sensorDS.addSink(jdbcSink);


        env.execute();
    }
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

1.4 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
在这里插入图片描述
                      您的支持是我创作的无限动力

在这里插入图片描述
                      希望我能为您的未来尽绵薄之力

在这里插入图片描述
                      如有错误,谢谢指正;若有收获,谢谢赞美

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1044282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity插件Xcharts3.x版本使用笔记

Unity插件Xcharts3.x版本使用笔记 官方下载链接&#xff1a;https://xcharts-team.github.io/导入Unity基本使用方式&#xff08;折线图动态添加数据&#xff09;如果想要更多的表现效果可以看官方自带的脚本&#xff0c;这里包括了官方展示案例的部分效果&#xff0c;不过没有…

Windows10操作系统部署AD

windows 10 安装配置AD 一、启用AD 1.打开控制面板—>程序—>启用或关闭windows功能 2.勾选Active Directory Lightweight Directory Services&#xff08;Active Directory 轻型目录服务&#xff09; 注&#xff1a;不同版本中英文显示有区别&#xff0c;认准AD字样就…

Learn Prompt- Midjourney案例:动漫设计

使用 Midjourney 生成动漫有两种方法&#xff1a;使用Niji模式或使用标准的 Midjourney 模型。Niji V5 是 Midjourney 的动漫专用模型。它建立在标准 Midjourney 模型的全新架构之上&#xff0c;更擅长生成命名的动漫角色。Niji V4于2023年12月发布&#xff0c;Niji V5于2023年…

uniapp中使用axios打包到小程序时报 TypeError: adapter is not a function

出现这个错误的原因是因为小程序支持的是它自己原生封装的request接口&#xff0c;它底层用的http的。 若需要使用axios的话&#xff0c;我们需要使用一个axios适配器来兼容小程序。 下面用到axios-miniprogram-adapter适配器来解决 gitHub地址&#xff1a;GitHub - bigmeow…

智能中充满了符号、逻辑、力的想象和诱惑

把智能仅仅视为物化的理性特征符号的观点忽略了智能的社会性和交互性。智能不仅仅是个体内部的智能表现&#xff0c;还包括人与人之间的互动和人机环境系统的影响。智能是人类与其环境相互作用的结果&#xff0c;受到社会文化、教育、经验等因素的影响&#xff0c;具有社会意义…

域名备案流程(个人备案,腾讯云 / 阿里云)

文章目录 1.网站备案的目的2.备案准备的材料2.1 网站域名2.2 云资源或备案授权码2.3 电子材料 3.首次个人备案准备的材料3.1 主体相关3.2 域名相关3.3 网站相关3.4 网站服务相关3.5 变更相关 4.个人备案流程4.1 登录系统4.2 填写备案信息&#x1f340; 填写备案省份&#x1f34…

sql防止连表查询后出现空行数据

sql防止连表查询后出现空行数据 防止连表查询后出现空行数据 1.在where后加&#xff1a;and t2.pk_id is not null 或者2.在返回值list上处理 List TaskItem intelligentCloudMapper.getTaskItem(params.getPkId()); TaskItem.removeAll(Collections.singleton(null)); <se…

IO流————

一、字符流 前面我们学习了字节流,使用字节流可以读取文件中的字节数据。但是如果文件中有中文,使用字节流来读取,就有可能读到半个汉字的情况,这样会导致乱码。虽然使用读取全部字节的方法不会出现乱码,但是如果文件过大又不太合适。 所以Java专门为我们提供了另外一种…

Opengl之颜色

现实世界中有无数种颜色&#xff0c;每一个物体都有它们自己的颜色。我们需要使用&#xff08;有限的&#xff09;数值来模拟真实世界中&#xff08;无限&#xff09;的颜色&#xff0c;所以并不是所有现实世界中的颜色都可以用数值来表示的。然而我们仍能通过数值来表现出非常…

js实现动态数字滚动,插件jquery.counterup.min.js的使用方式

推荐一个常用的数字滚动动画插件&#xff0c;jquery.counterup.js 该插件可以控制动画的延迟时间和动画过渡时间。但它依赖于Waypoints.js插件来监听滚动事件。 从而实现页面滚动到数字可视窗口&#xff0c;实现让数字从零到指定数值的滚动。 使用方式 首先引入jQuery.js 引入…

解决Excel无法打开文件“xxx.xlsx“,因为文件格式或文件扩展名无效。请确定文件未损坏,并且文件扩展名与文件的格式!匹配的问题

文章目录 1. 复现错误2. 分析错误3. 解决错误 1. 复现错误 今天在开发过程中&#xff0c;测试指给我一个bug&#xff0c;如下图所示&#xff1a; 于是&#xff0c;我拿到这个文件标准模板.xlsx&#xff0c;尝试使用WPS打开看看&#xff0c;如下图所示&#xff1a; 如上图所示&a…

九日集训 Leetcode 371.两整数之和

给你两个整数 a 和 b &#xff0c;不使用 运算符 和 - &#xff0c;计算并返回两整数之和。 示例 1&#xff1a; 输入&#xff1a;a 1, b 2 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;a 2, b 3 输出&#xff1a;5提示&#xff1a; -1000 < a, b < 10…

树莓派上使用kettle将文本文档导入mariadb

目录 1 连接MariaDB的前置条件 2 test.txt 3 在mariadb中创建数据库和数据表 4 在kettle中的操作 4.1 新建任务 4.2 连接数据库 4.3 文本文件输入 4.4 表输出 4.5 运行 4.6 结果 1 连接MariaDB的前置条件 首先你的mariaDB要有密码&#xff0c;我当前的用…

H3CNE

H3CNE 计算机网络概述 计算机网络定义 一组自治计算机互联的集合 计算机网络基本功能 资源共享 综合信息服务 分布式处理与负载均衡 计算机网络的类型 局域网 LAN 由用户自行建设&#xff0c;使用私有地址组建的内部网络 城域网 MAN 由运营商或大规模企业建设&am…

快速排序与冒泡排序以及代码

快速排序 快速排序&#xff08;Quicksort&#xff09;是一种常用的排序算法&#xff0c;它基于分治的思想。 时间复杂度&#xff1a;O&#xff08;nlogn&#xff09; 空间复杂度&#xff1a;O&#xff08;logn&#xff09; 快速排序的基本思想如下&#xff1a; 选择一个元素…

ffmpeg+flv视频推拉流实现(demo版)

前言 工作需要&#xff0c;记录一下前后端推拉流方案&#xff0c;基于HTTP-FLV协议&#xff0c;使用node flv.js ffmpeg进行前后端交互。 此方案为demo版&#xff0c;目的是打通前后端链路&#xff0c;项目应用正在研究中。 步骤 1.安装ffmpeg 后端推流需要借助ffmpeg流媒…

centos7 添加网卡设置动态ip,修改网卡为任意名称

centos7 添加网卡并设置动态ip&#xff0c;重命名为任意名称 本文记录如何在centos环境上增加两个网卡&#xff0c;并设置为动态获取ip&#xff0c;以及修改网卡名称为任意名称 1、centos7添加两个网卡动态获取ip 1.1 vmvare上添加网络适配器 1、关闭虚拟机 2、 添加网络适…

Linux:修改mvn命令使用的maven路径

要在 Linux 上更改 Maven 的版本&#xff0c;需要调整 PATH 环境变量以指向所需版本的 Maven 安装目录。 打开终端或命令行界面。 使用文本编辑器打开 /etc/profile 文件&#xff1a; vi /etc/profile在文件的末尾添加以下行&#xff0c;将 PATH 环境变量指向新的 Maven 安装目…

torch.sum()——dim参数

dim指在dim的这个维度上&#xff0c;对tesnor 进行求和&#xff0c;如果keepdim&#xff08;保持维度&#xff09;False&#xff0c;返回结果会删去dim所指的这个维度。以下面的例子分析dim的参数~ torch.tensor([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]) print(…

【高级数据结构C++】树的重心——教父POJ 3107(链式前向星的使用)

》》》算法竞赛 /*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在竞赛算法学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载…