使用Logstash将MySQL中的数据同步至Elasticsearch

news2025/1/27 12:48:21

目录

1 使用docker安装ELK

1.1 安装Elasticsearch

1.2 安装Kibana

1.3 安装Logstash

2 数据同步

2.1 准备MySQL表和数据

2.2 运行Logstash

2.3 测试

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

3.1.2 报错原因

3.1.3 解决方案

3.2 记录二

3.2.1 报错信息

3.2.2 报错原因

3.3.3 解决方案


1 使用docker安装ELK

        ELK是指Elasticsearch、Logstash、Kibana。

1.1 安装Elasticsearch

# 拉取es镜像
docker pull elasticsearch:7.4.2

mkdir -p /root/docker/elasticsearch/config
mkdir -p /root/docker/elasticsearch/data

# 任何ip都能访问
echo "http.host: 0.0.0.0" >> /root/docker/elasticsearch/config/elasticsearch.yml

# 运行elasticsearch REST API端口9200 集群端口9300
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
--restart=always \
--privileged=true \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /root/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /root/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /root/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2

# 保证权限 任何人任何组都可以读写操作执行,可以进入elasticsearch使用ll命令查看权限
chmod -R 777 /root/docker/elasticsearch/ 

 测试是否安装成功:

# 查看elasticsearch是否运行
docker ps -a

        在浏览器输入虚拟机的ip和elasticsearch的REST API端口http://172.1.11.10:9200/ ,如果出现以下内容,说明安装成功。

{
    "name": "7876d2859af8",
    "cluster_name": "elasticsearch",
    "cluster_uuid": "i46io2YkTY6pXr8IQ9qmXA",
    "version": {
        "number": "7.4.2",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "2f90bbf7b93631e52bafb59b3b049cb44ec25e96",
        "build_date": "2019-10-28T20:40:44.881551Z",
        "build_snapshot": false,
        "lucene_version": "8.2.0",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
    },
    "tagline": "You Know, for Search"
}

1.2 安装Kibana

# 拉取镜像,可视化检索数据
docker pull kibana:7.4.2

# 运行Kibana
docker run --name kibana --restart=always --privileged=true \
-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 \
-p 5601:5601 -d kibana:7.4.2

说明:

(1)-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 :Elasticsearch地址。

(2)-d:后端运行。

(3)--restart=always:开机启动。

(4)--name kibana :容器名称。

(6)privileged=true :权限。

1.3 安装Logstash

  • Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。 
  • 管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。
  • Logstash官方插件 logstash-input-jdbc集成在Logstash(5.x之后)的版本,可以通过配置实现mysql和es全量与增量数据的定时同步。
# 拉取logstash
docker pull logstash:7.4.2

2 数据同步

2.1 准备MySQL表和数据

create table pms_spu_info
(
   id                   bigint not null auto_increment comment '商品id',
   spu_name             varchar(200) comment '商品名称',
   spu_description      varchar(1000) comment '商品描述',
   catalog_id           bigint comment '所属分类id',
   brand_id             bigint comment '品牌id',
   weight               decimal(18,4),
   publish_status       tinyint comment '上架状态[0 - 下架,1 - 上架]',
   create_time          datetime,
   update_time          datetime,
   primary key (id)
);

2.2 运行Logstash

# 运行logstash
docker run -d --name logstash logstash:7.4.2

mkdir -p /root/docker/logstash/config
mkdir -p /root/docker/logstash/data
mkdir -p /root/docker/logstash/pipeline
mkdir -p /root/docker/logstash/jars

# 上传mysql驱动mysql-connector-java-5.1.47.jar到/root/docker/logstash/jars

#拷贝已启动的容器中的文件到宿主机,用于重启挂载
docker cp logstash2:/usr/share/logstash/config /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/data /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/pipeline /root/docker/logstash/

# 保证权限 任何人任何组都可以读写操作执行
chmod -R 777 /root/docker/logstash

