Debezium + Kafka-connect 实现Postgres实时同步Hologres

news2025/3/20 23:54:14

基于 Debezium + Kafka 的方案实现 PostgreSQL 到 Hologres 的实时数据同步,是一种高可靠性、高扩展性的解决方案。以下是详细的实现步骤:

1. 方案架构

Debezium:捕获 PostgreSQL 的变更数据(CDC),并将变更事件发送到 Kafka。

Kafka:作为消息队列,缓冲和分发变更数据。

Kafka 消费者:消费 Kafka 中的变更数据,并将其写入 Hologres。

Hologres:作为目标数据库,存储同步的数据。

2. 环境准备

2.1 组件版本

PostgreSQL:9.6 及以上(支持逻辑复制)。

Debezium:1.9 及以上。

Kafka:2.8 及以上。

Hologres:兼容 PostgreSQL 协议,支持 JDBC 连接。

2.2 配置 PostgreSQL

启用逻辑复制:

ALTER SYSTEM SET wal_level = logical;

创建复制槽:

# 创建复制槽(replication slot):
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
# 查看复制插槽
SELECT * FROM pg_replication_slots;

创建发布:

# 创建发布(配置连接器可自动创建)
CREATE PUBLICATION debezium_pub FOR ALL TABLES;
CREATE PUBLICATION debezium_pub FOR TABLE my_table1, my_table2;

# 删除发布
DROP PUBLICATION IF EXISTS debezium_pub;

确保 PostgreSQL 用户具有复制权限:

# 最好使用superuser
CREATE ROLE debezium_user WITH REPLICATION LOGIN PASSWORD 'your_password';

3. 配置 Debezium

3.1 下载 Debezium

链接下载 Debezium 的 PostgreSQL 连接器:
下载地址:https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.2.0.Final/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz

3.2 添加插件到 Kafka Connect

手动复制插件 tar.gz 文件
将下载的 tar.gz 文件解压复制到 Kafka Connect 的插件目录中。 默认情况下,
插件目录为: /usr/share/java/kafka-connect-plugins/

# 创建插件目录(如果不存在)
sudo mkdir -p /usr/share/java/kafka-connect/

# 复制 JAR 文件 
tar -zxvf /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -C /usr/share/java/kafka-connect/

使用 Kubernetes 挂载插件
如果你在 Kubernetes 环境中运行 Kafka Connect, 可以在 Deployment 文件中挂载插件目录:

volumes:
	- emptyDir: {}
	  name: plugins-volume
	- emptyDir: {}
	  name: config-volume

​ volumeMounts:
	- name: plugins-volume
	  mountPath: /usr/share/java/kafka-connect/
	- name: config-volume
	  mountPath: /etc/kafka-connect/

Kafka-connect对应的Service、StatefulSets对象的yaml

apiVersion: v1
kind: Service
metadata:
  name: prod-kafka-connect
  namespace: prod-core-jobs-shuguan
spec:
  ports:
    - port: 8083
      targetPort: 8083
      protocol: TCP
      name: http
  selector:
    app: prod-kafka-connect
  type: ClusterIP

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: prod-kafka-connect
  namespace: prod-core-jobs-shuguan
