数据集成的强大联盟:Elasticsearch、Kibana、Logstash、MySQL

news2024/10/5 14:04:22

通常,很多关系数据项目都使用 MySQL。 它对于标准的 CRUD 操作是有益的,但有时我们需要做额外的过程。 当我们搜索某些内容时,我们会消耗资源或合并多个表。 有时,即使不是,可能仍然需要复杂的 SQL 查询。 也许这不是正确的方法,但这是我们改变技术堆栈的不同方法。 对于这个堆栈,我们首先描述 Logstash。

更多阅读:“Elastic:开发者上手指南” 中的 “数据库数据同步” 章节。

我们什么时候使用 Logstash?

Logstash 用于必须从源接收数据、处理数据然后发送到另一个目的地的场景。 作为起点,Logstash 连接到 MySQL 读取数据,然后对其进行处理,最后将其发送到 Elasticsearch。 如下图所示

 

Logstash 的操作分为三个步骤:

  1. 输入 - input
  2. 过滤器 - filter
  3. 输出 - output

在添加此代码之前,我创建了一个名为 search.conf 的 Logstash 配置文件。

input{
        jdbc {
            jdbc_driver_library => "/home/mysql-connector-java-8.0.22.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/product?zeroDateTimeBehavior=convertToNull"
            jdbc_user => 'root'
            jdbc_password => ''
            statement => "Select product.*, DATE_FORMAT(updatedAt, '%Y-%m-%d %T') as lastTransaction from product where updatedAt > :sql_last_value"
            tracking_column => "lastTransaction"
            tracking_column_type => "timestamp"
            use_column_value => true
            lowercase_column_names => false
            clean_run => true
            schedule => "*/15 * * * * *"
        }
}

此输入运行 MySQL 查询并包含一个存储最新更新日期的跟踪列。 感谢此列,我可以有效地仅检索最后更新的行,而不是获取所有行。 现在我有了数据,我可以继续处理了。

filter {
    ruby {
        code => "
            if event.get('productStatusFK')
                productStatusFK = event.get('productStatusFK').to_i
                if productStatusFK == 0
                    event.set('productStatusFK', 'passive')
                elsif productStatusFK == 1
                    event.set('productStatusFK', 'active')
                end
            end
        "
    }
    mutate {
        remove_field => ["@version", "@timestamp"]
    }
}

下一步是将数据发送到 Elasticsearch。 我们如何执行输出操作?

output { 
    elasticsearch {
        hosts => [ "http://${ELASTIC_HOST}:9200" ]
        document_id => '%{productPK}'
        index => "product"
        doc_as_upsert => true
        action => "update"
        codec => "json"
        manage_template => true
        template_overwrite => true
    }
}

我完成了配置文件。 之后,我为 docker-compose 准备 app.yml。 它包含 4 个服务:MySQL,LogstashElasticsearchKibana。 这三个服务位于同一网络上。 由于这个网络,他们能够使用他们的服务名称相互通信。 同样对于 logstash,我创建了一个管道并在文件中对其进行了描述。 该文件使用 search.conf 文件。 所有文件已添加到 Github,下面提供了链接。 详细信息在下面的代码中。

app.yml

version: "3.7"

services:
  elasticsearch:
    image: elasticsearch:${STACK_VERSION}
    volumes:
      - type: volume
        source: es_data
        target: /usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms750m -Xmx750m
      - xpack.security.enabled=false
    networks:
      - elastic

  kibana:
    image: kibana:${STACK_VERSION}
    container_name: kibana
    ports:
      - target: 5601
        published: 5601
    depends_on:
      - elasticsearch
    networks:
      - elastic  

  logstash:
    image: logstash:${STACK_VERSION}
    depends_on:
      - elasticsearch
    volumes:
      - ./product/pipeline/:/usr/share/logstash/pipeline/
      - ./product/config/pipeline.yml:/usr/share/logstash/config/pipeline.yml
      - ./mysqlConnector/mysql-connector-java-8.0.22.jar:/home/mysql-connector-java-8.0.22.jar
    environment:
      - ELASTIC_HOST=elasticsearch
      - MYSQL_HOST=mysql
    networks:
      - elastic

  mysql:
    image: mysql:8
    restart: always
    command: --default-authentication-plugin=mysql_native_password
    environment:
      MYSQL_DATABASE: product
      MYSQL_TCP_PORT: 3306
      MYSQL_ALLOW_EMPTY_PASSWORD: "true"
    volumes:
      - ./database/product.sql:/docker-entrypoint-initdb.d/product.sql:ro
    ports:
      - "3306:3306"
    networks:
      - elastic

