【API篇】六、Flink输出算子Sink

news2024/12/30 3:01:35

文章目录

  • 1、输出到外部系统
  • 2、输出到文件
  • 3、输出到KafKa
  • 4、输出到MySQL(JDBC)
  • 5、自定义Sink输出

Flink做为数据处理引擎,要把最终处理好的数据写入外部存储,为外部系统或应用提供支持。与输入算子Source相对应的,输出算子为Sink。

在这里插入图片描述
前面一直在用的print就是一种Sink,用来将数据流写到控制台打印

在这里插入图片描述

1、输出到外部系统

Flink程序中所有对外的输出操作,利用Sink算子完成

Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法

stream.addSink(new SinkFunction());
//重写SinkFunction接口的invoke方法,用来将指定的值写入到外部系统中
//invoke方法在每条数据记录到来时都会调用。

Flink1.12开始,Sink算子的创建是通过调用DataStream的.sinkTo()方法

stream.sinkTo()

Flink官网为我们提供了一部分的框架的Sink连接器:

Flink官方为我们提供了一部分的框架的Sink连接器

source/sink即可读可写,能做为数据源连接,也能做为下游去输出。

2、输出到文件

先引入Flink流式文件系统的连接器FileSink的依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>${flink.version}</version>
</dependency>

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder):

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)

下面演示实现读往d盘下的tmp目录写数据(tmp目录不用提前创建,不存在会自动创建):

public class SinkFile {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件是正在写入状态
        env.setParallelism(1);

        // 必须开启checkpoint,否则文件一直都是 .inprogress状态,即正在写入
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

		//生成器模拟一个数据源
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000), //每秒生成1000条数据
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀,new也行,这里展示build方式创建配置对象
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("code9527")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录。ZoneId.systemDefault()即系统默认时区,也可是ZoneId类中的其他时区
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

运行,看下效果:inprocess,此时文件正在写入数据,不可读。一个这个inprocess文件,因为上面并行度设置的1

在这里插入图片描述

总结:重点还是FileSink对象的创建

  • 输出行/批文件存储的文件,可指定文件路径、文件编码、文件前后缀

  • 按目录分桶,传参的接口实现类对象自选,demo中是按照时间给文件夹命名

  • 特别注意文件滚动策略,是达到指定时间或者文件到达指定大小,是或的关系

  • FileSink对象创建完后,直接流对象调用sinkTo即可完成写入到文件的动作

3、输出到KafKa

添加KafKa连接器的依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka</artifactId>
	<version>${flink.version}</version>
</dependency>

