大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

news2024/12/23 16:40:35

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

  • 一、版本兼容性
  • 二、使用
  • 三、Flink SQL
  • 四、DataStream
  • 五、Lookup Join
  • 六、配置
    • 通用配置项
    • 接收器配置项
    • 查找Join配置项
  • 七、Doris 和 Flink 列类型映射
  • 八、使用Flink CDC访问Doris的示例
  • 九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
  • 十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
  • 十一、使用FlinkCDC更新Key列
  • 十二、使用Flink根据指定的列删除数据
  • 十三、最佳实践应用场景
  • 十四、常见问题解答

可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。

注意:

  • 修改和删除仅支持唯一键模型。
  • 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。

一、版本兼容性

在这里插入图片描述

二、使用

Maven

添加 flink-doris-connector

<!-- flink-doris-connector -->
<dependency>
   <groupId>org.apache.doris</groupId>
   <artifactId>flink-doris-connector-1.16</artifactId>
   <version>1.6.0</version>
</dependency>
  • 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
  • 也可以从这里下载相关版本的jar包。

flink-doris-connector下载地址:

  • https://repo.maven.apache.org/maven2/org/apache/doris/

编译

  • 编译时直接运行sh build.sh即可。
  • 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。

三、Flink SQL

read

-- doris source
CREATE TABLE flink_doris_source (
     name STRING,
     age INT,
     price DECIMAL(5,2),
     sale DOUBLE
     )
     WITH (
       'connector' = 'doris',
       'fenodes' = 'FE_IP:HTTP_PORT',
       'table.identifier' = 'database.table',
       'username' = 'root',
       'password' = 'password'
);

write

--enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

-- doris sink
CREATE TABLE flink_doris_sink (
     name STRING,
     age INT,
     price DECIMAL(5,2),
     sale DOUBLE
     )
     WITH (
       'connector' = 'doris',
       'fenodes' = 'FE_IP:HTTP_PORT',
       'table.identifier' = 'db.table',
       'username' = 'root',
       'password' = 'password',
       'sink.label-prefix' = 'doris_label'
);

-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

四、DataStream

read

DorisOptions.Builder builder = DorisOptions.builder()
         .setFenodes("FE_IP:HTTP_PORT")
         .setTableIdentifier("db.table")
         .setUsername("root")
         .setPassword("password");

DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
         .setDorisOptions(builder.build())
         .setDorisReadOptions(DorisReadOptions.builder().build())
         .setDeserializer(new SimpleListDeserializationSchema())
         .build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

write

DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式

字符串数据流(SimpleStringSerializer)

// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
         .setTableIdentifier("db.table")
         .setUsername("root")
         .setPassword("password");

Properties properties = new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty("format", "json");
//properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
                 .setDeletable(false)
                 .setStreamLoadProp(properties); ;

builder.setDorisReadOptions(DorisReadOptions.builder().build())
         .setDorisExecutionOptions(executionBuilder.build())
         .setSerializer(new SimpleStringSerializer()) //serialize according to string
         .setDorisOptions(dorisBuilder.build());

//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);

source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
       .sinkTo(builder.build());

//mock json string source
//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());

RowData数据流(RowDataSerializer)

// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
         .setTableIdentifier("db.table")
         .setUsername("root")
         .setPassword("password");

// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
                 .setDeletable(false)
                 .setStreamLoadProp(properties); //streamload params

//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};

builder.setDorisReadOptions(DorisReadOptions.builder().build())
         .setDorisExecutionOptions(executionBuilder.build())
         .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
                            .setFieldNames(fields)
                            .setType("json") //json format
                            .setFieldType(types).build())
         .setDorisOptions(dorisBuilder.build());

//mock rowdata source
DataStream<RowData> source = env. fromElements("")
     .map(new MapFunction<String, RowData>() {
         @Override
         public RowData map(String value) throws Exception {
             GenericRowData genericRowData = new GenericRowData(4);
             genericRowData.setField(0, StringData.fromString("beijing"));
             genericRowData.setField(1, 116.405419);
             genericRowData.setField(2, 39.916927);
             genericRowData.setField(3, LocalDate.now().toEpochDay());
             return genericRowData;
         }
     });

