logstash同步数据从kafka到es集群

news2025/1/8 11:54:18

背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想到了通过一个中间件来做,Python读取成功一个txt文件,并且加载到kafka中以后,就将这个txt文件备份然后删除掉原始文件。

第一步:向kafka中添加数据,我用Python连接kafka集群,向其中加载数据

# -*- coding: utf-8 -*-

import json
import json
import msgpack
from loguru import logger
from kafka import KafkaProducer
from kafka.errors import KafkaError

def kfk_produce_1():
    """
        发送 json 格式数据
    :return:
    """
    producer = KafkaProducer(
        bootstrap_servers='192.168.85.109:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    #logstash-topic-one
    #producer.send('python_test_topic', {'key': 'value'})
    producer.send('logstash-topic-one', {'name': 'value'})

kfk_produce_1()
执行完的结果,来界面工具上看,显示这样,说明数据已经加载进来了

在这里插入图片描述

第二步:配置logstash,将kafka中的数据加载到es集群中

编写的logstash.conf配置如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}

第三步:执行logstash,通过kibana查看数据是否在es集群中,展示如下,则说明配置是正确的

在这里插入图片描述

在这里插入图片描述

问题1:现在发现,name字段是在message下面,如果是多个字段的话,不方便查询,想着怎么讲字段从message中弄出来,修改的配置如下,增加一段这样的代码就OK了

type => "json"
        codec => json {
            charset => "UTF-8"
        }

完整的配置文件logstash.conf代码如下;

input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
        type => "json"
        codec => json {
            charset => "UTF-8"
        }
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}

然后我又造了一个多字段的场景如下;

在这里插入图片描述

我先去logstash中查看日志如下,字段已经分离出来了

{
          "name" => "value",
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:13:48.825Z
}
{
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:20:57.729Z,
          "name" => "令狐冲",
           "age" => "30",
        "height" => "180cm"
}

去kibana中去查询,显示如下,测试成功喽,😄

在这里插入图片描述
在这里插入图片描述

问题2:在查询结果中发现,有些字段是没有用的,看看怎么去掉?

在配置文件中增加一个过滤器就可以解决了

filter { mutate {
                 remove_field => ["@version","@timestamp","type"] # 删除字段
                 }
 }

然后再去kibana中去查看,就发现这会儿的字段格式非常好看了,😄

在这里插入图片描述

文档后续再继续完善,有好的建议或者问题可以留言交流,😄

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/536619.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据仓库是什么?什么是列式存储?

事务和分析 在早期的业务数据处理过程中,一次典型的数据库写入通常与一笔 商业交易(commercial transaction) 相对应:卖个货、向供应商下订单、支付员工工资等等。但随着数据库开始应用到那些不涉及到钱的领域,术语 交…

Liunx 套接字编程(2)TCP接口通信程序