# 删除logstash容器
docker rm -f logstash

# 配置连接es
cd /root/docker/logstash/config
vi logstash.yml
  • logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.xx.xx.6:9200" ]
  • 创建mysql.conf,编写mysql数据同步至es相关配置
# 创建mysql.conf
cd /root/docker/logstash2/pipeline/
vi mysql.conf

        1)mysql.conf内容如下:

input {
  jdbc {
    type => "jdbc"
    # 数据库连接地址
    jdbc_connection_string => "jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    # 数据库连接账号和密码
    jdbc_user => "root"
    jdbc_password => "root"
    # MySQL驱动架包
    jdbc_driver_library => "/usr/share/logstash/mysql/mysql-connector-java-8.0.17.jar"
    # MySQL驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库重连尝试次数
    connection_retry_attempts => "3"
    # 判断数据库连接是否可用,默认是false不开启
    jdbc_validate_connection => "true"
    # 数据库连接可用校验超时时间,默认3600秒
    jdbc_validation_timeout => "3600"
    # 开启分页查询,默认false不开启
    jdbc_paging_enabled => "true"
    # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
    jdbc_page_size => "500"
    # 查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径
    statement => "SELECT id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,brand_id brandId,weight,publish_status publishStatus,DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime FROM pms_spu_info WHERE update_time > :sql_last_value"
    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中
    record_last_run => true
    # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
    use_column_value => true
    # 需要记录的字段,用于增量同步,需是数据库字段
    tracking_column => "updateTime"
    # 轨迹字段类型Value can be any of: numeric,timestamp,Default value is "numeric"
    tracking_column_type => timestamp
    # record_last_run上次数据存放位置
    last_run_metadata_path => "/usr/share/logstash/config/logstash_metadata"
    # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
    clean_run => false
    # 同步频率(分 时 天 月 年),默认每分钟同步一次
    schedule => "* * * * *"
     
  }
}

output {
  elasticsearch {
    # host => "192.168.1.1"
		# port => "9200"
		# 配置ES集群地址
    hosts => ["172.xx.xx.xx:9200"]
    # 索引名字,必须小写
    index => "spu"
    # 文档id,数据唯一索引(建议使用表的主键)
    document_id => "%{id}"
  }
  stdout {
    codec => json_lines
  }
}

        2)查询sql如下:

SELECT 
id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,
brand_id brandId,weight,publish_status publishStatus,
DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,
DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime 
FROM pms_spu_info WHERE update_time > :sql_last_value

日期通过DATE_FORMAT(date,"输出格式")进行格式化,数据库与es日期格式保持一致。

  •  重新运行logstash容器
docker run  --name logstash --restart=always -d -p 5044:5044 -p 9600:9600   \
--privileged=true \
-v /root/docker/logstash/config:/usr/share/logstash/config   \
-v /root/docker/logstash/jars/mysql-connector-java-5.1.47.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.47.jar \
-v /root/docker/logstash/pipeline:/usr/share/logstash/pipeline \
logstash:7.4.2 -f /usr/share/logstash/pipeline/mysql.conf

说明:

(1)-f 是一个非常有用的选项,可以使用户使用指定的文件来指定一些Docker镜像的构建和配置信息。

(2)-f 也可以用于强制删除容器。

2.3 测试

  • mysql表中数据,如下

  • 通过Kibana进行查询,如下:

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

LogStash::PluginLoadingError Unable to find driver class via URLClassLoader in given driver jars : com.mysql.jdbc.Driver and com.mysql.jdbc.Driver

3.1.2 报错原因

        Logstashd的logstash-input-jdbc插件在调用数据库驱动jar包时,默认会去logstash/logstash-core/lib/jars/目录下去找。

3.1.3 解决方案

        将数据库驱动(例如:mysql-connector-java-5.1.47.jar)放到/usr/share/logstash/logstash-core/lib/jars/下面。

3.2 记录二

