Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中(一)

news2025/1/9 17:01:06

Elasticsearch 是一个强大的 RESTful 搜索和分析引擎,能够处理越来越多的用例。 它将集中存储你的数据,以实现闪电般的快速搜索、微调相关性以及可轻松扩展的强大分析。 关于如何使用 Elastic Stack(又名 ELK 堆栈)将数据摄取到 Elasticsearch 的资源有很多。在今天的文章中,我将详细介绍如何使用 Node.js 从零开始来把地震的实时数据采集到 Elasticsearch 中。

如果你选择的编程语言是 JavaScript,并且你需要使用 RESTful API 方法从第三方应用程序获取数据,那么使用 Node.js 获取数据是一个不错的选择。 你还可以托管服务器,让它持续实时摄取数据。 该演示将向您展示如何设置一个 Node.js + Express.js 服务器,该服务器实时将数据提取到 Elasticsearch 中,然后可以对这些数据进行分析并以有意义的方式采取行动。

对于此演示,我们将使用 USGS 实时发布的公开可用的全球地震数据。

准备工作

Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话,那么请参考我之前的文章:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch
  • Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana

在今天的展示中,我将使用 Elastic Stack 8.x 来进行展示。在安装的时候,请参考相应的 Elastic Stack 8.x 的文章来进行安装。

Node.js

你需要安装好自己的 Node.js 来完成下面的练习。你可以参考 Node.js 链接来进行相应的安装。

实时数据

根据 USGS 网上所提供的信息,我们可以在地址  https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson 找到相应的地震信息数据。我们可以通过如下的命令来进行查看:

curl https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson | jq .

如上所示,它是一个以 JSON 格式给出来的数据信息。这个数据会实时发生变化,我们可以通过反复访问这个接口来得到所需要的地震信息。在这里,我们需要注意的是:

  • "time": 1672471359610,这是一个时间信息,可以作为我们的 timestamp 来对它进行分析。我们将最终把它存入到 @timestamp 里。
  • "id": "nc73827101",这是一个地震特有的 id,我们将以这个 id 成为数据的 id。

  • "geometry",这个是地震发生的地理位置。我们可以需要在 Elasticsearch 中为它定一下为 geo_point 数据类型。我们将把它变为:

虽然数据有很多,但是我们最终需要的数据格式是这样的:

{
  "mag": 1.13,
  "place": "11km ENE of Coachella, CA",
  "@timestamp": 2022-05-02T20:07:53.266Z,
  "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  "sig": 20,
  "type": "earthquake",
  "depth": 2.09,
  "coordinates": {
    "lat": 33.7276667,
    "lon": -116.0736667
  }
}

在接下来的步骤里,我来详细介绍如何达到我们最终的目的。

创建 Node.js 应用

创建最基本的 express 应用

我们将从 0 开始一步一步地创建 Node.js 应用。首先我们在自己的电脑中创建一个目录:

mkdir earthquake_app
$ pwd
/Users/liuxg/demos
$ mkdir earthquake_app
$ cd earthquake_app/

我们进入到该目录中,并打入如下的命令:

npm init -y
$ npm init -y
Wrote to /Users/liuxg/demos/earthquake_app/package.json:

{
  "name": "earthquake_app",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}


$ ls
package.json

上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。

在接下来的代码中,我们将会使用如下的一些 packages:

  • @elastic/elasticsearch
  • axios
  • config
  • cors
  • express
  • log-timestamp
  • nodemon

我们可以通过如下的命令来进行安装:

npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
$ npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/
npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/

added 118 packages in 17s

11 packages are looking for funding
  run `npm fund` for details

由于我之前已经安装过,所以我上面显示的信息和你的可能会有所不同。我们再次来查看 package.json 文件:

$ pwd
/Users/liuxg/demos/earthquake_app
$ ls
node_modules      package-lock.json package.json
$ cat package.json 
{
  "name": "earthquake_app",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@elastic/elasticsearch": "^8.5.0",
    "axios": "^1.2.2",
    "config": "^3.3.8",
    "cors": "^2.8.5",
    "express": "^4.18.2",
    "log-timestamp": "^0.3.0",
    "nodemon": "^2.0.20"
  }
}

很显然,我们最新安装的 packages 已经自动添加到 package.json 文件中了。

我们接下来创建一个叫做 server 的子目录,并在它里面创建一个叫做 server.js 的文件:

$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir server
$ touch server/server.js

在上面,我们创建了一个叫做 server.js 的文件。这个将来就是我们需要运行的 server 脚本。为了能够让我们的 package.json 文件的配置能让 npm 进行运行,我们需要对它进行修改。

