使用Logstash将数据从MySQL同步至Elasticsearch(有坑)

news2025/1/21 0:57:20

文章目录

  • 一、准备工作
    • 1、安装elasticSearch+kibana
    • 2、安装MySQL
    • 3、安装Logstash
  • 二、全量同步
    • 1、准备MySQL数据与表
    • 2、上传mysql-connector-java.jar
    • 3、启动Logstash
    • 4、修改logstash.conf文件
    • 5、修改full_jdbc.sql文件
    • 6、打开Kibana创建索引和映射
    • 7、重启logstash进行全量同步
    • 8、踩坑
      • (1)报错
  • 三、增量同步
    • 1、修改增量配置
    • 2、新建increment_jdbc.sql文件
    • 3、重启容器
    • 4、测试
    • 5、同步原理

一、准备工作

1、安装elasticSearch+kibana

我们此处用的es和kibana版本是7.4.0版本的。
docker安装elasticSearch+kibana

2、安装MySQL

docker安装mysql-简单无坑

3、安装Logstash

logstash就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以根据自己的需求在inuput --output中间加上滤网,Logstash内置了几十种插件,可以满足各种应用场景。

logstash官方插件 logstash-input-jdbc集成在logstash(5.X之后)中,通过配置文件实现mysql与elasticsearch数据同步。
能实现mysql数据全量和增量的数据同步,且能实现定时同步。

在这里插入图片描述

# 拉取logstach
docker pull logstash:8.5.2

二、全量同步

全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。

1、准备MySQL数据与表

CREATE TABLE `product` (
  `id` int NOT NULL COMMENT 'id',
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,2) DEFAULT NULL,
  `create_at` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (1, '小米手机', 33.00, '1');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (2, '长虹手机', 2222.00, '2');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (3, '华为电脑', 3333.00, '3');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (4, '小米电脑', 333.30, '4');

2、上传mysql-connector-java.jar

把mysql-connector-java-8.0.21.jar上传到logstach服务器,

3、启动Logstash

# 编辑logstash.yml
vi /usr/local/logstash/config/logstash.yml

# 内容,需要修改es地址
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.3:9200" ]
# 自定义网络(可以解决网络不一致的问题)
#docker network create --subnet=172.188.0.0/16  czbkNetwork
# 启动 logstash
docker run --name logstash -v /usr/local/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml -v /usr/local/logstash/config/conf.d/:/usr/share/logstash/pipeline/   -v /root/mysql-connector-java-8.0.21.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar -d  d7102f8c625d

# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

4、修改logstash.conf文件

cd /usr/local/logstash/config/conf.d
vi logstash.conf

stdin从标准输入读取事件。

默认情况下,每一行读取为一个事件

input { 
  stdin {}
 #使用jdbc插件
 jdbc {
    # mysql数据库驱动
    #jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql数据库链接,数据库名
    jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/shop?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"
    # mysql数据库用户名,密码
    jdbc_user => "root"
    jdbc_password => "root"
 
    # 分页
    jdbc_paging_enabled => "true"
    # 分页大小
    jdbc_page_size => "50000"
    # sql语句执行文件,也可直接使用 statement => 'select * from t'
    statement_filepath => "/usr/share/logstash/pipeline/sql/full_jdbc.sql"
    #statement => " select *  from product  where id  <=100 "
  }
 }

# 过滤部分(不是必须项)
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

# 输出部分
output {
    elasticsearch {
        # elasticsearch索引名
        index => "product"
        # elasticsearch的ip和端口号
        hosts => ["172.17.0.3:9200"]
        # 同步mysql中数据id作为elasticsearch中文档id
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

5、修改full_jdbc.sql文件

mkdir /usr/local/logstash/config/conf.d/sql
cd /usr/local/logstash/config/conf.d/sql
vi full_jdbc.sql

full_jdbc.sql内容如下

SELECT
	id,
	TRIM( REPLACE ( name, ' ', '' ) ) AS productname,
	price
FROM product

6、打开Kibana创建索引和映射

注意!mysql—>logstash—>es
如果创建的映射是有大写的时候,es会自动转成小写
而且查看映射数据结构的时候会出现两个相同的字段(productname和productName)
这样就导致我们自己定义的映射无法使用,而有数据的是es自动生成的那个小写

PUT product
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "productname": {
        "type": "text"
    
      },
      "price": {
        "type": "double"
      } 
    }
  }
}

