搭建 canal 监控mysql数据到Elasticsearch(总结)

news2024/9/21 0:33:41

搭建 canal 监控mysql数据到 elasticsearch 中

需求:

要将 MySQL 数据库 info 中的 notice 和 result 表的增、删、改操作同步到 Elasticsearch 的 notice 和 result 索引,您需要正确配置 MySQL、Canal 、Canal Adapter 、 Elasticsearch 和 kibana
系统rocky9.2
IP192.168.174.136
服务/版本
mysql:8.0.26
Canal:1.1.8
canal.adapter:1.1.8
Elasticsearch:8.15.0
kibana:8.15.0

一、搭建mysql

博客地址

结合上面的博客地址的安装方式,使用下面的配置文件即可
mysql配置文件,我使用的是下面这个
vim /etc/my.cnf
[mysql]
default-character-set=utf8
socket=/var/lib/mysql/mysql.sock

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
expire-logs-days=7  # 可选,设置 Binlog 日志保留天数
# binlog-do-db=info  # 仅记录 info 数据库的 Binlog

bind-address = 0.0.0.0
#skip-name-resolve
# (注意这里我端口修改为了3066)
port = 3066 
socket=/var/lib/mysql/mysql.sock
basedir=/usr/local/mysql
datadir=/data/mysql
max_connections=5000
character-set-server=UTF8MB4
default-storage-engine=MyISAM
lower_case_table_names=1
sql-mode=STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
sql_mode=STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION

server-id=1
innodb-file-per-table=1
wait_timeout=2880
interactive_timeout=2880
innodb_buffer_pool_size = 10G
innodb_log_file_size= 1G
key_buffer_size = 1536M
max_allowed_packet = 1024M
table_open_cache = 500
tmp_table_size = 2048M
max_heap_table_size = 2048M
sort_buffer_size = 16M
read_buffer_size = 16M
read_rnd_buffer_size = 32M
myisam_sort_buffer_size = 128M
thread_cache_size = 8

#relay_log=mall-mysql-relay-bin
#relay_log_index=relay-log.index

[mysqld_safe]
log-error=/var/log/mysql/mysql.log
安装完成之后登录mysql
创建一个 canal 用户

CREATE USER 'canal'@'%' IDENTIFIED BY '123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

允许root远程访问
CREATE USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
GRANT ALL ON *.* TO 'root'@'%';
FLUSH PRIVILEGES;

二、安装Elasticsearch

操作系统优化
1 设置打开的文件句柄数和线程数
vim /etc/security/limits.conf

# 添加
# soft:软限制;hard:硬限制
# nproc:单个用户可打开的进程最大数
# nofile:单个进程打开文件最大数
# as:地址空间限制(unlimited:无限)
# fsize:最大文件大小
# memlock:最大锁定内存地址空间
*               soft    nproc           65536
*               hard    nproc           65536
*               soft    nofile          65536
*               hard    nofile          65536
*               -       as              unlimited
*               -       fsize           unlimited
*               -       memlock         unlimited

2 关闭 swap 交换空间
swapoff -a && sed -i '/swap/s/^.*$/#&/' /etc/fstab

3 设置虚拟内存大小和 TCP 超时重传次数
vim /etc/sysctl.conf

# 添加
vm.max_map_count=262144
net.ipv4.tcp_retries2=5
net.core.somaxconn = 1024
vm.overcommit_memory = 1
# 默认情况下 TCP keepalive 时间为 60 秒,超时重传 15 次。

sysctl -p
开始安装 Elasticsearch

下载&解压 Elasticsearch

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.0-linux-x86_64.tar.gz
tar -xf elasticsearch-8.15.0-linux-x86_64.tar.gz -C /usr/local/

useradd -u 9200 esuser
mkdir -p /data/elasticsearch/{data,logs,temp}
chown -R esuser:esuser  /data/elasticsearch/ /usr/local/elasticsearch-8.15.0/
cd /usr/local/elasticsearch-8.15.0/

修改配置文件