$ pwd
/Users/liuxg/demos/earthquake_app
$ cat package.json 
{
  "name": "earthquake_app",
  "version": "1.0.0",
  "description": "",
  "main": "sever.js",
  "scripts": {
    "start": "nodemon server/server.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@elastic/elasticsearch": "^8.5.0",
    "axios": "^1.2.2",
    "config": "^3.3.8",
    "cors": "^2.8.5",
    "express": "^4.18.2",
    "log-timestamp": "^0.3.0",
    "nodemon": "^2.0.20"
  }
}

很多人可能会奇怪,为啥使用 nodemon 来启动脚本。它的好处是当我们修改好 server.js 里的脚本,那么它会自动重新启动服务器的运行,而不需要我们每次都需要打入如下的命令:

npm start

接下为了验证我们的 express 应用是否能成功地运行,我们修改 server.js 为如下的代码:

server/server.js

onst express = require('express');

const app = express();

const port = 5001;

app.get('/', (req, res) => {
    res.send('Hello World!')
  })

app.listen(port, () => console.log(`Server listening at http://localhost:${port}`));

我们接下来使用如下的命令来进行启动:

npm start
$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start

> earthquake_app@1.0.0 start
> nodemon server/server.js

[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001

我们可以看到服务器已经成功地运行起来了,并且它运行于 5001 端口上。我们可以通过浏览器来进行访问它的网址:

上面显示我们的服务器运行正常。

安全地连接 Node.js 服务器到 Elasticsearch

接下来,我们需要创建代码来安全地连接 Node.js 服务到我们本地部署的 Elasticsearch 中。我们可以参考之前的文章 “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索”。我们可以在项目的更目录下创建如下的两个子目录:

mkdir config
mkdir -p server/elasticsearch
$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir config
$ mkdir -p server/elasticsearch
$ ls -d */
config/       node_modules/ server/

在 config 子目录下,我们创建如下的一个叫做 default.json 的文件。这个是用来配置如何连接到 Elasticsearch 的:

config/default.json

{
  "elastic": {
    "elasticsearch_endpoint": "https://localhost:9200",
    "username": "elastic",
    "password": "-pK6Yth+mU8O-f+Q*F3i",
    "apiKey": "eVBKOFhJVUJUN1gwSDQyLU5halY6R1BVRjNOUmpRYUtkTmpXTUZHdWZVUQ==",
    "certificate": "/Users/liuxg/elastic/elasticsearch-8.5.3/config/certs/http_ca.crt",
    "caFingerprint": "E3D36275D9FA80CF96F74E6537FC74E7952511A75E01605EBCFB8FC9F08F598C"
  }
}

我们先不要着急来了解这些配置参数。有些我们可能并不一定要用到。这些设置针对我们每个人的 Elasticsearch 的安装的不同而不同。在上面的参数解释如下:

  • elasticsearch_endpoint:这个是 Elasticsearch 的访问地址
  • username:这个是访问 Elasticsearch 的用户名,你可以不选用超级用户 elastic,而且在生产环境中,也不是推荐的方法
  • password:这个是上面 username 账号的密码
  • apiKey:这个是访问 Elasticsearch 所需要的 apiKey。你可以参考  “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索” 来了解如何进行生产。在下面的代码中,我们也可以使用 code 来进行生成
  • certificate:这个是证书的位置。每个 Elasticsearch 集群都会有一个生成的证书位置。我们需要填入这个位置信息
  • caFingerprint:这个是证书的 fingerprint 信息。我们可以采用 fingerprint 来进行连接。在本演示中,我将不使用这种方式。更多信息,请参考 Connecting | Elasticsearch JavaScript Client [master] | Elastic

我们在 elasticsearch 目录下创建一个叫做 client.js 的文件:

server/elasticsearch/client.js

const { Client } = require('@elastic/elasticsearch');
const config = require('config');
const fs = require('fs')

const elasticConfig = config.get('elastic');

// const client = new Client ( {
//   node: elasticConfig.elasticsearch_endpoint, 
//    auth: {
//      apiKey: elasticConfig.apiKey
//    }, 
//    tls: {
//     ca: fs.readFileSync(elasticConfig.certificate),
//     rejectUnauthorized: true
//    }     
// });

const client = new Client ( {
  node: elasticConfig.elasticsearch_endpoint,
  auth: { 
    username: elasticConfig.username,
    password: elasticConfig.password
   }, 
   tls: {
    ca: fs.readFileSync(elasticConfig.certificate),
    rejectUnauthorized: true
   }
});


client.ping()
  .then(response => console.log("You are connected to Elasticsearch!"))
  .catch(error => console.error("Elasticsearch is not connected."))

module.exports = client; 

在上面,我使用了两种方法来连接到 Elasticsearch。一种是通过 username/password 的方式来进行连接:

const client = new Client ( {
  node: elasticConfig.elasticsearch_endpoint,
  auth: { 
    username: elasticConfig.username,
    password: elasticConfig.password
   }, 
   tls: {
    ca: fs.readFileSync(elasticConfig.certificate),
    rejectUnauthorized: true
   }
});

而另外一种就是被注释掉的那个方法:

const client = new Client ( {
  node: elasticConfig.elasticsearch_endpoint, 
   auth: {
     apiKey: elasticConfig.apiKey
   }, 
   tls: {
    ca: fs.readFileSync(elasticConfig.certificate),
    rejectUnauthorized: true
   }     
});

这个也是被推荐的方法。在实际的使用中,我们更推荐使用 API key 来进行连接。

我们首先来使用 username/password 的方式来进行连接。我们需要修改我们的 server.js 来进行验证:

server/server.js

const express = require('express');
const client = require('./elasticsearch/client');

const app = express();

const port = 5001;

app.get('/', (req, res) => {
    res.send('Hello World!')
  })

app.listen(port, () => console.log(`Server listening at http://localhost:${port}`));

我们重新运行服务器。我们可以看到如下的输出:

$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start

> earthquake_app@1.0.0 start
> nodemon server/server.js

[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001
You are connected to Elasticsearch!

上面的输出表明我们已经能够成功地连接到 Elasticsearch 了。

使用代码获取 API key

我们接下来可以通过代码来获得 API key,尽管我们可以通过其它的方法来获得。请详细阅读 “Elasticsearch:创建 API key 接口访问 Elasticsearch”。在这里,我们可以使用 Node.js 代码来动态地生成一个 API key。我们在 server 目录下创建如下的一个文件:

sever/create-api-key.js

const client = require('./elasticsearch/client');

async function generateApiKeys(opts) {
  const body = await client.security.createApiKey({
    body: {
      name: 'earthquake_app',
      role_descriptors: {
        earthquakes_example_writer: {
          cluster: ['monitor'],
          index: [
            {
              names: ['earthquakes'],
              privileges: ['create_index', 'write', 'read', 'manage'],
            },
          ],
        },
      },
    },
  });
  return Buffer.from(`${body.id}:${body.api_key}`).toString('base64');
}

generateApiKeys()
  .then(console.log)
  .catch((err) => {
    console.error(err);
    process.exit(1);
  });

我们使用如下的命令来运行这个 Node.js 的代码:

$ pwd
/Users/liuxg/demos/earthquake_app
$ ls server/create-api-key.js 
server/create-api-key.js
$ node server/create-api-key.js 
You are connected to Elasticsearch!
emZJSGFZVUJUN1gwSDQyLWRLaS06LVpHaXR1bm5RQnEybE4zOWoyd0g5Zw==

我们可以把上面命令生成的 API key 写入到之前的 default.json 文件中。这样我们也可以通过 API key 的方式来访问 Elasticsearch 了,如果我们需要的话。这样 client.js 实际上可以写成:

server/elasticsearch/client.js

const { Client } = require('@elastic/elasticsearch');
const config = require('config');
const fs = require('fs')

const elasticConfig = config.get('elastic');

const client = new Client ( {
  node: elasticConfig.elasticsearch_endpoint, 
   auth: {
     apiKey: elasticConfig.apiKey
   }, 
   tls: {
    ca: fs.readFileSync(elasticConfig.certificate),
    rejectUnauthorized: true
   }     
});

// const client = new Client ( {
//   node: elasticConfig.elasticsearch_endpoint,
//   auth: { 
//     username: elasticConfig.username,
//     password: elasticConfig.password
//    }, 
//    tls: {
//     ca: fs.readFileSync(elasticConfig.certificate),
//     rejectUnauthorized: true
//    }
// });

client.ping()
  .then(response => console.log("You are connected to Elasticsearch!"))
  .catch(error => console.error("Elasticsearch is not connected."))

module.exports = client; 

我们重新运行 server.js,我们可以看到如下的输出:

$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start

> earthquake_app@1.0.0 start
> nodemon server/server.js

[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001
You are connected to Elasticsearch!

很显然,我们的 API key 方式是成功的。使用 API key 的好处是我们不必要暴露用户的密码在代码中,而且,我们甚至可以为这个 API key 来设置有效时间及权限。可以授予最小所需要的权限,以确保安全。

设置 RESTful API 调用以从源检索数据

现在我们的服务器正在运行并且 Elasticsearch 已连接,我们需要测试对 USGS 的 API 调用以接收初始数据。 在项目的根目录下,创建一个名为 routes 的文件夹和一个名为 api 的子文件夹。 在 api 文件夹中,创建一个名为 data.js 的文件并添加以下代码:

$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir -p server/routes/api

我在 routes/api 目录下创建一个如下的 data.js 文件:

server/routes/api/data.js

require('log-timestamp');
const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;

router.get('/earthquakes', async function (req, res) {
    console.log('Loading Application...');
    
    //======= Check that Elasticsearch is up and running =======\\
    pingElasticsearch = async () => {
        await client.ping(
            function(error,res) {
                if (error) {
                    console.error('elasticsearch cluster is down!');
                } else {
                    console.log('Elasticsearch Ready');
                }
            }
        );
    }

    // ====== Get Data From USGS and then index into Elasticsearch
    indexAllDocs = async () => {
        try {
            console.log('Getting Data From Host')

            const EARTHQUAKES = await axios.get(`${URL}`,{
                headers: {
                    'Content-Type': [
                        'application/json',  
                        'charset=utf-8' 
                    ]
                }
            });

            console.log('Data Received!')

            results = EARTHQUAKES.data.features

            console.log('Indexing Data...')

            console.log(results)
            res.json(results)

            if (EARTHQUAKES.data.length) {
                indexAllDocs();
            } else {
                console.log('All Data Has Been Indexed!');
            };
        } catch (err) {
            console.log(err)
        };

        console.log('Preparing For The Next Data Check...');
    }

    console.log("Ping the Elasticsearch server");
    pingElasticsearch()

    console.log("Get data from USGS");
    indexAllDocs()
});
 
module.exports = router;

上面的代码使用 npm 包 Axios 对 USGS 地震 API 进行异步 API 调用。 收到数据后,它将显示为 JSON。 你还可以看到我们在页面顶部导入了一个名为 log-timestamp 的依赖项。 这将允许我们将时间戳添加到每个 console.log。

我们接下来修改 server.js 如下:

server/server.js

const express = require('express');
const client = require('./elasticsearch/client');

const app = express();

const port = 5001;

//Define Routes
const data = require('./routes/api/data')
app.use('/api/data', data);

app.get('/', (req, res) => {
    res.send('Hello World!')
  })

app.listen(port, () => console.log(`Server listening at http://localhost:${port}`));

重新运行我们的 server.js。我们通过 Postman 或者其它的工具来对我们的 REST 接口进行访问:

localhost:5000/api/data/earthquakes

从上面的输出中,我们可以看出来设计的 REST 接口工作是正常的。它含有一些收集来的数据。在所收集来的数据中,有一些数据是我们并不需要的。我们最终需要的数据是这样的:

{
  "mag": 1.13,
  "place": "11km ENE of Coachella, CA",
  "time": 2022-05-02T20:07:53.266Z,
  "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
  "sig": 20,
  "type": "earthquake",
  "depth": 2.09,
  "coordinates": {
    "lat": 33.7276667,
    "lon": -116.0736667
  }
}

也就是说我们可以删除一下不需要的字段,并且我们需要转换一些字段,比如把 time 字段转换为我们想要的 @timestamp 字段。另外在写入 Elasticsearch 时,我们需要预先针对 coodinates 字段进行定义。它是一个 geo_point 类型的字段。

定义 mapping 及 pipeline

如上所示,我们需要的字段如上。我们可以如下的一个 earthquakes 索引。我们在 Kibana 的 console 中打入如下的命令:

PUT earthquakes
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "coordinates": {
        "type": "geo_point"
      },
      "depth": {
        "type": "float"
      },
      "mag": {
        "type": "float"
      },
      "place": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "sig": {
        "type": "short"
      },
      "type": {
        "type": "keyword"
      },
      "url": {
        "enabled": false
      }
    }
  }
}

