Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

news2024/11/17 7:44:20

如今,网络服务、数字媒体、传感器日志数据等众多来源产生了大量数据,只有一小部分数据得到妥善管理或利用来创造价值。读取大量数据、处理数据并根据这些数据采取行动比以往任何时候都更具挑战性。

在这篇文章中,我试图展示:

  • 在 Python 中生成模拟用户配置文件数据
  • 通过 Kafka Producer 将模za拟数据发送到 Kafka 主题
  • 使用 Logstash 读取数据并上传到 Elasticsearch
  • 使用 Kibana 可视化流数据

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我实现了如下的一个数据 pipeline:

 在今天的文章中,我将实现如下的一个数据 pipeline:

在今天的展示中,我将使用最新的 Elastic Stack 8.6.1 来进行展示。我将使用如下的配置:

如上所示,我使用两台机器:macOS 用于安装 Elastic Stack,而另外一台 Ubuntu 机器将被用于安装 Kafka 及 Logstash。我将在 Ubuntu OS 机器上使用 Python 向 Kafka 写入数据。

安装

Elasticsearch 及 Kibana

我将使用 docker compose 的方法来安装 Elasticsearch 及 Kibana。我们可以参考文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x” 来进行部署。当然,我们也可以参阅如下的文章来进行部署:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
  • Kibana:如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana

在默认的情况下,Elasticsearch 的访问是带有 HTTPS 的安全访问。

我们可以在电脑的 terminal 中打入如下的命令来检查:

curl -k -u elastic:password https://192.168.0.3:9200

 上述命令是在 Ubuntu OS 的机器上运行。它表明,我们可以在 Ubuntu OS 的机器上成功地访问 Elasticsearch。

安装 Kafka

我们安装涉及设置 Apache Kafka(我们的消息代理)。Kafka 使用 ZooKeeper 来维护配置信息和同步,因此在设置 Kafka 之前,我们需要先安装 ZooKeeper:

sudo apt-get install zookeeperd

接下来,让我们下载并解压缩 Kafka:

wget https://apache.mivzakim.net/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -xzvf kafka_2.13-2.4.0.tgz
sudo cp -r kafka_2.13-2.4.0 /opt/kafka

现在,我们准备运行 Kafka,我们将使用以下脚本进行操作:

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

你应该开始在控制台中看到一些 INFO 消息:

Kafka 的配置如下

  • Kafka 正在监听 9092 端口
  • Zookeeper 正在监听 2181 端口
  • Kafka Manager 正在监听 9000 端口

我们接下来打开另外一个控制台中,并为 registered_user 创建一个主题:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic registered_user

我们创建了一个叫做 registered_user 的 topic。上面的命令将返回如下的结果:

$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic registered_user
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic registered_user.

我们现在已经完全为开始管道做好了准备。

有关 kafka 的安装,我们也可以使用 docker-compose 来进行安装。具体安装步骤请参考 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch”。

Logstash

我们接下来安装 Logstash。我们到 Elastic 的官方网站来下载时候我们平台的安装包:

 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
liuxg@liuxgu:~/logstash$ wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
--2023-01-29 14:20:31--  https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
Resolving artifacts.elastic.co (artifacts.elastic.co)... 34.120.127.130, 2600:1901:0:1d7::
Connecting to artifacts.elastic.co (artifacts.elastic.co)|34.120.127.130|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 341638094 (326M) [binary/octet-stream]
Saving to: ‘logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w’

logstash-8.6.1-amd64.de 100%[==============================>] 325.81M  10.7MB/s    in 31s     

2023-01-29 14:21:03 (10.6 MB/s) - ‘logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w’ saved [341638094/341638094]

liuxg@liuxgu:~/logstash$ mv 'logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w' logstash-8.6.1-amd64.deb

我们使用如下的命令来安装 Logstash:

sudo dpkg -i logstash-8.6.1-amd64.deb 
liuxg@liuxgu:~/logstash$ sudo dpkg -i logstash-8.6.1-amd64.deb 
[sudo] password for liuxg: 
(Reading database ... 386953 files and directories currently installed.)
Preparing to unpack logstash-8.6.1-amd64.deb ...
Unpacking logstash (1:8.6.1-1) over (1:8.4.2-1) ...
Setting up logstash (1:8.6.1-1) ...
Installing new version of config file /etc/logstash/jvm.options ...

