Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道 - Nodejs

news2025/1/11 23:00:10

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我构建了从 Beats => Kafka => Logstash => Elasticsearch 的管道。在今天的文章中,我将描述从 Nodejs => Kafka => Logstash => Elasticsearch 这样的一个数据流。在之前的文章 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch” 中,我也展示了使用 Python 的方法。我的配置如下:

在上面的架构中,有几个重要的组件:

  • Kafka Server:这就是数据首先发布的地方。
  • Producer:扮演将数据发布到 Kafka topic 的角色。 在现实世界中,你可以具有任何可以为 kafka 主题生成数据的实体。 在我们的示例中,我们将生成伪造的用户注册数据。
  • Elasticsearch:这将充当将用户注册数据存储到其自身的数据库,并提供搜索及分析。
  • Logstash:Logstash 将扮演中间人的角色,在这里我们将从 Kafka topic 中读取数据,然后将其插入到 Elasticsearch 中。
  • Kibana:Kibana 将扮演图形用户界面的角色,它将以可读或图形格式显示数据。

为了演示的方便,你可以在地址下载演示文件 GitHub - liu-xiao-guo/data-pipeline8。我的文件目录是这样的:

$ pwd
/Users/liuxg/data/data-pipeline8
$ tree -L 3
.
├── README.md
├── docker-elk
│   ├── docker-compose.yml
│   └── logstash_pipeline
│       └── kafka-elastic.conf
├── docker-kafka
│   └── kafka-docker-compose.yml
└── kafka_producer.js
$ pwd
/Users/liuxg/data/data-pipeline8/docker-elk
$ ls -al
total 16
drwxr-xr-x  5 liuxg  staff   160 May 14  2021 .
drwxr-xr-x  8 liuxg  staff   256 Mar  5 07:36 ..
-rw-r--r--  1 liuxg  staff    29 May  7  2021 .env
-rw-r--r--  1 liuxg  staff  1064 May 13  2021 docker-compose.yml
drwxr-xr-x  3 liuxg  staff    96 May 13  2021 logstash_pipeline
$ vi .env
$ cat .env
ELASTIC_STACK_VERSION=8.6.2

上面的其它文件将在我下面的章节中介绍。如果你自己想通过手动的方式部署 Kafka 请参阅我的另外一篇文章 “使用 Kafka 部署 Elastic Stack”。

安装

Kafka,Zookeeper 及 Kafka Manager

我将使用 docker-compose 来进行安装。一旦安装好,我们可以看到:

  • Kafka 在 PORT 9092 侦听
  • Zookeeper 在 PORT 2181 侦听
  • Kafka Manager 侦听 PORT 9000 侦听

kafka-docker-compose.yml

version: "3"
services:
  zookeeper:
    image: zookeeper
    restart: always
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.3 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka_manager:
    image: hlebalbau/kafka-manager:stable
    container_name: kakfa-manager
    restart: always
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null

我们可以使用如下的命令来进行启动(在 Docker 运行的前提下):

docker-compose -f kafka-docker-compose.yml up

 一旦运行起来后,我们可以使用如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
a4acc0730467   zookeeper                        "/docker-entrypoint.…"   About a minute ago   Up About a minute   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
02ec8e8a1e30   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   About a minute ago   Up About a minute   0.0.0.0:9000->9000/tcp                                 kakfa-manager
a85c32c0c08e   wurstmeister/kafka               "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:9092->9092/tcp                                 kafka

我们发现 Kafka Manager 运行于 9000 端口。我们打开本地电脑的 9000 端口:

在上面它显示了一个默认的 topic,虽然不是我们想要的。

 

这样,我们就把 Kafka 上的 kafka_logstash topic 创建好了。

我们可以登录 kafka 容器来验证我们已经创建的 topic。我们使用如下的命令来找到 kafka 容器的名称:

docker ps -s
$ docker ps -s
CONTAINER ID   IMAGE                            COMMAND                  CREATED         STATUS         PORTS                                                  NAMES           SIZE
de7453250529   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   9 minutes ago   Up 9 minutes   0.0.0.0:9000->9000/tcp                                 kakfa-manager   117kB (virtual 427MB)
65eba68350f1   zookeeper                        "/docker-entrypoint.…"   9 minutes ago   Up 9 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper       33kB (virtual 288MB)
3394868b23e9   wurstmeister/kafka               "start-kafka.sh"         9 minutes ago   Up 9 minutes   0.0.0.0:9092->9092/tcp                                 kafka           210kB (virtual 457MB)

上面显示 kafka 的容器名称为 wurstmeister/kafka。我们使用如下的命令来进行登录:

docker exec -it wurstmeister/kafka  /bin/bash

然后我们在容器里 打入如下的命令:

$ docker exec -it kafka  /bin/bash
root@3394868b23e9:/# kafka-topics.sh --list -zookeeper zookeeper:2181
__consumer_offsets
kafka_logstash

上面的命令显示已经存在的被创建的 kafka_logstash topic。我们可以使用如下的命令来向这个被创建的 topic 来发送数据:

kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning
root@3394868b23e9:/# kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning

Elastic Stack 安装

 我们接下来安装 Elastic Stack。同样地,我使用 docker-compose 来部署 Elasticsearch, Logstash 及 Kibana。你们可以参考我之前的文章 “Logstash:在 Docker 中部署 Logstash”。为了能够把数据传入到 Elasticsearch 中,我们需要在 Logstash 中配置一个叫做 kafka-elastic.conf 的配置文件:

kafka-elastic.conf

input {
    kafka {
       bootstrap_servers => "192.168.0.3:9092"
       topics => ["kafka_logstash"]
    }
}

output {
   elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "kafka_logstash"
      workers => 1
    }
}

请注意:在上面的 192.168.0.3 为我自己电脑的本地 IP 地址。为了说明问题的方便,我们没有对来自 kafka 里的 registered_user 这个 topic 做任何的数据处理,而直接发送到 Elasticsearch 中。

我们的 docker-compose.yml 配置文件如下:

docker-compose.yml

version: "3.9"
services:
  elasticsearch:
    image: elasticsearch:${ELASTIC_STACK_VERSION}
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
      - xpack.security.enabled=false
    volumes:
      - type: volume
        source: es_data
        target: /usr/share/elasticsearch/data
    ports:
      - target: 9200
        published: 9200
    networks:
      - elastic

  kibana:
    image: kibana:${ELASTIC_STACK_VERSION}
    container_name: kibana
    ports:
      - target: 5601
        published: 5601
    depends_on:
      - elasticsearch
    networks:
      - elastic   

  logstash:
    image: logstash:${ELASTIC_STACK_VERSION}
    container_name: logstash
    ports:
      - 5200:5200
    volumes: 
      - type: bind
        source: ./logstash_pipeline/
        target: /usr/share/logstash/pipeline
        read_only: true
    networks:
      - elastic           

volumes:
  es_data:
    driver: local

networks:
  elastic:
    name: elastic
    driver: bridge

为方便起见,在我的安装中,我没有配置安全。如果你需要为 Elasticsearch 设置安全的话,请参考我之前的文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x”。

我们使用如下的命令来启动 Elastic Stack。在 docker-compose.yml 所在的目录中打入如下的命令:

$ pwd
/Users/liuxg/data/data-pipeline8/docker-elk
$ ls
docker-compose.yml logstash_pipeline
$ docker-compose up

 等所有的 Elastic Stack 运行起来后,我们再次通过如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
3db5e4e6e23e   kibana:8.6.2                     "/bin/tini -- /usr/l…"   About a minute ago   Up About a minute   0.0.0.0:5601->5601/tcp                                 kibana
210b673dd89a   logstash:8.6.2                   "/usr/local/bin/dock…"   About a minute ago   Up About a minute   5044/tcp, 9600/tcp, 0.0.0.0:5200->5200/tcp             logstash
05c434edd823   elasticsearch:8.6.2              "/bin/tini -- /usr/l…"   About a minute ago   Up About a minute   0.0.0.0:9200->9200/tcp, 9300/tcp                       elasticsearch
de7453250529   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   51 minutes ago       Up 51 minutes       0.0.0.0:9000->9000/tcp                                 kakfa-manager
65eba68350f1   zookeeper                        "/docker-entrypoint.…"   51 minutes ago       Up 51 minutes       2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
3394868b23e9   wurstmeister/kafka               "start-kafka.sh"         51 minutes ago       Up 51 minutes       0.0.0.0:9092->9092/tcp                                 kafka