在上面,我们针对索引的字段类型做如下的说明:

  • @timestamp:这是一个 date 字段类型的字段。我们希望的格式是 2022-05-02T20:07:53.266Z 而不是以 EPOC 形式显示的值,比如 1672471359610。这个字段有 time 转换而来
  • coordinates:这个是一个 geo_point 的字段。是地震发生的地理位置
  • place:这是一个 multi-field 字段。我们希望对这个字段进行统计,也可以针对它进行搜索
  • sig:这字段我们使用 short 类型,而不是 long。这样可以省去存储空间
  • type:这是一个 keyword 类型的字段。它只可以做数据分析统计之用
  • url:这个字段,我们既不想对它进行搜索,也不想对它进行统计,所有设置 enabled 为 false。这样可以省去分词的时间,从而提高摄入数据的速度

为此,我们可以针对上面的 data.js 做更进一步的修改:

server/routes/api/data.js

const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;

 //======= Check that Elasticsearch is up and running =======\\
function pingElasticsearch() {
    console.log("ping .....")
    client.ping({
        requestTimeout: 30000,
      }, function(error,res) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Elasticsearch Ready');
        }
    });
};

// ====== Get Data From USGS and then index into Elasticsearch
indexAllDocs = async () => {
    try {
        const EARTHQUAKES = await axios.get(`${URL}`,{
            headers: {
                'Content-Type': [
                    'application/json',  
                    'charset=utf-8' 
                ]
            }
        });

        console.log('Getting Data From Host')

        results = EARTHQUAKES.data.features    
    
        results.map(
            async (results) => (
              (earthquakeObject = {
                place: results.properties.place, //
                time: results.properties.time,   //
                url: results.properties.url,     //
                sig: results.properties.sig,     //
                mag: results.properties.mag,     //
                type: results.properties.type,   //
                longitude: results.geometry.coordinates[0], //
                latitude: results.geometry.coordinates[1],  //
                depth: results.geometry.coordinates[2],  //
              }),
              await client.index({
                index: 'earthquakes',
                id: results.id,
                body: earthquakeObject
              })
            )
        );

        if (EARTHQUAKES.data.length) {
            indexAllDocs();
        } else {
            console.log('All Data Has Been Indexed!');
        };
    } catch (err) {
        console.log(err)
    };

    console.log('Preparing For The Next Data Check...');
}


