从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

news2025/1/11 14:09:32

Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如 MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。本文中我们将采用 Debezium 与 Kafka 组合的方式来实现从 MySQL 到 DolphinDB 的数据同步。

Kafka + Debezium 的数据同步方案需要部署 4 个服务,如下所示

  • ZooKeeper:kafka 的依赖部署
  • Kafka:数据存储
  • Kafka-Connect:用于接入数据插件 source-connetor、sink-connector 的框架,可以提供高可用。也可以部署单实例版本。
  • Schema-Registry :提供实时同步的数据的元数据注册功能 ,支持数据序列化。

基于 Debezium 的数据架构图如下:

接下来,本文将逐一介绍这些服务的下载、安装,以及配置数据同步任务。

部署 Kafka 单实例实时数据存储

基于 Kafka 的整套架构是支持高可用集群的。不过,即使部署单实例的服务,也可以达成数据同步任务。

本文将以单实例存储来进行介绍数据同步。

部署准备

首先下载程序包,Zookeeper(开源)、Kafka(开源)、Confluent(社区版),可以自行到官网下载最新稳定版本。

并将下面 4 个软件包放到 /opt 目录下。(软件、配置、数据路径文件较多。注意:初次试用请尽量保持路径一致。)

  • jdk-17.0.7_linux-x64_bin.tar.gz
  • apache-zookeeper-3.7.1-bin.tar.gz
  • kafka_2.13-3.4.1.tgz(下载 scala 2.13 版本)
  • confluent-community-7.4.0.tar.gz

Confluent 下载会稍微麻烦点,需要选择 self-managed 然后录入信息,点击 start free 才能下载。注意下载 community 版本即可满足需要,我们只需要里头的 schema-registry 包。当然如果需要更好功能,也可以下载正式版,正式版包括了 Zookeeper、Kafka 以及管理、监控 Kafka 的更多功能。Confluent 是 Kafka 相关的商业公司。

以上 4 个程序包下载好之后,我们就可以开始部署了。

部署 Zookeeper

基础准备

第一步:创建部署用户

创建用户 kafka,授予 sudo 免密权限(需自行设置)。然后切换到 kafka 用户来进行操作(以下均为 kafka 用户操作)。

useradd kafka
su kafka

第二步:安装部署 java 环境

安装 java 到路径 /opt/java17,整套架构涉及的程序都是基于 java 虚拟机运行的。所以必须安装 java。

cd /opt
sudo mkdir -p /usr/local/java
sudo tar -xvf jdk-17.0.7_linux-x64_bin.tar.gz
sudo mv jdk-17.0.7 /usr/local/java/java17

设置 java 环境变量(kafka 用户下执行)。

vim ~/.bashrc
# 输入下面代码
JAVA_HOME=/usr/local/java/java17
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH

source ~/.bashrc
java --version

安装 Zookeeper

第一步:解压并安装 Zookeeper

3.7.1 版本的 Zookeeper 用户、组有默认值,这里我们需要调整一下。

cd /opt
sudo tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
sudo mv apache-zookeeper-3.7.1-bin zookeeper
sudo chown -R root:root zookeeper
sudo chmod -R 755 zookeeper

第二步:准备 Zookeeper 的配置文件和存储文件

创建 zookeeper 的配置文件、数据文件、日志文件的存储路径。请尽量保持路径一致。篇末有打包的全部程序配置文件包。

sudo mkdir -p /KFDATA/zookeeper/etc
sudo mkdir -p /KFDATA/zookeeper/data
sudo mkdir -p /KFDATA/zookeeper/datalog
sudo mkdir -p /KFDATA/zookeeper/logs
sudo chown -R kafka:kafka /KFDATA
chmod -R 700 /KFDATA/zookeeper

准备 zookeeper 的配置文件 zoo.cfg。先从 zookeeper 安装路径下复制 log4j.properties 过来,然后进行修改。

说明:zookeeper 的不同版本 log4j.properties 配置内容会略有区别。如有不同,请按 log4j 的规则调整。

cd /KFDATA/zookeeper/etc
touch zoo.cfg
echo tickTime=2000 > zoo.cfg
echo initLimit=10 >>zoo.cfg
echo syncLimit=5 >>zoo.cfg
echo  dataDir=/KFDATA/zookeeper/data >>zoo.cfg
echo  dataLogDir=/KFDATA/zookeeper/datalog >>zoo.cfg
echo  clientPort=2181 >>zoo.cfg
sudo cp /opt/zookeeper/conf/log4j.properties ./
sudo chown kafka:kafka ./log4j.properties

修改 log4j.properties 中的 zookeeper.log.dir 参数

第三步:创建 Zookeeper 的启动文件

创建一个 zk.env ,配置 Zookeeper 启动所需环境变量,用于启动 service 文件调用。

cd /KFDATA/zookeeper/etc/
touch zk.env
echo JAVA_HOME=/usr/local/java/java17 > zk.env
echo PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >> zk.env
echo ZOO_LOG_DIR=/KFDATA/zookeeper/logs >> zk.env
echo ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >> zk.env

如果对 Zookeeper 很熟练可以自行调用 Zookeeper 安装目录下的 bin 文件夹下的操作脚本来进行操作或测试。

使用 vim 命令编辑一个 service 文件。

sudo vim /usr/lib/systemd/system/zookeeper.service

录入以下启动命令信息,并保存。

[Unit]
Description=Apache Kafka - ZooKeeper
After=network.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/zookeeper/etc/zk.env
ExecStart=/opt/zookeeper/bin/zkServer.sh start /KFDATA/zookeeper/etc/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop /KFDATA/zookeeper/etc/zoo.cfg
TimeoutStopSec=180
Restart=no