如果对映射没有硬性要求,可以忽略当前步骤,会自动创建索引。

# 当前在es中是没有数据的
GET product/_search

7、重启logstash进行全量同步

# 重启
docker restart c1d20ebf76c3
# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

发现mysql中的数据已经同步至logstash中了。

8、踩坑

(1)报错

Error response from daemon: Cannot restart container 3849f947e115: driver failed programming external connectivity on endpoint logstash (60f5d9678218dc8d19bc8858fb1a195f4ebee294cff23d499a28612019a0ff78): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 4567 -j DNAT --to-destination 172.188.0.77:4567 ! -i br-413b460a0fc8: iptables: No chain/target/match by that name.

原因为:在启动firewalld之后,iptables被激活,
此时没有docker chain,重启docker后被加入到iptable里面

解决方案:
systemctl restart docker

三、增量同步

1、修改增量配置

修改上面的logstash.conf文件

input { 
  stdin {}
	 #使用jdbc插件
	   jdbc {
    # mysql数据库驱动
    #jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql数据库链接,数据库名
    jdbc_connection_string => "jdbc:mysql://172.188.0.15:3306/shop?characterEncoding=UTF-8&useSSL=false"
    # mysql数据库用户名,密码
    jdbc_user => "root"
    jdbc_password => "root"
    # 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次
    # /2* * * *表示每隔2分钟执行一次,依次类推
    schedule => "* * * * *"
    # 分页
    jdbc_paging_enabled => "true"
    # 分页大小
    jdbc_page_size => "50000"
    # sql语句执行文件,也可直接使用 statement => 'select * from t'
    statement_filepath => "/usr/share/logstash/pipeline/sql/increment_jdbc.sql"
    #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
    #last_run_metadata_path => "./config/station_parameter.txt"
     #设置时区,此处更新sql_last_value查询的时区,sql_last_value还是默认UTC
    jdbc_default_timezone => "Asia/Shanghai"
    #使用其它字段追踪,而不是用时间
    #use_column_value => true
    #追踪的字段
    #tracking_column => id
    tracking_column_type => "timestamp"

  }
 }

# 过滤部分(不是必须项)
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