vim config/elasticsearch.yml
cluster.name: es-dev #集群名称
node.name: es #节点名称

#######----------这个配置只需要在kibana节点安装 ,且需要使用堆栈监测功能---------#######
node.roles: [master,data,ingest, ml, transform, remote_cluster_client]
#######----------如果使用堆栈功能,需要把安全认证关闭------------------#######

path.data: /data/elasticsearch/data # 数据存储位置
path.logs: /data/elasticsearch/logs #日志存储位置
network.host: 0.0.0.0 #允许连接IP
# 允许跨域
http.port: 9200 # 网页访问端口
transport.profiles.default.port: 9300
http.cors.enabled: true

http.cors.allow-origin: "*"
http.cors.allow-headers: "*"
#http.cors.allow-methods: "GET"
cluster.initial_master_nodes: ["es"]
action.destructive_requires_name: false
discovery.seed_hosts: ["192.168.174.136:9300"] # 集群成员

#如果关闭安全认证需要使用以下方法
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false

#关闭geoip
ingest.geoip.downloader.enabled: false
xpack.monitoring.collection.enabled: true

vim config/jvm.options

将注释删除,改为当前自己需要的内存大小,作者当前内存是16G,所以改为4G大小 1/4即可
-Xms4g
-Xmx4g

下载中文分词器

./bin/elasticsearch-plugin install https://get.infini.cloud/elasticsearch/analysis-ik/8.15.0
中途输入 y
下载完成会在 plugins 目录生成analysis-ik 目录
chown -R  esuser.esuser /usr/local/elasticsearch-8.15.0/

启动 elasticsearch

runuser -l esuser -c "/usr/local/elasticsearch-8.15.0/bin/elasticsearch -d"

# 查看日志
tail -f /data/elasticsearch/logs/es-dev.log

三 、安装kibana

wget https://artifacts.elastic.co/downloads/kibana/kibana-8.15.0-linux-x86_64.tar.gz
cd /usr/local/kibana-8.15.0/

修改配置
vim config/kibana.yml

server.port: 5601

server.host: "0.0.0.0"
##填本机IP或者 0.0.0.0 都可以,最好写本机IP

server.name: "devops-kibana"
## name 名称可以随便指定

### es集群配置
elasticsearch.hosts: ["http://192.168.174.136:9200"]

pid.file: /usr/local/kibana-8.15.0/kibana.pid

elasticsearch.requestTimeout: 99999

i18n.locale: "zh-CN"  
#---------------------#####-------------------------
#如果高版本需要配置如下两个参数   用户名密码为 elasticsearch 安全证书用户密码 
#如果没有生成证书认证,可不加如下两个参数
elasticsearch.username: "kibana"
elasticsearch.password: "devops@123"

启动kibana

useradd -u 5601 kibana
chown -R kibana:kibana /usr/local/kibana-8.15.0/
nohup /usr/local/kibana-8.15.0/bin/kibana --allow-root > /var/log/kibana.log &

#查看日志
tail -f /var/log/kibana.log
启动完成之后准备创建mysql测试数据
登录到mysql中,
# 创建要监听的数据库
create database info;