volumes:
  es_data:
    driver: local

networks:
  elastic:
    name: elastic
    driver: bridge

我们需要在 .env 文件中指定我们需要的 Elastic Stack 版本:

$ pwd
/Users/liuxg/data/MySQL2ElasticFlow
$ ls -al
total 24
drwxr-xr-x    9 liuxg  staff   288 Jul  9 11:00 .
drwxr-xr-x  184 liuxg  staff  5888 Jul  9 10:54 ..
-rw-r--r--    1 liuxg  staff    20 Jul  9 11:00 .env
drwxr-xr-x   12 liuxg  staff   384 Jul  9 10:54 .git
-rw-r--r--    1 liuxg  staff  3384 Jul  9 10:54 README.md
-rw-r--r--    1 liuxg  staff  1513 Jul  9 11:01 app.yml
drwxr-xr-x    3 liuxg  staff    96 Jul  9 10:54 database
drwxr-xr-x    3 liuxg  staff    96 Jul  9 10:54 mysqlConnector
drwxr-xr-x    4 liuxg  staff   128 Jul  9 10:54 product
$ cat .env
STACK_VERSION=8.8.2

让我们测试一下这个结构。 使用 docker-compose 构建并运行应用程序。如果你已经有 mysqld 正在运行,你可以使用如下的命令来停止它的运行:

mysqladmin -u root shutdown -p
docker-compose -f app.yml up

我们可以使用如下的命令来查看运行的容器:

docker ps
$ docker ps
CONTAINER ID   IMAGE                 COMMAND                  CREATED          STATUS         PORTS                                            NAMES
662fd611a7a5   mysql:8               "docker-entrypoint.s…"   2 minutes ago    Up 2 minutes   0.0.0.0:3306->3306/tcp, 33060/tcp                mysql
3c3754292b27   logstash:8.8.1        "/usr/local/bin/dock…"   38 minutes ago   Up 2 minutes   5044/tcp, 9600/tcp                               logstash
5b6b423363b5   kibana:8.8.1          "/bin/tini -- /usr/l…"   38 minutes ago   Up 2 minutes   0.0.0.0:5601->5601/tcp                           kibana
b6bd075c5189   elasticsearch:8.8.1   "/bin/tini -- /usr/l…"   38 minutes ago   Up 2 minutes   0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   elasticsearch

我们可以看到有四个正在运行的容器。我们可以通过如下的命令来查看 Logstash 的日志:

docker logs -f logstash

现在是时候在 Kibana 中进行查看了:

GET _cat/indices
yellow open product yUIb3AWPQKqvm6wnbSsoNQ 1 1 4 0 15.6kb 15.6kb

我们可以通过如下的方式来查看里面的文档:

GET product/_search

我们可以看到有四个文档:

很显然,它是我们之前在 database/product.sql 中写进去的四个文档:

 

INSERT INTO `product` (`productPK`, `productName`, `productCode`, `productStatusFK`, `active`, `updatedAt`) VALUES
(1, 'logitech', 'logitech', 1, 1, '2023-07-05 01:00:00'),
(2, 'asus', 'asus', 1, 0, '2023-07-04 01:00:00'),
(3, 'apple', 'apple', 1, 0, '2023-07-03 01:00:00'),
(4, 'hewlett packard', 'hewlett packard', 1, 1, '2023-07-02 01:00:00');