//================== Official API Call ==================\\
router.get('/earthquakes', function (req, res) {
    res.send('Running Application...');
    console.log('Loading Application...')
    
    indexAllDocs(res);

});
 
module.exports = router;

在上面,我们添加了把文档写入 Elasticsearch 的代码部分。我们使用地震数据的 id 作为 Elasticsearch 文档的 id。等服务器运行起来后,我们需要在 terminal 中打入如下的命令:

curl -XGET http://localhost:5001/api/data/earthquakes

我们可以在 Kibana 中通过如下的命令来查看文档:

GET earthquakes/_search?filter_path=**.hits

我们可以看到如下的结果:

{
  "hits": {
    "hits": [
      {
        "_index": "earthquakes",
        "_id": "nc73827281",
        "_score": 1,
        "_source": {
          "place": "10km S of Laytonville, CA",
          "time": 1672505649740,
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827281",
          "sig": 63,
          "mag": 2.02,
          "type": "earthquake",
          "longitude": -123.4981689,
          "latitude": 39.5991669,
          "depth": 4.59
        }
      },
   ...

很显然,这个文档的 source 和我们之前的想要的格式还是不太一样。为了能够使的 time 转换为 @timestamp,我们可以在 Node.js 的代码中进行相应的转换。我们也可以采用 ingest pipeline 来实现相应的操作。我们定义如下的 ingest pipeine。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "This is for data transform for earthquake data",
    "processors": [
      {
        "date": {
          "field": "time",
          "formats": [
            "UNIX_MS"
            ]
        }
      }
      ]
  },
  "docs": [
    {
      "_source": {
        "place": "16km N of Borrego Springs, CA",
        "time": 1672507053210,
        "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
        "sig": 10,
        "mag": 0.81,
        "type": "earthquake",
        "longitude": -116.368,
        "latitude": 33.4013333,
        "depth": 2.91
      }
    }
  ]
}

在上面的命令中,我们使用 date processor 来把 time 转换为所需要的格式,并在默认的情况下把 target 设置为 @timestamp。上面命令运行的结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "sig": 10,
          "mag": 0.81,
          "depth": 2.91,
          "@timestamp": "2022-12-31T17:17:33.210Z",
          "latitude": 33.4013333,
          "place": "16km N of Borrego Springs, CA",
          "time": 1672507053210,
          "type": "earthquake",
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
          "longitude": -116.368
        },
        "_ingest": {
          "timestamp": "2023-01-01T00:31:03.544821Z"
        }
      }
    }
  ]
}