以下用socket模拟无界流,来演示数据输出到KafKa:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint,否则无法写入Kafka
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("node1", 9527);

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("topic1")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("test-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

关于 Kafka Sink,如果要使用精准一次写入Kafka,需要满足以下条件,缺一不可

  • 开启checkpoint(后续介绍)
  • 设置事务前缀
  • 设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟

如果要指定写入kafka的key,可以自定义序列化器:

  • 实现 一个接口,重写 序列化 方法
  • 指定key,转成 字节数组
  • 指定value,转成 字节数组
  • 返回一个 ProducerRecord对象,把key、value放进去
public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("node1", 9527);
        /**
         *指定写入kafka的key,可以自定义序列化器:
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");  //输入的测试数据格式为a,b,c,所以这里先分割一下
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("topic1", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("test-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

4、输出到MySQL(JDBC)

添加MySQL驱动依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

在这里插入图片描述

再引入flink-jdbc连接器依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.1-1.17</version>
</dependency>



PS:

教学视频中提到了另一种情况,这里记录下。即:官方还未提供flink-connector-jdbc的某高版本的正式依赖,如1.17.0(当前时间已有),暂时从apache snapshot仓库下,因此引入依赖前,先在pom文件中指定仓库路径

<repositories>
    <repository>
        <id>apache-snapshots</id>  <!--这个id后面setting.xml里有用-->
        <name>apache-snapshots</name>
		<url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

再引入flink-jdbc连接器依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加!apache-snapshots

<mirror>
	<id>aliyunmaven</id>
	<mirrorOf>*,!apache-snapshots</mirrorOf>   <!--即除了apache-snapshots,其余的都去阿里仓库下,!即排除,后面的名称是pom中定义的那个-->
	<name>阿里云公共仓库</name>
	<url>https://maven.aliyun.com/repository/public</url>
</mirror>


根据你的数据类型,建立对应结构的表,这里根据要接收的自定义对象WaterSensor建表test:

mysql>     
CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

输出到MySQL的Demo代码:

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction());  //输入的信息映射转为自定义的WaterSensor实体类对象

        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://node01:3306/testDB?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("admin123")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        sensorDS.addSink(jdbcSink);
        env.execute();
    }
}

总结: 写入mysql时注意只能用老的sink写法: addsink,此外JdbcSink的4个参数:

  • 第一个参数: 执行的sql,一般就是 insert into搭配占位符
  • 第二个参数: 预编译sql对象, 对占位符填充值
  • 第三个参数: 执行选项 ,比如批次大小、重试时间
  • 第四个参数: 数据库连接选项 , url、用户名、密码

运行,输入数据,查看MySQL:

在这里插入图片描述

5、自定义Sink输出

现有的Flink连接器不能满足需求时,需要自定义连接器进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,实现这个接口,就可通过DataStream的.addSink()方法自定义写入任何的外部存储。

public class MySinkFunction implements SinkFunction<String>{

	@Override
	public void invoke(String value, Context context) throws Exception{
		//输出逻辑
		//value即流中的数据,来一条数据,invoke方法就被调用一次(所以不要在这里创建连接对象)
		//如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象
	}
}
stream.addSink(new MySinkFunction<String>());

来一条数据,invoke方法就被调用一次,如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象:

public class MySinkFunction implements RichSinkFunction<String>{

	Connection connection = null;

	@Overrdie
	public void open(Configuration parameters) throws Exception{
		connection = new xxConnection(xx);
	}

	@Override
	public void close() throws Exception{
		super.close();
	}

	@Override
	public void invoke(String value, Context context) throws Exception{
		//输出逻辑
		connection.executeXXX(xxx);
		
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1125838.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker部署rabbitmq的坑

背景 今天用docker部署rabbitmq&#xff0c;启动都一起正常&#xff0c;但是当访问15672端口时&#xff0c;不能加载出页面。 排查 1.防火墙是否开启 ufw status2.ip是否能ping通 ping 192.168.x.x3.检查docker日志 docker psdocker logs -f 容器id4.进入容器&#xff0c…

Visual Studio Code (VS Code)安装教程

Visual Studio Code&#xff08;简称“VS Code”&#xff09;。 1.下载安装包 VS Code的官网&#xff1a; Visual Studio Code - Code Editing. Redefined 首先提及一下&#xff0c;vscode是不需要破解操作的&#xff1b; 第一步&#xff0c;看好版本&#xff0c;由于我的系…

性能测试连载-负载场景模型构建

业务需求 假设公司领导现在给你分配了一个性能测试需求如下&#xff1a; 1&#xff1a;公司有1000人在上班时间段会登录平台进行打卡操作&#xff0c;可能会登录打卡多次 2&#xff1a;业务高峰时间段在8:00-8:30&#xff0c;半小时 3&#xff1a;需要保证90%用户的响应时间在…

GB28181学习(十二)——报警事件通知和分发

要求 发生报警事件时&#xff0c;源设备将报警信息发送给SIP服务器&#xff1b;报警事件通知和分发使用MESSAGE方法&#xff1b;源设备包括&#xff1a; SIP设备网关SIP客户端联网系统综合接处警系统以及卡口系统 目标设备包括&#xff1a; 具有接警功能的SIP客户端联网系统综…

【斗破年番】官方终于回应,萧潇删减不属实,两线索佐证,彩鳞咖位不会降

【侵权联系删除】【文/郑尔巴金】 斗破苍穹年番动画虽然火爆&#xff0c;但是问题也很多&#xff0c;动不动就上演一出魔改&#xff0c;引发粉丝们的疯狂吐槽。先是萧炎与美杜莎女王的陨落心炎失身戏份遭删减&#xff0c;如今当萧炎回蛇人族&#xff0c;又魔改了美杜莎女王怀孕…

06、Python 序列 与 列表 与 元组 的关系和创建 和 简单使用

目录 序列元组与列表关系总结 创建元组与列表方式一创建元组注意点 创建元组与列表方式二简单使用通过索引访问元素子序列序列加法序列乘法in运算 了解Python序列 创建列表和元组 通过索引访问元素 子序列 序列运算 序列 所谓序列&#xff0c;指的是一种包含多项数据的数据结…

【面试经典150 | 链表】循环链表

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;哈希集合方法二&#xff1a;快慢指针方法三&#xff1a;计数 拓展其他语言python3 写在最后 Tag 【快慢指针】【哈希集合】【计数】【链表】 题目来源 141. 环形链表 题目解读 判断一个链表中是否存在环。 解题思路 …

vue2中,下拉框多选和全选的实现

vue2中&#xff0c;下拉框多选和全选的实现 代码布局在methods: 中添加功能函数较为完整的一个整体代码&#xff1a; 如图所示点击全选即可完成下拉框中全部子项的全部的选中&#xff0c;同时取消全选即可全部取消选择。 代码布局 <div class"chos-box2"><…

STM32入门F4

学习资料&#xff1a;杨桃电子&#xff0c;官网&#xff1a;洋桃电子 | 杜洋工作室 www.doyoung.net 嵌入式开发核心宗旨&#xff1a;以最适合的性能、功能、成本来完成最有性价比的产品开发。 1.为什么要学F407 STM32F103系列与STM32F407系列对照表&#xff1a; 2.F4系列命…

Ruo-Yi前后端分离版本相关笔记

1.前提条件和基础 Spring Boot Vue 环境要求&#xff1a;Jdk1.8以上版本、MySql数据库、Redis、Maven、Vue 2.使用若依 官网地址&#xff1a;RuoYi-Vue: &#x1f389; 基于SpringBoot&#xff0c;Spring Security&#xff0c;JWT&#xff0c;Vue & Element 的前后端分…

渗透测试tomcat错误信息泄露解决办法

解决方法&#xff1a; 1、使用tomcat8.5.16&#xff0c;会重定向非法url到登录url 2、配置server.xml&#xff0c;加上 <Valve className"org.apache.catalina.valves.ErrorReportValve" showReport"false" showServerInfo"false" />配置…

Unity中国、Cocos为OpenHarmony游戏生态插上腾飞的翅膀

2023年是OpenHarmony游戏生态百花齐放的一年&#xff01;为了扩展OpenHarmony游戏生态&#xff0c;OpenHarmony在基金会成立了游戏SIG小组&#xff0c;游戏SIG小组联合cocos&#xff0c;从cocos2dx入手一周内快速适配了cocos2.2.6的MVP版本&#xff0c;随后又分别适配了cocos2d…

扬尘在线监测 扬尘在线监测系统 扬尘监测设备

计讯物联扬尘在线监测系统&#xff0c;环保数采仪云管理平台&#xff0c;PM2.5、PM10、TSP、温度、湿度、噪声、大气压力、风力、风速、风向监测目标数据采集&#xff0c;5G/4G无线网络自动上报云端&#xff0c;相关部门实时远程在线监控&#xff0c;同时支持治污喷淋系统、视频…

msvcr100.dll丢失怎样修复,msvcr100.dll丢失怎么解决(最新方法分享)

我们经常会遇到一些错误提示&#xff0c;其中之一就是“msvcr100.dll丢失”。这个错误通常会导致某些程序无法正常运行。为了解决这个问题&#xff0c;我们需要采取一些措施来修复丢失的msvcr100.dll文件。本文将介绍五个有效的解决方法&#xff0c;帮助大家解决这一问题。 一…

OKLink携手CertiK在港举办Web3生态安全主题论坛

2023年10月23日&#xff0c;OKLink与CertiK共同发起的Web3生态安全主题论坛在香港铜锣湾拉开帷幕。本次论坛由OKLink和CertiK主办&#xff0c;香港投资推广署独家支持&#xff0c;聚焦如何构建安全可靠的Web3生态系统议题&#xff0c;同时深入剖析这一进程中所面临的潜在挑战。…

Qt中QFile、QByteArray QDataStream和QTextStream区别及示例

在Qt中&#xff0c;QFile、QByteArray、QDataStream和QTextStream是常用的文件和数据处理类。 主要功能和区别 QFile&#xff1a; QFile是用于读写文本和二进制文件以及资源的I/O设备。可以单独使用QFile&#xff0c;或者更方便地与QTextStream或QDataStream一起使用。 通常在…

开关电源检测的技术标准和安全标准是什么?纳米软件为您介绍

开关电源总体技术标准 1. 外观&#xff1a;元器件排列整齐、美观、结构合理&#xff0c;焊点均匀饱满、明亮、光滑、无尖刺&#xff0c;焊接牢固。PCB板铜条无脱落、外露、无毛刺、飞边、变形。 2. 输入电压&#xff1a;110VAC/DC或220VAC/DC或380VAC三相20%;或85~264VAC全范围…

Go学习第六章——系统函数与错误处理

Go系统函数与错误处理 1 字符串相关系统函数2 时间和日期相关的函数2.1 Now函数2.2 日期的格式化 3 内置函数4 错误处理4.1 基本使用4.2 自定义错误 1 字符串相关系统函数 下面给出20个常见的函数&#xff0c;不需要导包&#xff0c;直接可以用 func main() {// 1.统计字符串…

Synchronized同步锁

synchronized 一&#xff0c;介绍 Java中的synchronized关键字用于实现线程同步&#xff0c;可以修饰方法或代码块。 1. 修饰方法&#xff1a;当一个方法被synchronized修饰时&#xff0c;只有获得该方法的锁的线程才能执行该方法。其他线程需要等待锁的释放才能执行该方法。…

Java Netty - Buffer类

Buffer类 当应用程序进行数据传输的时候&#xff0c;往往需要使用缓冲区&#xff0c;常用的缓存区就是JDK NIO类库提供的 java.nio.Buffer&#xff1b; NIO的Buffer本质上是一个内存块&#xff0c;既可以写入数据&#xff0c;也可以从中读取数据&#xff1b; 其中&#xff0c;…