mongo实时导入到clickhouse案例(包含复杂嵌套json的解析)

news2024/10/7 11:23:14

(一)案例介绍

本案例是把Mongo数据库的数据通过FlinkCDC实时导入到Kafka,消费Kafka数据把维表数据写入到MySQL。读取MySQL维表数据和消费Kafka的数据通过Flink SQL Join后导入到ClickHouse。

在这里插入图片描述

(二) maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mini-program</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink-mongo-realdata</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
<!--        <flink.version>1.12.7</flink.version>-->
<!--        <flink.scala.version>2.11</flink.scala.version>-->
        <flink.version>1.13.3</flink.version>
        <flink.scala.version>2.12</flink.scala.version>
        <flink-cdc-version>2.1.0</flink-cdc-version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.1-patch</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.2.1</version>
           <!-- <exclusions>
               <exclusion>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka-clients</artifactId>
               </exclusion>
            </exclusions>-->
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
     <!--       <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>-->
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.16</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.version}</version>
            <type>pom</type>
<!--            <scope>provided</scope>-->
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.13.2-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

(三) 数据准备

test01表 json数据


{
	"audio": "12345",
	"detail": "hello world01",
	"location": {
        "alt": 1209.2559999999994,
        "lat": 39.38871949999998,
        "long": 106.9585184
    },
	"ppx": {
        "x": -4833.720000125517,
        "y": 944.3920672436675,
        "z": 1209.2559999999994
    },
	"hight": 90.203,
	"userid": 8,
	"time": "2022/11/14 09:59:52",
	"send_message": [
		{
			"message": "how are you!",
			"time": "2022/11/14 09:58:50"
		},
		{
			"message": "what?",
			"time": "2022/11/14 09:59:35"
		}
	]
}


{
	"audio": "67890",
	"detail": "hello world02",
	"location": {
        "alt": 1209.2559999999994,
        "lat": 39.38871949999998,
        "long": 106.9585184
    },
	"ppx": {
        "x": -4833.720000125517,
        "y": 944.3920672436675,
        "z": 1209.2559999999994
    },
	"hight": 80.203,
	"userid": 7,
	"time": "2022/11/13 09:59:52",
	"send_message": [
		{
			"message": "how are you!",
			"time": "2022/11/14 09:58:50"
		},
		{
			"message": "what?",
			"time": "2022/11/14 09:59:35"
		}
	]
}

user表 json数据


{
	"id": 7,
	"name": "abel",
	register_time: "2021/04/12 15:08:31",
	"age": 36
}


{
	"id": 8,
	"name": "piter",
	register_time: "2020/05/09 06:59:52",
	"age": 45
}

使用db.test01.insertOne(json数据) 插入到mongo数据库即可

(四) Mongo到Kafka

Flink Mongo CDC介绍

github地址

https://github.com/ambitfly/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md

​ MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。

依赖条件

MongoDB版本
MongoDB version >= 3.6

集群部署
副本集 或 分片集群

Storage Engine
WiredTiger存储引擎。

副本集协议版本
副本集协议版本1 (pv1) 。
从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。

需要的权限
MongoDB Kafka Connector需要changeStream 和 read 权限。
您可以使用下面的示例进行简单授权:
更多详细授权请参考MongoDB数据库用户角色。

use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" }, //read role includes changeStream privilege 
    { role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});

Flink Mongo CDC DataStream Demo

Flink Mongo CDC DataStream代码

package com.ambitfly.mongo.demo;

import com.ambitfly.mongo.MyMongoDebeziumSchema;
import com.ambitfly.utils.PropertiesUtil;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCDataStreamDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String host = "10.10.21.229:27017";
        String username = "flinkuser";
        String password = "flinkpw";
        String databaseList = "test";
        String collectionList = "test.test01,test.user";

        DebeziumSourceFunction<String> source = MongoDBSource.<String>builder()
                .hosts(host)
                .username(username)
                .password(password)
                .databaseList(databaseList.split(","))
                .collectionList(collectionList.split(","))
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> stringDataStreamSource = env.addSource(source);

        stringDataStreamSource.print();

        env.execute();
    }
}