从上面的输出中,我们可以看出来 @timestamp 字段已经生成。它的值由 time 字段转换而来。我们还发现 latitude 及 longitude 并不是按照我们需要的格式来显示的。我们需要把它转化为另外一个像如下的对象:

我们可以通过 rename processor 来操作:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "This is for data transform for earthquake data",
    "processors": [
      {
        "date": {
          "field": "time",
          "formats": [
            "UNIX_MS"
            ]
        }
      },
      {
        "rename": {
          "field": "latitude",
          "target_field": "coordinates.lat"
        }
      },
      {
        "rename": {
          "field": "longitude",
          "target_field": "coordinates.lon"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "place": "16km N of Borrego Springs, CA",
        "time": 1672507053210,
        "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
        "sig": 10,
        "mag": 0.81,
        "type": "earthquake",
        "longitude": -116.368,
        "latitude": 33.4013333,
        "depth": 2.91
      }
    }
  ]
}

在上面的命令中,我们通过 rename processor 来重新命名 longitude 及 latitude 两个字段。运行上面的代码,我们可以看到如下的结果:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "sig": 10,
          "mag": 0.81,
          "depth": 2.91,
          "@timestamp": "2022-12-31T17:17:33.210Z",
          "coordinates": {
            "lon": -116.368,
            "lat": 33.4013333
          },
          "place": "16km N of Borrego Springs, CA",
          "time": 1672507053210,
          "type": "earthquake",
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
        },
        "_ingest": {
          "timestamp": "2023-01-01T00:38:42.729604Z"
        }
      }
    }
  ]
}