3.2.1 报错信息

javax.net.ssl.SSLException: closing inbound before receiving peer's close _notify

3.2.2 报错原因

        安装的是mysql8.x的版本,远程连接发现需要做ssl身份验证,本机连接不需要,取消掉其ssl身份验证需要调整配置。        

3.3.3 解决方案

        数据库连接地址上添加useSSL=false,如下:

"jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1426632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入了解C++:底层编译原理

进程的虚拟空间划分 任何编程语言,都会产生两样东西,指令和数据。 .exe程序运行的时候会从磁盘被加载到内存中,但是不能直接加载到物理内存中。Linux会给当前进程分配一块空间,比如x86 32位linux环境下会给进程分配2^32(4G)大小…

《数字电子电路》 课程设计:十字路口红绿灯自动控制系统(上)(multisim仿真及PCB实现)

(一)前言 本系列文章就笔者在大二下学期进行《数字电子线路》课程设计的题目:十字路口红绿灯自动控制系统 进行详细的讲解,希望对读者有所帮助。 (二)目录 一、主要指标及要求 二、电路工作原理 1、工作原…

「数据结构」3.ArrayList

🎇个人主页:Ice_Sugar_7 🎇所属专栏:Java数据结构 🎇**欢迎点赞收藏加关注哦!* ArrayList 🍉ArrayList的构造🍉add方法🍌扩容机制🍌重要结论 🍉其…

大数据交易蓝图,推动数据价值实现

数据最为新的生产要素,必定会推动各行各业的革新和转型。 数据流通,是数据价值实现的必然之路。 大数据交易中心,提供数据产品挂牌出售和合法合规交易的场所和平台。 数据厂商,通过技术手段收集数据,实现数据标准化…

idea配置tomcat

推荐链接:IntelliJ IDEA中配置Tomcat(超详细)_idea怎么配置tomcat服务器-CSDN博客 1,官员下载链接:Apache Tomcat - Welcome! 附本人下载的 tomcat9 的百度网盘链接 链接:https://pan.baidu.com/s/1DpyBGnG4mUGTm5Z…

【Django开发】0到1开发美多商城项目第3篇:用户注册业务实现(附代码,已分享)

本系列文章md笔记(已分享)主要讨论django商城项目相关知识。项目利用Django框架开发一套前后端不分离的商城项目(4.0版本)含代码和文档。功能包括前后端不分离,方便SEO。采用Django Jinja2模板引擎 Vue.js实现前后端…

[Java面试]JavaSE知识回顾

🎄欢迎来到边境矢梦的csdn博文🎄 🎄本文主要梳理Java面试中JavaSE中会涉及到的知识点 🎄 🌈我是边境矢梦,一个正在为秋招和算法竞赛做准备的学生🌈 🎆喜欢的朋友可以关注一下&#x…

STM32CubeIDE 使用标准库来编写程序

这些天我想找一个软件来实现软件的替代。就找到了st 的生态。可是现在st 生态都在极力的推荐HAL 库,但是习惯了标准库的朋友们,还不是很习惯。 先上总结一下,为了好记忆: 一、 在编译栏做如下设置 1、头文件设置 2、源文件设置 二、指定具体的预定义宏 1、USE_STDPERIPH_D…

实习日志10

1.用户信息 1.1.在用户管理中编辑用户信息 1.2.绑定公司id 1.3.显示在页面 2.修改识别逻辑 2.1.分析 先识别,再判断,清空键把识别结果清空 2.2.写码 修改了发票识别逻辑,略... 3.接高拍仪 3.1.js引入报错 分析: 遇到的错误…

【日常总结】宝塔中 Gitlab服务器 forbidden

一、场景 二、问题 三、原因 四、解决方案 五、实战 Stage 1:打开 /etc/gitlab/gitlab.rb,并编辑 Stage 2:重启gitlab服务 Stage 3:测试(打开girlab网页) 六、后续 一、场景 公司更换新电脑 服务…