运行结果

在这里插入图片描述

Flink Mongo CDC SQL方式 Demo

Flink SQL语句准备

Source Table SQL

CREATE TABLE mongoCDCSourceTable (
    _id STRING,
	audio STRING,
	detail STRING,
	location ROW<
		alt DOUBLE,
		lat DOUBLE,
		long DOUBLE
		>,
	ppx ROW<
		x DOUBLE,
		y DOUBLE,
		z DOUBLE
		>,
	hight DOUBLE,
	userid INT,
	`time` STRING,
	send_message ARRAY<
		ROW<
			message STRING,
			`time` STRING
			>
		>,
	not_exist ROW<
		not_exist1 STRING,
		not_exist2 STRING
	>,
     PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = '10.10.21.229:27017',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'test',
    'collection' = 'test01'
)

Sink Table SQL

CREATE TABLE printSinkTable(
    _id STRING,
	audio STRING,
	detail STRING,
	location_alt DOUBLE,
	location_lat DOUBLE,
	location_long DOUBLE,
	ppx_x DOUBLE,
	ppx_y DOUBLE,
	ppx_z DOUBLE,
	hight DOUBLE,
	userid INT,
	`time` STRING,
	send_message_messages ARRAY<STRING>,
	send_message_times ARRAY<STRING>,
	not_exist_not_exist1 STRING,
	not_exist_not_exist2 STRING
) WITH (
    'connector' = 'print'
)

Insert SQL

INSERT INTO printSinkTable
SELECT
	_id
	,audio
	,detail
	,location.alt
	,location.lat
	,location.long
	,ppx.x
	,ppx.y
	,ppx.z
	,hight
	,userid
	,`time`
	,collect_list(send_message_message)
	,collect_list(send_message_time)
	,not_exist.not_exist1
	,not_exist.not_exist2
FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)
group by 
	_id
	,audio
	,detail
	,location.alt
	,location.lat
	,location.long
	,ppx.x
	,ppx.y
	,ppx.z
	,hight
	,userid
	,`time`
	,not_exist.not_exist1
	,not_exist.not_exist2

CollectList UDF函数

需要自己自定义一个聚合函数,collect_list

package com.ambitfly.mongo.demo.function;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.List;

public class CollectList extends AggregateFunction<String[], List<String>> {

    public void retract(List acc,String conlum){
        acc.remove(conlum);
    }

    public void accumulate(List acc,String conlum){
        acc.add(conlum);
    }

    @Override
    public String[] getValue(List list) {
        return (String[]) list.toArray(new String[0]);
    }

    @Override
    public List createAccumulator() {
        List list = new ArrayList<>();
        return list;
    }

    public void resetAccumulator(List list){
        list.clear();
    }
}

Flink Mongo CDC SQL方式代码

package com.ambitfly.mongo.demo;

import com.ambitfly.mongo.demo.function.CollectList;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

import java.util.concurrent.ExecutionException;