很显然,我们看到了一个新的 coordinates 的字段。它是一个 object。我们发现有一个多余的字段叫做 time。这个并不是我们所需要的。我们可以通过 remove processor 来删除这个字段。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "This is for data transform for earthquake data",
    "processors": [
      {
        "date": {
          "field": "time",
          "formats": [
            "UNIX_MS"
            ]
        }
      },
      {
        "rename": {
          "field": "latitude",
          "target_field": "coordinates.lat"
        }
      },
      {
        "rename": {
          "field": "longitude",
          "target_field": "coordinates.lon"
        }
      },
      {
        "remove": {
          "field": "time"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "place": "16km N of Borrego Springs, CA",
        "time": 1672507053210,
        "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
        "sig": 10,
        "mag": 0.81,
        "type": "earthquake",
        "longitude": -116.368,
        "latitude": 33.4013333,
        "depth": 2.91
      }
    }
  ]
}

我们运行上面的命令。我们再次查看输出的结果:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "sig": 10,
          "mag": 0.81,
          "depth": 2.91,
          "@timestamp": "2022-12-31T17:17:33.210Z",
          "coordinates": {
            "lon": -116.368,
            "lat": 33.4013333
          },
          "place": "16km N of Borrego Springs, CA",
          "type": "earthquake",
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
        },
        "_ingest": {
          "timestamp": "2023-01-01T00:44:46.919265Z"
        }
      }
    }
  ]
}

很显然这个时候,我们的 time 字段不见了。

在上面,我们通过 _simulate 的端点测试好了我们的 ingest pipeline。接下来,是我们使用命令来创建这个 pipeline 的时候了。我们使用如下的命令来创建这个 pipeline:

PUT _ingest/pipeline/earthquake_data_pipeline
{
  "description": "This is for data transform for earthquake data",
  "processors": [
    {
      "date": {
        "field": "time",
        "formats": [
          "UNIX_MS"
        ]
      }
    },
    {
      "rename": {
        "field": "latitude",
        "target_field": "coordinates.lat"
      }
    },
    {
      "rename": {
        "field": "longitude",
        "target_field": "coordinates.lon"
      }
    },
    {
      "remove": {
        "field": "time"
      }
    }
  ]
}

运行上面的命令。这样我们就创建了一个叫做 earthquake_data_pipeline 的 ingest pipeline。

接下来,我们需要删除之前所创建的索引,因为它包含我们不需要的一些字段:

DELETE earthquakes

我们再次运行之前创建索引 earthquakes 的命令:

PUT earthquakes
    {
      "mappings": {
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "coordinates": {
            "type": "geo_point"
          },
          "depth": {
            "type": "float"
          },
          "mag": {
            "type": "float"
          },
          "place": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            }
          },
          "sig": {
            "type": "short"
          },
          "type": {
            "type": "keyword"
          },
          "url": {
            "enabled": false
          }
        }
      }
    }

我们接下来需要修改 data.js 文件来使用这个 ingest pipeline:

server/routes/api/data.js

const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;

 //======= Check that Elasticsearch is up and running =======\\
function pingElasticsearch() {
    console.log("ping .....")
    client.ping({
        requestTimeout: 30000,
      }, function(error,res) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Elasticsearch Ready');
        }
    });
};

// ====== Get Data From USGS and then index into Elasticsearch
indexAllDocs = async () => {
    try {
        const EARTHQUAKES = await axios.get(`${URL}`,{
            headers: {
                'Content-Type': [
                    'application/json',  
                    'charset=utf-8' 
                ]
            }
        });

        console.log('Getting Data From Host')

        results = EARTHQUAKES.data.features    
    
        results.map(
            async (results) => (
              (earthquakeObject = {
                place: results.properties.place, 
                time: results.properties.time,   
                url: results.properties.url,     
                sig: results.properties.sig,     
                mag: results.properties.mag,     
                type: results.properties.type,   
                longitude: results.geometry.coordinates[0], 
                latitude: results.geometry.coordinates[1], 
                depth: results.geometry.coordinates[2], 
              }),
              await client.index({
                index: 'earthquakes',
                id: results.id,
                body: earthquakeObject,
                pipeline: 'earthquake_data_pipeline'
              })
            )
        );

        if (EARTHQUAKES.data.length) {
            indexAllDocs();
        } else {
            console.log('All Data Has Been Indexed!');
        };
    } catch (err) {
        console.log(err)
    };

    console.log('Preparing For The Next Data Check...');
}


//================== Official API Call ==================\\
router.get('/earthquakes', function (req, res) {
    res.send('Running Application...');
    console.log('Loading Application...')
    
    setInterval(() => { 
        pingElasticsearch()
        indexAllDocs(res);
    }, 120000);

});
 
module.exports = router;

