0301taildir-source报错-flume-大数据

news2024/12/25 0:30:09

1 基础环境简介

linux系统:centos,前置安装:jdk、hadoop、zookeeper、kafka,版本如下

软件版本描述
centos7linux系统发行版
jdk1.8java开发工具集
hadoop2.10.0大数据生态基础组件
zookeeper3.5.7分布式应用程序协调服务
kafka3.0分布式mq组件
flume1.9.0分布式采集传输组件

2 报错

  • 场景1:动态监控目录多个日志变化,通过flume采集传输到kafka

  • 报错日志

    org.apache.flume.FlumeException: Error creating positionFile parent directories
            at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)
            at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
            at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)
            at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
            at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flume
            at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
            at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
            at java.nio.file.Files.createDirectory(Files.java:674)
            at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
            at java.nio.file.Files.createDirectories(Files.java:727)
            at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)
            ... 11 more
    
    
  • conf文件如下

    #定义组件
    a1.sources = r1
    a1.channels = c1
    
    #配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /export/server/flume/taildir_position.json
    
    #配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
    a1.channels.c1.kafka.topic = topic_log01
    a1.channels.c1.parseAsFlumeEvent = false
    
    #组装 
    a1.sources.r1.channels = c1
    
  • 原因就是在创建positionFile的时候父目录已存在

  • 场景2:我们生成的日志文件app.log 每经过一天会按照日期重命名文件,然后生成新的app.log,此时flume会重新采集所有的日志信息,导致信息重复采集2次。

  • Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.t
    
    xt"}
    
    {"inode":2496275,"pos":12,"file":"/opt/module/flume/files2/log.t
    
    xt"}
    
  • 而flume会同时判断Inode和file来确定是否同一文件

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统

    用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来

    识别文件。

3 解决

场景1解决方案有两种:

  1. 既然是创建父目录已存在,我们可以吧positionFile位置重新配置。

  2. 修改源代码,我们通过源代码找下处理逻辑,下载1.9.0版本的flume源代码,官网地址:https://archive.apache.org/dist/flume/,找到TailSource 170行

     @Override
      public synchronized void configure(Context context) {
        String fileGroups = context.getString(FILE_GROUPS);
        Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);
    
        filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
                                 fileGroups.split("\\s+"));
        Preconditions.checkState(!filePaths.isEmpty(),
            "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");
    
        String homePath = System.getProperty("user.home").replace('\\', '/');
        positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);
        Path positionFile = Paths.get(positionFilePath);
        try {
          // 此处创建父目录,如果存在报错
          Files.createDirectories(positionFile.getParent());
        } catch (IOException e) {
          throw new FlumeException("Error creating positionFile parent directories", e);
        }
        headerTable = getTable(context, HEADERS_PREFIX);
        batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
        skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
        byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
        idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
        writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
        cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
            DEFAULT_CACHE_PATTERN_MATCHING);
    
        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
            PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
            PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        fileHeader = context.getBoolean(FILENAME_HEADER,
                DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
                DEFAULT_FILENAME_HEADER_KEY);
        maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
        if (maxBatchCount <= 0) {
          maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
          logger.warn("Invalid maxBatchCount specified, initializing source "
              + "default maxBatchCount of {}", maxBatchCount);
        }
    
        if (sourceCounter == null) {
          sourceCounter = new SourceCounter(getName());
        }
      }
    

    在这里插入图片描述

可以在创建父目录之前检测是否已存在,如果已存在,直接跳过创建即可,修改try代码块中内容如下

boolean exists = Files.exists(positionFile.getParent());
      if (!exists)
        Files.createDirectories(positionFile.getParent());

maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示:在这里插入图片描述

重新运行,正常启动,如下图日志所示:在这里插入图片描述

kafka中新接收的数据如下图所示:在这里插入图片描述

场景2解决方案 把TailFile如下代码

  public boolean updatePos(String path, long inode, long pos) throws IOException {
    if (this.inode == inode && this.path.equals(path)) {
      setPos(pos);
      updateFilePos(pos);
      logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
      return true;
    }
    return false;
  }
  
  // 修改为
    public boolean updatePos(String path, long inode, long pos) throws IOException {
    if (this.inode == inode) {
      setPos(pos);
      updateFilePos(pos);
      logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
      return true;
    }
    return false;
  }