[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

第四步:创建测试脚本

(1)创建连接 Zookeeper 测试文件 zkCon.sh

mkdir -p /KFDATA/bin
cd /KFDATA/bin
touch zkCon.sh
echo export JAVA_HOME=/usr/local/java/java17 >zkCon.sh
echo export PATH="{$JAVE_HOME}/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>zkCon.sh
echo export ZOO_LOG_DIR=/KFDATA/zookeeper/logs >>zkCon.sh
echo export ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >>zkCon.sh
echo  '/opt/zookeeper/bin/zkCli.sh -server localhost:2181 -Dzookeeper.config.path=/KFDATA/zookeeper/zoo.cfg' >>zkCon.sh

对脚本授予执行权限。

chmod +x  zkCon.sh 

部署启动 Zookeeper

第一步:通过 systemctl 工具启动 Zookeeper 服务。

sudo systemctl start zookeeper.service

第二步:查看 Zookeeper 启动情况

可以通过 jps 命令 查看 java 进程, QuorumPeerMain 进程是 Zookeeper 的启动进程。

也可以通过 systemctl 命令查看,如图即是正常启动。

 sudo systemctl status zookeeper

第三步:通过客户端连接 Zookeeper ,并进行查看。

cd /KFDATA/bin/
./zkCon.sh
# 等待 zookeeper 命令行窗口
ls /
ls /zookeeper 

如果返回如下显示,表示 Zookeeper 启动成功,可以在 Zookeeper 中观察到自身的基础信息。

ctrl +c 可以退出 Zookeeper 客户端连接。

部署 Kafka

安装 Kafka

第一步:解压安装 Kafka 文件

执行以下命令,修改一下 Kafka 的安装文件名。

cd /opt
sudo tar -xvf kafka_2.13-3.4.1.tgz
sudo mv kafka_2.13-3.4.1 kafka

第二步:准备 Kafka 的配置文件和存储文件

创建 Kafka 的配置文件、数据文件、日志文件的存储路径。

mkdir -p /KFDATA/kafka/etc
mkdir -p /KFDATA/kafka/data
mkdir -p /KFDATA/kafka/logs

准备 Kafka 相关配置文件,创建启动配置文件,和日志配置文件。

cd /KFDATA/kafka/etc
touch kafka-server.properties
cp /opt/kafka/config/log4j.properties ./
cp /opt/kafka/config/tools-log4j.properties ./

修改 kafka-server.properties 文件中的配置,修改内容较多,文件如下,也可以自行录入。

############################# Server Basics #############################
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.189.130:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/KFDATA/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=-1
log.retention.bytes=21474836480 
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
auto.create.topics.enable=true
############################# Zookeeper #############################
zookeeper.connect=192.168.189.130:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=12000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
############################# message Settings #############################
message.max.byte=5242880

其中以下两项需要视具体环境修改,advertise.listeners 是对外监听端口。

advertised.listeners=PLAINTEXT://192.168.189.130:9092
zookeeper.connect=192.168.189.130:2181

第三步:准备 Kafka 的启动文件

创建 Kafka 启动的环境变量文件,这里配置了开启 JMX 监控端口,如果不需要,可以忽略后两项配置。

JMX 端口的作用是可以通过此端口连接,获取一些监控指标。

cd /KFDATA/kafka/etc
touch kf-server.env

echo PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/opt/kafka:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>kf-server.env
echo LOG_DIR="/KFDATA/kafka/logs/" >>kf-server.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka/etc/log4j.properties\" >>kf-server.env
echo KAFKA_JMX_OPTS=\"-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.189.130 -Djava.net.preferIPv4Stack=true\" >>kf-server.env
echo JMX_PORT=29999 >>kf-server.env

创建 Kafka 的 systemd service 文件,vim 打开一个文件。

sudo vim /usr/lib/systemd/system/kafka-server.service

录入以下内容,并保存。

[Unit]
Description=Apache Kafka - broker
After=network.target confluent-zookeeper.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka/etc/kf-server.env
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /KFDATA/kafka/etc/kafka-server.properties
ExecStop=/KFDATA/kafka/bin/kafka-server-stop.sh
LimitNOFILE=1000000
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Kafka

第一步:通过 systemctl 工具启动 Kafka 服务

执行下述命令启动 Kafka 服务:

sudo systemctl start kafka-server.service

第二步:查看 Kafka 启动情况

检查 Kafka 启动情况,可以连接 Zookeeper 客户端 。查看 zookeeper 中的数据。

cd /KFDATA/bin
./zkCon.sh
ls /

可以看到 Zookeeper 中已经多了一些 kafka 注册信息,如 brokers、cluseter、config、controller 等。

此时,可以测试创建一个 topic 进行测试:

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --create --topic test110

执行下述代码,查看当前 Kafka 中 topic 列表:

./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

如果返回上述图片显示内容,说明 Kafka 已经启动成功。

部署 Schema-Registry

Schema-Registry 是用于注册传输数据的数据结构的。并记录数据结构改变的每一个版本。数据写入 Kafka 和从 Kafka 中读出都需要 schema-registry 中记录的数据结构来进行序列化和反序列化。通过使用 schema-registry 来注册数据结构。Kafka 中只需保存序列化后的数据即可。可以减少数据的空间占用。

安装 Schema-Registry

第一步:解压安装 Schema-Registry 文件

Schema-Registry 程序是 confluent 程序包中一部分。所以这里我们要安装 conluent,社区版本即可。解压缩 confluent-community-7.4.0.tar.gz,并修改文件名,设置隶属组。

cd /opt
sudo tar -xvf confluent-community-7.4.0.tar.gz
sudo mv confluent-7.4.0 confluent
sudo chown -R root:root confluent
sudo chmod -R 755 confluent

第二步:准备 Schema-Registry 的配置文件和存储文件

创建 schema-registry 的配置、日志文件存储路径。

mkdir -p /KFDATA/schema-registry/etc
mkdir -p /KFDATA/schema-registry/logs

准备 schema-registry 的配置文件。

cd /KFDATA/schema-registry/etc
cp /opt/confluent/etc/schema-registry/schema-registry.properties ./
cp /opt/confluent/etc/schema-registry/log4j.properties ./

修改 schema-registry.properties 文件,修改连接的 Kafka Server 地址。

第三步:准备 Schema-Registry 的启动文件

创建 Schema-Registry 启动环境变量文件,用于 Schema-Registry 启动时使用。

touch schema-registry.env
echo PATH="/usr/local/java/java17/bin:/opt/confluent/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >schema-registry.env
echo LOG_DIR="/KFDATA/schema-registry/logs" >>schema-registry.env
echo LOG4J_DIR="/KFDATA/schema-registry/etc/log4j.properties" >>schema-registry.env
echo SCHEMA_REGISTRY_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/schema-registry/etc/log4j.properties\" >>schema-registry.env

创建 Schema-Registry 的 systemd service 启动文件。

sudo vim /usr/lib/systemd/system/schema-registry.service

录入以下内容并保存。

[Unit]
Description=RESTful Avro schema registry for Apache Kafka
After=network.target

[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/schema-registry/etc/schema-registry.env
ExecStart=/opt/confluent/bin/schema-registry-start -daemon /KFDATA/schema-registry/etc/schema-registry.properties
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Schema-Registry

第一步:通过 systemctl 工具启动 Schema-Registry 服务

执行以下命令

sudo systemctl start schema-registry

第二步:查看 Schema-Registry 启动情况

通过 systemctl 工具查看启动状态。

sudo systemctl status schema-registry

查看 Kafka 中的 topic

cd /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

可以看到 kafka 中已经创建出了 schema-registry 需要使用的 topic

schema-registry 启动成功。

部署 Kafka-Connect

Kafka-Connect 是 Kafka 提供的 HA 框架,实现了 Kafka-Connect 接口的 connector(连接器),只需处理自己需要进行读取、写入数据任务。高可用部分由 kafka-connect 框架负责。

Kafka-Connect 可用通过 rest api 进行访问。

安装 Kafka-Connect

第一步:Kafka-Connect 安装

Kafka-Connect 由 Kafka 提供,启动程序在 Kafka 的安装路径下,已经存在。数据元数据注册由 schema-registry 处理。相应的序列化包在已安装 Confluent 路径下。故无需再安装程序包。

第二步:准备 Kafka-Connect 的配置文件和存储文件

创建 Kafka-Connect 的配置、日志文件存储路径

mkdir -p /KFDATA/kafka-connect/etc
mkdir -p /KFDATA/kafka-connect/logs

创建 Kafka-Connect 的配置文件

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

录入以下内容并保存。 ip 地址部分,需要视当前环境修改。

bootstrap.servers=192.168.189.130:9092
group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.189.130:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.189.130:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

plugin.path=/opt/confluent/share/java/plugin
rest.host.name=192.168.189.130
rest.port=8083
rest.advertised.host.name=192.168.189.130
rest.advertised.port=8083

offset.flush.timeout.ms=50000
offset.flush.interval.ms=10000
send.buffer.bytes=13107200
consumer.max.poll.records=10000
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

创建 Kafka-Connect 的 log4j 配置文件。

cd /KFDATA/kafka-connect/etc
cp /opt/kafka/config/connect-log4j.properties ./log4j.properties

修改文件中的以下参数配置

vim ./log4j.properties
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log

将其修改为

log4j.appender.connectAppender.File=/KFDATA/kafka-connect/logs/connect.log

第三步:准备 Kafka-Connect 的启动文件

创建 Kafka-Connect 启动环境变量文件。

cd /KFDATA/kafka-connect/etc
touch kafka-connect.env

echo PATH="/usr/local/java/java17/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >kafka-connect.env
echo LOG_DIR="/KFDATA/kafka-connect/logs/" >>kafka-connect.env
echo LOG4J_DIR="/KFDATA/kafka-connect/etc/log4j.properties" >>kafka-connect.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka-connect/etc/log4j.properties\" >>kafka-connect.env
echo CLASSPATH=/opt/confluent/share/java/schema-registry/*:/opt/confluent/share/java/kafka-serde-tools/*:/opt/confluent/share/java/confluent-common/* >>kafka-connect.env
echo JMX_PORT=29998 >>kafka-connect.env

创建 Kafka-Connect 的 systemd service 文件

sudo vim /usr/lib/systemd/system/kafka-connect.service

录入以下内容,并保存。

[Unit]
Description=Apache Kafka Connect - distributed
After=network.target

[Service]
Type=simple
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka-connect/etc/kafka-connect.env
ExecStart=/opt/kafka/bin/connect-distributed.sh /KFDATA/kafka-connect/etc/kafka-connect.properties
TimeoutStopSec=180
Restart=no

[Install]
WantedBy=multi-user.target

重新加载 service 启动服务。

sudo systemctl daemon-reload

部署启动 Kafka-Connect

第一步:通过 systemctl 工具启动 Kafka-Connect 服务

执行以下命令

sudo systemctl start kafka-connect.service

第二步:查看 Kafka-Connect 启动情况

通过 jps 命令查看启动情况

jps -mlvV |grep connect

查看 Kafka 中的 topic 情况,Kafka-Connect 会在 Kafka 中创建 connect-configs 、connect-offsets、connect-statuses 三个 topic。

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

使用 curl 命令访问 kafka-connect,可以看到当前我们还没有配置 connector 任务

 curl -H "Accept:application/json" 192.168.189.130:8083/connectors/

部署 MySQL 数据同步到 Kafka

MySQL 的数据同步包括初始全量同步和 CDC 实时增量同步。

全量同步:将所选表的全部数据以 Insert 的方式写入 kafka,建议此时不要对数据库进行操作。

CDC 实时增量同步:从全量同步时记录的事务顺序号,实时读取 MySQL 的 binlog 日志,写入增量数据到 Kafka。

安装 Debezium-MySQL 连接器插件

配置启动 Debezium-MySQL 连接器,需要以下两步:

  1. 下载、安装 Debezium-MySQL 插件,并将插件路径配置到 Kafka Connect 配置文件中。
  2. 重新启动 Kafka Connect 程序,以加载插件。

第一步:下载安装 Debezium-MySQL 插件

官方网站 Debezium ,选择最新稳定版本进行下载。

选择 MySQL Connector Plug-in

创建插件路径(部署 kafka,kafka-connnect 环境的 kafka 用户),在此路径下解压 Debezium 的 MySQL 插件包

sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo tar -xvf debezium-connector-mysql-2.3.2.Final-plugin.tar.gz
rm ./debezium-connector-mysql-2.3.2.Final-plugin.tar.gz

第二步:配置 Kafka-Connect 加载插件

修改 Kafka Connect 的配置文件,添加插件路径配置

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

添加或修改参数 plugin.path 如下

plugin.path=/opt/confluent/share/java/plugin

重新启动 Kafka Connect

sudo systemctl stop kafka-connect
sudo systemctl start kafka-connect

查看日志输出, 如下图所示,则插件加载成功。

cat /KFDATA/kafka-connect/logs/connect.log|grep mysql

配置 MySQL 数据库

做为 Source 数据库,我们基于 MySQL 的 binlog 来获取实时的增量数据,所以需要对 MySQL 数据库做一些设置。

第一步:创建数据同步用的 MySQL 用户

Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库拥有适当的权限。

CREATE USER 'datasyn'@'%' IDENTIFIED BY '1234';

授予权限。

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'datasyn';

刷新授权表。

FLUSH PRIVILEGES;

第二步:设置 MySQL 参数

进行 CDC 同步,需要对 MySQL 数据库进行一些设置。

参数说明
server-id1MySQL 集群中用于标识一个 MySQL 服务器实例。可以自行调整设置。
log-binmysql-bin设置启用二进制日志功能,并指定日志文件名及存储位置。可自行调整设置。
binlog_formatROW必须 binlog-format 设置为 ROW 或 row。 连接 MySQL 级联复制实例时,链路内每个实例环节都要设置。
binlog_row_imageFULL必须 binlog_row_image 设置为 FULL 或 full 。连接 MySQL 级联复制实例时,链路内每个级联实例环节都要设置。
gtid_modeON设置开启全局事务标识
enforce_gtid_consistencyON设置强制执行 GTID 一致性
expire_logs_days3设置 MySQL 日志保留时间,MySQL 的 CDC 数据同步需要有对应日志文件才能进行同步。推荐至少设置保留3天。
binlog_row_value_options““此变量不能设置为 PARTIAL_JSON

参数参考代码:

[mysqld]
server-id = 1

log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
binlog_row_value_options=""

gtid_mode=ON
enforce_gtid_consistency=ON

expire_logs_days=3

配置 MySQL 数据同步连接任务

配置同步任务的及检查的很多命令都要带上 url 等参数。为了操作快捷,封装了一些加载配置文件的操作脚本,kafka-tools.tar 。下载当前包,解压缩到 /KFDATA 目录下 。后续的很多操作,检查 Kafka 的 topic,查看数据。配置同步任务等都会使用 kafka-tools 包中的脚本。请务必配置。包中的脚本都可以无参数运行,会输出 help。

cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar

修改 kafka-tools/config/config.properties 配置参数。

按照本机的路径、IP 等对应修改 Kafka、Kafka_Connect 的启动 IP 地址,以及安装目录。

准备MySQL 数据库表

第一步:创建一个数据库

create database basicinfo;

第二步:创建两张表,并插入一些数据

创建表1 index_components,主键字段 4 个。

use basicinfo;
CREATE TABLE `index_components` (
  `trade_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `code` varchar(20) NOT NULL,
  `effDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `indexShortName` varchar(20) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `indexCode` varchar(20) NOT NULL,
  `secShortName` varchar(20) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `exchangeCD` varchar(4) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL,
  `weight` decimal(26,6) DEFAULT NULL,
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `flag` int NOT NULL DEFAULT '1',
  PRIMARY KEY `index_components_pkey` (`trade_date`,`code`,`indexCode`,`flag`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 

插入 4 条 数据

insert into index_components (trade_date,code,effdate,indexShortName,indexCode,secShortName,exchangeCD,weight,timestamp,flag)
values('2006-11-30','000759','2018-06-30 03:48:05','中证500','000905','中百集团','XSHE',0.0044,'2018-06-30 05:43:05',1),
('2006-11-30','000759','2018-06-30 04:47:05','中证500','000906','中百集团','XSHE',0.0011,'2018-06-30 05:48:06',1),
('2006-11-30','600031','2018-06-30 05:48:05','上证180','000010','三一重工','XSHG',0.0043,'2018-06-30 05:48:05',1),
('2006-11-30','600031','2018-06-30 06:48:02','沪深300','000300','三一重工','XSHG',0.0029,'2018-06-30 05:48:05',1);

创建表2 stock_basic ,主键字段 2 个。

CREATE TABLE `stock_basic` (
  `id` bigint NOT NULL ,
  `ts_code` varchar(20) NOT NULL,
  `symbol` varchar(20) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL,
  `area` varchar(20) DEFAULT NULL,
  `industry` varchar(40) DEFAULT NULL,
  `list_date` date DEFAULT NULL,
  PRIMARY KEY (`id`,`ts_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入 3 条数据 ;

insert into stock_basic(id,ts_code,symbol,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安银行','深圳','银行','1991-04-03'),
(2,'000002.SZ','000002','万科A','深圳','地产','1991-01-29'),
(3,'000004.SZ','000004','ST国华','深圳','软件服务','1991-01-14')

准备连接器配置文件,并启动连接任务

第一步:准备 MySQL 同步任务配置文件

创建连接 MySQL 的 source 连接器配置文件。

mkdir /KFDATA/datasyn-config
cd /KFDATA/datasyn-config
vim source-mysql.json

录入以下配置 ,hostname 和 kafka 启动地址需对应修改。

{
    "name": "basicinfo-connector",
    "config":{
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "topic.prefix":"mysqlserver",
        "database.hostname": "192.168.189.130",
        "database.port": "3306",
        "database.user": "datasyn",
        "database.password": "1234",
        "database.server.id": "2223314",
        "database.include.list": "basicinfo",
        "schema.history.internal.kafka.bootstrap.servers": "192.168.189.130:9092",
        "schema.history.internal.kafka.topic": "schema-changes.basicinfo",
        "heartbeat.interval.ms":"20000"
    }
}

参数说明:以上参数为必填参数。更多详细参数说明可以参看 Debezium connector for MySQL :: Debezium Documentation

参数名称默认值参数说明
connector.class连接器的 Java 类的名称。这里是 mysql 的连接器类名。
tasks.max1当前 connector 的最大并行任务数。mysql 的 source 连接器任务数只能是 1。
topic.prefix当前 connector 同步写入任务的命名空间。会被用于添加到同步表对应 topic 名称前等
database.hostnameMySQL 数据库服务器的 IP 地址或主机名。
database.port3306MySQL 数据库服务器的整数端口号。
database.userMySQL 数据库服务器连接用户。
database.passwordMySQL 数据库服务器连接用户密码。
database.server.id用来模拟 MySQL 隶属进程的进程号。同步程序会以此数字 ID 加入 MySQL 集群。
database.influde.list匹配的数据库名。可以多个,用逗号分割即可。
schema.history.internal.kafka.bootstrap.servers数据同步记录 MySQL 的表结构信息的 kafka 连接
schema.history.internal.kafka.topic数据同步记录 MySQL 表结构的 topic 名称
heartbeat.interval.ms0当接到 MySQL 更改事件时,保证触发记录 binlog 事务位置或者 gtid 的间隔事件。(如果此值为 0 时,接收到不属于数据同步表的改变事件时,不会记录事务位置,可能导致当前记录的同步事务号大幅度落后 MySQL 的最新事务号)。

第二步:启动 MySQL 的数据同步任务

通过 rest api 启动 MySQL 的 source 连接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-mysql.json

也可以通过我们提供 kafka-tools 中的脚本启动,操作能简单一些

cd /KFDATA/kafka-tools/bin
./rest.sh create @/KFDATA/datasyn-config/source-mysql.json

第三步:查看 MySQL 数据同步任务状态

查看同步任务列表。list 参数展示任务名列表,showall 参数会显示全部同步任务状态。

./rest.sh list
./rest.sh showall

通过下图可以看到,connector 和 task 的状态都是 RUNNING,当前同步任务状态正常。

说明:每个同步任务会有一个 connector,可以多个 task。

使用 kafka-tools 的脚本 kafka.sh 查看 kafka 中的 topic

cd /KFDATA/kafka-tools/bin
./kafka.sh tplist|grep mysqlserver

下图中的 topic [mysqlserver.basicinfo.index_components] 即为我们的表 basicinfo.index_components 数据在 kafka 中的存储

查看 topic [mysqlserver.basicinfo.index_components] 中的数据条数。

./kafka.sh get_offsets mysqlserver.basicinfo.index_components

kafka 中已经同步了 MySQL 表 basicinfo.index_components 的 4 条数据。

说明: 在同步 MySQL 的初始快照数据时,不能中断。否则必须清理全部已同步数据,重新开始。即初始快照数据不支持断点续传。

部署 Kafka 数据同步到 DolphinDB

安装 Kafka-DolphinDB 连接器

配置启动 Kafka-DolphinDB 连接器插件,需要以下两步:

  1. 安装 Kafka-DolphinDB 插件,并将插件路径配置到 Kafka Connect 配置文件中。
  2. 重新启动 Kafka Connect 程序,以加载插件。

第一步:下载 Kafka-DolphinDB 插件

  • jdbc-1.30.22.4-ddbsync.Beta1.jar:该 DolphinDB JDBC 包为数据同步做了一些专门修改,后续会同步到主分支上。
  • kafka-connect-jdbc-10.7.4-ddb1.01.Beta1.jar:基于 kafka-connect-jdbc-10.7.4 开发,添加了 DolphinDB 连接器。

创建插件路径(部署 Kafka,Kafka-Connnect 环境的 kafka 用户),在此路径下放置 Kafka-DolphinDB 插件包,上面两个包都要放到此目录下。

sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc

第二步:配置 Kafka-Connect 加载插件

Kafka-DolphinDB 插件包的父路径与前文 Debezium-MySQL 连接器插件路径均为 /opt/confluent/share/java/plugin/,因此无需再次配置到 Kafka-Connect 的配置文件中。

如果路径不一致,可以在 kafka-connect.properties 中的 plugin.path 参数里配置,以逗号分隔。

查看 plugin.path 参数配置:

cat /KFDATA/kafka-connect/etc/kafka-connect.properties |grep plugin

重新启动 Kafka Connect:

sudo systemctl stop kafka-connect
sudo systemctl start kafka-connect

查看日志输出

cat /KFDATA/kafka-connect/logs/connect.log|grep JdbcSinkConnector

出现下图中所示信息时,说明插件加载成功。

DolphinDB 的数据同步准备

第一步:创建同步的库、表

要求:当前支持数据同步,需要依赖 TSDB 引擎的 keepDuplicates = LAST 数据来保证数据写入的幂等性,即发生数据重复时,两次及以上的相同增量数据写入,不影响数据的一致性。所以需要满足以下条件:

  1. DolphinDB 的表必须是 TSDB 引擎且设置 keepDuplicates = LAST。
  2. TSDB 引擎目前不支持单字段 sortColumn 设置 keepDuplicates = LAST,所以同步的 MySQL 目标表主键必 须是 2个及以上字段。
  3. sortColumn 最后的字段必须是时间或者数字。对应的 MySQL 目标表主键字段必须包含时间或数字。

分别创建之前 MySQL 中两张表的对应表:

  1. 创建 MySQL 表 basicinfo.index_components 的DolphinDB 对应分布式表 [dfs://index_data].[index_components]
def createIndexComDB(dbName){
	if(existsDatabase(dbName)){
	dropDatabase(dbName)
	}
	database(directory=dbName, partitionType=RANGE, partitionScheme= 1999.01M + (0..26)*12,engine="TSDB")
}
def createIndexCom(dbName,tbName){
	db=database(dbName)
             if(existsTable(dbName, tbName)){
                   db.dropTable(tbName)	
	}
	mtable=table(100:0, `trade_date`code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`timestamp`flag, [TIMESTAMP,SYMBOL,TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,SYMBOL,DOUBLE,TIMESTAMP,INT]);
	db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`trade_date,sortColumns=`code`indexCode`flag`trade_date,compressMethods={trade_date:"delta"},keepDuplicates=LAST)
}
createIndexComDB("dfs://index_data")
createIndexCom("dfs://index_data",`index_components)

2. 创建 MySQL 表 basicinfo.stock_basic 的 DolphinDB 对应分布式表 [dfs://wddb].[stock_basic]

def createStockBasicDB(dbName){
	if(existsDatabase(dbName)){
	dropDatabase(dbName)
	}
	db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 1],engine="TSDB")
}
def createStockBasic(dbName,tbName){
	db=database(dbName)
             if(existsTable(dbName, tbName)){
                   db.dropTable(tbName)	
	}
             mtable=table(100:5, `id`ts_code`symbol`name`area`industry`list_date, [LONG,SYMBOL,SYMBOL,SYMBOL,SYMBOL,SYMBOL,DATE]);
	 db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`id,sortColumns=`ts_code`id,keepDuplicates=LAST,sortKeyMappingFunction=[hashBucket{,100}])
}
createStockBasicDB("dfs://wddb")
createStockBasic("dfs://wddb", `stock_basic)

第二步:配置同步配置表

DolphinDB 做为数据的接收端,本身无需做数据库上的额外设置,按正常使用配置即可。但由于 DolphinDB 中的数据存储表通常以分布式表为主,且分布式表是按照分区规则放置在不同的库名下,不同库名下的表是支持重名的。所以需要提供对于 DolphinDB 中表的同步配置信息。

  1. 在 DolphinDB 中创建一张配置表。库、表名可在后续操作中调整,但是表中字段名要保持一致。
  • 数据库名:dfs://ddb_sync_config
  • 表名:sync_config
dbName = "dfs://ddb_sync_config"
if(existsDatabase(dbName)){
    dropDatabase(dbName)
}
db=database(dbName, HASH, [SYMBOL, 5])

if(existsTable(dbName, "sync_config"))
    db.dropTable("sync_config")
mtable=table(100:0, `connector_name`topic_name`target_db`target_tab, [SYMBOL,SYMBOL,SYMBOL,SYMBOL]);
db.createTable(table=mtable, tableName="sync_config")

2. 插入配置表信息,配置 MySQL 表 basicinfo.index_components 和 basicinfo.stock_basic 对应的 kafka 中 topic 名称对应的 DolphinDB 分布式表

sync_config=loadTable("dfs://ddb_sync_config","sync_config");
tmp_tab=table(100:0,`connector_name`topic_name`target_db`target_tab, [SYMBOL,SYMBOL,SYMBOL,SYMBOL]);
insert into tmp_tab (connector_name,topic_name,target_db,target_tab) values ("ddb-sink","mysqlserver.basicinfo.index_components","dfs://index_data","index_components");
insert into tmp_tab (connector_name,topic_name,target_db,target_tab) values ("ddb-sink","mysqlserver.basicinfo.stock_basic","dfs://wddb","stock_basic");
sync_config.append!(tmp_tab);

表中数据如下:

注意:对于同一个 connector_name,相同的 topic_name 只能配置一条数据。配置分布式库、表必须在 DolphinDB 书库中存在。

字段名类型字段作用
connector_nameSymbol配置的 DolphinDB sink 同步任务名
topic_nameSymbol要同步的 kafka topic 名称
target_dbSymbol对应的 DolphinDB 分布式库名
target_tabSymbol对应的 DolphinDB 分布式表名

配置 DolphinDB 的数据同步连接任务

准备连接器配置文件,并启动连接任务

创建 DolphinDB 数据同步任务配置文件

cd /KFDATA/datasyn-config
vim ddb-sink.json

配置如下:

{
    "name": "ddb-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "2",
        "topics": "mysqlserver.basicinfo.index_components,mysqlserver.basicinfo.stock_basic",
        "connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.evolve": "false",
        "insert.mode": "insert",
        "delete.enabled": "true",
        "batch.size":"10000",
        "pk.mode": "record_key",
        "ddbsync.config.table":"dfs://ddb_sync_config,sync_config"
    }
}

参数说明:以上参数项为同步 DolphinDB 所需参数。如果对 Confluent 的 JDBC Sink Connect 有经验,可适当调节。

参数名称默认值参数说明
name同步任务名称,不可重复。
connector.class连接器的 Java 类的名称。这里是 JdbcSink 的通用连接器类名。
tasks.max1当前 connector 的最大并行任务数。可以调节增大,会创建多 consumer 并行消费读取 Kafka 中数据。一般的数据同步场景设置到 10 基本可以满足同步速度上的需求。
topics配置要同步的 Kafka 中的 topic 名称,配置多个 topic 时用逗号分割。
connection.urlMySQL 数据库服务器的 IP 地址或主机名。
transforms声明数据转换操作。
transforms.unwrap.type声明数据转换器类别。请保持不变。
transforms.unwrap.drop.tombstonesfalse声明是否删除 Kafka 中的墓碑数据。
auto.evolvetrue当 DolphinDB 中缺少列时,是否自动增加列。当前不支持自动增加列,必须配置为 false。
insert.modeinsert数据插入模式。当前只支持 insert 模式。
pk.modenone主键模式。必须设置为 record_key。
delete.enabledfalse在主键模式为 record_key 情况下。对于 null 值 record 是否按照 delete 进行操作。
batch.size3000设置在数据量足够大时。以每批最大多少条来写入到目标数据库。注意:当该值大于 Connect worker 中设置的 consumer.max.pol.records 时,每次提交数量会受 consumer.max.pol.records 的值限制。
ddbsync.config.tabledfs://ddb_sync_config, sync_configKafka 中的 topic 对应 DolphinDB 表的配置表名称。可以自行定义库、表名称。但表中的字段要保持一致。表结构见“DolphinDB 的数据同步准备”。

通过 REST API 启动 source 连接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @ddb-sink.json

也可以通过我们提供 kafka-tools 中的脚本启动

cd /KFDATA/kafka-tools/bin
./rest.sh create @/KFDATA/datasyn-config/ddb-sink.json

查看同步任务列表。其中,”ddb-sink” 为 DolphinDB 数据同步程序。

./rest.sh list

查看 DolphinDB 的 sink 同步任务状态

./rest.sh status ddb-sink

通过下图可以看到,同步到 DolphinDB 的同步任务包含 1 个 connector 和 2 个 task 。两个 task 状态都是 RUNNING,即正常运行。这里配置了两个线程进行数据消费,并写入 DolphinDB。

查看 DolphinDB 中的数据

select * from loadTable('dfs://index_data', 'index_components');
select * from loadTable('dfs://wddb', 'stock_basic')

数据分别如下,两张表的初始数据均已经同步到了 DolphinDB 中。

实时数据同步验证

第一步:插入新数据

在 MySQL 中插入两条新数据。

insert into basicinfo.index_components (trade_date,code,effdate,indexShortName,indexCode,secShortName,exchangeCD,weight,timestamp,flag)
values
('2006-11-30','600051','2018-06-30 05:48:05','上证180','000010','三一重工','XXXB',0.0043,'2018-06-30 05:48:05',1),
('2006-11-30','600052','2018-06-30 06:48:02','沪深300','000300','三一重工','XSHG',0.0029,'2018-06-30 05:48:05',1)

在 DolphinDB 中进行查询,可以看到已经多了两条 code 值为 600051 和 600052 的。

select * from loadTable('dfs://index_data', 'index_components');

第二步:数据更新

在 MySQL 中更新一条数据,这里我们做一个涉及主键字段的更新。

update basicinfo.index_components set code='600061' where code ='600051'

在 DolphinDB 中进行查询,发现表中已经不存在 code 值为 600051 的数据,但可以看到一条 code 值为 600061 的数据。

select * from loadTable('dfs://index_data', 'index_components');

第三步:数据删除

从 MySQL 中删除一条数据。

delete from basicinfo.index_components where code='600061'

在 DolphinDB 中进行查询,可以看到 code 值为 600061 的数据已经不存在了。

运维操作

DolphinDB 同步须知

  1. DolphinDB 是一款支持海量数据的分布式时序数据库。针对不同的数据处理需求,在底层架构上天然上与通常的关系型数据库不同。所以需要有以下限制:
  • DolphinDB 的表没有主键设计,需要设置成 sortColumn 字段,并设置 keepDuplicates = LAST 来进行去重,确保数据唯一。
  • DolphinDB 表采用 TSDB 引擎,才可以设置 sortColumn
  • DolphinDB 中 TSDB 引擎的 sortColumn 中必须要有时间列或者数字列,对应的来源主键则必须包含同样类型字段。
  • DolphinDB 中 TSDB 引擎的 sortColumn 中必须要有至少两个字段,才能设置 keepDuplicates = LAST,所以对应的来源表主键必须是 2 个字段及以上。

2. DDL 语句相关:

  • 当前不支持 DDL 语句同步。
  • 当前不支持同时修改两边表后的数据传递。

部署检查

  1. 查看当前服务是否都在运行状态
sudo systemctl list-units |egrep 'zookeeper|kafka-server|schema-registry|kafka-connect'

也可以使用 Jps 等其他方法快速查看 Java 进程。

2. 运行以下命令查看当前的同步任务列表查询

查看当前有哪些同步任务:

./rest.sh list

3. 查看某个同步任务的状态

./rest.sh status ddb-sink

4. 暂停同步任务,该操作会停止当前整体 connector 同步任务

./rest.sh c_pause ddb-sink

5. 恢复同步任务

./rest.sh c_resume ddb-sink

对于曾经由于数据库报错一度暂停的同步任务,在错误消除后,只要 connector 运行正常,可以通过以下命令使其恢复同步:

./rest.sh t_restart ${connector_name} ${task_id}

6. 修改同步任务配置参数

./rest c_alter ${connector_name} @source_config.json

修改参数时,只需传递参数,不需要带有 connector name,格式示例如下:

数据同步情况检查

正常情况下,数据同步程序会保持稳定的数据同步。对于意外因素造成的数据未同步,可参考以下步骤逐一排查:

  1. 查看 MySQL 中binlog中记录的最新位置。

查看该值需要正确的配置 gtid_mode 等参数,按照前面的提供的 MySQL 参数配置既可。

SHOW MASTER STATUS;

查看 MySQL 中的 binlog 具体数据库更改。 可以通过 mysqlbinglog 命令查看 MySQL 的 binlog 中记录的数据库改变。

./mysqlbinlog --base64-output=decode-rows -v --skip-gtids /usr/local/mysql/data/binlog.000003|less

2. 查看 Kafka 中记录的 MySQL 同步的 binlog 位置。

结合前面查看的 MySQL 最新 binlog 位置,可以确定当前数据从 MySQL 到 Kafka 的同步进度。

./consume.sh --topic connect-offsets --from-beginning |grep basicinfo-connector

查看 Kafka 中数据, Kafka 中的数据是已序列化的二进制存储。需要使用 avro 调用 schema-registry 中的表结构信息及进行反序列化。这里我们提供了 tpconsumer.sh 脚本,可以提供反序列化后的 Kafka 中的真实数据,并匹配上该条数据对应的表结构。

./tpconsumer.sh --op=2 --topic=mysqlserver.basicinfo.index_components --offset=1 --max-messages=2

3. 查看当前 DolphinDB 同步任务列表。

下面命令可以查看当前 Kafka 中的消费组。

./kafka.sh cm_list

查看 DolphinDB 同步任务对应的 Kafka 消费组中的每一个 consumer 的消费进度,通过此命令可以查看同步程序中每一张的表同步进度。 Lag 为 0 则表示 Kafka 中 topic 当前没有未消费的数据,即 Kafka 中的数据与对应表的数据是一致的。

./kafka.sh cm_detail connect-ddb-sink|awk '{printf "%-20s %-40s %-9s %-14s %-15s %-10s %-30s\n", $1, $2, $3, $4, $5, $6,$7}'

附录

KFDATA.tar 压缩包包含:数据的同步数据文件夹、配置文件及 Kafka-tools 脚本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1323047.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单总线cpu设计(包含定长指令周期和变长指令周期和现代时序设计)

来都来了点个赞收藏一下再走呗~~~🌹🌹🌹🌹🌹 一、定长指令周期cpu设计 第1关:MIPS指令译码器设计 此实验就是只需要知道mips知道操作码op对应的值是什么就可以了,下面给出实验中用到的mips指令…

大型语言模型:RoBERTa — 一种稳健优化的 BERT 方法

slavahead 一、介绍 BERT模型的出现BERT模型带来了NLP的重大进展。 BERT 的架构源自 Transformer,它在各种下游任务上取得了最先进的结果:语言建模、下一句预测、问答、NER标记等。 尽管 BERT 性能出色,研究人员仍在继续尝试其配置&#xff0…

JNDI注入Log4jFastJson白盒审计不回显处理

目录 0x00 前言 0x01 Maven 仓库及配置 0x02 JNDI 注入简介 0x03 Java-第三方组件-Log4J&JNDI 0x04 Java-第三方组件-FastJson&反射 0x05 白盒审计 - FastJson 0x06 白盒审计 - Log4j 0x07 不回显的处理方法 0x00 前言 希望和各位大佬一起学习,如果…

RK3399平台开发系列讲解(内核入门篇)网络协议的分层

🚀返回专栏总目录 文章目录 一、应用层二、传输层三、网络层四、数据链路层(Data Link Layer)五、物理层沉淀、分享、成长,让自己和他人都能有所收获!😄 📢对于多数的应用和用户而言,使用互联网的一个基本要求就是数据可以无损地到达。用户通过应用进行网络通信࿰

Android的组件、布局学习

介绍 公司组织架构调整,项目组需要承接其他项目组的android项目,负责维护和开发新需求,故学习下基础语法和项目开发。 组件学习 Toolbarheader布局部分 就是app最顶部的部分 他的显示与否,是与F:\androidProject\android_lear…

LVGL 显示图片

LVGL 显示图片 LVGL显示图片1. 显示图片文件2. 显示C数组格式3. 显示RAM中的图像文件4. 图像符号显示5. 显示GIF动画LVGL显示图片代码分析 LVGL显示图片 lvgl 8.3版本默认支持PNG,BMP,JPG,SJPG和GIF动图等格式的图片显示; 需要在lv_conf.h配置文件里使能对应图片的…

【网络安全】—Shell编程入门(1)

文章目录 基础变量概念介绍特殊变量进阶数值计算实践条件测试比较条件判断语句流程控制语句循环语句应用 Shell 是 Unix/Linux 操作系统下的一种命令行解释器,它接收用户输入的命令然后调用相应的程序。我们可以通过 Shell 脚本来自动执行一系列的命令。接下来&…

基于python的leetcode算法介绍之递归

文章目录 零 算法介绍一 简单示例 辗转相除法Leetcode例题与思路[509. 斐波那契数](https://leetcode.cn/problems/fibonacci-number/)解题思路:题解: [206. 反转链表](https://leetcode.cn/problems/reverse-linked-list/)解题思路:题解&…

最新AI创作系统ChatGPT系统源码+DALL-E3文生图+AI绘画+GPT语音对话功能

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

计算机网络 网络层下 | IPv6 路由选择协议,P多播,虚拟专用网络VPN,MPLS多协议标签

文章目录 5 IPv65.1 组成5.2 IPv6地址5.3 从IPv4向IPv6过渡5.3.1 双协议栈5.3.2 隧道技术 6 因特网的路由选择协议6.1 内部网关协议RIP6.2 内部网关协议 OSPF基本特点 6.3 外部网关协议 BGP6.3.1 路由选择 6.4 路由器组成6.4.1 基本了解6.4.2 结构 7 IP多播7.1 硬件多播7.2 IP多…

vp与vs联合开发-通过FrameGrabber连接相机

添加控件 1.CogRecordDisplay 控件 用于显示图像 初始化相机对象方法 //启动窗体时 调用初始化相机方法 //封装相机关闭方法 //窗体关闭时 调用相机关闭方法 拍照 设置采图事件 // 保存图像 设置曝光按钮事件 1.可变参数

C# pictureBox显示一张图片,我想先释放这个图片以免占用无法修改,(旋转)改完再显示这张图片

效果 public static bool RotateFlip(MyDel Log, string fileName){try{string tempPath Path.GetTempFileName();using (Bitmap bmp new Bitmap(fileName)){float resolution 600; //x,y必须为这个数 误差小于-1bmp.RotateFlip(RotateFlipType.Rotate90FlipNone);bmp.Save(…

智能优化算法应用:基于冠状病毒群体免疫算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于冠状病毒群体免疫算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于冠状病毒群体免疫算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.冠状病毒群体免疫算法4.…

飞天使-k8s-知识点1-kubernetes架构简述

文章目录 名词功能要点 k8s核心要素CNCF 云原生框架简介k8s组建介绍 名词 CI 持续集成, 自动化构建和测试:通过使用自动化构建工具和自动化测试套件,持续集成可以帮助开发人员自动构建和测试他们的代码。这样可以快速检测到潜在的问题,并及早…

C/C++编程中的算法实现技巧与案例分析

C/C编程语言因其高效、灵活和底层的特性,被广大开发者用于实现各种复杂算法。本文将通过10个具体的算法案例,详细探讨C/C在算法实现中的技巧和应用。 一、冒泡排序(Bubble Sort) 冒泡排序(Bubble Sort)是一…

单片机应用实例:LED显示电脑电子钟

本例介绍一种用LED制作的电脑电子钟(电脑万年历)。其制作完成装潢后的照片如下图: 上图中,年、月、日及时间选用的是1.2寸共阳数码管,星期选用的是2.3寸数码管,温度选用的是0.5寸数码管,也可根据…

人工智能中不可预测的潜在错误可能是灾难性的——数学解释

一、说明 有没有人研究评估AI的错误产生的后果有多么严重,是否存在AI分险评估机制?更高维度上,人工智能的未来是反乌托邦还是乌托邦?这个问题一直是争论的话题,各大阵营都支持。我相信我们无法准确预测这两种结果。这是…

画图之C4架构图idea和vscode环境搭建篇

VS Code 下C4-PlantUML安装 安装VS Code 直接官网下载安装即可,过程略去。 安装PlantUML插件 在VS Code的Extensions窗口中搜索PlantUML,安装PlantUML插件。 配置VS Code代码片段 安装完PlantUML之后,为了提高效率,我们最好安装PlantUML相关的代码片段。 打开VS Cod…

React心理健康测试网站系统源码

帮助需要的人更好地了解自己的心理健康状态和人格特征。本模板提供了一个最小的配置,使得React可以在Vite中启用HMR,并且包含了几个ESLint规则。只需要使用react antd-mobile即可轻松部署完成。 源码下载:https://download.csdn.net/downlo…

操作系统系列:Unix进程系统调用fork,wait,exec

操作系统系列:Unix进程系统调用 fork系统调用fork()运用的小练习 wait系统调用Zombiesexec 系列系统调用 开发者可以查看创建新进程的系统调用,这个模块会讨论与进程相关的Unix系统调用,下一个模块会讨论Win32 APIs相关的进程。 fork系统调用…