# 创建一个名为 result 的表,并包含 id、user_id、score、created_at 和 updated_at 字段。
CREATE TABLE result (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
score DECIMAL(5, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

# 插入5 条数据
INSERT INTO result (user_id, score, created_at, updated_at) VALUES
(1, 95.75, '2024-08-01 10:00:00', '2024-08-01 10:00:00'),
(2, 88.50, '2024-08-01 10:05:00', '2024-08-01 10:05:00'),
(3, 76.20, '2024-08-01 10:10:00', '2024-08-01 10:10:00'),
(4, 82.00, '2024-08-01 10:15:00', '2024-08-01 10:15:00'),
(5, 91.30, '2024-08-01 10:20:00', '2024-08-01 10:20:00'),

创建一个名为 notice 的表,并包含id, title, content, created_at, updated_at 字段。

CREATE TABLE notice (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

id 列是自增的主键,title 是一个不允许为空的字符串,content 是可选的文本,created_at 和 updated_at 列处理时间戳

# 要插入 5 条数据到 notice 表中
('Title 1', 'Content for notice 1', NOW(), NOW()),
('Title 2', 'Content for notice 2', NOW(), NOW()),
('Title 3', 'Content for notice 3', NOW(), NOW()),
('Title 4', 'Content for notice 4', NOW(), NOW()),
('Title 5', 'Content for notice 5', NOW(), NOW()),
创建索引
登录到 kibana中 – 点击 开发工具,复制下面的命令,

在这里插入图片描述

创建 notice索引,复制到控制台,点击运行
PUT /notice
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"  // 使用 keyword 类型,适合精确匹配
      },
      "title": {
        "type": "text",    // 使用 text 类型,支持全文搜索
        "analyzer": "standard" // 使用标准分析器
      },
      "content": {
        "type": "text",    // 使用 text 类型,支持全文搜索
        "analyzer": "standard" // 使用标准分析器
      },
      "created_at": {
        "type": "date",    // 使用 date 类型,处理日期和时间
        "format": "yyyy-MM-dd'T'HH:mm:ss" // 日期格式
      },
      "updated_at": {
        "type": "date",    // 使用 date 类型,处理日期和时间
        "format": "yyyy-MM-dd'T'HH:mm:ss" // 日期格式
      }
    }
  }
}
创建 result 索引,复制到控制台,点击运行
PUT /result
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "user_id": {
        "type": "integer"
      },
      "score": {
        "type": "float"
      },
      "created_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "updated_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      }
    }
  }
}

注释:
Shards 和 Replicas:number_of_shards 定义了分片的数量,number_of_replicas 定义了每个主分片的副本数。根据你的数据量和需求进行调整。
Mappings:在创建索引时定义的数据类型对数据的索引和查询性能有很大影响,因此请确保使用合适的数据类型。

查看索引
创建索引后,你可以查看索引的详细信息以确认设置和映射是否正确:

curl -X GET "localhost:9200/result"	
curl -X GET "localhost:9200/notice"	
向索引中添加文档
创建索引后,你可以向 notice 索引中添加文档。以下是一个示例:
POST /notice/_doc/1
{
  "id": "1",
  "title": "Sample Notice Title",
  "content": "This is the content of the notice.",
  "created_at": "2024-08-29T08:00:00",
  "updated_at": "2024-08-29T08:00:00"
}

查询索引中的数据
要查询 notice 索引中的数据,可以使用如下的查询请求:
GET /notice/_search
{
  "query": {
    "match_all": {}
  }
}

四、下载安装 Canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.adapter-1.1.8-SNAPSHOT.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.deployer-1.1.8-SNAPSHOT.tar.gz
# 下面这两个安装包可有可无
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.admin-1.1.8-SNAPSHOT.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.example-1.1.8-SNAPSHOT.tar.gz

解压canal 与 canal-adapter

mkdir /usr/local/canal.adapter
mkdir /usr/local/canal
tar -xf canal.adapter-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal.adapter
tar -xf canal.deployer-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal

修改配置

cd /usr/local/canal/ 
vim conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 这个值随意修改,但是不能和 mysql 主库的server-id值一样
canal.instance.mysql.slaveId=1001  


# enable gtid use true/false
canal.instance.gtidon=false

# position info
# mysql 库的 IP地址和端口(注意这里我端口修改为了3066)
canal.instance.master.address=127.0.0.1:3066
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# mysql 账户与密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
# 监听 info 库的notice表与result表
canal.instance.filter.regex=info\\.notice,info\\.result
# table black regex
# 不监听mysql库中以slave_开头的表
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
# 这个默认即可
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
vim  conf/canal.properties
因为我们要将监控到的数据传到ES,所以修改地方比较少
canal.serverMode = tcp
canal.instance.tsdb.enable = true
# 集群的配置只要把H2改为mysql,因为要进行元数据管理。
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3066/canal_manager
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 123456

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