spec:
  serviceName: "prod-kafka-connect"
  replicas: 1
  selector:
    matchLabels:
      app: prod-kafka-connect
  template:
    metadata:
      labels:
        app: prod-kafka-connect
    spec:
      volumes:
        - emptyDir: {}
          name: plugins-volume
        - emptyDir: {}
          name: config-volume
      initContainers:
        - name: extract-plugins
          image: pkg.geely.com/docker/busybox:1.31   # 初始化容器,作下载拷贝Debezium插件使用
          command: 
            - sh
            - -c
            - |
              wget https://pkg.geely.com/artifactory/IMD-pypi-dev-hz/resource/uploadTemp/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -O /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz
              tar -zxvf /etc/kafka-connect/debezium-connector-postgres-2.2.0.Final-plugin.tar.gz -C /usr/share/java/kafka-connect/
          volumeMounts:
            - name: plugins-volume
              mountPath: /usr/share/java/kafka-connect/
            - name: config-volume
              mountPath: /etc/kafka-connect/
      containers:
        - name: prod-kafka-connect
          image: pkg.geely.com/docker/confluentinc/cp-kafka-connect:7.2.0
          ports:
            - containerPort: 8083
              name: http
          env:
            - name: CONNECT_BOOTSTRAP_SERVERS
              value: "prod-kafka:9092"  # 替换为你的 Kafka 集群地址
            - name: CONNECT_GROUP_ID
              value: "connect-cluster"
            - name: CONNECT_KEY_CONVERTER
              value: "org.apache.kafka.connect.storage.StringConverter"
            - name: CONNECT_VALUE_CONVERTER
              value: "org.apache.kafka.connect.storage.StringConverter"
            - name: CONNECT_CONFIG_STORAGE_TOPIC
              value: "connect-configs"
            - name: CONNECT_OFFSET_STORAGE_TOPIC
              value: "connect-offsets"
            - name: CONNECT_STATUS_STORAGE_TOPIC
              value: "connect-statuses"
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: CONNECT_REST_ADVERTISED_PORT
              value: "8083"
          volumeMounts:
            - name: plugins-volume
              mountPath: /usr/share/java/kafka-connect/
            - name: config-volume
              mountPath: /etc/kafka-connect/
          readinessProbe:
            httpGet:
              path: /connectors
              port: http
            initialDelaySeconds: 30
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /connectors
              port: http
            initialDelaySeconds: 300
            periodSeconds: 10

3.3 配置 Debezium 连接器

在 Kafka Connect 中添加 Debezium 连接器,分为两部分,一个为postgres-source,从pg监控数据变化发送消息到kafka,另一个为hologres-sink,从kafka消费数据,写到hologres数据库,如下

# 添加source-connector:
curl --location --request POST 'http://kafka-connect-address:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,
    "tasks.max": "3",
    "database.hostname": "prod-postgresql",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "123456",
    "database.dbname": "db_dmp",
    "database.server.name": "prod-shuguan-dmp",
    "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
    "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
    "table.include.list": "public.data_info,public.data_source",
    "primary.key.fields": "id",
    "slot.name": "debezium_slot",
    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "publication.autocreate.mode": "filtered",
    "topic.prefix": "datatrans"
  }
}'  

# 返回结果: {
    "name": "postgres-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "tasks.max": "3",
        "database.hostname": "prod-postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "123456",
        "database.dbname": "db_dmp",
        "database.server.name": "prod-shuguan-dmp",
        "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
        "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
        "table.include.list": "public.data_info,public.data_source",
        "primary.key.fields": "id",
        "slot.name": "debezium_slot",
        "plugin.name": "pgoutput",
        "publication.name": "debezium_pub",
        "publication.autocreate.mode": "filtered",
        "topic.prefix": "datatrans",
        "name": "postgres-source"
    },
    "tasks": [],
    "type": "source"
} 

# 添加sink-connector:
curl --location --request PUT 'http://kafka-connect-address:8083/connectors/hologres-sink/config' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "hologres-sink",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": true,
        "value.converter.schemas.enable": true,
        "schemas.enable": false,
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://holo-cn-vpc-st.hologres.ops.auto.gee-cloud.com:80/db_dmp?reWriteBatchedInserts=true",
        "connection.username": "admin",
        "connection.password": "123456",
        "insert.mode": "upsert",
        "topics": "datatrans.public.data_info,datatrans.public.data_source",
        "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
        "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
        "delete.enabled": "true",
        "primary.key.fields": "id",
        "primary.key.mode": "record_key",
        "schema.evolution": "basic",
        "database.time_zone": "UTC",
        "auto.create": "true",
        "auto.evolve": "true"
    }
}'

# 返回结果:
{
    "name": "hologres-sink",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "schemas.enable": "false",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://holo-cn-vpc-st.hologres.ops.auto.gee-cloud.com:80/db_dmp?reWriteBatchedInserts=true",
        "connection.username": "admin",
        "connection.password": "123456",
        "insert.mode": "upsert",
        "topics": "datatrans.public.data_info,datatrans.public.data_source",
        "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
        "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
        "delete.enabled": "true",
        "primary.key.fields": "id",
        "primary.key.mode": "record_key",
        "schema.evolution": "basic",
        "database.time_zone": "UTC",
        "auto.create": "false",
        "auto.evolve": "false",
        "name": "hologres-sink"
    },
    "tasks": [
        {
            "connector": "hologres-sink",
            "task": 0
        }
    ],
    "type": "sink"
}