为了能够配置 Logstash 能够正确地访问 Elasticsearch,我们可以参考我之前的文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。我们需要按照文章里的要求创建一个叫做 truststore.p12 的文件。由于我们是以 docker 的形式启动 Elasticsearch 及 Kibana 的,我们在 macOS 的机器上使用如下的命令来拷贝证书。我们先查看容器:

$ docker ps
CONTAINER ID   IMAGE                                                 COMMAND                  CREATED       STATUS                 PORTS                              NAMES
a2374f620b78   docker.elastic.co/kibana/kibana:8.6.1                 "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   0.0.0.0:5601->5601/tcp             elastic8-kibana-1
e2d6443b8edb   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   9200/tcp, 9300/tcp                 elastic8-es03-1
a29bbeb4bdf2   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   9200/tcp, 9300/tcp                 elastic8-es02-1
81de3d45943c   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   0.0.0.0:9200->9200/tcp, 9300/tcp   elastic8-es01-1

我们可以看到有一个叫做 elastic8-es01-1 的容器。

$ pwd
/Users/liuxg/data/elastic8
$ ls
docker-compose.yml http_ca.crt        kibana.yml         write_to_kafka.py
$ docker cp 81de3d45943c:/usr/share/elasticsearch/config/certs/ca/ca.crt .
$ ls
ca.crt             docker-compose.yml http_ca.crt        kibana.yml         write_to_kafka.py
$ ls
ca.crt             docker-compose.yml kibana.yml       

运用 ca.crt 文件,我们使用如下的命令来创建一个叫做 truststore.p12 的文件。它的 storepass 是 password:

keytool -import -file ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
$ keytool -import -file ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
Certificate was added to keystore
$ ls
ca.crt             docker-compose.yml kibana.yml         truststore.p12

从上面,我们可以看出来它创建了 truststore.p12 这个文件。我们接下来把这个文件拷贝到 Ubuntu OS 机器下的 /etc/logstash/conf.d 目录中。

liuxg@liuxgu:/etc/logstash/conf.d$ ls
truststore.p12

我们接下来在地址 /etc/logstash/conf.d 创建一个叫做叫做 kafka_to_logstash.conf 的配置文件:

kafka_to_logstash.conf