public class FlinkCDCSQLDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                //.inBatchMode()
                .build();


        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        tableEnvironment.executeSql(
                "CREATE TABLE mongoCDCSourceTable (\n" +
                "\t_id STRING,\n" +
                "\taudio STRING,\n" +
                "\tdetail STRING,\n" +
                "\tlocation ROW<\n" +
                "\t\talt DOUBLE,\n" +
                "\t\tlat DOUBLE,\n" +
                "\t\tlong DOUBLE\n" +
                "\t\t>,\n" +
                "\tppx ROW<\n" +
                "\t\tx DOUBLE,\n" +
                "\t\ty DOUBLE,\n" +
                "\t\tz DOUBLE\n" +
                "\t\t>,\n" +
                "\thight DOUBLE,\n" +
                "\tuserid INT,\n" +
                "\t`time` STRING,\n" +
                "\tsend_message ARRAY<\n" +
                "\t\tROW<\n" +
                "\t\t\tmessage STRING,\n" +
                "\t\t\t`time` STRING\n" +
                "\t\t\t>\n" +
                "\t\t>,\n" +
                "\tnot_exist ROW<\n" +
                "\t\tnot_exist1 STRING,\n" +
                "\t\tnot_exist2 STRING\n" +
                "\t>,\n" +
                "PRIMARY KEY(_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'mongodb-cdc',\n" +
                "    'hosts' = '10.10.21.229:27017',\n" +
                "    'username' = 'flinkuser',\n" +
                "    'password' = 'flinkpw',\n" +
                "    'database' = 'test',\n" +
                "    'collection' = 'test01'\n" +
                ")");

        tableEnvironment.executeSql(
                "CREATE TABLE printSinkTable(\n" +
                "\t_id STRING,\n" +
                "\taudio STRING,\n" +
                "\tdetail STRING,\n" +
                "\tlocation_alt DOUBLE,\n" +
                "\tlocation_lat DOUBLE,\n" +
                "\tlocation_long DOUBLE,\n" +
                "\tppx_x DOUBLE,\n" +
                "\tppx_y DOUBLE,\n" +
                "\tppx_z DOUBLE,\n" +
                "\thight DOUBLE,\n" +
                "\tuserid INT,\n" +
                "\t`time` STRING,\n" +
                "\tsend_message_messages ARRAY<STRING>,\n" +
                "\tsend_message_times ARRAY<STRING>,\n" +
                "\tnot_exist_not_exist1 STRING,\n" +
                "\tnot_exist_not_exist2 STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");

        // 注册自定义函数
        tableEnvironment.createTemporaryFunction("collect_list", CollectList.class);


        TableResult tableResult = tableEnvironment.executeSql(
                "INSERT INTO printSinkTable\n" +
                        "SELECT\n" +
                        "\t_id\n" +
                        "\t,audio\n" +
                        "\t,detail\n" +
                        "\t,location.alt\n" +
                        "\t,location.lat\n" +
                        "\t,location.long\n" +
                        "\t,ppx.x\n" +
                        "\t,ppx.y\n" +
                        "\t,ppx.z\n" +
                        "\t,hight\n" +
                        "\t,userid\n" +
                        "\t,`time`\n" +
                        "\t,collect_list(send_message_message)\n" +
                        "\t,collect_list(send_message_time)\n" +
                        "\t,not_exist.not_exist1\n" +
                        "\t,not_exist.not_exist2\n" +
                        "FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)\n" +
                        "group by \n" +
                        "\t_id\n" +
                        "\t,audio\n" +
                        "\t,detail\n" +
                        "\t,location.alt\n" +
                        "\t,location.lat\n" +
                        "\t,location.long\n" +
                        "\t,ppx.x\n" +
                        "\t,ppx.y\n" +
                        "\t,ppx.z\n" +
                        "\t,hight\n" +
                        "\t,userid\n" +
                        "\t,`time`\n" +
                        "\t,not_exist.not_exist1\n" +
                        "\t,not_exist.not_exist2");

        tableResult.getJobClient().get().getJobExecutionResult().get();
    }
}

运行结果

在这里插入图片描述

自定义Mongo CDC DataStream的序列化器

简单的json序列化器

结构如下,fullDocument为具体数据,operationType有insert、update、delete,documentKey为主键

{
    "db": "xxx",
    "tableName": "xxx",
    "fullDocument":{},
    "documentKey":{},
    "operationType": ""
}

代码

package com.ambitfly.mongo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Map;
import java.util.Properties;


