flink日志实时采集写入Kafka/ElasticSearch

news2024/11/25 7:01:39

目录

  • 背景
  • 注意点
  • 自定义Appender
  • log4j配置文件
  • 启动脚本
  • 实现效果

背景

由于公司想要基于flink的日志做实时预警功能,故需要实时接入,并刷入es进行分析。

注意点

日志接入必须异步,不能影响服务性能

kafka集群宕机,依旧能够提交flink任务且运行任务

kafka集群挂起恢复,可以依旧续写实时运行日志

自定义Appender

在类上加上@Plugin注解,标记为自定义appender

@Plugin(name = "KafkaAppender", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {}

在类加上@PluginFactory和@PluginAttribute来配合log4j.properties来传递参数

@PluginFactory
    public static KafkaAppender createAppender(
            /** 发送到的Topic */
            @PluginAttribute("topic") String topic,
            /** Kafka地址 */
            @PluginAttribute("kafkaBroker") String kafkaBroker,
            /** 设置的数据格式Layout */
            @PluginElement("Layout") Layout<? extends Serializable> layout,
            @PluginAttribute("name") String name,
            @PluginAttribute("append") boolean append,
            /** 日志等级 */
            @PluginAttribute("level") String level,
            /** 设置打印包含的包名,前缀匹配,逗号分隔多个 */
            @PluginAttribute("includes") String includes,
            /** 设置打印不包含的包名,前缀匹配,同时存在会被排除,逗号分隔多个 */
            @PluginAttribute("excludes") String excludes) {
        return new KafkaAppender(name, topic, kafkaBroker, null, layout, append, level, includes, excludes);
    }

在append中对每一条日志进行处理

    @Override
    public void append(LogEvent event) {
        if (event.getLevel().isMoreSpecificThan(this.level)) {
            if (filterPackageName(event)) {
                return;
            }
            try {
                if (producer != null) {
                    CompletableFuture.runAsync(() -> {
                        producer.send(new ProducerRecord<String, String>(topic, getLayout().toSerializable(event).toString()));
                    }, executorService);
                }
            } catch (Exception e) {
                LOGGER.error("Unable to write to kafka for appender [{}].", this.getName(), e);
                LOGGER.error("Unable to write to kafka in appender: " + e.getMessage(), e);
            } finally {
            }
        }
    }

源码地址https://gitee.com/czshh0628/realtime-log-appender

log4j配置文件

日志接入kafka

在flink的conf目录的log4j.properties里添加如下配置

# 自定义的Kafka配置
rootLogger.appenderRef.kafka.ref=KafkaAppender
appender.kafka.type=KafkaAppender
appender.kafka.name=KafkaAppender
# 日志发送到的Topic
appender.kafka.topic=cdc
# Kafka Broker
appender.kafka.kafkaBroker=xxx:9092,xxx:9092
# kerberos认证
http://appender.kafka.keyTab=xxx
http://appender.kafka.principal=xxx
# 发送到Kafka日志等级
appender.kafka.level=info
# 过滤指定包名的文件
appender.kafka.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
## kafka的输出的日志pattern
appender.kafka.layout.type=PatternLayout
appender.kafka.layout.pattern={"logFile":"${sys:log.file}","taskId":"${sys:taskId}","taskVersion":"${sys:taskVersion}","logTime":"%d{yyyy-MM-dd HH:mm:ss,SSS}","logMsg":"%-5p %-60c %x - %m","logThrow":"%throwable"}

日志接入elasticsearch

# 自定义的es的配置
rootLogger.appenderRef.es.ref=EsAppender
appender.es.type=EsAppender
appender.es.name=EsAppender
appender.es.hostname=bigdata
appender.es.port=9200
appender.es.index=flink_logs
appender.es.fetchSize=100
appender.es.fetchTime=5000
appender.es.level=info
appender.es.includes=com.*,org.apache.hadoop.yarn.client.*,org.*
appender.es.layout.type=PatternLayout
appender.es.layout.pattern={"logFile":"${sys:log.file}","taskId":"${sys:taskId}","taskVersion":"${sys:taskVersion}","logTime":"%d{yyyy-MM-dd'T'HH:mm:ss,SSS'Z'}","logMsg":"%-5p %-60c %x - %m","logThrow":"%throwable"}

启动脚本

bin/flink run -m yarn-cluster -p 1 -yjm 1024 -ytm 1024 -ys 1 -yD env.java.opts="-DtaskId=1000 -DtaskVersion=1.0" -c czs.study.flinkcdc.mysql.MySqlCdcStream test.jar

实现效果

image-20230425153320114

image-20230425153344392

image-20230425154607978

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我们公司的面试,有点不一样!

我们公司的面试&#xff0c;有点不一样&#xff01; 朋友们周末愉快&#xff0c;我是鱼皮。因为我很屑&#xff0c;所以大家也可以叫我屑老板。 自从我发了自己创业的文章和视频后&#xff0c;收到了很多小伙伴们的祝福&#xff0c;真心非常感谢&#xff01; 不得不说&#…

如何写出CPU友好的代码,百倍提升性能?

作者&#xff1a;王再军 不管是什么样的数据&#xff0c;投其所好&#xff0c;才能够优化代码性能。本文将用一个实际用例为大家分享如何通过用心组织的代码来提升性能。 一、出现性能差别的代码 CPU友好的代码与我们平时的那些CRUD操作可能没什么关系。但是用心组织的代码其实…

开源模型ModelScope的初探使用

泛AI开发者的一站式模型服务产品平台 阿里继续沿用它的平台思维&#xff0c;搞了这个ModelScope训练模型平台&#xff0c;一边开源一部分模型&#xff0c;一边在阿里云上卖自己的付费版&#xff0c;套路依旧没变&#xff0c;不过对AI相关模型感兴趣的同学&#xff0c;想做业务…

202303最新各大厂大数据核心面试题

1、 字节、阿里、拼多多、中移杭研、海亮等:Hive做过哪些实际优化?必须结合实际项目来谈,结合我实际离线数仓里做的优化? 本人回答: 1.小文件的优化(解决方法是combineHiveinput、merge、jvm重用等) 2.数据倾斜的优化:

Flutter 小技巧之横竖列表的自适应大小布局支持

今天这个主题看着是不是有点抽象&#xff1f;又是列表嵌套&#xff1f;之前不是分享过《 ListView 和 PageView 的各种花式嵌套》了么&#xff1f;那这次的自适应大小布局支持有什么不同&#xff1f; 算是某些奇特的场景下才会需要。 首先我们看下面这段代码&#xff0c;基本逻…

android studio EditText用法

1.自定义文本框 选中状态&#xff1a; <?xml version"1.0" encoding"utf-8"?> <shape xmlns:android"http://schemas.android.com/apk/res/android"><!--指定形状内部颜色--><solid android:color"#ffffff"&g…

机器学习在生态、环境经济学中的实践技术应用及论文写作

近年来&#xff0c;人工智能领域已经取得突破性进展&#xff0c;对经济社会各个领域都产生了重大影响&#xff0c;结合了统计学、数据科学和计算机科学的机器学习是人工智能的主流方向之一&#xff0c;目前也在飞快的融入计量经济学研究。表面上机器学习通常使用大数据&#xf…

点了下链接信息就泄露了,ta们是怎么做到的?

随着互联网的普及以及一系列可供上网设备的快速发展&#xff0c;截止2022年12月&#xff0c;中国网民规模达10.37亿&#xff0c;较之2021年12月增长3549万&#xff0c;互联网普及率达75.6%&#xff1b;在这么庞大的数据背后又有多少用户的个人信息被泄露呢? 一、信息泄露常见场…

2023 年最全面的 DevOps 工具列表,你用过几个?

在软件开发领域&#xff0c;DevOps已经成为越来越重要的概念。它强调了开发、测试、运维等各个环节之间的协作和自动化&#xff0c;以提高软件交付的速度和质量。随着时间的推移&#xff0c;DevOps所涉及的工具也不断更新和演进。本文将介绍一个预计在 2023 年最全面的 DevOps …

elementui中使用响应式布局实现五个盒子一行的适配

一、使用elementui中的自定义标签 自定义标签之后&#xff0c;浏览器中的css样式会出现这个类名 <el-row :gutter"30" class"row-bg"><el-col:xs"8":sm"6":md"4":lg"{ span: 24-5 }"class"headerC…

开发框架Furion之Winform+SqlSugar

目录 1.开发环境 2.项目搭建 2.1 创建WinFrom主项目 2.2 创建子项目 2.3 实体类库基础类信息配置 2.3.1 Nuget包及项目引用 2.3.2 实体基类创建 2.4 仓储业务类库基础配置 2.4.1 Nuget包及项目引用 2.4.2 Dtos实体 2.4.3 仓储基类 2.5 service注册类库基础配置 2…

【图形数据库】Neo4j简介及应用场景

文章目录 1.什么是Neo4j?2.图形数据结构3.Neo4j应用场景3.1我们可以将图领域划分成以下两部分&#xff1a;3.2目前&#xff0c;业内已经有了相对比较成熟的基于图数据库的解决方案&#xff0c;大致可以分为以下几类。3.2.1金融行业应用3.2.2社交网络图谱3.2.3企业关系图谱 总结…

Linux进程通信:存储映射mmap

1. 存储映射是什么&#xff1f; 如上图&#xff0c;存储映射是将块设备的文件映射到进程的虚拟地址空间。之后&#xff0c;进程可以直接使用指针操作其地址空间中映射的文件&#xff0c;对这块映射区操作就相当于操作文件。 2. 存储映射函数mmap的简单使用 &#xff08;1&…

网络安全岗位面试题大全:解析各个分支岗位的面试题目,帮助你上岸大厂

网络安全是一个广泛的领域&#xff0c;涵盖了许多不同的岗位和分支。我整理了网络安全各个岗位分支的面试题目&#xff1a; 安全工程师/系统管理员 您如何确保网络系统的安全性和保密性&#xff1f;您采用了哪些技术和工具&#xff1f;请描述一下您在过去工作中遇到的最具挑战…

C++ -5- 内存管理

文章目录 C语言和C内存管理的区别示例1. C/C 中程序内存区域划分2. C中动态内存管理3.operator new 与 operator delete 函数4.new 和 delete 的实现原理5.定位new表达式 C语言和C内存管理的区别示例 //C语言&#xff1a; struct SListNode {int data;struct SListNode* next; …

什么是内存?什么是内存逃逸?怎么做内存逃逸分析

内存 平时我们在电脑上听歌&#xff0c;聊天&#xff0c;或者启动某个程序&#xff0c;那么这个启动过程&#xff0c;其实就是把程序从硬盘读入到内存中去。就像安卓手机&#xff0c;内存不够了很卡&#xff0c;杀掉几个软件&#xff0c;内存就升上来了。但也不是所有的程序都…

产品经理需要了解api接口的哪些东西

一、作为产品经理&#xff0c;需要了解API接口的以下方面&#xff1a; 功能&#xff1a;API接口的功能是指它提供的业务功能&#xff0c;包括数据查询、修改、增加、删除、计算等等&#xff0c;根据产品的需求确定需要调用哪些API接口。请求方式和传参&#xff1a;API接口的请…

致力提供一站式数据可视化解决方案,支持报表、图表、大屏

一、开源项目简介 Davinci是一个DVAAS&#xff08;Data Visualization as a Service&#xff09;平台解决方案。 Davinci面向业务人员/数据工程师/数据分析师/数据科学家&#xff0c;致力于提供一站式数据可视化解决方案。既可作为公有云/私有云独立使用&#xff0c;也可作为…

Linux进程通信:信号

1. 信号的概念 Linux进程间通信的方式之一。信号也称为“软件中断”。 信号特点&#xff1a; 简单&#xff1b;携带信息有限&#xff1b;满足特定条件才发送信号&#xff1b;可进行用户空间和内核空间进程的交互&#xff1b; 2. 信号的编号 kill -l // 查看信号编号 POS…

ModelArts的使用

完整流程第一个实例&#xff1a;AI初学者&#xff1a;使用订阅算法构建模型实现花卉识别_AI开发平台ModelArts_最佳实践_模型训练&#xff08;预置算法-新版训练&#xff09;_华为云 一、支持的模型 可以在gitee上下载标准网络模型&#xff1a; models: Models of MindSpore …