Flink高手之路4-Flink流批一体

news2024/12/28 13:54:22

文章目录

  • Flink高手之路4-Flink流批一体API开发
    • 一、流批一体相关的概念
      • 1.数据的时效性
      • 2.流处理和批处理
        • 1)批处理
        • 2)流处理
        • 3)两者对比
      • 3.流批一体API
      • 4.流批一体的编程模型
    • 二、Data Source
      • 1.预定义的Source
        • 1)基于集合的Sources
          • (1)API
          • (2)演示
        • 2)基于文件的Source
          • (1)API
          • (2)演示
        • 3)基于socket的source
          • (1)模拟socket通信,安装nc工具
          • (2)启动nc发送数据
          • (3)使用telnet来接收数据,测试socket是否工作正常
          • (4)编写Flink代码,作为Socker通信的接收端,接收nc发送的数据进行处理
          • (5)案例-利用基于Socket的source实时统计单词数量
      • 2.自定义的Source
        • 1)随机生成数据
          • (1)api
          • (2)需求
          • (3)lombok插件
          • (4) ptg插件(可选)
          • (5)编程实现
        • 2)MySQL
          • (1)从mysql中抽取数据有多种方式
          • (2)需求
          • (3)启动mysql
          • (4)准备数据
          • (5)pom文件中添加msyql依赖
          • (6)代码实现
    • 三、Transformations
      • 1. 官网API列表
      • 2. 整体分类
        • 1)对单条纪录的操作
        • 2)对多条记录的操作
        • 3)对多个流进行操作并转换为单个流(合并union)
        • 4)把一个流拆分成多个流(拆分)
      • 3.基本操作
        • 1)map
        • 2)flatMap
        • 3)keyBy
        • 4)filter
        • 5)sum
        • 6)reduce
      • 4.合并
        • 1)union和connect
        • 2)需求
        • 3)代码实现
        • 4)执行,查看结果
      • 5. 拆分
        • 1) select和Side Outputs
        • 2) 需求:
        • 3)代码实现
        • 4)执行,查看结果
      • 6.分区
        • 1)rebalance重平衡分区
        • 2)代码实现
        • 3)输出结果
        • 4)其他分区
    • 四、Data Sink
      • 1.预定义Sink
        • 1)基于控制台和文件的Sink
      • 2.自定义Sink
        • 1)MySQL
        • 2)主程序
        • 3)实体类
        • 4)自定义数据下沉工具类
        • 5)输出结果
    • 五、Connector
      • 1. JDBC

Flink高手之路4-Flink流批一体API开发

一、流批一体相关的概念

1.数据的时效性

在日常工作中,我们一般处理数据时先把数据存放在表中,然后进行加工和分析,这就产生数据的时效性问题。

如果以年、月、天为单位的级别处理数据,进行统计分析、个性化推荐等,这样的数据一般称为批数据。

如果以小时、分钟、秒这样的单位进行处理,这样的数据一般称之为流数据,比如对网站的实时监控、日志的处理等。在这样的场景下,如果还要收集数据存储到数据库中再进行处理,就不能满足高时效性的需求。

2.流处理和批处理

1)批处理

Batch Analytics,统一收集数据->存储数据->进行批处理,比如MapReduce、Hive、FlinkDataSet等,生成离线报表。

2)流处理

Streaming Analytics,对数据流进行实时处理,比如Storm、FlinkStream、Spark Streaming等,应用场景如实时大屏、实时报表。

3)两者对比

  • 时效性不同:流处理实时低延迟,批处理非实时高延迟
  • 数据的特征不同:流处理的数据一般是动态、没有边界,而批处理的数据一般静态、有边界
  • 应用场景不同:流处理应用在实时场景,而批处理实时性要求不高
  • 运行方式不同:流处理持续进行,批处理一次性完成

3.流批一体API

Flink的DataStream API既支持批处理,又支持流处理,可以把批处理当成是流处理的一种特例。

流批一体的优点:

  • 可复用:作业可以在流、批两者模式之间自由切换,不用重写代码
  • 维护简单:统一的api

4.流批一体的编程模型

数据源(Data Source)->数据处理(Transformations)->数据输出(Data Sink)。

二、Data Source

1.预定义的Source