source. sinkTo(builder. build());

SchemaChange数据流(JsonDebeziumSchemaSerializer)

// enable checkpoint
env.enableCheckpointing(10000);

Properties props = new Properties();
props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions. builder()
         .setFenodes("127.0.0.1:8030")
         .setTableIdentifier("test.t1")
         .setUsername("root")
         .setPassword("").build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix")
         .setStreamLoadProp(props).setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
         .setDorisExecutionOptions(executionBuilder.build())
         .setDorisOptions(dorisOptions)
         .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
         .sinkTo(builder.build());

五、Lookup Join

CREATE TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
  'table.identifier' = 'dim.dim_city',
  'username' = 'root',
  'password' = ''
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
  • 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
  • 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
  • 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。

六、配置

通用配置项

fenodes:

  • Doris FE http地址,支持多个地址,用逗号分隔

benodes:

  • Doris BE http地址,支持多个地址,以逗号分隔。

jdbc-url:

  • jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030

table.identifier:

  • Doris表名,如:db.tbl

auto-redirect:

  • 默认值:true
  • 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。

doris.request.retries:

  • 默认值:3
  • 向 Doris 发送请求的重试次数

doris.request.connect.timeout.ms:

  • 默认值:30000
  • 向 Doris 发送请求的连接超时

doris.request.read.timeout.ms:

  • 默认值:30000
  • 读取向 Doris 发送请求的超时

源配置项

doris.request.query.timeout.s:

  • 默认值:3600
  • 查询Doris的超时时间,默认1小时,-1表示无超时限制

doris.request.tablet.size:

  • 默认值:Integer. MAX_VALUE
  • 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。

doris.batch.size:

  • 默认值:1024
  • 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。

doris.exec.mem.limit:

  • 默认值:2147483648
  • 单个查询的内存限制。默认为2GB,以字节为单位

doris.deserialize.arrow.async:

  • 默认值:FALSE
  • 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch

doris.deserialize.queue.size:

  • 默认值:64
  • Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效

doris.read.field:

  • 读取Doris表的列名列表,以逗号分隔

doris.filter.query:

  • 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。

接收器配置项

sink.label-prefix:

  • Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。

sink.properties.*:

  • 导入流负载参数。
    例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01
  • JSON格式导入
    ‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’
    详细参数请参考这里。

sink.enable-delete:

  • 默认值:TRUE
  • 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。

sink.enable-2pc:

  • 默认值:TRUE
  • 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。

sink.buffer-size:

  • 默认值:1MB
  • 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可

sink.buffer-count:

  • 默认值:3
  • 写入数据缓冲区的数量。不建议修改,默认配置即可

sink.max-retries:

  • 默认值:3
  • Commit失败后最大重试次数,默认3

sink.use-cache:

  • 默认值:false
  • 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。

sink.enable.batch-mode:

  • 默认值:false
  • 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。
    同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。

sink.flush.queue-size:

  • 默认值:2
  • 在批处理模式下,缓存的列大小。

sink.buffer-flush.max-rows:

  • 默认值:50000
  • 批处理模式下,单批写入的最大数据行数。

sink.buffer-flush.max-bytes:

  • 默认值:10MB
  • 在批处理模式下,单批写入的最大字节数。

sink.buffer-flush.interval:

  • 默认值:10s
  • 批处理模式下,异步刷新缓存的时间间隔

sink.ignore.update-before:

  • 默认值:true
  • 是否忽略update-before事件,默认忽略。

查找Join配置项

lookup.cache.max-rows

  • 默认值:-1
  • 查找缓存的最大行数,默认值为-1,不启用缓存

lookup.cache.ttl:

  • 默认值:10s
  • 查找缓存的最大时间,默认10s

lookup.max-retries:

  • 默认值:1
  • 查找查询失败后重试的次数

lookup.jdbc.async:

  • 默认值:false
  • 是否启用异步查找,默认为false

lookup.jdbc.read.batch.size:

  • 默认值:128
  • 异步查找下,每个查询的最大批量大小

lookup.jdbc.read.batch.queue-size:

  • 默认值:256
  • 异步查找时中间缓冲队列的大小

lookup.jdbc.read.thread-size:

  • 默认值:3
  • 每个任务中用于查找的jdbc线程数