我们可以对数据进行如下的搜索:

GET product/_search?filter_path=**.hits
{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "asu",
            "fields": [
              "productName",
              "productCode"
            ],
            "fuzziness": "auto"
          }
        }
      ]
    }
  }
}

我们可以得到如下的结果:

{
  "hits": {
    "hits": [
      {
        "_index": "product",
        "_id": "2",
        "_score": 0.87417156,
        "_source": {
          "productPK": 2,
          "active": false,
          "updatedAt": "2023-07-04T01:00:00.000Z",
          "productStatusFK": "active",
          "productCode": "asus",
          "productName": "asus",
          "lastTransaction": "2023-07-04 01:00:00"
        }
      }
    ]
  }
}

Hooray! 我们已经完成了从 MySQL 通过 Logstash 把文档写入到 Elasticsearch!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/734110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何查看OpenAI的AccessToken?

如何查看OpenAI的AccessToken? 记录一下如何查看 OpenAI的 AccessToken 文章目录 如何查看OpenAI的AccessToken?前提具体操作总结 前提 💧首先,在获取AccessToken前,你需要达成 以下两个条件: 拥有一个可用的OpenA…

ARM CORETEX M0简介

ARM CORETEX M0简介 1. M0处理器简单框图 处理器内核:逻辑控制与运算 内部总线系统:单总线将处理器与外部的存储器和外部连接,进行数据交互(冯诺依曼架构,数据与指令一起) NVIC:嵌套向量中断控…

【Matlab】智能优化算法_亨利气体溶解度优化算法HGSO

【Matlab】智能优化算法_亨利气体溶解度优化算法HGSO 1.背景介绍2.数学模型2.1 亨利定律2.2 HGSO 3.文件结构4.伪代码5.详细代码及注释5.1 Create_Groups.m5.2 Evaluate.m5.3 fun_checkpoisions.m5.4 fun_getDefaultOptions.m5.5 HGSO.m5.6 main.m5.7 sumsqu.m5.8 update_posit…

机器学习总览

机器学习 1.什么是机器学习? 机器学习是使计算机像人类一样学习与行动的科学,并通过观察与现实世界交互的形式向计算机提供数据和信息,从而随着时间的推移以自主的方式改善其学习。 通过经验提高某些任务性能的计算机程序。 人工智能>机器…

FreeRTOS ~(六)信号量 ~ (1/3)信号量解决同步缺陷

前情提要 FreeRTOS ~(四)同步互斥与通信 ~ (1/3)同步的缺陷 FreeRTOS ~(五)队列的常规使用 ~ (1/5)队列解决同步缺陷 举例子说明:利用信号量解决前述的"同步的缺陷&…

最具价值开源项目收藏--持续更新

轻量级开源笔记应用(memos) 该项目基于 Go React.js SQLite 技术栈开发,兼具高性能与可定制性,适用于日常生活办公中的各类笔记管理场景。 开源地址:https://github.com/usememos/memos 跨平台无缝传输文件&#…

揭秘python函数:编程艺术的核心力量(3)

文章目录 前言递归lambda表达式lambda 的参数形式无参数位置参数关键字参数缺省参数可变参数1.包裹位置传递2.包裹关键字传递 带判断条件的lambda表达式列表数据按照字典key的值进行排序 高阶函数的使用内置高阶函数1.map()2.reduce()3.filter() 前言 前面我们已经学习了 pyth…

7-测试模型(2个)

目录 1.软件测试V模型 2.软件测试W模型(双V模型) 1.软件测试V模型 V模型最早是由Paul Rook在20世纪80年代后期提出的,目的是改进软件开发的效率和效果。是瀑布模型的变种。 明确地标注了测试过程中存在的…

前端工程打包部署

打包 直接执行第二个脚本build即可 打包后的文件将会放在dist目录下 部署 NGINX:是一款轻量级的Web服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。其特点是占用内存少,并发能力强,在各大型互联网公司都有非…

