大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

news2025/1/18 16:53:45

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

flume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.23</version>
  </dependency>
</dependencies>

编写代码

package icu.wzk;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 这里是逐条处理
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
        // 获取Event的Header
        Map<String, String> headerMap = event.getHeaders();
        // 解析Body获取JSON字符串
        String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            // 解析JSON字符串获取时间戳
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
            // 将时间戳转换字符串 yyyy-MM-dd
            // 将字符串转换为Long
            long timestampLong = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestampLong);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // 将转换后的字符串放置header中
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "Unknown");
            event.setHeaders(headerMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event : list) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stm32启动过程解析startup启动文件

1.STM32的启动过程模式 1.1 根据boot引脚决定三种启动模式 复位后&#xff0c;在 SYSCLK 的第四个上升沿锁存 BOOT 引脚的值。BOOT0 为专用引脚&#xff0c;而 BOOT1 则与 GPIO 引脚共用。一旦完成对 BOOT1 的采样&#xff0c;相应 GPIO 引脚即进入空闲状态&#xff0c;可用于…

如何在项目中用elementui实现分页器功能

1.在结构部分复制官网代码&#xff1a; <template> 标签: 这是 Vue 模板的根标签&#xff0c;包含所有的 HTML 元素和 Vue 组件。 <div> 标签: 这是一个普通的 HTML 元素&#xff0c;包裹了 el-pagination 组件。它没有特别的意义&#xff0c;只是为了确保 el-pagi…

15-大模型 RAG 经验篇

一、LLMs 已经具备了较强能力了&#xff0c;存在哪些不足点? 在 LLM 已经具备了较强能力的基础上&#xff0c;仍然存在以下问题&#xff1a; 幻觉问题&#xff1a;LLM 文本生成的底层原理是基于概率的 token by token 的形式&#xff0c;因此会不可避免地产生"一本正经…

数据结构-二叉树及其遍历

🚀欢迎来到我的【数据结构】专栏🚀 🙋我是小蜗,一名在职牛马。🐒我的博客主页​​​​​​ ➡️ ➡️ 小蜗向前冲的主页🙏🙏欢迎大家的关注,你们的关注是我创作的最大动力🙏🙏🌍前言 本篇文章咱们聊聊数据结构中的树,准确的说因该是只说一说二叉树以及相…

Iview DatePicker 仅允许选择当前月份及以后的月份

iview DatePicker之前月份禁用且下月可用 html代码 <DatePicker type"month" :options"options4" :value"dialogForm.estimatedStartTimeWithCreate" on-change"monthTime($event, loadDateStart)" placeholder"请选择时间&q…

r-and-r——提高长文本质量保证任务的准确性重新提示和上下文搜索的新方法可减轻大规模语言模型中的迷失在中间现象

概述 随着大规模语言模型的兴起&#xff0c;自然语言处理领域取得了重大发展。这些创新的模型允许用户通过输入简单的 "提示 "文本来执行各种任务。然而&#xff0c;众所周知&#xff0c;在问题解答&#xff08;QA&#xff09;任务中&#xff0c;用户在处理长文本时…

【GPTs】Ai-Ming:AI命理助手,个人运势与未来发展剖析

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 &#x1f4af;GPTs指令&#x1f4af;前言&#x1f4af;Ai-Ming主要功能适用场景优点缺点 &#x1f4af;小结 &#x1f4af;GPTs指令 中文翻译&#xff1a; defcomplete_sexagenary&#xff08;年&a…

ubuntu24.04网卡配置

vim /etc/netplan/01-netcfg.yaml /24表示子网掩码的长度。这种表示法称为CIDR&#xff08;无类别域间路由&#xff09;记法。CIDR记法将IP地址和它们的子网掩码合并为一个单一的值&#xff0c;其中斜杠/后面的数字表示子网掩码中连续的1的位数。 对于/24&#xff1a; 24表示…

【linux】如何扩展磁盘容量(VMware虚拟机)-转载

如何扩展磁盘容量(VMware虚拟机) 一、前置准备工作 扩展虚拟机磁盘前&#xff0c;需要先把虚拟机关机才能进行扩展磁盘操作 1.选择虚拟机设置&#xff0c;如下图所示 2.输入你想扩展的磁盘容量&#xff0c;以本次实操为例&#xff0c;我这里输入的30G&#xff08;具体按照实…

python 数据类型----可变数据类型