public class MyMongoDeserializationSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();

        String[] fileds = topic.split("\\.");

        // 获取数据库名和表名
        if (fileds.length >= 2) {
            result.put("db",fileds[0]);
            result.put("tableName",fileds[1]);
        }

        Struct value = (Struct) sourceRecord.value();

        Schema schema1 = value.schema();
/*        List<Field> fields1 = schema1.fields();


        for (Field field : fields1) {
            System.out.println(field.name()+":"+value.getString(field.name()));
        }*/
        // 获取fullDocument数据
        String fullDocument = value.getString("fullDocument");

        if(fullDocument!=null) {
            JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
            result.put("fullDocument",fullDocumentJson);
        }



        // 获取documentKey数据
        String documentKey = value.getString("documentKey");
        if (documentKey != null) {
            JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
            result.put("documentKey",documentKeyJson);
        }


        // 获取操作类型
        String operationType = value.getString("operationType");
        if(operationType != null){
            result.put("operationType",operationType);
        }

        collector.collect(result.toJSONString());

    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void configure(Properties properties) {

    }

    @Override
    public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {

    }
}

Debezium Format json序列化器

Debezium是一个CDC (Changelog Data Capture)工具,可以实时从MySQL, PostgreSQL, Oracle, Microsoft SQL Server和许多其他数据库到Kafka中进行更改。Debezium为更新日志提供了统一的格式模式,并支持使用JSON和Apache Avro序列化消息。Flink支持将Debezium JSON和Avro消息解释为INSERT/UPDATE/DELETE消息到Flink SQL系统。Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Debezium JSON或Avro消息,并发送到Kafka等外部系统。然而,目前Flink不能将UPDATE_BEFORE和UPDATE_AFTER合并到一个更新消息中。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Debezium消息。

结构如下

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op": "d"  
}

代码

package com.ambitfly.mongo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Properties;


public class MyMongoDebeziumSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();

        String[] fileds = topic.split("\\.");

        // 获取数据库名和表名
        if (fileds.length >= 2) {
            result.put("db",fileds[0]);
            result.put("tableName",fileds[1]);
        }

        Struct value = (Struct) sourceRecord.value();

        Schema schema1 = value.schema();

        // 获取fullDocument数据
        String fullDocument = value.getString("fullDocument");

        if(fullDocument!=null) {
            JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
            result.put("fullDocument",fullDocumentJson);
        }



        // 获取documentKey数据
        String documentKey = value.getString("documentKey");
        String _id = "";
        if (documentKey != null) {
            JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
            _id = documentKeyJson.getJSONObject("_id").getString("$oid");
            result.put("documentKey",documentKeyJson);
        }

        result.put("_id",_id);

        // 获取操作类型
        String operationType = value.getString("operationType");
        if(operationType != null){
            result.put("operationType",operationType);
            JSONObject debeziumJson = new JSONObject();
            if("insert".equals(operationType)){
                debeziumJson.put("before",null);
                debeziumJson.put("after",result);
                debeziumJson.put("op","c");
                collector.collect(debeziumJson.toJSONString());
            }else if ("delete".equals(operationType)){
                debeziumJson.put("before",result);
                debeziumJson.put("after",null);
                debeziumJson.put("op","d");
                collector.collect(debeziumJson.toJSONString());
            }else if ("update".equals(operationType)){
                debeziumJson.put("before",result);
                debeziumJson.put("after",null);
                debeziumJson.put("op","d");
                collector.collect(debeziumJson.toJSONString());

                debeziumJson.put("before",null);
                debeziumJson.put("after",result);
                debeziumJson.put("op","c");
                collector.collect(debeziumJson.toJSONString());
            }
        }

    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void configure(Properties properties) {

    }

    @Override
    public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {

    }
}

未完待续!

mongo->kafka

kafka->mysql

kafka->clickhouse

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字人的生死疲劳

你看好数字人吗&#xff1f;这个问题在今天似乎颇难回答。如果从宏观趋势上看&#xff0c;数字人的利好要素似乎已经达到了一个空前的高度。比如有市场分析机构预测&#xff0c;到2026年中国AI数字人的市场规模将突破100亿人民币&#xff0c;整体市场呈现高速增长态势。又比如今…