input {
  kafka {
    bootstrap_servers => "192.168.0.4:9092"
    topics => ["registered_user"]
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  elasticsearch {
      hosts => ["https://192.168.0.3:9200"]
      index => "registered_user"
      workers => 1
      user => "elastic"
      password => "password"
      ssl_certificate_verification => true
      truststore => "/etc/logstash/conf.d/truststore.p12"
      truststore_password => "password"
  }
}

在上面,请注意的是:

  • 我们使用 Elasticsearch 的超级用户 elastic 来连接 Elasticsearch。它的密码是 password。在实际的使用中,我们可以创建一个合适权限的用户来进行连接。

这样我们的 Logstash 的配置就完成了。

sudo service logstash start

我们可以通过如下的命令来检查 Logstash 是否已经成功地运行起来了。

service logstash status
liuxg@liuxgu:~$ service logstash status
● logstash.service - logstash
     Loaded: loaded (/lib/systemd/system/logstash.service; disabled; vendor preset: enabled)
     Active: active (running) since Sun 2023-01-29 15:25:57 CST; 7s ago
   Main PID: 60841 (java)
      Tasks: 33 (limit: 18977)
     Memory: 508.6M
     CGroup: /system.slice/logstash.service
             └─60841 /usr/share/logstash/jdk/bin/java -Xms1g -Xmx1g -Djava.awt.headless=true ->

1月 29 15:25:57 liuxgu systemd[1]: Started logstash.
1月 29 15:25:57 liuxgu logstash[60841]: Using bundled JDK: /usr/share/logstash/jdk

上面表明我们的 logstash 服务已经被成功地运行起来了。我们还可以通过如下的命令来查看 logstash 服务的日志:

journalctl -u logstash

向 Kafka topic 写入数据

我们使用如下的 Python 应用向我们的 Kafka topic “registered_user” 来写入数据:

write_to_kafka.py

from faker import Faker
from kafka import KafkaProducer
import json
fake = Faker()
import time

def get_registered_data():
    return {
        'first name': fake.first_name(),
        'last name': fake.last_name(),
        'age': fake.random_int(0, 60),
        'address': fake.address(),
        'register_year': fake.year(),
        'register_month': fake.month(),
        'register_day': fake.day_of_month(),
        'monthly_income': fake.random_int(28000, 100000)
    }

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

producer = KafkaProducer(bootstrap_servers=['192.168.0.4:9092'],
                         value_serializer=json_serializer)

if __name__ == '__main__':
    while True:
        registered_data = get_registered_data()
        print(registered_data)
        producer.send('registered_user', registered_data)
        time.sleep(3)

为了运行上面的应用,我们必须安装如下的两个包:

pip3 install Faker
pip3 install kafka-python

我们在 Ubuntu OS 机器上运行上面的代码:

python write_to_kafka.py 

 我们回到 Kibana 的界面来进行查看:

GET _cat/indices

上面的命令显示:

我们可以对这个文件进行搜索:

GET registered_user/_search

 我们可以看到如下的结果:

从上面,我们可以看出来我们的数据已经被结构化。

我们可以针对这个索引进行可视化。你可以阅读我博客里的相应文章以了解更多。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Discord教程:Discord账号注册、Discord多账号登录和管理

Discord最初是为游戏玩家在群聊和交流而创建的。但自疫情爆发以来,许多企业、公司和初创公司发现,居家办公时使用Discord进行日常沟通非常便捷。 Discord不再是仅限于游戏玩家,平台建立了不同于其他任何社交空间的新空间,封闭又开…

9、矩阵的简单运算

目录 一、矩阵的加减运算 二、矩阵的乘方运算 1.数与矩阵的乘法 2.矩阵与矩阵的乘法 三、矩阵的除法 四、矩阵的幂运算 五、矩阵元素的查找 六、矩阵元素的排序 七、矩阵元素的求和 八、矩阵元素的求积 九、矩阵元素的差分 一、矩阵的加减运算 进行矩阵加法、减法运…

前端学习第四站——CSS全面学习基础篇

目录 一、基础认知 1.1 CSS的介绍 1.2 语法规则 1.3 CSS初体验 1.4 CSS初识-小结 2.1 CSS引入方式 二、基础选择器 1.1 选择器的作用 1.2 标签选择器 1.3. 类选择器 1.4 id选择器 补充:类和id的区别 1.5 通配符选择器 三、字体和文本样式 1. 字体样式 …

python数据可视化开发(2):pandas读取Excel的数据格式处理(数据读取、指定列数据、DataFrame转json、数学运算、透视表运算输出)

系列文章目录 python开发低代码数据可视化大屏:pandas.read_excel读取表格python实现直接读取excle数据实现的百度地图标注python数据可视化开发(1):Matplotlib库基础知识 文章目录系列文章目录前言实践目标一、读取Excel数据read_excel参数说明读取全部…

Launcher应用列表内搜索框显示异常

问题描述 应用列表界面搜索框显示异常。本地试验后发现以下规律。 1、删除几个底边栏图标 2、旋转屏幕 3、进入应用列表,观察上方搜索框显示 问题分析 此问题是launcher内部界面显示问题,比较初级。找到规律后,发现应用列表内搜索框和底边…

【Hadoop】MapReduce数据倾斜问题解决方案

默认情况下Map任务的数量与InputSplit数量保持一致,Map阶段的执行效率也与InputSplit数量相关,当遇到大量的小文件时我们采用SequenceFile合并成一个大文件,以此来提高运行效率(【Hadoop】MapReduce小文件问题解决方案&#xff08…

OJ万题详解––高考排名(C++详解)

题目 题目描述 高考成绩的排名规则是按总分由高到低排,总分相同的人排名应相同,例如有 5 个同学的考高成绩: 考号姓名成绩001c1567002ygh605003gl690004xtb605005wzs567按照成绩排序后,成绩如下: 排名考号姓名成绩1003…

C/C++ 相关低耦合代码的设计

在我们设计C/C 程序的时候,有时需要两个类或者两个模块相互“认识”,或者两个模块间函数互相调用,假设我们正在开发一个网上商店,代表的网店客户的类必须要知道相关的账户。UML图如下,这被称为环依赖,这两个…

【GIS前沿】什么是新型基础测绘、内容、产品体系、特征?

《测绘法》指出,基础测绘是建立和维护全国统一的测绘基准和测绘系统,进行航天航空影像获取,建立和更新维护基础地理信息数据库,提供测绘地理信息应用服务等。 文章目录一、什么是新型基础测绘?二、新型基础测绘的特征三…

6、场景法

为什么使用场景法 现在的系统基本上都是由事件来触发控制流程的。如:我们申请一个项目,需先提交审批单据,再由部门经理审批,审核通过后由总经理来最终审批,如果部门经理审核不通过,就直接退回。每个事件触…

1.Docker Desktop安装设置

1.下载最新版本Download Docker Desktop | Docker 2.进行安装 2.1进行4.x版本安装 2.2最新版本出现问题 出现 docker desktop stopped 过一会后 quit退出,下载3.x版本 2.3继续安装 Enable Hyper-V windows Features 启动Hyper-V windows 虚拟化功能 百度百科-验证…

【GD32F427开发板试用】一、环境搭建与freertos移植

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动,更多开发板试用活动请关注极术社区网站。作者:chenjie 【GD32F427开发板试用】一、环境搭建与freertos移植 【GD32F427开发板试用】二、USB库移植与双USB CDC-ACM功能开发 【GD32F427开发板…

java集合类(属于工具类)概述

Java集合类可用于存储数量不等的对象,并可以实现常用的数据结构,如栈、队列等。除此之外,Java集合还可用于保存具有映射关系的关联数组。 Java集合大致可分为Set、List、Queue和Map四种体系: 其中Set代表无序、不可重复的集合&…

限制系统性能瓶颈的因素、衡量系统性能的指标

文章目录限制系统性能瓶颈的因素cpu内存磁盘IO网络IO异常数据库锁竞争衡量系统性能的指标响应时间吞吐量计算机资源分配使用率负载承受能力有时候我们的程序性能不高,需要提升性能,这个时候可以从以下几个角度去考虑是什么限制了我们的性能瓶颈.限制系统性能瓶颈的因素 cpu 有…

spring-bean的生命周期-【源码解析】-上

一、spring的bean概念Spring最重要的功能就是帮助程序员创建对象(也就是IOC),而启动Spring就是为创建Bean对象做准备,所以我们先明白Spring到底是怎么去创建Bean的,也就是先弄明白Bean的生命周期。Bean的生命周期就是指…

RocketMq-dashboard:topic 5min trend 原理和源码分析(一)

本文阅读基础:使用或了解过rocketMq;想了解"topic 5min trend"背后的原理;想了解监控模式如何实现。RocketMq的dashboard,有运维页面,驾驶舱,集群页面,主题页面,消费者页面…

[羊城杯 2020]easyre 1题解

一步一个脚印地耐心攀登,就是别去看顶峰,而要专注于在爬的路。 ——黑泽明 目录 1.查壳 2.IDA静态分析main函数 3.研究三重加密 第一重加密 第二重加密 第三重加密 4.解密 1.查壳 64bit exe文件 2.IDA静态分析main函数 拖入IDA,找到…

芯片验证系列——Checker

在产生了有效的激励后,需要判断出不符合功能描述的行为。Checker就是用于查看DUT是否按照功能描述做出期望的行为,识别出所有的设计缺陷。 按照激励的生成方式和检查的功能点分布可以将验证划分为三种方式: 黑盒验证:验证环境不…

【Vue】前端工程化与 webpack

一、前端工程化前端开发1.1 小白眼中的前端开发会写 HTML CSS JavaScript 就会前端开发需要美化页面样式,就拽一个 bootstrap 过来需要操作 DOM 或发起 Ajax 请求,再拽一个 jQuery 过来需要快速实现网页布局效果,就拽一个 Layui 过来1.2 实…

redis事务详解

事务是逻辑上对数据的一组操作,这操作要么一次全部成功或者这操作全部失败,是不可分割的单位 四大特性 原子性,一致性,隔离性,持久性(ACID) redis的事务 redis是弱事务型数据库,并不具备ACID的全部特性 re…