1.TCP通信程序的编写 面向连接、可靠传输、提供字节流传输服务 客户端向服务器发送一个连接建立的请求流程,上图中服务端第三步详细流程 2.TCP接口 socket--创建套接字 int socket(int domain, int type, int protocol); bind---绑定 intbind(int sockfd, struct s…

自动化测试工具 —— selenium介绍及基本使用方法

Selenium是一个开源、免费、简单、灵活,对Web浏览器支持良好的自动化测试工具,在UI自动化、爬虫等场景下是十分实用的,能够熟练掌握并使用Selenium工具可以大大的提高效率。 Selenium简介 Selenium支持多平台、多浏览器、多语言去实现自动化…

声音合成——Foley Sound——DECASE项目——多模态智能感知与应用——项目复现

文章目录 概述项目复现配置环境下载并配置文件运行代码第一阶段,训练提取DTFR特征的模型资料搜集 train_vqvae.py 第二阶段,使用训练好的模型提取声音的DTFR特征torch.cuda.OutOfMemoryError: CUDA out of memory. 第三阶段,基于特征训练合成…

【软件测试】支付模块测试攻略,这些测试方法和注意事项你掌握了么?

对于大部分人而言,支付模块或许是日常生活中最为关注和使用的功能之一,因此,对于支付模块的质量控制也显得尤为重要。 但考虑到支付涉及到金钱流转等敏感信息,一旦出现问题可能带来非常严重后果。因此,在支付模块测试…

FastAPI 的路由介绍及使用

上一篇文章中,我介绍了 FastAPI 框架的安装和 HelloWorld 项目搭建方式。本文将介绍如何使用 Router 路由处理 FastAPI 中的请求。 什么是路由 路由 Router 就像是一个流水线上的线长,协调生产,下达命令给不同的组长进行分工,然…

Springboot——事物管理

文章目录 事务管理一、 Spring事务管理1.1 事务回顾1.2 案例: 解散部门(未开启事务)1.3 事务管理注解Transactional1.4 事务管理日志开关1.5 rollbackFor 异常回滚属性1.6 propagation 事务传播行为1.7 解散部门并记录操作日志1.7.1 创建数据…

Java 8 腰斩!Java 17 暴涨 430%!!(文末福利)

New Relic 最新发布了一份 “2023 年 Java 生态系统状况报告”,旨在提供有关当今 Java 生态系统状态的背景和见解。该报告基于从数百万个提供性能数据的应用程序中收集的数据,对生产中使用最多的版本、最受欢迎的 JDK 供应商、容器的兴起等多方面进行了调…

AIGC+实时云渲染:开启3D内容生态的黄金时代

AIGC技术革命下,我们的3D内容生态将会迎来怎样的变化格局? 实时云渲染 / Cloud XR技术将在AIGC大潮中扮演什么样的角色? 作为云基础设施厂商,我们有哪些机会可以抓住? 这些问题已在XR产业、3D内容行业以及软件行业内…

人工智能基础部分18-条件随机场CRF模型的应用

大家好,我是微学AI,今天给大家介绍一下人工智能基础部分18-条件随机场CRF模型的应用,本文将详细介绍条件随机场(CRF)模型,包括其原理、应用场景及实际代码实现。我将通过一个生活中的简单数据样例来演示如何输入数据、运行模型以及…

prometheus监控redis集群并显示到granfana面板

prometheus监控redis集群 监控redis1,在redis机器上安装redis_exporter2,配置prometheus配置文件3,设置redis maxmemory4,导入redis监控模板736 监控redis 1,在redis机器上安装redis_exporter 下载,安装r…

什么是边缘服务器?边缘计算的未来如何?

边缘服务器是指驻留在网络逻辑边缘上的任何类型的服务器,通常位于专用网络和互联网之间。边缘服务器的计算能力正在迅速塑造现代工业格局,动态应用程序和现代业务基础设施的出现使得数据的快速处理和共享成为必然。公司现在正在用边缘服务器技术取代传统…

什么是Pinia?以及它的使用方式?以及和Vuex的区别是什么?

文章目录 概要什么是Pinia?Pinia和Vuex的区别?Pinia相比于Vuex的优势?为什么要使用Pinia?Pinia的简单使用安装(仅限于Vue3)使用 概要 提示:下边是Pinia的简单介绍(详解请点击查看官方…

深度解析接口自动化框架封装项目:封装层级,关联调用,极限改进

目录 前言: 一、接口封装与封装层级 二、接口关联和数据准备 三、接口封装极限改进 四、代码示例 五、总结 前言: 接口自动化是软件测试领域中的一个重要环节,它可以自动化执行接口测试用例,快速发现和定位接口问题&#xf…

JAVA的基本数据类型及扩大缩小转换

JAVA的8种基本类型 分为四大类:整形、浮点型、字符型、布尔型 数据类型类别大小(位)范围byte整型8-128 到 127short整型16-32768 到 32767int整型32-2147483648 到 2147483647long整型64-9223372036854775808 到 9223372036854775807float浮点型32约3.40282347e38…

房屋装修选择自装,如何寻找砌墙工人,比价并施工(砌墙阶段)

环境: 地点:杭州 装修类型:自装 面积:建面135平方 进度:砌墙阶段 问题描述: 房屋装修选择自装,如何寻找砌墙工人,比价并施工 解决方案: 一、了解砌墙相关知识 砌…

docker 安装 prometheus,grafana,node-exporter 监控工具

:https://download.csdn.net/download/qq_42208305/87792827 加载离线镜像 : 监控端安装: docker load -i prometheus.tar docker load -i grafana.tar 被监控端安装: docker load -i node-expo…

类加载器与模块化系统

1 类加载器 “类加载器”是实现应用程序自己决定如何去获取所需的类这个动作的代码。 1.1 类与类加载器 比较两个类是否“相等”,只有在这两个类是由同一个类加载器加载的前提下才有意义。否则,即使这两个类来源同一个Class文件,被同一个J…

智慧型档案馆十防一体化安全管控平台所需要的主要产品

档案八防十防常用的十款设备 序号 名称 1 温湿度传感器 2 空气质量云测仪 3 恒湿净化一体机 4 健康防护一体机 5 综合智能触摸一体化区域控制器 6 空调红外学习控制模块 7 漏水检测控制器及感应线 8 数字烟雾传感器 9 红外防盗传感器 10 系统软件平台 附…

无脑006——mmrotate框架下复现RTMDet-R

必须用mmrotate才有hrsc的程序 1 安装环境: Linux RTX 3090 nvcc --version cuda 11.3 pytorch 1.11.0 conda install pytorch1.11.0 torchvision0.12.0 torchaudio0.11.0 cudatoolkit11.3 -c pytorch测试pytorch是否安装成功: >>> torch.…