R中高效安装包,以ComplexHeatmap包为例

包安装问题解决方案 1. Biocmanager安装 [2. 手动安装](正在更新……) 目录 包安装问题解决方案前言1. install.packages()的介绍1.1 install.packages()的工作原理1.2 install.packages()安装失败的原因1.3 解决方案 2. BiocManage安装ComplexHeatmap总…

kubernetes的概念以及部署

简介: kubernetes,是一个全新的基于容器技术的分布式架构领先方案,是谷歌严格保密十几年的秘密武器----Borg系统的一个开源版本,于2014年9月发布第一个版本,2015年7月发布第一个正式版本。 kubernetes的本质是…

PDF怎么免费分割成多个文件?这几个方法非常好用!

记灵在线工具是一种常用的电子工具,尤其在工作和学习中,我们经常需要使用记灵工具进行文档处理和整理。其中,记灵工具的分割功能是非常有用的,因为它可以将一个大的文件分割成多个较小的文件,从而便于我们对文件进行分…

Ubuntu22.04如何安装steam游戏平台

linux终端安装 安装命令1 打开终端,输入以下命令安装: sudo snap install steam 安装命令2 1.在这种安装模式下,我们使用的是指令安装,这时我们需要打开终端(cmd/控制台)随后输入此指令:“su…

口语理解任务源码详解系列(三)利用BiRNN-attention模型实现

利用RNN-attention模型实现 写在前面 在前文介绍了项目的数据集构建:传送门,以及利用seq2seq-attention模型实现意图分类与槽位填充任务:传送门 本文利用BiRNN-attention实现:实现细节请参考论文:《Attention-Based Re…

一键搞定发布自己Jar到Maven中央仓库

做java 开发那当然离不开jar包管理, 不知何时一直想想封装一个自己的jar包 然后发布到maven中央仓库给别人使用。 hhh 我感觉自己写一个jar包工具然后,被很多人使用是一件很牛,很快乐事情。 终于有了这个机会,和时间。SpringBoot stater出来了…

Steam搬砖项目介绍

Steam搬砖项目:轻松赚取稳定收入的副业选择 对于许多数字游戏玩家来说,Steam平台并不陌生。今天,我将向您介绍一个稳定的副业选择——Steam搬砖项目。通通过简单的操作,您可以轻松获得几十上百元的利润。 介绍 Steam搬运砖项目…

css 伪元素和浮动

展示为行内元素 inline-block <style>div {/* 浏览器解析行内块或行内标签的时候, 如果标签换行书写会产生一个空格的距离, 展示为行内元素 */display: inline-block;width: 100px;height: 100px;}.one {background-color: pink;}.two {background-color: skyblue;}</…

基于OpenCV 实现车牌号码识别--附免费源码

在本教程中,您将学习如何使用 OpenCV 和 EasyOCR 包自动执行车牌/车牌识别 (LPR/NPR)。 EasyOCR是一个开源 Python 包,用于执行光学字符识别 - OCR(从图像中提取文本)。 该软件包非常易于使用,在撰写本文时,它支持 80 多种语言,包括中文、阿拉伯语、法语、英语、西里尔…

多元回归预测 | Matlab主成分分析PCA降维,BP神经网络回归预测。PCA-BP回归预测模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元回归预测 | Matlab主成分分析PCA降维,BP神经网络回归预测。PCA-BP回归预测模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源码 %% 清空环境变量 warnin…

【脚本语言】Shell Script - 日期的获取、设置和延时操作

目录 基础概念 基础语法 打印当前日期 打印纪元时 将日期转换为纪元时 打印要求格式的日期 设置日期和时间 基础概念 Bash可以帮助我们以不同的格式打印日期、设置日期&#xff0c;又或根据日期或时间进行操作等。 在类Unix系统中&#xff0c;日期被存储为一个整数&#xff0…