1)基于集合的Sources

(1)API

env.fromElements(可变参数)
env.fromCollection(各种集合)
env.generateSequence(开始,结束)
env.fromSequence(开始,结束

(2)演示

fromElements

image-20230330143717716

collection

image-20230330143832518

Sequence

image-20230330144053171

image-20230330144318064

2)基于文件的Source

(1)API

env.readTextFile()

(2)演示

读取本地文件

image-20230330144743130

读取本地文件夹

image-20230330144802618

image-20230330145044848

读取集群的文件

image-20230330145327553

读取集群的文件夹

image-20230330145532606

读取gz格式的压缩文件
准备好gz格式的压缩文件

image-20230330145741875

image-20230330145821479

image-20230330150155637

读取集群上gz格式的压缩文件

image-20230330150247742

image-20230330150323954

image-20230330150459534

3)基于socket的source

socket是指网络通信,需要有一个发送端(服务端)和一个接收端(客户端),类似于插头和插座(socket),socket通信需要指定发送端的地址和端口。在实际应用中用于和一些智能硬件对接。比如门禁的人脸识别系统,每个人脸机都有一个ip地址和端口,可以与之进行通信。

(1)模拟socket通信,安装nc工具

nc是netcat的简称,可以用它向某台主机的某个端口发送数据,模拟socket通信的发送端也就是服务端,作为source。

安装nc工具:

image-20230330150859185

(2)启动nc发送数据

image-20230330151153378

(3)使用telnet来接收数据,测试socket是否工作正常

windows主机安装telnet

image-20230330151539654

执行telnet命令

image-20230330152041809

socket通信

image-20230330152147903

linux主机安装telnet

image-20230330153431345

image-20230330153546538

此时,发送端(服务器端)发送的数据,所有的接收端(客户端)都可以收到,但是接收端(客户端)发送的数据,只有发送端(服务器端)能收到

image-20230330153711360

image-20230330153754742

(4)编写Flink代码,作为Socker通信的接收端,接收nc发送的数据进行处理

image-20230408103943992

image-20230408103900984

(5)案例-利用基于Socket的source实时统计单词数量

代码:

package cn.edu.hgu.bigdata20.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author 王
 * @description 基于socket的实时统计
 * @date 2023/04/06
 */
public class FlinkSocketSourceDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//单例模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.source
        DataStream<String> ds = env.socketTextStream("hadoop001", 9999);
        // 3.处理数据:transformations,使用匿名函数类
        // 3.1 将每一行数据切分成一个个的单词组成一个集合
        DataStream<String> wordDS = ds.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                // 参数s代表一行行的文本数据,将其切割为一个个的单词
                String[] words = s.split(" ");
                // 将切割的每一个单词收集起来成为一个集合
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        // 3.2 使集合中的每一个单词记为1,组成一个二元组
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                // 此处的s就是传过来的一个个单词,他跟1组成一个二元组
                return Tuple2.of(s, 1);
            }
        });
        // 3.3 对新的集合按照key,也就是单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> groupDS = wordAndOnesDS.keyBy(t -> t.f0);//lambda形式,fo表示二元组的第一个元素
        // 3.4 对数据进行聚合
        DataStream<Tuple2<String, Integer>> result = groupDS.sum(1);//此处的1表示二元组的第二个元素
        // 4.输出结果:sink
        result.print();
        // 5.触发执行
        // 对于DataSet如果有print,可以省略execute
        env.execute();
    }
}

执行,输入数据,查看查看结果

image-20230408104503429

2.自定义的Source

1)随机生成数据

(1)api
  • SourceFunction:非并行的随机数据源(并行度为1)
  • RichSourceFunction:丰富的非并行的随机数据源(并行度为1)
  • ParallelSourceFunction:并行的随机数据源(并行度可以大于等于1)
  • RichParallelSourceFunction:丰富的并行的随机数据源(并行度可以大于等于1)
(2)需求

每隔1秒随机生成一条订单信息(订单ID,用户ID,订单金额,时间戳)