git基础之三|初始化本地库、新建文件、提交代码、版本回退、穿梭等使用命令集合【2022最全版】

Git作为版本管理的软件&#xff0c;在我们的协同工作中非常重要。因此&#xff0c;对于Git的常见命令&#xff0c;如新建、编辑文件、提交版本、版本回退等操作必须要熟悉。 Git常用命令一、初始化本地库1、创建项目文件夹2、右击选择Git bash3、在bash中输入4、查看创建的结果…

mysql索引中最左前缀原则

最左前缀原则 最先匹配最左边的索引&#xff0c;匹配上就继续&#xff0c;如果匹配不上就检索不到 (a,b,c是索引) where后面的条件有没有给a对应的条件 不给定a等于几&#xff0c;是没法儿查询出结果的&#xff0c; 因为辅助聚簇索引是把索引按照组合索引的顺序存到一起的&…

IoTDB 可实现的基本操作 —— 数据写入、删除、导出、元数据管理、时区设置 | 小白教程文档(四)...

前言上篇教程介绍了 Apache IoTDB 处理时序数据时&#xff0c;能够实现的部分具体功能和具体的操作命令&#xff0c;包括数据导入、基本查询、和聚合查询。本篇将继续介绍 Apache IoTDB 可实现的其他功能和相关 SQL 语句命令&#xff0c;包括数据的写入、删除、导出、元数据操作…

python 解析库Beautiful Soup的安装

Beautiful Soup的安装一、Beautiful Soup的安装1.1 安装lxml库1.2 安装beautifulsoup41.3 验证beautifulsoup4能否运行一、Beautiful Soup的安装 Beautiful Soup是Python的一个HTML或XML的解析库&#xff0c;使用它可以很方便地从网页中提取数据。它的解析器是依赖于lxml库的&…

数组元素循环右移问题

目录 1008 数组元素循环右移问题 输入格式: 输出格式: 输入样例: 输出样例: 代码长度限制 时间限制 内存限制 思路: 1.右移函数 1.2函数代码: 2.main函数中 2.2main函数代码: 完整代码: 时间复杂度: 总结: 题目的链接: 1008 数组元素循环右移问题 一个数组A中存有…

MongoDB(一)【概念介绍安装】

MongoDB 概念以及安装 官方文档&#xff1a;https://www.mongodb.com/docs/manual/ 简介 官方介绍 MongoDB是一个文档数据库&#xff0c;旨在方便应用开发和扩展 百度百科 MongoDB是一个基于分布式文件存储的数据库。由C语言编写。旨在为WEB应用提供可扩展的高性能数据存储解…

【web前端期末大作业】html在线网上书店 基于html制作我的书屋(23页面)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

反序列化漏洞原理

序列化及其出现场景 ●远程和进程间通信&#xff08;RPC/IPC&#xff09; ●连线协议、Web服务、消息代理 ●缓存/持久性存储区 ●数据库、缓存服务器、文件系统 ●HTTP cookie、HTML参数、API身份验证令牌 序列化 (serialize)是将对象的状态信息转换为可以存储或传输的形…

基于java+springmvc+mybatis+jsp+mysql的电动车实名制挂牌管理系统

项目介绍 电动车实名制挂牌管理系统的需求和管理上的不断提升&#xff0c;电动车实名制挂牌管理的潜力将无限扩大&#xff0c;电动车实名制挂牌管理系统在业界被广泛关注&#xff0c;本网站及对此进行总体分析&#xff0c;将电动车实名制挂牌信息管理的发展提供参考。电动车实…

微信接入ChatGPT,使用Node+ChatGPT+Wechaty做一个微信机器人