我们可以看到 Elasticsearch 运用于 9000 端口,Kibana 运行于 5601 端口,而 Logstash 运行 5000 端口。 我们可以访问 Kibana 的端口地址 5601: 

 

运行 Nodejs 应用导入模拟数据

我们接下来建立一个 Nodejs 的应用来模拟一些数据。首先,我们需要安装如下的包:

npm install kafkajs uuid randomstring random-mobile

我们在根目录下打入如下的命令:

npm init -y
$ npm init -y
Wrote to /Users/liuxg/data/data-pipeline8/package.json:

{
  "dependencies": {
    "kafkajs": "^2.2.4"
  },
  "name": "data-pipeline8",
  "description": "This is a sample code showing how to realize the following data pipeline:",
  "version": "1.0.0",
  "main": "kafka_producer.js",
  "devDependencies": {},
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/liu-xiao-guo/data-pipeline8.git"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "bugs": {
    "url": "https://github.com/liu-xiao-guo/data-pipeline8/issues"
  },
  "homepage": "https://github.com/liu-xiao-guo/data-pipeline8#readme"
}

上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。

kafka_producer.js

// import { Kafka, logLevel } from "kafkajs";
const { Kafka } = require('kafkajs');
const logLevel = require("kafkajs");

// import { v4 as uuidv4 } from "uuid";
const { v4: uuidv4 } = require('uuid');

console.log(uuidv4());

const kafka = new Kafka({
  clientId: "random-producer",
  brokers: ["localhost:9092"],
  connectionTimeout: 3000,
});

var randomstring = require("randomstring");
var randomMobile = require("random-mobile");
const producer = kafka.producer({});
const topic = "kafka_logstash";

const produce = async () => {
  await producer.connect();
  let i = 0;

  setInterval(async () => {
    var event = {};
    try {
      event = {
        globalId: uuidv4(),
        event: "USER-CREATED",
        data: {
          id: uuidv4(),
          firstName: randomstring.generate(8),
          lastName: randomstring.generate(6),
          country: "China",
          email: randomstring.generate(10) + "@gmail.com",
          phoneNumber: randomMobile(),
          city: "Hyderabad",
          createdAt: new Date(),
        },
      };

      await producer.send({
        topic,
        acks: 1,
        messages: [
          {
            value: JSON.stringify(event),
          },
        ],
      });

      // if the message is written successfully, log it and increment `i`
      console.log("writes: ", event);
      i++;

    } catch (err) {
      console.error("could not write message " + err);
    }
  }, 5000);
};

produce().catch(console.log)

我们运行上面的 Nodejs 代码:

npm start

 我们接下来在 Kibana 中来查看索引 kafka_logstash:

GET kafka_logstash/_count
{
  "count": 103,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

我们可以看到文档的数值在不断地增加。我们可以查看文档:

很显然我们收到了数据。从上面的结果中,我们可以看出来是一些非结构化的数据。我们可以针对 Logstash 的 pipeline 进行修改:

kafka-elastic.conf

input {
    kafka {
       bootstrap_servers => "192.168.0.3:9092"
       topics => ["kafka_logstash"]
    }
}

filter {
    json {
        source => "message"
    }

    mutate {
      add_field => {
        "id" => "%{[data][id]}"
      }
      add_field => {
        "firstName" => "%{[data][firstName]}"
      }
      add_field => {
        "lastName" => "%{[data][lastName]}"
      }
      add_field => {
        "city" => "%{[data][city]}"
      }
      add_field => {
        "country" => "%{[data][country]}"
      }
      add_field => {
        "email" => "%{[data][email]}"
      }
      add_field => {
        "phoneNumber" => "%{[data][phoneNumber]}"
      }
      add_field => {
        "createdAt" => "%{[data][createdAt]}"
      }
      remove_field => ["data", "@version", "@timestamp", "message", "event", "globalId"]
    }  
}

output {
   elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "kafka_logstash"
      workers => 1
    }
}