要求:

  • 随机生成订单ID(UUID):UUID 是通用唯一识别码(Universally Unique Identifier)的缩写
  • 随机生成用户ID(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间
(3)lombok插件

Lombok项目是一个java库,它可以自动插入到编辑器和构建工具中,增强java的性能。不需要再写getter、setter或equals方法,只要有一个注解,你的类就有一个功能齐全的构建器、自动记录变量等等。

image-20230408104803718

使用lombok插件,新版的idea中已经自带,若使用旧版的idea,可以下载插件:

image-20230408105214207

使用时引入lombok的依赖

image-20230408105112778

(4) ptg插件(可选)

在我们安装了lombox的基础上,如果不想使用注解,我们可以使用ptg插件,在实体类上右击鼠标就会弹出小窗口,点击Ptg To JavaBean就可以自动生成get、set、有参构造、无参构造和toString。

image-20230408110111566

image-20230408105600749

(5)编程实现

订单实体类

package cn.edu.hgu.bigdata20.flink.order;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 订单类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
    //订单id
    String id;
    // 用户id
    Integer userId;
    // 订单金额
    Integer money;
    // 订单时间
    Long createTime;

}

在这里插入图片描述

主类

package cn.edu.hgu.bigdata20.flink.order;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * 主处理类
 * SourceFunction:非并行的随机数据源(并行度为1)
 * RichSourceFunction:丰富的非并行的随机数据源(并行度为1)
 * ParallelSourceFunction:并行的随机数据源(并行度可以大于等于1)
 * RichParallelSourceFunction:丰富的并行的随机数据源(并行度可以大于等于1)
 *
 * @author 王
 * @date 2023/04/08
 */
public class RichParallelSourceFunctionDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.source
        DataStream<Order> ds = env.addSource(new RichParallelSourceFunction<Order>() {
            // 定义一个标识,是否需要生成数据
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Order> sourceContext) throws Exception {
                //持续的生成模拟数据,知道flag为false,也就是执行了取消命令,不在生成数据
                Random random = new Random();
                while (flag) {
                    // 每隔1秒(1000毫秒)
                    Thread.sleep(1000);
                    // 生成订单id
                    String id = UUID.randomUUID().toString();
                    // 生成用户ID,0~2之间的随机数
                    int userID = random.nextInt(3);
                    // 生成金额
                    int money = random.nextInt(101);
                    // 生成时间戳
                    long createTime = System.currentTimeMillis();
                    // 通过Source的上下文返回创建的订单
                    sourceContext.collect(new Order(id, userID, money, createTime));

                }
            }

            @Override
            public void cancel() {
                // 取消任务/执行cancel命令时执行
                flag = false;
            }
        });
        // 3.transformations
        // 4.sink
        ds.print();
        // 5.execute
        env.execute();
    }
}

执行,查看结果

image-20230408115753713

2)MySQL

(1)从mysql中抽取数据有多种方式
  • sqoop:数据迁移工具可以把mysql的数据迁移到hdfs、hive

  • kette:ETL的可视化工具

  • spark:api方式,读取mysql的数据并进行处理

  • flink:api方式,读取mysql的数据并进行处理

(2)需求

从mysql中实时的价值数据,显示出来

(3)启动mysql

本地mysql

image-20230408120753030

(4)准备数据

① 新建数据库

image-20230408123501527

② 新建表

image-20230408123605228

③ 设置字段

image-20230408123800798

创建其他的字段

image-20230408124458521

也可以使用SQL命令执行:

DROP TABLE IF EXISTS `students`;
CREATE TABLE `students` (
  `Id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '学生信息表id',
  `sname` varchar(50) DEFAULT NULL COMMENT '学生姓名',
  `sex` varchar(10) DEFAULT NULL COMMENT '性别',
  `age` tinyint DEFAULT NULL COMMENT '年龄',
  `phone` varchar(20) DEFAULT NULL COMMENT '手机号',
  `address` varchar(100) DEFAULT NULL COMMENT '住址',
  PRIMARY KEY (`Id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb3 COMMENT='学生信息表';

④ 添加纪录

image-20230408124938587

对应的sql语句:

INSERT INTO `students` VALUES (1,'李四','女',18,'13456764567','保定'),(2,'张三','男',20,'14567819000','石家庄'),(3,'王五','男',23,'14562347805','衡水');
(5)pom文件中添加msyql依赖

image-20230408125112083

(6)代码实现

学生实体类

package cn.edu.hgu.bigdata20.flink.mysql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 学生类实体
 * @author 王
 * @date 2023/04/08
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id; //表id
    String sname; //学生姓名
    String sex; //学生性别
    Integer age; //年龄
    String phone; //手机号
    String address; //手机号
}