七、Doris 和 Flink 列类型映射

Doris类型Flink类型
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
DECIMALV2DECIMAL
ARRAYARRAY
MAPMAP
JSONSTRING
VARIANTSTRING
IPV4STRING
IPV6STRING

从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。

八、使用Flink CDC访问Doris的示例

SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
  id int
  ,name VARCHAR
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'database.table',
  'username' = 'root',
  'password' = '',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.enable-delete' = 'true', -- Synchronize delete events
  'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;

九、使用FlinkSQL通过CDC访问并实现部分列更新的示例

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id int
  ,name STRING
  ,bank STRING
  ,age int
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE doris_sink (
    id INT,
    name STRING,
    bank STRING,
    age int
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'database.table',
  'username' = 'root',
  'password' = '',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' --Enable partial column updates
);


insert into doris_sink select id,name,bank,age from cdc_mysql_source;

十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)

MySQL同步示例

<FLINK_HOME>bin/flink run \
     -Dexecution.checkpointing.interval=10s\
     -Dparallelism.default=1\
     -c org.apache.doris.flink.tools.cdc.CdcTools\
     lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     mysql-sync-database\
     --database test_db \
     --mysql-conf hostname=127.0.0.1 \
     --mysql-conf port=3306 \
     --mysql-conf username=root \
     --mysql-conf password=123456 \
     --mysql-conf database-name=mysql_db \
     --including-tables "tbl1|test.*" \
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=123456 \
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

Oracle同步示例

<FLINK_HOME>bin/flink run \
      -Dexecution.checkpointing.interval=10s \
      -Dparallelism.default=1 \
      -c org.apache.doris.flink.tools.cdc.CdcTools \
      ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
      oracle-sync-database \
      --database test_db \
      --oracle-conf hostname=127.0.0.1 \
      --oracle-conf port=1521 \
      --oracle-conf username=admin \
      --oracle-conf password="password" \
      --oracle-conf database-name=XE \
      --oracle-conf schema-name=ADMIN \
      --including-tables "tbl1|tbl2" \
      --sink-conf fenodes=127.0.0.1:8030 \
      --sink-conf username=root \
      --sink-conf password=\
      --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
      --sink-conf sink.label-prefix=label \
      --table-conf replication_num=1

PostgreSQL 同步示例

<FLINK_HOME>/bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1\
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     postgres-sync-database \
     --database db1\
     --postgres-conf hostname=127.0.0.1 \
     --postgres-conf port=5432 \
     --postgres-conf username=postgres \
     --postgres-conf password="123456" \
     --postgres-conf database-name=postgres \
     --postgres-conf schema-name=public \
     --postgres-conf slot.name=test \
     --postgres-conf decoding.plugin.name=pgoutput \
     --including-tables "tbl1|tbl2" \
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

SQLServer同步示例

<FLINK_HOME>/bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1 \
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     sqlserver-sync-database \
     --database db1\
     --sqlserver-conf hostname=127.0.0.1 \
     --sqlserver-conf port=1433 \
     --sqlserver-conf username=sa \
     --sqlserver-conf password="123456" \
     --sqlserver-conf database-name=CDC_DB \
     --sqlserver-conf schema-name=dbo \
     --including-tables "tbl1|tbl2" \
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

十一、使用FlinkCDC更新Key列

一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。

原理

Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。

示例

Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。

十二、使用Flink根据指定的列删除数据

通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。

默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。

-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE DORIS_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'db.table',
  'username' = 'root',
  'password' = '',
  'sink.enable-delete' = 'false',        -- false means not to get the event type from RowKind
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Display the import column of the specified streamload
);

INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
from KAFKA_SOURCE;
  • 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
  • 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
  • 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
  • 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
  • 最后,将处理后的数据插入到Doris目标表中。
  • 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。

十三、最佳实践应用场景

  • 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。

其他注意事项:

  • Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
  • 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。

十四、常见问题解答

Doris Source读取数据后,为什么流会结束?

  • 目前Doris Source是有界流,不支持CDC读取。

Flink能否读取Doris并执行条件下推?

  • 通过配置doris.filter.query参数可以实现。

如何写入位图类型?

CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
   'connector' = 'doris',
   'fenodes' = '127.0.0.1:8030',
   'table.identifier' = 'test.bitmap_test',
   'username' = 'root',
   'password' = '',
   'sink.label-prefix' = 'doris_label',
   'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)

errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

  • 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。

errCode = 2, detailMessage = transaction [19650] not found

  • 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。

errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

  • 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
  • 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。

当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?

  • 您可以添加序列列的配置来确保顺序。

Flink任务没有报错,但数据无法同步?

  • 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。

tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

  • 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。

源表和Doris表应如何对应?

  • 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。

TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

  • 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。

DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc

  • 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。

org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx

  • 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 ‍>= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 ‍>=’‘2024-01-01’'。

Failed to connect to backend: http://host:webserver_port, and BE is still alive

  • 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。

当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。

  • Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2036556.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity协程WaitForSeconds在编辑器和WebGL表现不同问题的解决方法参考

最近做的一个效果让下面为了让下面这种图片生成一个翻页效果&#xff08;使用ShaderGraph中的FlipBook节点&#xff09;&#xff0c;我通过携程来实现连续翻页。 先是定义一个Coroutine coroutine null&#xff1b; 然后在一定情况下执行coroutine StartCoroutine(KeepPrevie…

Spring入门讲解

这里写目录标题 Spring基础概念关键重点主要特性主要优势Spring与Java EE的对比Spring生态系统概述总结 Spring 基础概念 Spring是一个开源的轻量级Java开发框架&#xff0c;它提供了全面的基础设施支持&#xff0c;简化了企业级应用的开发和部署。Spring的核心理念是依赖注入…

基于华为atlas下的yolov5+BoT-SORT/ByteTrack煤矿箕斗状态识别大探索

写在前面&#xff1a; 本项目的代码原型基于yolov5yolov8。其中检测模型使用的yolov5&#xff0c;跟踪模型使用的yolov8。 这里说明以下&#xff0c;为什么不整体都选择yolov8呢&#xff0c;v8无疑是比v5优秀的&#xff0c;但是atlas这块经过不断尝试没有过去&#xff0c;所以…

前端进行分页Vue3+Setup写法

当后端不方便提供数据分页查询接口时&#xff0c;就需要前端来自己分割进行分页操作 在有可能的情况下还是建议用分页查询接口&#xff0c;减少网络数据传输 首先el-table绑定数组 分页组件&#xff0c;变量自己定义防止报错 <el-paginationlayout"->, total, siz…

Springboot实现doc,docx,xls,xlsx,ppt,pptx,pdf,txt,zip,rar,图片,视频,音频在线预览功能,你学“废”了吗?

最近工作中&#xff0c;客户需要生成包含动态内容的word/pdf报告&#xff0c;并且需要在线预览。 刚开始使用后台直接生成word文档&#xff0c;返回文件流给前端&#xff0c;浏览器预览会发生格式错乱问题&#xff0c;特别是文档中的图片有些还不显示。 想到最简单的办法就是…

在原生未启用kdump的BCLinux 8系列服务器上启用kdump及报错处理

本文记录了在原生未启用kdump的BCLinux 8系列操作系统的服务器上手动启用kdump服务及报错处理的过程。 一、问题描述 BCLinux 8系列操作系统&#xff0c;系统初始化安装时未启用kdump服务&#xff0c;手动启动时报以下“No memory reserved for crash kernel”或“ConditionK…

数学建模——评价决策类算法(层次分析法、Topsis)

一、层次分析法 概念原理 通过相互比较确定各准则对于目标的权重, 及各方案对于每一准则的权重&#xff0c;这些权重在人的思维过程中通常是定性的, 而在层次分析法中则要给出得到权重的定量方法. 将方案层对准则层的权重及准则层对目标层的权重进行综合, 最终确定方案层对目标…

解读RPA自动化流程机器人

RPA全称Robotic Process Automation&#xff0c;即机器人流程自动化&#xff0c;基于人工智能和自动化技术&#xff0c;能够将大量重复、规则明确的日常事务操作实现自动化处理&#xff0c;通常被形象地称为“数字员工”。本文金智维将深入探讨RPA的主要价值和应用领域&#xf…