我们在 Kibana 中删除 kafka_logstash:

DELETE kafka_logstash

我们停止运行 Nodejs 应用。我们把运行 Elastic Stack 的 docker-compose 关掉,并再次重新启动它:

docker-compose down
docker-compose up

我们再次运行 Nodejs 应用:

 我们再次到 Kibana 中进行查看:

很显然,这次,我们看到结构化的输出文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/391161.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

modbus转profinet网关连接ABB变频器在博图程序案例

在博图里PLC无需编程利用兴达易控modbus转Profinet网关,将ABB变频器接入到西门子网络中.用到设备为西门子1200PLC,ABB变频器及兴达易控Modbus转profinet网关一个;兴达易控Modbus转profinet协议转换器(XD-MDPN100)一台 打开博图添加1200PLC&am…

121.(leaflet篇)leaflet结合echarts4迁徙图

听老人家说:多看美女会长寿 地图之家总目录(订阅之前建议先查看该博客) 文章末尾处提供保证可运行完整代码包,运行如有问题,可“私信”博主。 效果如下所示: 下面献上完整代码,代码重要位置会做相应解释 <!DOCTYPE html> <html>

【数据挖掘与商务智能决策】第二章 特征工程与数据预处理

数据预处理 非数值类型数据处理 Get_dummies哑变量处理 1. 简单示例&#xff1a;“男”和“女”的数值转换 import pandas as pd df pd.DataFrame({客户编号: [1, 2, 3], 性别: [男, 女, 男]}) df客户编号性别01男12女23男 df pd.get_dummies(df, columns[性别]) df客户…

DetectGPT:使用概率曲率的零样本机器生成文本检测

DetectGPT的目的是确定一段文本是否由特定的llm生成&#xff0c;例如GPT-3。为了对段落 x 进行分类&#xff0c;DetectGPT 首先使用通用的预训练模型&#xff08;例如 T5&#xff09;对段落 ~xi 生成较小的扰动。然后DetectGPT将原始样本x的对数概率与每个扰动样本~xi进行比较。…

浏览器主页被hao123劫持的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。喜欢通过博客创作的方式对所学的知识进行总结与归纳,不仅形成深入且独到的理…

Redis哨兵(Sentinel)模式

前言 上一期实现了Redis的主从复制架构&#xff0c;由于主从模式在主节点宕机故障时整个Redis服务都不能再执行写操作&#xff0c;而无法保证Redis在整个系统中的高可用。 Redis提供了Sentinel哨兵机制来解决以上问题&#xff0c;当哨兵服务监测到master下线或宕机&#xff0…

汽车标定知识整理(二):CCP报文基本命令介绍

目录 一、基本命令 CRO命令报文的基本命令表&#xff1a; 二、基本命令与可选命令帧格式介绍 1、CONNECT——建立连接&#xff08;0x01&#xff09; 2、GET_CPP_VERSION——获取CCP版本&#xff08;0x1B&#xff09; 3、SET_MTA——设置内存传输地址&#xff08;0x02&#…

FPGA_边沿监测理解

一、简易频率计设计中为什么一定要获取下降沿?gate_a:实际闸门信号gate_a_stand:将实际闸门信号打一拍之后的信号gate_a_fall_s:下降沿标志信号cnt_clk_stand: Y值&#xff0c;即在实际闸门信号下&#xff0c;标准时钟信号的周期个数cnt_clk_stand_reg:保存Y值的寄存器核心问题…

展示企业情况的BI数据可视化大屏怎么做?

做综合展示企业情况的BI数据可视化大屏&#xff0c;就意味着要综合展示多个子公司或者部门的数据情况。首先要解决的就是多系统数据的整合、数据孤岛的束缚&#xff0c;其次才是数据分析模型构建、BI数据可视化大屏报表的制作。 1、整合多系统数据&#xff0c;消除数据孤岛现象…

flink大数据处理流式计算详解