image-20230408133139536

主类

package cn.edu.hgu.bigdata20.flink.mysql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * 演示MySQL作为数据源,每隔5秒从MySQL中读取数据
 *
 * @author 王
 * @date 2023/04/08
 */
public class MySQLSourceDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.source
        DataStream<Student> ds = env.addSource(new RichParallelSourceFunction<Student>() {
            // 定义一个标识,是否需要生成数据
            private Boolean flag = true;
            // 定义连接对象
            private Connection conn = null;
            // 定义语句对象
            private Statement st = null;
            // 定义结果集对象
            private ResultSet rs = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 1.加载MySQL驱动
                Class.forName("com.mysql.cj.jdbc.Driver");
                // 2.获取数据库连接
                conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/studb", "root", "****");
                // 3.通过连接获取语句对象
                st = conn.createStatement();
            }

            @Override
            public void close() throws Exception {
                // 6.关闭资源
                rs.close();
                st.close();
                conn.close();
            }

            @Override
            public void run(SourceContext<Student> sourceContext) throws Exception {
                while (flag) {
                    // 每隔1秒(2000毫秒)
                    Thread.sleep(2000);
                    // 4.执行语句对象得到结果集
                    String sql = "Select * from students";
                    rs = st.executeQuery(sql);
                    // 5.操作结果集
                    while (rs.next()) {
                        // id
                        Integer id = rs.getInt("id");
                        // 姓名
                        String sname = rs.getString("sname");
                        // 性别
                        String sex = rs.getString("sex");
                        // 年龄
                        Integer age = rs.getInt("age");
                        // 手机号
                        String phone = rs.getString("phone");
                        // 住址
                        String address = rs.getString("address");
                        // 通过Source的上下文返回信息
                        sourceContext.collect(new Student(id, sname, sex, age, phone, address));
                    }

                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });
        // 3.transformations
        // 4.sink
        ds.print();
        // 5.execute
        env.execute();
    }
}

注意:

image-20230408142524851

执行,查看结果

image-20230408142556889

添加一条记录,再次查看结果

在这里插入图片描述

三、Transformations

1. 官网API列表

官网api网址:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/

image-20230408144006966

2. 整体分类

image-20230408144732549

整体来说,流式数据上的操作可以分为四类。

1)对单条纪录的操作

比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)。

2)对多条记录的操作

比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理。

3)对多个流进行操作并转换为单个流(合并union)

例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

4)把一个流拆分成多个流(拆分)

DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

3.基本操作

1)map

map:将函数作用在集合中的每一个元素上,并返回作用后的结果

image-20230408145338043

2)flatMap

flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

image-20230408145351723

3)keyBy

按照指定的key来对流中的数据进行分组,要注意的是,流处理中没有groupBy,而是keyBy

image-20230408145402868

4)filter

filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

image-20230408145313942

5)sum

sum:按照指定的字段对集合中的元素进行求和

6)reduce

reduce:对集合中的元素进行聚合

image-20230408145446533

4.合并

1)union和connect

union:

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

image-20230408145700739

connect:

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

connect只能连接两个数据流,union可以连接多个数据流。

connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

image-20230408145733702

2)需求

  • 将两个String类型的流进行union
  • 将一个String类型和一个Long类型的流进行connect

3)代码实现

package cn.edu.hgu.bigdata20.flink.union;


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * 演示Flink的Union和connect的使用
 *
 * @author 王
 * @date 2023/04/08
 */
public class FlinkUnionAndConnectDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.Source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink", "hive");
        String[] text = {"java", "flume", "azkaban", "sqoop"};
        DataStream<String> ds2 = env.fromCollection(Arrays.asList(text));
        DataStream<Long> ds3 = env.fromSequence(5, 10);

        //3.Transformation
        // union
        DataStream<String> unionResult = ds1.union(ds2);
        // connection
        ConnectedStreams<String, Long> connectResult = ds1.connect(ds3);
        //interface CoMapFunction<IN1, IN2, OUT>
        DataStream<String> result = connectResult.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s;
            }

            @Override
            public String map2(Long aLong) throws Exception {
                return aLong.toString();
            }
        });

        //4.Sink
        // unionResult.print();
        result.print();

        //5.execute
        env.execute();
    }
}