即不校验file只校验inode,具体这里不再去验证,有兴趣自己验证下

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

参考链接:

[1]flume教学视频[CP/OL].2020-04-16.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成都正信晟锦:亲戚借了钱不认账怎么办是现金

在人际交往中&#xff0c;金钱往来往往是敏感而复杂的议题&#xff0c;尤其是与亲戚间的借贷。若遭遇亲戚借了钱却不认账的尴尬局面&#xff0c;我们该如何妥善处理呢? 沟通始终是解决问题的第一步。尝试与该亲戚进行坦诚的对话&#xff0c;了解不认账的原因。可能是对方遇到了…

基于springboot的车辆充电桩管理平台

技术&#xff1a;springbootmysqlvue 一、背景 科学技术日新月异的如今&#xff0c;计算机在生活各个领域都占有重要的作用&#xff0c;尤其在信息管理方面&#xff0c;在这样的大背景下&#xff0c;学习计算机知识不仅仅是为了掌握一种技能&#xff0c;更重要的是能够让它真正…

【深度学习】手动实现RNN循环神经网络

&#x1f33b;个人主页&#xff1a;相洋同学 &#x1f947;学习在于行动、总结和坚持&#xff0c;共勉&#xff01; 目录 01 回顾 02 RNN神经网络原理 03 RNN神经网络实现 04 RNN神经网络实验 RNN的特别结构使得RNN具备了短期记忆能力&#xff0c;使其能够学习部分语义信息…

php 对接IronSource海外广告平台收益接口Reporting API

今天对接的是IronSource广告reporting api接口&#xff0c;拉取广告收益回来自己做统计。记录分享给大家 首先是文档地址,进入到IronSource后台就能看到文档地址以及参数&#xff1a; 文档地址&#xff1a;https://developers.is.com/ironsource-mobile/air/reporting/ 在这里插…

用户中心项目(前后端环境搭建)

文章目录 1.企业做项目流程2.需求分析1.登录/注册2.用户管理&#xff08;仅管理员可见&#xff09;3.用户校验 3.技术选型4.计划5.初始化项目1.前端初始化1.ant design pro 官网2.ant design pro 初始化1.本地创建一个文件夹存放项目2.cmd进入该目录3.根据官网进行初始化项目4.…

《C语言深度剖析》---------关键字(1)

1.双击实质--->加载内存 windows系统里面&#xff0c;双击的本质就是运行程序&#xff0c;把程序加载到内存里面&#xff1b; 任何程序运行的时候都必须加载到内存里面&#xff1b; 程序没有运行之前在硬盘里面&#xff0c;为什么程序运行之前必须加载到内存里面呢&#…

免费的chatgpt网站(包含最新版4.0)

相信每个人在生活工作学习中都逃不过用chatgpt来解决一些问题&#xff0c;下面我长话短说&#xff0c;为大家简单介绍几款免费且好用的chatgpt网站 1、YesChat 网址&#xff1a;YesChat-ChatGPT4V Dalle3 Claude 3 All in One Free 第一个就给大家介绍一个狠角色&#xff0c;最…

万界星空科技铜杆加工行业生产管理MES系统

传统的铜管加工方法有&#xff1a; &#xff08;1&#xff09;铜管挤压加工技术&#xff08;2&#xff09;铜管上引连铸法&#xff08;3&#xff09;铜管(有缝)焊接生产技术&#xff08;4&#xff09;铸轧法生产精密铜管铸轧法 生产精密铜管是一种全新的生产工艺&#xff0c;…

154.乐理基础-和弦固定标记法(三)九音、十一音、十三音

如果到这五线谱还没记住还不认识的话去看102.五线谱-高音谱号与103.五线谱-低音谱号这两个里&#xff0c;这里面有五线谱对应的音名&#xff0c;对比着看 内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;153.和弦的织体 上一个内容里练习的答案&#xff1a; 接下…

开放签开源电子签章白皮书-简版