删除连接器

curl --location --request DELETE 'http://kafka-connect-address:8083/connectors/postgres-source' 

查看连接器状态:

curl --location --request GET 'http://kafka-connect-address:8083/connectors/postgres-source/status' 

#返回结果:
{
    "name": "postgres-source",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.244.0.116:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "10.244.0.116:8083"
        }
    ],
    "type": "source"
}

查看连接器配置

curl --location --request GET 'http://kafka-connect-address:8083/connectors/postgres-source/config' \

#返回结果:
{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "publication.autocreate.mode": "filtered",
    "database.user": "postgres",
    "database.dbname": "db_dmp",
    "slot.name": "debezium_slot",
    "tasks.max": "3",
    "publication.name": "debezium_pub",
    "database.server.name": "prod-shuguan-dmp",
    "schema.history.internal.kafka.bootstrap.servers": "prod-kafka:9092",
    "database.port": "5432",
    "plugin.name": "pgoutput",
    "topic.prefix": "datatrans",
    "schema.history.internal.kafka.topic": "pgsql-sync-hologres",
    "database.hostname": "prod-postgresql",
    "database.password": "123456",
    "name": "postgres-source",
    "table.include.list": "public.data_info,public.data_source"
}

查看连接器列表

curl --location --request GET 'http://kafka-connect-address:8083/connectors' 


#返回结果:
[
    "postgres-source",
    "hologres-sink"
]

其他相关接口

kafka-connect支持接口
GET /connectors #返回活动连接器的列表
POST /connectors #创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象
GET /connectors/{name} #获取有关特定连接器的信息
GET /connectors/{name}/config #获取特定连接器的配置参数
PUT /connectors/{name}/config #更新特定连接器的配置参数
GET /connectors/{name}/status #获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态
GET /connectors/{name}/tasks #获取当前为连接器运行的任务列表
GET /connectors/{name}/tasks/{taskid}/status #获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息
PUT /connectors/{name}/pause #暂停连接器及其任务,停止消息处理,直到连接器恢复
PUT /connectors/{name}/resume #恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)
POST /connectors/{name}/restart #重新启动连接器(通常是因为失败)
POST /connectors/{name}/tasks/{taskId}/restart #重启个别任务(通常是因为失败)
DELETE /connectors/{name} #删除连接器,停止所有任务并删除其配置

#Kafka Connect还提供了用于获取有关连接器插件信息的REST API:
GET /connector-plugins #返回安装在Kafka Connect集群中的连接器插件列表。请注意,API仅检查处理请求的worker的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果添加新的连接器jar
PUT /connector-plugins/{connector-type}/config/validate # 根据配置定义验证提供的配置值。此API执行每个配置验证,在验证期间返回建议值和错误消息。

4. 测试和验证

4.1 数据新增

在 PostgreSQL 中插入数据:

INSERT INTO public.data_info(
	platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)
	VALUES('11v-xl', 'camera', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);

查看kafka-topic消息

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":18,"platform":"11v-xl","data_type":"camera","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741311929781870,"latest_update_time":1741311929781870,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741311929783,"snapshot":"false","db":"db_dmp","sequence":"[\"12057353312\",\"12057353472\"]","ts_us":1741311929783308,"ts_ns":1741311929783308000,"schema":"public","table":"data_info","txId":3433486,"lsn":12057353472,"xmin":null},"transaction":null,"op":"c","ts_ms":1741311929873,"ts_us":1741311929873166,"ts_ns":1741311929873166000}}  

检查 Hologres 中的数据是否同步

SELECT * FROM public.data_info ORDER BY id DESC;

4.2 数据更新
在 PostgreSQL 中更新数据:

UPDATE public.data_info SET platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';

查看Kafka-topic消息:


I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":18,"platform":"6v-xl","data_type":"camera","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741311929781870,"latest_update_time":1741311929781870,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741312464134,"snapshot":"false","db":"db_dmp","sequence":"[\"12065166032\",\"12065166088\"]","ts_us":1741312464134145,"ts_ns":1741312464134145000,"schema":"public","table":"data_info","txId":3434153,"lsn":12065166088,"xmin":null},"transaction":null,"op":"u","ts_ms":1741312464229,"ts_us":1741312464229054,"ts_ns":1741312464229054000}}