4)执行,查看结果

image-20230408155043480

5. 拆分

1) select和Side Outputs

Select就是获取分流后对应的数据

Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

2) 需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

3)代码实现

package cn.edu.hgu.bigdata20.flink.select;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 演示Flink的Select和Side的使用
 *
 * @author 王
 * @date 2023/04/08
 */
public class FlinkSelectAndSideDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.Source
        DataStreamSource<Long> ds = env.fromSequence(1, 10);

        //3.Transformation

        // 定义偶数标签
        OutputTag<Long> tag_even = new OutputTag<>("偶数", TypeInformation.of(Long.class));
        // 定义奇数标签
        OutputTag<Long> tag_odd = new OutputTag<>("奇数", TypeInformation.of(Long.class));
        //对流中的数据进行处理
        SingleOutputStreamOperator<Long> tagResult = ds.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long aLong, ProcessFunction<Long, Long>.Context context, Collector<Long> collector) throws Exception {
                // 判断奇偶性
                if (aLong % 2 == 0) {
                    context.output(tag_even, aLong);
                } else {
                    context.output(tag_odd, aLong);
                }
            }
        });

        // 偶数流
        DataStream<Long> evenResult = tagResult.getSideOutput(tag_even);
        // 偶数流
        DataStream<Long> oddResult = tagResult.getSideOutput(tag_odd);
        //4.Sink
        evenResult.print("偶数");
        oddResult.print("奇数");

        //5.execute
        env.execute();
    }
}

4)执行,查看结果

image-20230408155128069

6.分区

1)rebalance重平衡分区

类似与spark中的repartition,但是功能更强大,可以直接解决数据倾斜问题。

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

image-20230420143557883

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

在这里插入图片描述

2)代码实现

package cn.edu.hgu.bigdata20.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 王
 * @description flink的Rebalance重平衡示例
 * @date 2023/04/20
 */
public class TransformationRebalanceDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.source
        DataStream<Long> ds = env.fromSequence(0, 100);

        // 3.Transformation
        // 3.1 过滤操作,有可能出现数据倾斜
        DataStream<Long> filterDS = ds.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long num) throws Exception {
                return num > 10;
            }
        });

        // 3.2 map操作,将数据转为(分区编号/子任务编号, 数据) 二元组
        // Rich表示多功能的,比MapFunction要多一些API可以供我们使用
        DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                //获取分区编号/子任务编号
                int id = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(id, 1);
            }
        }).keyBy(t -> t.f0).sum(1);
        // 3.3 rebalance操作,对数据进行重平衡,然后再进行转换
        DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Long value) throws Exception {
                //获取分区编号/子任务编号
                int id = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(id, 1);
            }
        }).keyBy(t -> t.f0).sum(1);

        //4.sink
        //result1.print();//有可能出现数据倾斜
        result2.print();//在输出前进行了rebalance重分区平衡,解决了数据倾斜

        //5.execute
        env.execute();
    }
}

3)输出结果

image-20230420145408979

4)其他分区

image-20230420145502737

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

四、Data Sink

1.预定义Sink

1)基于控制台和文件的Sink

  • print:直接输出到控制台
  • printToErr:直接输出到控制台,用红色
  • writeAsText(本地/HDFS):输出到本地或者hdfs上,说明:如果并行度为1输出为文件,如果并行度>1,输出为文件夹,如果不加并行度,则使用默认的并行度(cpu核数),输出也为文件夹