在上面的代码中,我对一下的两处做了修改:

我们再次使用如下的命令来启动对数据的采集:

curl -XGET http://localhost:5001/api/data/earthquakes

稍等一点时间(超过2分钟),我们到 Kibana 中来查看数据:

GET earthquakes/_search

我们可以看到如下的数据:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 9,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "earthquakes",
        "_id": "us7000j1cr",
        "_score": 1,
        "_source": {
          "sig": 340,
          "mag": 4.7,
          "depth": 181.449,
          "@timestamp": "2023-01-01T06:39:45.239Z",
          "coordinates": {
            "lon": 70.8869,
            "lat": 36.5351
          },
          "place": "36 km S of Jurm, Afghanistan",
          "type": "earthquake",
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/us7000j1cr"
        }
      },
  ...

从上面,我们可以看出来有9个地震数据已经被写入。我们可以让应用运行一段时间,它可能会有更多的数据进来。比如:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "earthquakes",
        "_id": "nc73827436",
        "_score": 1,
        "_source": {
          "sig": 252,
          "mag": 4.04,
          "depth": 4.51,
          "@timestamp": "2023-01-01T06:49:08.930Z",
          "coordinates": {
            "lon": -121.220665,
            "lat": 36.5789986
          },
          "place": "9km NW of Pinnacles, CA",
          "type": "earthquake",
          "url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827436"
        }
      },

我们可以看到10个数据。

从上面的数据中,我们可以看到最终的数据结构就是我们想要的数据结构。

在接下来的文章中,我将详细描述如何对这个数据进行可视化。我将使用 Kibana 来进行展示,也会使用 Web 来进行搜索。请参阅文章 “Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中(二)”

为了方便大家的学习,我把源代码放在这里:https://github.com/liu-xiao-guo/earthquakes-app

参考:

【1】https://medium.com/@webdevmark16/ingesting-real-time-data-into-elasticsearch-with-node-js-a7aa9b5acf8c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/135768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++编程调试秘籍】| 总结归纳要点(上)

文章目录一、编译器是捕捉缺陷的最好场合1 如何使用编译器捕捉缺陷二、在运行时遇见错误该如何处理1 该输出哪些错误信息2 执行安全检查则会减低程序效率,该如何处理呢3 当运行时遇到错误时,该如何处理四、索引越界1 动态数组2 静态数组3 多维数组5 指针…

【回答问题】ChatGPT上线了!给我推荐20个比较流行的深度学习模型

目录给我推荐20个比较流行的nlp模型给我推荐20个比较流行的计算机视觉模型给我推荐20个比较流行的图像分类模型给我推荐20个比较流行的人脸识别模型给我推荐20个比较流行的实体识别模型给我推荐20个比较流行的语言识别模型给我推荐20个比较流行的激光雷达3D点云模型给我推荐20个…

计算机网络的一些常识

序 小白,啥也不会,所以要学习常识 快速总览 这个视频按照数据链路层——网络层——传输层——应用层的顺序,自下向上介绍的。虽然只有30分钟,但是挺全的,密度挺高的,而且小白友好。 计算机网络 Compute…

python学习|第一天

文章目录1.输出函数print2.浮点数输出3.数据类型转换4.运算符优先级5.列表对象list1.输出函数print #输出数字,直接输出 print(2023) print(2023.1)#输出字符串,要加单引号或者双引号(实际效果都是一样的),输出后自动换行 print(hello 2023)…

华为CE系列和S系列交换机堆叠配置及mad检测

CE系列交换机堆叠配置: 第一台交换机配置: system-view immediately #不用每次都输入 commit提交了 sysname sw1 stack stack member 1 priority 105 stack member 1 domain 10 quit inter stack-port 1/1 port member-group inter 10GE 1/0/1 dis st…

使用Helm部署Wikijs

使用 Helm 部署 Wiki.js 📚️ 参考文档: Wiki.js 官方文档 - 安装 - Kubernetes Wiki.js 使用 Helm 安装 Wiki.js 官方文档 - 安装 - 侧加载 官方教程 Kubernetes 开始使用 Helm Chart 在 Kubernetes 上安装 先决条件 Kubernetes 集群HelmPostgreSQL 数据库 ❗…

实习------数据库进阶

B树索引 什么是索引? MySQL官方对索引的定义为:索引就是用于实现数据的快速检索,由数据表中的一列或多列组合而成,索引实质上是一张描述索引列的列值与原表中记录行之间一 一对应关系的有序表。索引的实现通常使用B树及其变种B树…

2022年终回顾与总结:螃蟹走路-冲,撞

工作赚钱&#xff0c;养家糊口 << 2022年对地球上的人类来说&#xff0c;肯定是刻骨铭心的纪元。对于微小的个人而言&#xff0c;感受是真真切切的。固然全球疫情危害了劳苦大众&#xff0c;但家庭给我的触动却是直接和深刻的。 这一年的轨迹被6月的一刀切成两片。上半年…