检查Hologres数据:

SELECT * FROM public.data_info ORDER BY id DESC;

4.3 数据删除
在 PostgreSQL 中更新数据:

DELETE FROM public.data_info WHERE platform = '6v-xl';

查看Kafka-topic消息:

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":18,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741312717303,"snapshot":"false","db":"db_dmp","sequence":"[\"12068950904\",\"12068950960\"]","ts_us":1741312717303137,"ts_ns":1741312717303137000,"schema":"public","table":"data_info","txId":3434449,"lsn":12068950960,"xmin":null},"transaction":null,"op":"d","ts_ms":1741312717748,"ts_us":1741312717748938,"ts_ns":1741312717748938000}}
null

检查Hologres数据:

SELECT * FROM public.data_info ORDER BY id DESC;

4.3 批量插入
在 PostgreSQL 中插入数据:

INSERT INTO public.data_info(
	platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)
	VALUES ('11v-xl', 'camera0', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
	('11v-xl', 'camera1', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0), 
	('11v-xl', 'camera2', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0), 
	('11v-xl', 'camera3', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0), 
	('11v-xl', 'camera5', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);

查看Kafka-topic消息:

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":26,"platform":"11v-xl","data_type":"camera0","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"[\"12072782920\",\"12072782920\"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072782920,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540003,"ts_us":1741313540003647,"ts_ns":1741313540003647000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":27,"platform":"11v-xl","data_type":"camera1","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"[\"12072782920\",\"12072783208\"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783208,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540005,"ts_us":1741313540005062,"ts_ns":1741313540005062000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":29,"platform":"11v-xl","data_type":"camera3","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"[\"12072782920\",\"12072783784\"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783784,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540006,"ts_us":1741313540006726,"ts_ns":1741313540006726000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":28,"platform":"11v-xl","data_type":"camera2","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"[\"12072782920\",\"12072783496\"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072783496,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540005,"ts_us":1741313540005913,"ts_ns":1741313540005913000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":30,"platform":"11v-xl","data_type":"camera5","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313539694,"snapshot":"false","db":"db_dmp","sequence":"[\"12072782920\",\"12072784072\"]","ts_us":1741313539694079,"ts_ns":1741313539694079000,"schema":"public","table":"data_info","txId":3435484,"lsn":12072784072,"xmin":null},"transaction":null,"op":"c","ts_ms":1741313540007,"ts_us":1741313540007475,"ts_ns":1741313540007475000}}

检查Hologres数据:

SELECT * FROM public.data_info ORDER BY id DESC;

4.3 批量更新
在 PostgreSQL 中更新数据:

UPDATE public.data_info SET  platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';

查看Kafka-topic消息:

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":26,"platform":"6v-xl","data_type":"camera0","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"[\"12073766384\",\"12073766440\"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073766440,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734826,"ts_us":1741313734826063,"ts_ns":1741313734826063000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":27,"platform":"6v-xl","data_type":"camera1","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"[\"12073766384\",\"12073775280\"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775280,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734827,"ts_us":1741313734827700,"ts_ns":1741313734827700000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":29,"platform":"6v-xl","data_type":"camera3","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"[\"12073766384\",\"12073775872\"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775872,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734829,"ts_us":1741313734829449,"ts_ns":1741313734829449000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":28,"platform":"6v-xl","data_type":"camera2","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"[\"12073766384\",\"12073775576\"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073775576,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734828,"ts_us":1741313734828522,"ts_ns":1741313734828522000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":30,"platform":"6v-xl","data_type":"camera5","data_name":"aroundBack_xl","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741313539693111,"latest_update_time":1741313539693111,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313734647,"snapshot":"false","db":"db_dmp","sequence":"[\"12073766384\",\"12073776168\"]","ts_us":1741313734647557,"ts_ns":1741313734647557000,"schema":"public","table":"data_info","txId":3435710,"lsn":12073776168,"xmin":null},"transaction":null,"op":"u","ts_ms":1741313734830,"ts_us":1741313734830236,"ts_ns":1741313734830236000}}

检查Hologres数据:

SELECT * FROM public.data_info ORDER BY id DESC;

4.3 批量删除
在 PostgreSQL 中删除数据:

DELETE FROM public.data_info WHERE platform = '6v-xl';

查看Kafka-topic消息:

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":26,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"[\"12074168712\",\"12074168768\"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168768,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871946,"ts_us":1741313871946325,"ts_ns":1741313871946325000}}
null
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":27,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"[\"12074168712\",\"12074168840\"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168840,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871947,"ts_us":1741313871947402,"ts_ns":1741313871947402000}}
null
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":29,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"[\"12074168712\",\"12074168984\"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168984,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871948,"ts_us":1741313871948261,"ts_ns":1741313871948261000}}
null
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":28,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"[\"12074168712\",\"12074168912\"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074168912,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871947,"ts_us":1741313871947868,"ts_ns":1741313871947868000}}
null
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":{"id":30,"platform":"","data_type":"","data_name":"","dir_name":null,"bag_exists":null,"clip_exists":null,"create_time":null,"latest_update_time":null,"deleted":0},"after":null,"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741313871896,"snapshot":"false","db":"db_dmp","sequence":"[\"12074168712\",\"12074169056\"]","ts_us":1741313871896885,"ts_ns":1741313871896885000,"schema":"public","table":"data_info","txId":3435860,"lsn":12074169056,"xmin":null},"transaction":null,"op":"d","ts_ms":1741313871948,"ts_us":1741313871948688,"ts_ns":1741313871948688000}}
null

检查Hologres数据:

SELECT * FROM public.data_info ORDER BY id DESC;

4.4 事务操作
Pgsql先插入数据,再做更新,然后回滚更新:

BEGIN;

INSERT INTO public.data_info (
	platform, data_type, data_name, dir_name, bag_exists, clip_exists, create_time, latest_update_time, deleted)
	VALUES ('11v-xl', 'camera0', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0),
	('11v-xl', 'camera1', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0), 
	('11v-xl', 'camera2', 'aroundBack', 'cam_around_back', 1, 1, now(), now(), 0);

SAVEPOINT my_savepoint;

UPDATE public.data_info SET  platform = '6v-xl', data_name = 'aroundBack_xl' WHERE platform = '11v-xl';

ROLLBACK TO my_savepoint;

COMMIT;

查看Kafka-topic消息,只有一次插入操作:

I have no name!@prod-kafka-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic datatrans.public.data_info
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":37,"platform":"11v-xl","data_type":"camera0","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"[\"12141193520\",\"12141193624\"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141193624,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869719,"ts_us":1741327869719110,"ts_ns":1741327869719110000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":38,"platform":"11v-xl","data_type":"camera1","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"[\"12141193520\",\"12141204296\"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141204296,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869720,"ts_us":1741327869720542,"ts_ns":1741327869720542000}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"platform"},{"type":"string","optional":false,"field":"data_type"},{"type":"string","optional":false,"field":"data_name"},{"type":"string","optional":true,"field":"dir_name"},{"type":"int16","optional":true,"field":"bag_exists"},{"type":"int16","optional":true,"field":"clip_exists"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"create_time"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"latest_update_time"},{"type":"int16","optional":true,"default":0,"field":"deleted"}],"optional":true,"name":"datatrans5.public.data_info.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"datatrans5.public.data_info.Envelope","version":2},"payload":{"before":null,"after":{"id":39,"platform":"11v-xl","data_type":"camera2","data_name":"aroundBack","dir_name":"cam_around_back","bag_exists":1,"clip_exists":1,"create_time":1741327869321334,"latest_update_time":1741327869321334,"deleted":0},"source":{"version":"2.7.4.Final","connector":"postgresql","name":"datatrans5","ts_ms":1741327869322,"snapshot":"false","db":"db_dmp","sequence":"[\"12141193520\",\"12141204584\"]","ts_us":1741327869322708,"ts_ns":1741327869322708000,"schema":"public","table":"data_info","txId":3453096,"lsn":12141204584,"xmin":null},"transaction":null,"op":"c","ts_ms":1741327869721,"ts_us":1741327869721271,"ts_ns":1741327869721271000}}

检查Hologres数据,保存UPDATE语句之前的数据:

SELECT * FROM public.data_info ORDER BY id DESC;

5. 总结

  • 通过 Debezium + Kafka 的方案,可以实现 PostgreSQL 到 Hologres 的高可靠、高性能的实时 数据同步。该方案适合数据量较大、实时性要求较高的场景,同时具备良好的扩展性和灵活性。

  • 数据同步写入,通过kafka-topic匹配表名,但是数据发送kafka-topic,名称必须加前缀,导致出 库和入库的表名不一致,入库的表统一多了前缀(java开发可通过mybatis拦截器,或者mybatis-plus配置解决该问题)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2318651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

集成学习之随机森林

目录 一、集成学习的含义 二、集成学习的代表 三、集成学习的应用 1、分类问题集成。(基学习器是分类模型) 2、回归问题集成。(基学习器是回归模型) 3、特征选取集成。 四、Bagging之随机森林 1、随机森林是有多个决策树&a…

在线JSON格式校验工具站

在线JSON校验格式化工具(Be JSON)在线,JSON,JSON 校验,格式化,xml转json 工具,在线工具,json视图,可视化,程序,服务器,域名注册,正则表达式,测试,在线json格式化工具,json 格式化,json格式化工具,json字符串格式化,json 在线查看器,json在线,json 在线验…

SAP的WPS导出找不到路径怎么办;上载报错怎么办

一.打开注册编辑器 二.输入以下地址 计算机\HKEY_CLASSES_ROOT\ExcelWorksheet\Protocol\StdFileEditing\Server 去除掉EXE后面的命令即可 二:WPS上载文件没反应怎么办 如何切换整合模式或多组件模式-WPS学堂 根据官方操作把整合模式改成多组件模式

Moonlight-16B-A3B: 变革性的高效大语言模型,凭借Muon优化器打破训练效率极限

近日,由Moonshot AI团队推出的Moonlight-16B-A3B模型,再次在AI领域引发了广泛关注。这款全新的Mixture-of-Experts (MoE)架构的大型语言模型,凭借其创新的训练优化技术,特别是Muon优化器的使用,成功突破了训练效率的极…

rust学习笔记17-异常处理

今天聊聊rust中异常错误处理 1. 基础类型&#xff1a;Result 和 Option&#xff0c;之前判断空指针就用到过 Option<T> 用途&#xff1a;表示值可能存在&#xff08;Some(T)&#xff09;或不存在&#xff08;None&#xff09;&#xff0c;适用于无需错误信息的场景。 f…

PyTorch系列教程:使用预训练语言模型增强文本分类

文本分类仍是自然语言处理&#xff08;NLP&#xff09;领域的一项基础任务&#xff0c;其目标是将文本数据归入预先设定的类别之中。预训练语言模型的出现极大地提升了这一领域的性能。本文将探讨如何利用 PyTorch 来利用这些模型&#xff0c;展示它们如何能增强文本分类任务。…

LabVIEW 线性拟合

该 LabVIEW 程序实现了 线性拟合&#xff08;Linear Fit&#xff09;&#xff0c;用于计算给定一组数据点的斜率&#xff08;Slope&#xff09;和截距&#xff08;Intercept&#xff09;&#xff0c;并将结果可视化于 XY Graph 中。本案例适用于数据拟合、实验数据分析、传感器…

nacos安装,服务注册,服务发现,远程调用3个方法

安装 点版本下载页面 服务注册 每个微服务都配置nacos的地址&#xff0c;都要知道 服务发现 2个是知道了解 远程调用基本实现 远程调用方法2&#xff0c;负载均衡API测试 远程调用方法3&#xff0c;注解 负载均衡的远程调用&#xff0c; 总结 面试题

Mac:JMeter 下载+安装+环境配置(图文详细讲解)

&#x1f4cc; 下载JMeter 下载地址&#xff1a;https://jmeter.apache.org/download_jmeter.cgi &#x1f4cc; 无需安装 Apache官网下载 JMeter 压缩包&#xff0c;无需安装&#xff0c;下载解压后放到自己指定目录下即可。 按我自己的习惯&#xff0c;我会在用户 jane 目…

Python IP解析器 ip2region使用

说明&#xff1a;最近需要在python项目内使用IP定位所在城市的需求&#xff0c;没有采用向外部ISP服务商API请求获取信息的方案&#xff0c;则翻了翻&#xff0c;在搞Java时很多的方案&#xff0c;在Python端反而可选择范围很小。 # 示例查询 ips ["106.38.188.214"…

labview与西门子1500plc进行S7通讯(仿真效果)

环境&#xff1a; 1.博图V16 2.S7-PLCSIM Advanced V3.0 3.labview2020 4.HslCommunication的dll文件 运行效果图 通过使用HslCommunication的库文件来对西门子plc进行通讯 labview代码 代码打包 通过网盘分享的文件&#xff1a;labview进行s7通讯测试.rar 链接: https:/…

Oracle 公布 Java 的五大新功能

Java 增强提案包括语言增强和性能优化&#xff0c;从 JDK 25 中的稳定值 API 开始。 随着JDK&#xff08;Java 开发工具包&#xff09;24刚刚全面上市&#xff0c;Oracle 提前透露了不久的将来即将推出的 Java 功能&#xff0c;包括增强原始装箱到空限制值类类型。 3 月 18 日…

台式机电脑组装---电脑机箱与主板接线

台式机电脑组装—电脑机箱与主板接线 1、机箱连接主板的跳线一般主要有USB 2.0、USB 3.0、前置音频接口(HD_AUDIO)以及POWER SW、RESET SW、POWER LED、HDD LED四个主板跳线&#xff0c;这些跳线分别的含义如下。 RESET SW&#xff1a;机箱重启按键&#xff1b;注&#xff1a…

动作捕捉手套如何让虚拟现实人机交互 “触手可及”?

在虚拟与现实逐渐交融的当下&#xff0c;动作捕捉技术正以前所未有的速度革新着多个领域。 动作捕捉技术&#xff0c;简称“动捕”&#xff0c;已经从早期的影视特效制作&#xff0c;逐步拓展到游戏开发、虚拟现实、机器人控制等多个领域。 而mHandPrO数据手套作为这一领域的…

笔记本电脑关不了机是怎么回事 这有解决方法

在快节奏的现代生活中&#xff0c;笔记本电脑已成为我们工作、学习和娱乐的得力助手。在使用电脑的过程中&#xff0c;笔记本电脑突然关不了机了&#xff0c;怎么回事&#xff1f;下面驱动人生就来讲一讲笔记本电脑不能正常关机的解决方法&#xff0c;有需要的可以来看看。 一、…

给管理商场消防安全搭建消防安全培训小程序全过程

一、需求沟通 “我是管理商场消防安全的嘛&#xff0c;做这个的作用呢&#xff0c;1是商场的所有商户员工可以看平面或者视频随时自学&#xff0c; 2是我们定期培训必修课程、考试&#xff0c;这个需要留存他们的手签字的签到表确认我们讲给他们听了&#xff08;免责很重要&am…

Flutter:页面滚动,导航栏背景颜色过渡动画

记录&#xff1a;导航默认透明&#xff0c;页面发生滚动后&#xff0c;导航背景色由0-1&#xff0c;过渡到白色背景。 view import package:ducafe_ui_core/ducafe_ui_core.dart; import package:flutter/material.dart; import package:get/get.dart; import package:redo…

VSCode + CMake

参考文献&#xff1a; 如何用 GCC, CMake 和 Make 编译C/C代码Windows 上的 Linux 子系统&#xff1a;WSLWSL&#xff1a;桌面 UI 远程连接 RDP 配置 VScode 文章目录 CMake 配置VSCode 配置launch.jsontask.jsonc_cpp_properties.json CMake 配置 编写如下的 CmakeLists.t…

Docker进阶篇1:什么是Docker数据卷?为什么需要Docker数据卷?Docker数据卷3种类型介绍

大家好我是木木&#xff0c;在当今快速发展的云计算与云原生时代&#xff0c;容器化技术蓬勃兴起&#xff0c;Docker 作为实现容器化的主流工具之一&#xff0c;为开发者和运维人员带来了极大的便捷 。下面我们一起开始进阶第1篇&#xff1a;什么是Docker数据卷&#xff1f;为什…

(2025|ICLR|华南理工,任务对齐,缓解灾难性遗忘,底层模型冻结和训练早停)语言模型持续学习中的虚假遗忘

Spurious Forgetting in Continual Learning of Language Models 目录 1. 引言 2. 动机&#xff1a;关于虚假遗忘的初步实验 3. 深入探讨虚假遗忘 3.1 受控实验设置 3.2 从性能角度分析 3.3 从损失景观角度分析 3.4 从模型权重角度分析 3.5 从特征角度分析 3.6 结论 …