flink大数据处理 文章目录flink大数据处理二、WebUI可视化界面&#xff08;测试用&#xff09;三、Flink部署3.1 JobManager3.2 TaskManager3.3 并行度的调整配置3.4 区分 TaskSolt和parallelism并行度配置四、Source Operator(资源算子)五、Sink Operator(输出算子)六、Flink滑…

系统检测维护工具Wsycheck使用(18)

实验目的 &#xff08;1&#xff09;学习Wsycheck的基本功能&#xff1b; &#xff08;2&#xff09;掌握Wsycheck的基本使用方法&#xff1b; 预备知识 windows操作系统的基本知识如&#xff1a;进程、网络、服务和文件等的了解。 Wsycheck是一款强大的系统检测维护工具,进程和…

js求解《初级算法》19.删除链表的倒数第N个结点

一、题目描述 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 二、思路 用虚拟头结点&#xff0b;双指针的方法解决该题&#xff0c;我们知道题目要求我们返回的是…

SpringMVC源码:参数解析、方法调用与返回值处理

参考资料&#xff1a; 《SpringMVC源码解析系列》 《SpringMVC源码分析》 《Spring MVC源码》 写在开头&#xff1a;本文为个人学习笔记&#xff0c;内容比较随意&#xff0c;夹杂个人理解&#xff0c;如有错误&#xff0c;欢迎指正。 前文&#xff1a; 《SpringMVC源码&a…

并发包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么区别?

第20讲 | 并发包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么区别&#xff1f; 在上一讲中&#xff0c;我分析了 Java 并发包中的部分内容&#xff0c;今天我来介绍一下线程安全队列。Java 标准库提供了非常多的线程安全队列&#xff0c;很容易混淆。 今天我要问你的…

谷歌浏览器和火狐浏览器永久禁用缓存【一劳永逸的解决方式】

目录 前言 谷歌浏览器 方式一 方式二 火狐浏览器 前言 缓存对于开发人员来说异常的痛苦,很多莫名其妙的bug就是由缓存导致的,但当我们在网上查找禁用缓存的方式时,找到的方式大多数都是在开发者工具的面板中勾选禁用缓存的选项,但这种方式有个弊端就是需要一直打开这个…

软件测试工程师没有碰到算我输-2023最全的接口测试面试题及参考答案

接口测试最近几年被炒的火热了&#xff0c;越来越多的测试同行意识到接口测试的重要性。接口测试为什么会如此重要呢&#xff1f; 主要是平常的功能点点点&#xff0c;大家水平都一样&#xff0c;是个人都能点&#xff0c;面试时候如果问你平常在公司怎么测试的&#xff0c;你除…

【pytorch】使用mixup技术扩充数据集进行训练

目录1.mixup技术简介2.pytorch实现代码&#xff0c;以图片分类为例1.mixup技术简介 mixup是一种数据增强技术&#xff0c;它可以通过将多组不同数据集的样本进行线性组合&#xff0c;生成新的样本&#xff0c;从而扩充数据集。mixup的核心原理是将两个不同的图片按照一定的比例…

【嵌入式开发】iperf

iperf一级目录用法help文档iperf参数功能iperf测试实例测试网口上行速率测试网口下行速率perf 是一个网络性能测试工具。Iperf可以测试最大TCP和UDP带宽性能&#xff0c;具有多种参数和UDP特性&#xff0c;可以根据需要调整&#xff0c;可以报告带宽、延迟抖动和数据包丢失。一…

小程序项目在hbuilder里面给它打包成app

小程序项目临时有些登录需求&#xff0c;需要把&#xff08;小程序某些功能通过条件编译让它显示到app上&#xff09;小程序打包成app的话就必须需要一个打包的证书&#xff0c;证书的话就要去重新生成&#xff0c;苹果电脑可以去自动生成证书&#xff0c;平时是用windows进行开…

Java的四种引用强软弱虚及其使用场景

一.强引用 回收时机&#xff1a;在内存不足时也不会被回收。 使用方式&#xff1a;String str new String("str"); 使用场景&#xff1a;是平常用的最多的引用 二.软引用 回收时机&#xff1a; 在内存不足时会被回收。 使用方式&#xff1a;SoftRefere…