除悟空CRM外,主流的6大CRM私有部署的厂商

支持私有化部署的CRM有&#xff1a;1.纷享销客&#xff1b; 2.悟空CRM&#xff1b; 3.销售易&#xff1b; 4.有赞CRM&#xff1b; 5.知客CRM&#xff1b; 6.八骏CRM&#xff1b; 7.白码CRM。 面对日益复杂的网络环境和严峻的数据保护法规&#xff0c;私有化部署的CRM系统成为了…

论文阅读笔记:Semi-DETR: Semi-Supervised Object Detection with Detection Transformers

论文阅读笔记&#xff1a;Semi-DETR: Semi-Supervised Object Detection with Detection Transformers 1 背景1.1 动机1.2 问题 2 创新点3 方法4 模块4.1 分阶段混合匹配4.2 跨视图查询一致性4.3 基于代价的伪标签挖掘4.4 总损失 效果5.1 和SOTA方法对比5.2 消融实验 论文&…

Flink开发过程中遇到的问题

1. 任务启动报错Trying to access closed classloader. Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the st…

基于PSO-BP+BP多特征分类预测对比(多输入单输出) Matlab代码

基于PSO-BPBP多特征分类预测对比(多输入单输出) Matlab代码 1、和市面上的不同&#xff0c;运行一个main一键出对比图&#xff0c;非常方便 2、可以根据需要定制其他算法优化模型对比 程序已经调试好&#xff0c;无需更改代码替换数据集即可运行&#xff01;&#xff01;&…

Python | Leetcode Python题解之第334题递增的三元子序列

题目&#xff1a; 题解&#xff1a; class Solution:def increasingTriplet(self, nums: List[int]) -> bool:n len(nums)if n < 3:return Falsefirst, second nums[0], float(inf)for i in range(1, n):num nums[i]if num > second:return Trueif num > first…

C++字体库开发之EM长度单位转换九

freetype 设置EM // if (m_face) // FT_Set_Pixel_Sizes(*m_face, 0, pixelSize); // 动态宽&#xff0c;固定高 px // error FT_Set_Char_Size(face, /* face 对象的句柄 */ // 0, /* 以 …

Unity Audio

这章练习将介绍在unity中创建 audio&#xff08;音频&#xff09;的工具&#xff0c;培养的技能将帮助创建引人入胜的音频音景。完成本次学习后&#xff0c;能够使用 Unity 中的所有主要音频组件&#xff0c;为各种不同体验创建音频效果。 音频处理工具&#xff1a; Audacity…

Mintegral出海系列:解锁全球应用商店新增长路径

在全球化竞争的浪潮中&#xff0c;面对打法各异的应用和游戏品类&#xff0c;以及全球数百个环境不同的国家和地区&#xff0c;开发者们正面临着前所未有的挑战。Mintegral「出海ing」系列专题内容&#xff0c;助力出海开发者选准赛道探索新的增长路径。 据近期数据显示&#x…

LLM微调(精讲)-以高考选择题生成模型为例(DataWhale AI夏令营)

前言 你好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者&#xff0c;上一篇文章中&#xff0c;作者介绍了基于讯飞开放平台进行大模型微调的完整流程&#xff1b;而在本文中&#xff0c;作者将对大模型微调的数据准备部分进行深入&#xff1b;…

凤凰端子音频矩阵应用领域

凤凰端子音频矩阵&#xff0c;作为一种集成了凤凰端子接口的音频矩阵设备&#xff0c;具有广泛的应用领域。以下是其主要应用领域&#xff1a; 一、专业音响系统 会议系统&#xff1a;在会议室中&#xff0c;凤凰端子音频矩阵能够处理多个话筒和音频源的信号&#xff0c;实现…

Luminar Neo for Mac/Win:创新AI图像编辑软件的强大功能

Luminar Neo&#xff0c;这款由Skylum公司倾力打造的图像编辑软件&#xff0c;为Mac和Windows用户带来了前所未有的创作体验与编辑便利。作为一款融合了先进AI技术的图像处理工具&#xff0c;Luminar Neo以其独特的功能和高效的操作流程&#xff0c;成为了摄影师、设计师及摄影…

使用Sanic和SSE实现实时股票行情推送

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…