// 4.输出结果:sink
        // aggResult.print(); // 直接输出到控制台
        // aggResult.printToErr(); // 直接输出到控制台,用红色
        // 把结果储存到hdfs上
        // System.setProperty("HADOOP_USER_NAME", "root");
        // aggResult.writeAsText("hdfs://hadoop001:9000/output/wordcount")
        // 默认的并行度12(根据自己的核数),输出文件夹
        // aggResult.writeAsText("E:\\wordcount\\output\\count1", FileSystem.WriteMode.OVERWRITE);
        // 并行度>1,输出文件夹
        // aggResult.writeAsText("E:\\wordcount\\output\\count2", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        // 并行度为1,输出为文件
        aggResult.writeAsText("E:\\wordcount\\output\\count3", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

print:

image-20230420151424588

printToErr

image-20230420151601406

默认的并行度12(根据自己的核数),输出文件夹:

image-20230420152149272

image-20230420152118359

并行度>1,输出文件夹:

image-20230420152425287

image-20230420152459543

并行度为1,输出为文件

image-20230420153056998

image-20230420153127345

2.自定义Sink

1)MySQL

可以把经过Flink处理的数据保存到MySQL中

2)主程序

package cn.edu.hgu.bigdata20.flink;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * description:使用Flink的流处理进行单词计数
 * author:王
 * date:2023/04/20
 */
public class SinkMySQLDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.Source
        // 2.1 添加自定义的source,连接mysql,设置并行度为2
        DataStream<Student> ds = env.fromElements(new Student(null, "kate", "女", 19, "14567217623", "邯郸"));
        // 3.Transformation
        // 4.Sink
        ds.addSink(new MySQLSink());
        //5.execute
        env.execute();
    }
}

3)实体类

package cn.edu.hgu.bigdata20.flink.mysql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * description:学生类实体
 * author 王
 * date 2023/04/08
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id; //表id
    String sname; //学生姓名
    String sex; //学生性别
    Integer age; //年龄
    String phone; //手机号
    String address; //手机号
}

4)自定义数据下沉工具类

package cn.edu.hgu.bigdata20.flink;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * description:自定义数据下沉工具类
 * author 王
 * date 2023/04/20
 */
public class MySQLSink extends RichSinkFunction<Student> {
    private Connection conn = null;
    private PreparedStatement st = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 从mysql中读取数据,创建Student对象,添加到上下文,作为source
        // 1.加载mysql驱动,开启连接
        // Class.forName("com.mysql.jdbc.Driver"); // mysql5
        Class.forName("com.mysql.cj.jdbc.Driver"); // mysql8
        // 2.获取连接Connection
        String url = "jdbc:mysql://localhost:3306/studb";
        String user = "root";
        String password = "123456";
        conn = DriverManager.getConnection(url, user, password);
        // 3.获取执行语句对象PreparedStatement(参数化的执行对象)
        String sql = "insert into `students` values (null, ?, ?, ?, ?, ?)";
        st = conn.prepareStatement(sql);
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        //给st中的?设置具体值
        st.setString(1, value.getSname());
        st.setString(2, value.getSex());
        st.setInt(3, value.getAge());
        st.setString(4, value.getPhone());
        st.setString(5, value.getAddress());
        //执行sql
        st.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        if (conn != null) conn.close();
        if (st != null) st.close();
    }
}

5)输出结果

原始数据:

image-20230420161802381

执行后数据:

image-20230420161838039

五、Connector

1. JDBC

官网API:https://ci.apache.org/projects/flink/flink-docs-release-1.17/dev/connectors/jdbc.html

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.exactlyOnceSink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                JdbcExecutionOptions.builder()
                    .withMaxRetries(0)
                    .build(),
                JdbcExactlyOnceOptions.defaults(),
                () -> {
                    // create a driver-specific XA DataSource
                    // The following example is for derby 
                    EmbeddedXADataSource ds = new EmbeddedXADataSource();
                    ds.setDatabaseName("my_db");
                    return ds;
                });
env.execute();

参考文章:

Lombok

JavaBean

【MySql】Navicat 连接数据库出现1251 - Client does not support authentication protocol … 问题的解决方法

Flink 实时去重方案

            "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
            (ps, t) -> {
                ps.setInt(1, t.id);
                ps.setString(2, t.title);
                ps.setString(3, t.author);
                ps.setDouble(4, t.price);
                ps.setInt(5, t.qty);
            },
            JdbcExecutionOptions.builder()
                .withMaxRetries(0)
                .build(),
            JdbcExactlyOnceOptions.defaults(),
            () -> {
                // create a driver-specific XA DataSource
                // The following example is for derby 
                EmbeddedXADataSource ds = new EmbeddedXADataSource();
                ds.setDatabaseName("my_db");
                return ds;
            });