目录 前言 准备工作 起步 实践 写在最后 前言 接上篇文章&#xff1a;站在巨人的肩膀上&#xff0c;用NodeChatGPT模块实现一个接口_DieHunter1024的博客-CSDN博客 我将ChatGPT模块的使用介绍了一下&#xff0c;使用自己的session发送请求达到调用ChatGPT进行聊天的目的&…

动态规划——背包问题(2)

文章目录多重背包的单调队列优化例题思路代码二维费用背包问题例题背包问题装法的总结&#xff1a;至多、恰好、至少背包最多装V体积背包恰好装V体积背包最少装V体积例题求解方案数初始化和循环顺序例题求解具体方案思路例题考察思维的一些题目多重背包与分组背包机器分配金明的…

原生JS开发手机端H5项目总结(FamilyChallenge)

一、 插件文件 &#xff08;可多选&#xff09; JQuery &#xff08;操作dom&#xff09;lottie.js &#xff08;播放动效&#xff09;preload-0.6.2.min.js&#xff08;资源预加载&#xff09;jweixin-1.6.0.js &#xff08;h5跳转微信小程序&#xff09;TweenMax.min.js &am…

Lidar based off-road negative obstacle detection and analysis(论文翻译)

&#xff08;机翻 自己留作资料的 大家辩证使用 论文地址&#xff1a;https://ieeexplore.ieee.org/document/6083105&#xff09; Abstract: 无人驾驶地面车辆 (UGV) 要想在越野地形中高速行驶&#xff0c;就必须实时分析和了解周围的地形&#xff1a;它必须知道它打算去哪里&…

备忘录模式

一、备忘录模式 1、定义 备忘录模式&#xff08;Memento Pattern&#xff09;又称作快照模式&#xff08;Snapshot Pattern&#xff09;&#xff0c;指在不破坏封装的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在对象之外保存这个状态。这样以后就可将该对象恢复…

你好,Cartesi Rollups Alpha 0.8.0

支持 Arbitrum 和 Optimism 部署&#xff0c;并使内部增强。我们通过这个新版本支持在Optimism和Arbitrum L2 链上的部署&#xff0c;为 Cartesi Rollups DApp 开发人员带来了更低的延迟和更低的gas费用。我们致力于提高 Cartesi 技术的多样性和性能提升&#xff0c;并且通过 R…

【MySQL】视图

文章目录视图基本使用视图规则与限制视图 视图是一个虚拟表,其内容由查询定义,同真实的表一样,视图包含一系列带有名称的列和行数据,视图的数据变化会影响到基表,基表的数据变化也会影响到视图, 主要作用是,将表的内容,常用需要的部分创建一个部分,这样使用视图的时候,可以减…

CAS:1407166-70-4,NODA-GA-NHS ester大环化合物供应

基本信息 名称&#xff1a; NODA-GA-NHS ester 2,2′-(7-(1-carboxy-4-((2,5-dioxopyrrolidin-1-yl)oxy)-4-oxobutyl)-1,4,7-triazonane-1,4-diyl)diacetic acid CAS编号&#xff1a;1407166-70-4 分子式&#xff1a;C19H28N4O10&#xff0c;HPF6&#xff0c;TFA 分子量&a…

HTTP MIME类型

文章目录HTTP MIME类型HTTP MIME类型 HTTP 请求头中的 Accept 头是客户端用来告诉服务器&#xff0c;客户端可以处理什么类型的内容&#xff0c;这种内容类型使用 MIME 类型来表示。借助内容协商机制&#xff0c;服务器可以从诸多 MIME 中选择一项进行应用&#xff0c;并使用 …

Java精品项目源码第145期食品检测管理系统

Java精品项目源码第145期食品检测管理系统 大家好&#xff0c;小辰哥又来啦&#xff0c;今天给大家介绍一个错的基于SSM的食品检测管理系统。 文章目录Java精品项目源码第145期食品检测管理系统前言一、项目运行1.运行环境2.截图前言 提示&#xff1a;以下是本篇文章正文内容…