配置文件中注释的中文备注,记得观察。

启动 canal

./bin/startup.sh 

# 查看日志
tail -f ./logs/canal/canal.log 
输出如下及正常。

在这里插入图片描述

tail -f logs/example/example.log (无报错及正常)

如果报错如下:
[main] WARN o.s.context.support.ClassPathXmlApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'tableMetaTSDB' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'metaHistoryDAO' while setting bean property 'metaHistoryDAO'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'metaHistoryDAO' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'sqlSessionFactory' while setting bean property 'sqlSessionFactory'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'dataSource' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.CannotLoadBeanClassException: Cannot find class [com.alibaba.druid.pool.DruidDataSource] for bean with name 'dataSource' defined in class path resource [spring/tsdb/h2-tsdb.xml]; nested exception is java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource
处理
将druid的jar包放在lib目录就可以了。druid-1.2.22.jar测试通过
下载地址 :https://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/

五、安装 Canal.adapter

cd /usr/local/canal.adapter
vim application.yml
将多余的删除,取其精华即可
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    # canal.tcp.server.host需要修改
    canal.tcp.server.host: 192.168.174.136:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # url,username,password需要修改(注意,这里端口我修改过) info是库名
      url: jdbc:mysql://192.168.174.136:3066/info?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: gl 
      outerAdapters:
      - name: logger
      # name需要修改
      # 这个表示我们使用的是哪个适配器,es8 表示使用的是 es8 适配器,其他的可以参考解压后的 conf 下面的目录名称
      - name: es8
        # hosts需要修改(注意,要加上http://)
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          # cluster.name需要修改 ES集群名称
          cluster.name: es-dev

适配器配置

在上面的启动器的配置中我们已经配置了 ES8 作为适配器,那具体要同步的是哪张表, 以及对应的 ES中是索引是哪个怎么配置呢?这些配置就放在适配器的配置里面, 每一个适配器的配置都是一个想要同步到 ES 的模板配置。

因为我使用的es8适配器,所以进到es8中
cd conf/es8
cp -v mytest_user.yml esMappingNotice.yml
rm -rf biz_order.yml  customer.yml   mytest_user.yml
vim esMappingNotice.yml
dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
  _index: notice
  _id: _id
  _type: _doc
  upsert: true
  sql: "
SELECT
        c.id AS _id,
        c.title AS title,
        c.content AS content,
        DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
        DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
        notice AS c
"
  commitBatch: 3000
注释:
dataSourceKey: defaultDS
destination: example
outerAdapterKey: es-key
groupId: g1
上面的几个配置,都需要跟启动器里面的配置保持一致。

esMapping:该配置是表示的是如何将 MySQL 的数据同步到 ES 中,配置比较复杂,其中
_index 表示 ES 的索引(需要提前创建);
_id 和 pk 二选一配置,表示使用查询出来的哪个字段作为唯一值;
upsert 表示对应主键的数据不存在的时候执行插入动作,存在的时候执行更新动作;
sql:表示要同步的数据,这个的 SQL 形式要求会比较严格
而且 _id 必须要加别名,我索性把所有都改个别名

commitBatch: 3000 设置了每次批量提交的记录数量为 3000。这意味着每当 canal.adapter 收集到 3000 条记录时,
会将这些记录批量提交到 Elasticsearch。确保这个批量大小适合你的数据量和 Elasticsearch 的处理能力,
以避免超时或性能问题。如果你遇到性能瓶颈,可以尝试调整这个参数值,增大或减小批量大小来优化性能。

vim esMappingResult.yml

dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
  _index: result
  _id: _id
  _type: _doc
  upsert: true
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userid,
        c.score AS score,
        DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
        DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
        result AS c
"
  commitBatch: 3000