智能家居的网关新形态:Aqara 方舟智慧中枢 M3 体验

如果说在刚刚结束的 2023 年有哪些备受期待的智能家居产品,Aqara 方舟智慧中枢 M3 一定榜上有名,我的多位朋友也曾在装修过程中多次向我询问是否有这款产品的相关资讯;谁能想到自从在 2022 年 11 月首次亮相之后,这款产品一直等了…

1个 THM 和多台 BSP 的通讯(以邦纳 BSP 系列 PLC 为例)

一.架构和接线如下图所示 二、建立连接 选择 PLC 的驱动,多台连接请勾选“次连接” “次连接总数”就是要连接的 PLC 台数。 设置触摸屏通讯参数;同时确保每台 PLC 的通讯参数与该设定相同(但站号不能相同)。 三、…

DRV8313和L298N都是电机驱动,一个是驱动三相FOC无刷直流电机的,一个是驱动有刷电机,使stm32控制无刷电机简单入门知识

DRV8313和L298N都是电机驱动器,但它们之间存在一些关键的区别: DRV83131: 由德州仪器(TI)制造。 具有集成的场效应晶体管(FET)。 最大电压为65V。 峰值电流为3A。 适用于三相电机驱动。 L298N…

基于SpringBoot+Vue学科竞赛管理系统

文章目录 基于SpringBootVue学科竞赛管理系统1系统概述1.3系统设计思想 2相关技术2.1 MYSQL数据库2.2 B/S结构2.3 Spring Boot框架简介2.4 Vue简介 3系统分析3.1可行性分析3.1.1技术可行性3.1.2经济可行性3.1.3操作可行性 3.2系统性能分析3.2.1 系统安全性3.2.2 数据完整性 3.4…

【鸿蒙】大模型对话应用(三):跨Ability跳转页面

Demo介绍 本demo对接阿里云和百度的大模型API,实现一个简单的对话应用。 DecEco Studio版本:DevEco Studio 3.1.1 Release HarmonyOS SDK版本:API9 关键点:ArkTS、ArkUI、UIAbility、网络http请求、列表布局、层叠布局 页面跳…

Oracle 面试题 | 06.精选Oracle高频面试题

🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 🍚 蓝桥云课签约作者、上架课程《Vue.js 和 E…

C languange DGEQRF 示例,link liblapack.a

1.示例源码 #include <stdio.h>int min(int m, int n){ return m<n? m:n;}void print_matrix(double* A, int m, int n, int lda) {for (int i 0; i < m; i){for (int j 0; j < n; j){//printf("%7.4f ", A[i j*lda]);printf("%7.4f, &quo…

结构体--共用体--枚举 之难点——链表 奋力学习嵌入式的第十六天

结构体 注意&#xff1a; 1.结构体类型 可以定义在 函数里里面 但是此时作用域就被限定在该函数中 2.结构体定义形式 //形式一 限定一类型 后定义变量 struct stu { ... }; struct stu s; //形式二 定义类型的同时 定义变量 struct stu { ... }s1,s2,*s3,s4[10]; struc…

骨传导耳机是什么?使用骨传导耳机可以保护听力吗?

骨传导耳机是一种特殊的蓝牙耳机&#xff0c;通过人体骨骼来传递声音&#xff0c;可以绕过耳道和耳膜直接传达音频到听者的内耳&#xff0c;开放双耳的佩戴方式可以在享受音乐或通话的同时保持对周围环境的感知&#xff0c;这种设计在户外活动或运动等场景下的使用尤为实用&…

ENG-2,可用于监测细胞内钠离子的动态变化

Replacement of Asante NaTrium Green-2 AM钠离子指示探针&#xff0c;ENG-2&#xff0c;可用于监测细胞内钠离子的动态变化 您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;Replacement of Asante NaTrium Green-2 AM钠离子指示探针&#xff0c;ENG-2 一、基本信…