env.execute();








参考文章:

[Lombok](https://baike.baidu.com/item/Lombok/23780246?fr=aladdin)

[JavaBean](http://t.csdn.cn/ix8FK)

[【MySql】Navicat 连接数据库出现1251 - Client does not support authentication protocol ...... 问题的解决方法](http://t.csdn.cn/Y2Gzz)

[Flink 实时去重方案](http://t.csdn.cn/Av7KH)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/441147.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023.4.19 + 4.20

文章目录 String类1&#xff1a;介绍&#xff1a;2&#xff1a;String类实现了很多的接口&#xff1a;3&#xff1b;String类常用构造器4&#xff1a;不同方式创建String类对象的区别&#xff08;1&#xff09;直接赋值的方式&#xff08;2&#xff09;常规new的方式&#xff0…

【筛质数】——朴素筛,埃式筛,欧拉筛

题目描述&#xff1a; 题目分析&#xff1a; 这道题可以用&#xff0c;朴素筛&#xff0c;埃氏筛&#xff0c;欧拉筛来写。 普通筛&#xff1a; 时间复杂度&#xff1a;O(n logn) 时间复杂度太高&#xff0c;会超时的&#xff01;&#xff01;&#xff08;9/10&#xff09; #…

Keil5----显示空白符和设置使用空白格表示Tab键

一、Keil5界面----显示空白符 首先打开Keil5-MDK界面&#xff0c;然后按照下面步骤操作。 步骤1&#xff1a;点击 Edit(编辑)&#xff0c;然后点击 Configuration(配置) 步骤2&#xff1a;勾选 View White Spaces(查看空白) 步骤3&#xff1a;显示设置后的结果 具体显示结果分…

Git添加SSH密钥本地仓库上传远程GitHub库

1、前言 现在想要从本地设备将本地仓库上传到GitHub上需要用到SSH密钥&#xff0c;接下来讲解大致的步骤&#xff0c;本文默认读者已经掌握基本的Git知识 2、详细步骤 2.1 创建密钥 在本地项目仓库根目录下&#xff0c;输入下面的命令&#xff1a; ssh-keygen -t rsa命令输…

深度学习 Day 31——YOLOv5-Backbone模块实现

深度学习 Day 31——YOLOv5-Backbone模块实现 文章目录 深度学习 Day 31——YOLOv5-Backbone模块实现一、前言二、我的环境三、什么是YOLOv5-Backbone模块&#xff1f;四、搭建包含Backbone模块的模型1、模型整体代码2、模型每一部分详解3、模型详情 五、模型训练六、最终结果1…

计算机|网页设计 |七大罪动漫主题|作品分享

文章目录 一、主题介绍二、截图展示三、源代码获取 一、主题介绍 计算机&#xff5c;网页设计 &#xff5c;七大罪动漫主题&#xff5c;作品分享 一个关于七大罪动漫主题的网页设计。共4页 图片文字都可修改&#xff01; 二、截图展示 三、源代码获取 本次的分享就到这里啦&…

双指针【算法推导、背模板】——最长连续不重复子序列

799. 最长连续不重复子序列 - AcWing题库 通常情况双指针就是需要将O(N^2^)&#xff0c;利用某些单调性质实现O(N) 通用代码模板 for(int i 0 , j 0; i < n ; i ){while(j < i && check(i , j ) ) j ;// 需要处理的逻辑 }check判断是否构成 算法推导 题目中…

LLM总结(持续更新中)

引言 当前LLM模型火出天际&#xff0c;但是做事还是需要脚踏实地。此文只是日常学习LLM&#xff0c;顺手整理所得。本篇博文更多侧重对话、问答类LLM上&#xff0c;其他方向&#xff08;代码生成&#xff09;这里暂不涉及&#xff0c;可以去看综述来了解。 之前LLM模型梳理 …

微服务---RabbitMQ与SpringAMQP基本使用

RabbitMQ 1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&am…

OpenCV实例(四)手写数字识别

OpenCV实例&#xff08;四&#xff09;手写数字识别 1.基本原理2.实现步骤2.1数据准备2.2计算匹配值2.3获取最佳匹配值及对应模板2.4获取最佳匹配模板对应的数字2.5输出识别结果 3.代码实例 作者&#xff1a;Xiou 1.基本原理 使用模板匹配的方式实现手写数字识别&#xff0c;…

2023/4/20总结

项目 网上关于listview的资料太少了&#xff0c;在网上的那些资料里面&#xff0c;了解到以下这些。 如果希望listview后期能更改或者更新&#xff0c;那么需要使用到 ObservableList 它可以观察到&#xff0c;listview的改动。 需要特别注意一点的是&#xff1a;写俩者的…

如何发布自己的 npm 包?

一. 准备工作 1. 注册 npm 账号 还没有 npm 账号&#xff1f;去官网注册&#xff1a; https://www.npmjs.com/ 需要记住用户名、密码、邮箱 2. 查看本地 npm 镜像&#xff0c;如果不是默认的&#xff0c;需要改回来 npm config get registry重置镜像路径 npm config set r…

vulstack ATTCK(三)靶场

0x00环境搭建 两种形式 1.添加vmare2网卡&#xff0c;修改vmare2网卡的地址为192.168.93.0网段&#xff0c;注意不要在连接到主机适配器上打勾&#xff0c;这样会使本机也可以访问此电脑&#xff0c;5台机器都换成vmare2即可&#xff0c;第一台出网的centos在添加另一张nat网卡…

Docker容器---数据卷 数据容器

Docker容器---数据卷 数据容器 一、数据卷概述1、数据卷2、数据卷原理3、数据卷作用 二、数据卷容器1、数据卷容器作用2、创建数据卷容器 三、容器互联1、创建并运行源容器取名web12、创建并运行接收容器取名web2 一、数据卷概述 管理 Docker 容器中数据主要有两种方式&#x…

社科院与杜兰大学中外合作办学金融管理硕士项目——比起过往,前路更值得期待

当结束一天工作陷入沉思时&#xff0c;你有没有特别遗憾的事情呢&#xff0c;人生有太多的不确定性&#xff0c;比起过往&#xff0c;未知的人生更值得我们期待。与其懊恼没完成的遗憾&#xff0c;不如珍惜当下&#xff0c;努力创造未来。人生没有太晚的开始&#xff0c;在职读…

frp内网穿透——以连接到校园内网的服务器为例

有时候想摸鱼不去实验室&#xff0c;在宿舍就直接连接到实验室的GPU服务器。奈何服务器在校园网内部&#xff0c;外网无法直接直接访问。此时需要手动搭一个跳板机&#xff0c;来连接到内网的GPU服务器&#xff0c;这一过程怎么做到呢&#xff1f;我们可以使用frp内网穿透工具&…

Seata:连接数据与应用

作者&#xff1a;季敏&#xff08;清铭&#xff09;Seata 开源社区创始人&#xff0c;分布式事务团队负责人。 本文主要介绍分布式事务从内部到商业化和开源的演进历程&#xff0c;Seata 社区当前进展和未来规划。 Seata 是一款开源的分布式事务解决方案&#xff0c;旨在为现…

Java基础(十八):java比较器、系统相关类、数学相关类

Java基础系列文章 Java基础(一)&#xff1a;语言概述 Java基础(二)&#xff1a;原码、反码、补码及进制之间的运算 Java基础(三)&#xff1a;数据类型与进制 Java基础(四)&#xff1a;逻辑运算符和位运算符 Java基础(五)&#xff1a;流程控制语句 Java基础(六)&#xff1…

JavaScript黑科技:隐秘执行

JavaScript黑科技&#xff1a;隐秘执行 如果能使网页中的JavaScript代码隐密的加载、隐密的执行&#xff0c;那对于保护JavaScript代码来说是很有利的。 本文将探索、演示一种隐秘执行JavaScript代码的技术。 源码如下&#xff1a; <html> <script>window.onlo…

Prometheus+node_exporter+Grafana+夜莺 监控部署

一、安装Prometheus 1.1 部署并配置Prometheus #主机基础配置 [rootnode4~]# systemctl stop firewalld && systemctl disable firewalld [rootnode4~]# sed -i s/enforcing/disabled/g /etc/selinux/config && setenforce 0#上传prometheus安装包并解压 [r…