时间类型的表结构想要存到es中必须自定义转换器或格式化程序,将 Timestamp 转换为 Elasticsearch 支持的日期格式
(例如 ISO 8601 格式)。  否则导入时报错
ERROR c.a.otter.canal.client.adapter.es8x.etl.ESEtlService - cannot write xcontent for unknown value of type class java.sql.Timestamp
java.lang.IllegalArgumentException: cannot write xcontent for unknown value of type class java.sql.Timestamp
因为作者这里有两张表,表结构大同小异
notice 表

在这里插入图片描述

result 表

在这里插入图片描述

然后启动canal.adapter

赋权:
cd /usr/local/canal.adapter/
chmod 777 -R conf/es8
./bin/startup.sh 

tail -f logs/adapter/adapter.log 
2024-08-30 15:06:03.275 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-gl succeed
2024-08-30 15:06:03.275 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2024-08-30 15:06:03.275 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2024-08-30 15:06:03.285 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.543 seconds (JVM running for 4.264)
2024-08-30 15:06:03.368 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-08-30 15:09:27.964 [http-nio-8081-exec-1] INFO  o.a.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
输出如上则为成功

在这里插入图片描述

然后测试

# 插入数据,查看一下Canal.adapter是否可以读到数据
登录到mysql中

INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (22, 'New Notice', 'This is a new notice', NOW(), NOW());
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (23, 'New Notice', 'This is a new notice', NOW(), NOW());