开放签开源电子签章白皮书-简版 一、摘要&#xff1a; 开放签电子签章团队源自于电子合同SaaS公司&#xff0c;立志于通过开源、开放的模式&#xff0c;结合团队十多年的行业经验&#xff0c;将电子签章产品更简单、更低门槛的推广到各行各业中。让电子签章应用更简单&#x…

31.HarmonyOS App(JAVA)鸿蒙系统app Service服务的用法

鸿蒙系统app Service服务的用法 后台任务调度和管控 HarmonyOS将应用的资源使用生命周期划分为前台、后台和挂起三个阶段。前台运行不受资源调度的约束&#xff0c;后台会根据应用业务的具体任务情况进行资源使用管理&#xff0c;在挂起状态时&#xff0c;会对应用的资源使用进…

2024年熔化焊接与热切割证模拟考试题库及熔化焊接与热切割理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年熔化焊接与热切割证模拟考试题库及熔化焊接与热切割理论考试试题是由安全生产模拟考试一点通提供&#xff0c;熔化焊接与热切割证模拟考试题库是根据熔化焊接与热切割最新版教材&#xff0c;熔化焊接与热切割大…

n皇后问题(典dfs )注意对角线状态判断

思路&#xff1a;用的dfs思想&#xff0c;第一种是全排列思路&#xff0c;和数字排列同样的步骤。要注意对对角线的判断。下面画了个图简单示意一下&#xff0c;但是 u 和 i 的位置变了&#xff0c;在代码里呈现不一样。明天再改吧。先睡了。 代码&#xff1a; #include<io…

IDE(集成开发环境)插件是安全开发的便捷方式之一

开发人员每天都使用插件&#xff0c;插件的功能在于简化开发流程&#xff0c;例如自动检测所有特殊字符&#xff08;如“;”、“:”&#xff09;或语法合规性。创建插件的目的本身就是为了让开发人员能够在编写代码时检测漏洞&#xff0c;并在无需离开 IDE 环境的情况下立即修复…

STM32CubeMX学习笔记25---FreeRTOS信号量

一、信号量简介 信号量用于同步&#xff0c;任务间或者任务和中断间同步 互斥量用户互锁&#xff0c;用于保护同时只能有一个任务访问的资源&#xff0c;为资源上一把锁。 二值信号量&#xff1a;同步。如果存在两个线程&#xff0c;为线程1和线程2&#xff0c;如果线程1发送了…

大载重无人机基础技术,研发一款50KG负重六旋翼无人机技术及成本分析

六旋翼无人机是一种多旋翼无人机&#xff0c;具有六个旋翼&#xff0c;通常呈“X”形布局。它采用电动串列式结构&#xff0c;具有垂直起降、悬停、前飞、后飞、侧飞、俯仰、翻滚等多种飞行动作的能力。六旋翼无人机通常被用于航拍、农业植保、环境监测、地形测绘等领域。 六旋…

PolarDN MISC(简单)大礼包 :详细思路过程

0和255 题目给了俩个文件&#xff0c;一个.txt,一个.py .txt文件中包含0和255 一个字节有八位&#xff0c;每一位只能储存1或0&#xff0c;计算机只懂二进制&#xff0c;所以就是2的八次方&#xff0c;又计算机规定从0开始计数&#xff0c;所以是0至255 考虑用编码转换工具将其…

Android: Gradle 命令

一、查看整个项目依赖传递关系 x.x.x (*) 该依赖已经有了&#xff0c;将不再重复依赖。x.x.x -> x.x.x 该依赖的版本被箭头所指的版本代替。x.x.x -> x.x.x(*) 该依赖的版本被箭头所指的版本代替&#xff0c;并且该依赖已经有了&#xff0c;不再重复依赖。 1. gradlew ap…

redis常用五大数据类型

目录 Key 字符串String 常用命令 列表List 常用命令 集合Set 常用命令 Hash哈希 键值对集合 有序集合Zset Redis新数据类型 Key set key value...添加keykeys *查看当前库中所有的keyexist key该key是否存在type keykey的类型del key删除keyunlink key根据value选择非阻塞…

基于springboot+vue的火锅店管理系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…