# 输出部分
output {
 
    elasticsearch {
        # elasticsearch索引名
        index => "product"
        # elasticsearch的ip和端口号
         hosts => ["172.188.0.88:9200"]
        # 同步mysql中数据id作为elasticsearch中文档id
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
 

2、新建increment_jdbc.sql文件

/usr/local/logstash/config/conf.d/sql目录下新建increment_jdbc.sql文件

cd /usr/local/logstash/config/conf.d/sql
vi increment_jdbc.sql

increment_jdbc.sql内容如下:

此处sql尽量保持与全量一致Select后的

SELECT
	id,
	TRIM( REPLACE ( product_name, ' ', '' ) ) AS productname,
	price
	
FROM product where update_time > :sql_last_value

3、重启容器

# 启动
docker restart 容器id

4、测试

数据库插入一条数据之后,会自动同步至es

5、同步原理

#进入容器
docker exec -it 4f95a47f12de /bin/bash
#查看记录点
cat /usr/share/logstash/.logstash_jdbc_last_run

last_run_metadata_path=>“/usr/share/logstash/.logstash_jdbc_last_run”

在容器里面的/usr/share/logstash/路径下的隐藏文件.logstash_jdbc_last_run中记录了全量同步的UTC时间

每次同步完成记录该时间(重要)
在这里插入图片描述
注意
logstash_jdbc_last_run默认是没有的,执行增量后创建
文件也是可以删除的
容器重启会自动创建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/883938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux 系统中vi 编辑器和库的制作和使用

目录 1 vim 1.1 vim简单介绍 1.2 vim的三种模式 1.3 vim基本操作 1.3.1命令模式下的操作 1.3.2 切换到文本输入模式 1.3.3 末行模式下的操作 2 gcc编译器 2.1 gcc的工作流程 2.2 gcc常用参数 3 静态库和共享&#xff08;动态&#xff09;库 3.1库的介绍 3.2静态…

Dockerfile自定义镜像

文章目录 Dockerfile自定义镜像镜像结构Dockerfile语法构建java项目 小结 Dockerfile自定义镜像 常见的镜像在DockerHub就能找到&#xff0c;但是我们自己写的项目就必须自己构建镜像了。 而要自定义镜像&#xff0c;就必须先了解镜像的结构才行。 镜像结构 镜像是将应用程序及…

PyTorch基础(16)-- torch.gather()方法

一、前言 在实现DQN的过程中&#xff0c;torch.gather()这个方法引起了我的注意&#xff0c;原因有二&#xff1a;1&#xff09;这个函数在我硕士期间很少遇见&#xff0c;用到的次数更是少之又少&#xff1b;2&#xff09;torch.gather()这个方法是如何使用的呢&#xff0c;以…

[HZNUCTF 2023 preliminary] 2023杭师大校赛(初赛) web方向题解wp 全

ezflask 先看题目&#xff0c;应该是模板注入&#xff08;SSTI&#xff09;&#xff0c;输入{{7*‘7’}}直接报错误。 发现模板是反序输出的&#xff0c;怪不得不能直接输入{{}}。 输入}}‘7’*7{{返回777777&#xff0c;是jinja2 //直接手打&#xff0c;无所谓我是怨种 ?nam…

Mysql中使用存储过程插入decimal和时间数据递增的模拟数据

场景 Mysql插入数据从指定选项中随机选择、插入时间从指定范围随机生成、Navicat使用存储过程模拟插入测试数据&#xff1a; Mysql插入数据从指定选项中随机选择、插入时间从指定范围随机生成、Navicat使用存储过程模拟插入测试数据_mysql循环插入随机数据_霸道流氓气质的博客…

【第三阶段】kotlin语言中的先决条件函数

用于函数内部判断异常&#xff0c;节省开发 1.checkNotNull&#xff08;&#xff09;如果传入为null则抛出异常 fun main() {var name:String?nullcheckNotNull(name) }执行结果 2.requireNotNull ()如果传入为null则抛出异常 fun main() {var name:String?nullrequireNot…

基于PSO-KELM的时间序列数据预测(含对比实验)

前段时间有粉丝私信想让我出一期对时间序列预测的文章&#xff0c;所以今天它来了。 时间序列数据&#xff0c;如股指价格&#xff0c;具有波动性、非线性和突变的特点&#xff0c;对于这类数据的预测往往需要可靠强健的预测模型&#xff0c;而传统的机器学习算法如SVM、BP等…

详解RFC 793文档-3

3.4 建立连接 三次握手用来建立连接,这个过程通常由一个TCP发起,并由另一个TCP响应。如果两个TCP同时启动该过程,该过程也可以工作。这说明客户端和服务器可以同时发起连接请求,且能够连接成功。当同时尝试连接时,每个TCP在发送自己的SYN后接收到一个不携带任何ACK确认的…

分布式 - 服务器Nginx:一小时入门系列之代理缓冲与缓存

官方文档&#xff1a;https://nginx.org/en/docs/http/ngx_http_proxy_module.html 1. 代理缓冲 proxy_buffer 代理缓冲用于临时存储从后端服务器返回的响应数据。通过使用代理缓冲&#xff0c;Nginx可以在接收完整的响应后再将其发送给客户端&#xff0c;从而提高性能和效率…

从零实现kv存储V2.0

在V1.0版本&#xff0c;我们实现了基于array的kv存储引擎。本文继续完善&#xff0c;增加rbtree、hash、skiptable引擎。 实际上&#xff0c;在框架确定的基础上&#xff0c;其他的引擎只需要添加接口即可。 一、架构设计 二、具体实现 2.1 引擎层 //---------------------…

第14集丨Vue2 基础 —— 生命周期

目录 一、引子1.1 实现一1.2 一个死循环的写法1.3 mounted实现 二、生命周期2.1 概念2.2 常用的生命周期钩子2.3 关于销毁Vue实例注意点2.4 vm的一生(vm的生命周期)2.5 生命周期图示 每个 Vue 实例在被创建时都要经过一系列的初始化过程——例如&#xff0c;需要设置数据监听、…

轻量级自动化测试框架WebZ

一、什么是WebZ WebZ是我用Python写的“关键字驱动”的自动化测试框架&#xff0c;基于WebDriver。 设计该框架的初衷是&#xff1a;用自动化测试让测试人员从一些简单却重复的测试中解放出来。之所以用“关键字驱动”模式是因为我觉得这样能让测试人员&#xff08;测试执行人员…

企业权限管理(十三)-用户关联角色操作

用户关联角色操作 从前台发送请求 <a href"${pageContext.request.contextPath}/user/findUserByIdAndAllRole.do?id${user.id}" class"btn bg-olive btn-xs">添加角色</a>查询用户以及用户可以添加的角色 usercontroller //查询用户以及用…

k8s 自身原理之 Service

好不容易&#xff0c;终于来到 k8s 自身的原理之 关于 Service 的一部分了 前面我们用 2 个简图展示了 pod 之间和 pod 与 node 之间是如何通信息的&#xff0c;且通信的数据包是不会经过 NAT 网络地址转换的 那么 Service 又是如何实现呢&#xff1f; Service 我们知道是用…

网神 SecGate 3600 防火墙任意文件上传漏洞复现

0x01 产品简介 网神SecGate3600下一代极速防火墙&#xff08;NSG系列&#xff09;是基于完全自主研发、经受市场检验的成熟稳定网神第三代SecOS操作系统 并且在专业防火墙、VPN、IPS的多年产品经验积累基础上精心研发的高性能下一代防火墙 专门为运营商、政府、军队、教育、大型…

图扑数字孪生智慧乡村综合管控平台

数字乡村是伴随网络化、信息化和数字化在农业农村经济社会发展中的应用&#xff0c;既是乡村振兴的战略方向&#xff0c;也是建设数字中国的重要内容。为了进一步提升乡村治理智能化、专业化水平&#xff0c;解决建设顶层缺失、数据孤岛等问题&#xff0c;数字孪生技术被广泛应…

工控机防病毒

2月3日&#xff0c;作为全球最大的半导体制造设备和服务供应商&#xff0c;美国应用材料公司&#xff08;Applied Materials&#xff09;表示&#xff0c;有一家上游供应商遭到勒索软件攻击&#xff0c;由此产生的关联影响预计将给下季度造成2.5亿美元&#xff08;约合人民币17…

使用MAT分析OOM问题

OOM和内存泄漏在我们的工作中&#xff0c;算是相对比较容易出现的问题&#xff0c;一旦出现了这个问题&#xff0c;我们就需要对堆进行分析。 一般情况下&#xff0c;我们生产应用都会设置这样的JVM参数&#xff0c;以便在出现OOM时&#xff0c;可以dump出堆内存文件&#xff…

JavaScript进阶 第二天

深入对象内置构造函数 一. 深入对象 创建对象三种方式构造函数实例成员&静态成员 1.1 创建对象三种方式 ① 利用对象字面量创建对象 const o {name: 哈哈 } ② 利用new Object 创建对象 const o new Object({ name: 哈哈 }) ③ 构造函数创建对象 1.2 构造函数 …

致远OA M1Server RCE漏洞复现

0x01 产品简介 致远M1移动协同软件&#xff0c;结合移动应用特色的信息终端&#xff0c;帮您高效管理&#xff0c;掌控全局&#xff1b;基于移动互联技术的产品&#xff0c;实现全天候在线&#xff0c;随时随地了解企业信息&#xff1b;触控式操作&#xff0c;舒适的滑动体验&a…