一、list列表类型&#xff1a; 一种有序集合&#xff0c;里面有多个数据用逗号隔开&#xff0c;可以对数据进行追加、插入、删除和替换&#xff1b;使用[]标识&#xff0c;可以包含任意数据类型 登录后复制 # 字符串类型列表 names[bill,may,jack]#整数型列表 numbers [1,2,34…

STARTS:一种用于自动脑电/脑磁(E/MEG)源成像的自适应时空框架|文献速递-基于深度学习的病灶分割与数据超分辨率

Title 题目 STARTS: A Self-adapted Spatio-temporal Framework for Automatic E/MEG SourceImaging STARTS&#xff1a;一种用于自动脑电/脑磁(E/MEG)源成像的自适应时空框架 01 文献速递介绍 电生理源成像&#xff08;Electrophysiological Source Imaging&#xff0c;E…

海康威视和大华视频设备对接方案

目录 一、海康威视 【老版本】 【新版本】 二、大华 一、海康威视 【老版本】 URL规定&#xff1a; rtsp://username:password[ipaddress]/[videotype]/ch[number]/[streamtype] 注&#xff1a;VLC可以支持解析URL里的用户名密码&#xff0c;实际发给设备的RTSP请求不支…

20.UE5UI预构造,开始菜单,事件分发器

2-22 开始菜单、事件分发器、UI预构造_哔哩哔哩_bilibili 目录 1.UI预构造 2.开始菜单和开始关卡 2.1开始菜单 2.2开始关卡 2.3将开始菜单展示到开始关卡 3.事件分发器 1.UI预构造 如果我们直接再画布上设计我们的按钮&#xff0c;我们需要为每一个按钮进行编辑&#x…

手搓神经网络(MLP)解决MNIST手写数字识别问题 | 数学推导+代码实现 | 仅用numpy,tensor和torch基本计算 | 含正反向传播数学推导

手写数字识别&#xff08;神经网络入门&#xff09; 文章目录 手写数字识别&#xff08;神经网络入门&#xff09;实验概述实验过程数据准备模型实现线性变换层前向传播反向传播更新参数整体实现 激活函数层&#xff08;ReLU&#xff09;前向传播反向传播整体实现 Softmax层&am…

极速入门数模电路

一. 认识数模元器件 1.1 面包板 1.2 导线 一般使用红色导线表示正极&#xff0c;黑色导线表示负极。 1.3 纽扣电池 1.4 电池座 1.4 LED灯 1.5 数码管 1.6 有源蜂鸣器 1.7 扬声器 1.8 电容 电容接电池之后可以充电&#xff0c;充完电后电容接LED灯可以放电。 1.9 电阻 1.1…

Windows docker下载minio出现“Using default tag: latestError response from daemon”

Windows docker下载minio出现 Using default tag: latest Error response from daemon: Get "https://registry-1.docker.io/v2/": context deadline exceeded 此类情况&#xff0c;一般为镜像地址问题。 {"registry-mirrors": ["https://docker.re…

使用MaxKB搭建知识库问答系统并接入个人网站(halo)

首发地址&#xff08;欢迎大家访问&#xff09;&#xff1a;使用MaxKB搭建知识库问答系统并接入个人网站 前言 从OpenAI推出ChatGPT到现在&#xff0c;大模型已经渗透到各行各业&#xff0c;大模型也逐渐趋于平民化&#xff1b;从最开始对其理解、生成、强大的知识积累的惊叹&…

数据库练习:查询操作

1. 查询出部门编号为D2019060011的所有员工 2. 所有财务总监的姓名、编号和部门编号。 3. 找出奖金高于工资的员工。 4. 找出奖金高于工资40%的员工。 5 找出部门编号为D2019090011中所有财务总监&#xff0c;和部门编号为D2019060011中所有财务专员的详细资料。 6. 找出部门编…

css数据不固定情况下,循环加不同背景颜色

<template><div><p v-for"(item, index) in items" :key"index" :class"getBackgroundClass(index)">{{ item }}</p></div> </template><script> export default {data() {return {items: [学不会1, …

【Python绘图】两种绘制混淆矩阵的方式 (ConfusionMatrixDisplay(), imshow()) 以及两种好看的colorbar

在机器学习领域&#xff0c;混淆矩阵是一个评估分类模型性能的重要工具。它不仅展示了模型预测的准确性&#xff0c;还揭示了模型在不同类别上的表现。本文介绍两种在Python中绘制混淆矩阵的方法&#xff1a;ConfusionMatrixDisplay() 和 imshow()&#xff0c;以及两种好看的co…