tail -f logs/adapter/adapter.log 
输出如下:说明成功
2024-08-30 15:03:05.827 [pool-3-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":22,"title":"New Notice","content":"This is a new notice","created_at":1725001385000,"updated_at":1725001385000}],"database":"info","destination":"example","es":1725001385000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001385626,"type":"INSERT"}
2024-08-30 15:03:15.858 [pool-3-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":23,"title":"New Notice","content":"This is a new notice","created_at":1725001395000,"updated_at":1725001395000}],"database":"info","destination":"example","es":1725001395000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001395857,"type":"INSERT"}

在这里插入图片描述

然后全量导入一次数据
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:23 条"}

esMappingNotice.yml 则为适配器文件的名称。

curl "localhost:8081/etl/es8/esMappingResult.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:20 条"}
然后打开kibana 或者 elasticsearch-head-5.0.0
作者这里使用的是 elasticsearch-head-5.0.0

在这里插入图片描述

验证同步配置

在 MySQL 数据库中执行一些增、删、改操作,例如:
登录到mysql中,
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (1, 'New Notice', 'This is a new notice', NOW(), NOW());
UPDATE notice SET content = 'Updated content' WHERE id = 1;
DELETE FROM notice WHERE id = 1;

在这里插入图片描述

#### 查询_id 为 10 11 的sql 已经删除

在这里插入图片描述

补充:安装elasticsearch-head

# 安装依赖:
yum -y install nodejs  bzip2

# 下载 elasticsearch-head
# github地址:
https://github.com/mobz/elasticsearch-head
# 下载软件包
git clone https://gitee.com/cyberton/elasticsearch-head.git
或者
wget https://github.com/mobz/elasticsearch-head/archive/refs/tags/v5.0.0.tar.gz
tar -xf v5.0.0.tar.gz -C /usr/local/
cd /usr/local/elasticsearch-head-5.0.0/

vim Gruntfile.js
            connect: {
                    server: {
                    options: {
                       hostname: '*',  # 新增
                       port: 9100,
                       base: '.',
                       keepalive: true
                            }
                    }
            }
# 保存退出	

vim  _site/app.js +4388
# 将localhost 修改为 本机IP
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.174.136:9200";
# 保存退出

# 安装依赖
npm install -g grunt-cli
# 如果下载失败,更换源
npm cache clean --force
npm config set registry https://mirrors.huaweicloud.com/repository/npm/

# 继续安装
npm install -g grunt-cli
npm install grunt-contrib-jasmine
npm install -g @angular/cli

npm install
有报错先不用处理 直接运行下面命令试试
npm run start  
nohup grunt server >/dev/null 2>&1 &   后台运行

访问:稍后测试:
http://192.168.174.136:9100

完工,撒花 🎉🎉🎉🎉🎉

如有问题,请留言评论。

参考文献:

https://blog.csdn.net/qq_30614345/article/details/134083545
https://blog.csdn.net/H_Sino/article/details/137765283

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2100734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django + websocket 连不上

看了网上的几个简单例子&#xff0c;一步一步做&#xff0c;但无一成功。都连不上websocket。 后来按一个视频教程的操作步骤来做&#xff0c;成功了。差别在于视频教程中加了 pip install daphne 和setting.py中 连不上的表现&#xff1a; 前端报错&#xff1a; WebSock…

如何打造在线音乐网站?java springboot架构,vue前端开发,音乐分享新体验

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

基于Prometheus 和K8S kubernetes 构建 搭建监控告警系统

目录 1、Prometheus介绍&#xff1f; 2、Prometheus特点&#xff1f; 2.1 样本 3、Prometheus组件介绍 4、Prometheus工作流程 4、Prometheus和zabbix对比分析 5、Prometheus的几种部署模式 5.1 基本高可用模式 5.2 基本高可用远程存储 5.3 基本HA 远程存储 联邦集群…

【QA】牙科光固化机类产品导光元件在进行检测时应关注哪些问题?

版权说明&#xff1a;本文来源【国家药品监督管理局】&#xff0c;如果您认为我们的文中描述与事实不符或有侵权行为&#xff0c;请及时联系我们。感谢您的关注。

怎么找TikTok代运营助力?灵感魔方怎么样?

在当今全球化的浪潮中&#xff0c;海外版抖音已然成为了品牌出海的重要阵地。然而&#xff0c;面对这个充满机遇与挑战的平台&#xff0c;如何找到专业的TikTok代运营团队来助力品牌成功出海呢&#xff1f;以下是一些关键的考量因素和方法。 首先&#xff0c;专业的TikTok代运…

ubuntu系统如何跑分

win系统下&#xff0c;跑分工具很多&#xff0c;ubuntu的话&#xff0c;估计很多人陌生。 这里介绍ubuntu跑分工具--geekbench 1&#xff0c;geekbench安装 下载地址i&#xff1a;https://www.geekbench.com/download/linux/ 点击进去就自动下载了。 没有下载到就继续点击…

MATLAB代码|中心差分卡尔曼滤波(CDKF)的滤波例程,无需下载,直接复制到MATLAB上面就能运行

文章目录 CDKF介绍代码运行结果各模块解析初始化系统模型设置CDKF循环绘图另有关于EKF和CDKF的对比程序:EKF+CDKF两个滤波的MATLAB程序,估计三轴位置,带中文注释—— https://blog.csdn.net/callmeup/article/details/136610153。 CDKF介绍 中心差分卡尔曼滤波(Central D…

C# 上位机开发指南:高效学习建议

目录 前言 上位机概念 基础知识 1、C#语言基础 2、.NET框架 3、桌面应用开发 4、设备通信 5、数据操作 6、多线程和并发 7、UI/UX设计 8、调试和测试 关键概念 事件驱动编程 异步编程 设计模式 通信协议学习 数据处理与存储 实时性与并发处理 技术选型 1、框…

CosyVoice 本地部署详细教程 手把手小白教程

CosyVoice 本地部署详细教程 注意事项 所有相关的软件、文件名称不要使用中文名称&#xff0c;也不要有中文路径&#xff0c;也不要有空格。 包括C盘用户名&#xff0c;不要有中文或空格。 模型部署前准备 nvidia显卡,建议显存6G以上AI框架CUDA、cuDNN安装 &#xff08;已安…

集成电路学习:什么是IP知识产权

一、IP&#xff1a;知识产权 IP是Intellectual Property的缩写&#xff0c;即知识产权。知识产权是一种无形的财产权&#xff0c;也称智力成果权&#xff0c;它指的是通过智力创造性劳动所获得的成果&#xff0c;并且是由智力劳动者对成果依法享有的专有权利。这种权利包括人身…

神仙公司名单(南京)

神仙公司&#xff08;南京&#xff09; 继续&#xff0c;神仙公司系列。 上一期我们介绍了 深圳的神仙公司&#xff0c;结果留言区的重点全都指向了 HW 和 BYD &#x1f923;&#x1f923; 不愧是你们&#xff0c;但我们不是"那个"排行。 言归正传&#xff0c;这期给…

笔记:《利用Python进行数据分析》之透视表和交叉表

透视表和交叉表 透视表&#xff08;pivot table&#xff09;是各种电子表格程序和其他数据分析软件中一种常见的数据汇总工具。它根据一个或多个键对数据进行聚合&#xff0c;并根据行和列上的分组键将数据分配到各个矩形区域中。在Python和pandas中&#xff0c;可以通过本章所…

代码随想录算法训练营第十八天| 669. 修剪二叉搜索树、108. 将有序数组转换为二叉搜索树、538. 把二叉搜索树转换为累加树

今日内容 leetcode. 669 修剪二叉搜索树leetcode. 108 将有序数组转换为二叉搜索树leetcode. 538 把二叉搜索树转换为累加树 Leetcode. 669 修剪二叉搜索树 文章链接&#xff1a;代码随想录 (programmercarl.com) 题目链接&#xff1a;669. 修剪二叉搜索树 - 力扣&#xff08…

mathtype批量更改公式编号的格式

这是一个笔记 在使用的时候遇到的问题,经过研究发现mathtype生成的右编号公式可以批量更新 调整前 如图,如果调整前使用的是默认的编号格式,显示为 1.5 1.5 1.5: 这里面1和5的意义为: 1:节编号5:公式编号 如下:调整后 如果需要不按章节形式来分类,则可以通过以下…

大数据中心运营管理整体规划方案(32页PPT)

随着信息技术的飞速发展&#xff0c;大数据已成为推动产业升级、促进经济社会发展的重要力量。大数据中心作为数据存储、处理与分析的核心基础设施&#xff0c;其运营管理的规范化、高效化、智能化直接关系到数据价值的有效挖掘和企业的核心竞争力。大数据中心运营管理整体规划…

免费下载Win11 24H2专业版!附详细安装教程

今日&#xff0c;系统之家小编给大家带来2024年最新的Windows11 24H2专业版系统&#xff0c;更新后系统版本号将升至26100.1591。系统基于微软官方最新Windows 11 24H2专业版进行离线制作与优化&#xff0c;确保系统安全无毒&#xff0c;兼容性强&#xff0c;可完美支持新老机型…

构建可扩展的数据平台(数据平台设计中的数据网格趋势)

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 欢迎来到雲闪世界。在本文中&#xff0c;我旨在深入研究各种类型的数据平台架构&#xff0c;更好地了解它们的演变、优势、劣势和实际应用。重点是数据网格架构、它在现代数据堆栈 (MDS) 中的作用以及当今…

Getting an error trying to import environment OpenAI Gym

题意&#xff1a;尝试导入 OpenAI Gym 环境时遇到错误 问题背景&#xff1a; I am trying to run an OpenAI Gym environment: 我正在尝试运行一个 OpenAI Gym 环境&#xff1a; env gym.make("ALE/Breakout-v5", render_mode"rgb_array")But I get th…

数据分析-MAC 编程环境安装教程

目标 在 Mac 电脑上&#xff0c;安装以下环境&#xff1a; 1. 安装 Anaconda&#xff1a;用于管理 Python 及其依赖库的分发&#xff0c;适用于数据科学和机器学习。 2. 安装 chromedriver&#xff1a;用于 Selenium 自动化测试&#xff0c;尤其是与 Google Chrome 浏览器的…

【爬虫软件】YouTube关键词搜索采集工具

我用Python开发的油管关键词搜索批量采集软件。 软件界面&#xff1a; 采集结果demo数据&#xff1a; 演示视频: https://www.bilibili.com/video/BV1uz421m7JA 完整讲解&#xff1a; https://www.bilibili.com/read/cv34407726