深度学习——序列模型(笔记)

1.序列数据&#xff1a; ①现实生活中有很多数据是有时序结构&#xff0c;比如电影的评分随时间的变化而变化。 ②统计学中&#xff0c;超出已知观测范围进行预测是外推法&#xff0c;在现有的观测值之间进行估计是内插法 2.统计工具&#xff1a;处理序列数据选用统计工具和新…

第一章:Mybatis与微服务注册

目录 一、SpringBoot整合MybatisPlus 创建自动生成代码子模块 创建商品服务子模块 二、SpringBoot整合Freeamarker 三、SpringBoot整合微服务&gateway&nginx 整合微服务之商品服务zmall-product 创建并配置网关gateway服务 安装配置SwitchHosts 安装配置Windo…

安装包部署prometheus+Grafana+node_exporter

部署prometheus 在192.168.11.141服务器操作 下载prometheus安装包 wget https://github.com/prometheus/prometheus/releases/download/v2.32.1/prometheus-2.32.1.linux-amd64.tar.gz 下载prometheus安装包 tar xvf prometheus-2.32.1.linux-amd64.tar.gz -C /usr/local…

OSPF的工作原理与性能优化

OSPF的3张表 OSPF的工作过程分为3个大步骤&#xff0c;分别是形成邻居关系&#xff0c;形成邻接关系&#xff0c;计算路由 OSPF建立邻居&#xff0c;收集LSA&#xff0c;收集完成形成邻接 用收集到的LSA&#xff0c;作为原材料&#xff0c;计算路由 完成这3大步骤&#xff0c;…

分享106个PHP源码,总有一款适合您

源码下载链接&#xff1a;https://pan.baidu.com/s/1Dyc3Qj8JRHJr2sECdEqGrA?pwdlscj 提取码&#xff1a;lscj PHP源码 分享106个PHP源码&#xff0c;总有一款适合您 采集参数 page_count 1 # 每个栏目开始业务content"text/html; charsetgb2312"base_url &q…

MySQL事务隔离级别详解

一、什么是事务 事务&#xff08;Transaction&#xff09;是由一系列对数据库中的数据进行访问与更新的操作所组成的一个程序执行单元。 在同一个事务中所进行的操作&#xff0c;要么都成功&#xff0c;要么就都失败。理想中的事务必须满足四大特性&#xff0c;这就是大名鼎鼎…

8种专坑同事的 SQL 写法,性能降低100倍,不来看看?

今天给大家分享几个SQL常见的“坏毛病”及优化技巧。 SQL语句的执行顺序&#xff1a; 1、LIMIT 语句 分页查询是最常用的场景之一&#xff0c;但也通常也是最容易出问题的地方。比如对于下面简单的语句&#xff0c;一般 DBA 想到的办法是在 type、 name、 create_time 字段上…

第二章:Swagger2

目录 背景介绍 什么是Swagger2 常用注解 SpringBoot整合Swagger2 生产环境下屏蔽Swagger2 修改Swagger2配置类 修改application.yml 使用maven package打包测试 运行测试 背景介绍 在团队开发中&#xff0c;一个好的 API 文档不但可以减少大量的沟通成本&#xff0c;还…

Linux系统的进程管理

文章目录Linux系统的进程管理1.查看进程2.父进程3.终止进程4.进程树Linux系统的进程管理 在LINUX中&#xff0c;每个执行的程序都称为一个进程。每一个进程都分配一个ID号(pid,进程号) 每个进程都可能以两种方式存在的。前台与后台&#xff0c;所谓前台进程就是用户目前的屏幕…

Vulnhub 靶场 Earth

通关方案&#xff1a;https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html 思路流程&#xff1a; 1. 信息收集 nmap扫描发现开了22端口和两个web端口&#xff08;80和443&#xff09;。 注意这里信息收集到到位&#xff0c;获取的信息多一些。 使用nmap默认脚…

常见的降维技术比较:能否在不丢失信息的情况下降低数据维度

本文将比较各种降维技术在机器学习任务中对表格数据的有效性。我们将降维方法应用于数据集&#xff0c;并通过回归和分类分析评估其有效性。我们将降维方法应用于从与不同领域相关的 UCI 中获取的各种数据集。总共选择了 15 个数据集&#xff0c;其中 7 个将用于回归&#xff0…

电子招标采购系统源码—互联网+招标采购

​ ​ 智慧寻源 多策略、多场景寻源&#xff0c;多种看板让寻源过程全程可监控&#xff0c;根据不同采购场景&#xff0c;采取不同寻源策略&#xff0c; 实现采购寻源线上化管控&#xff1b;同时支持公域和私域寻源。 询价比价 全程线上询比价&#xff0c;信息公开透明&#x…