openresty(nginx)+lua+kafka实现日志搜集系统

news2025/1/24 5:34:45

        今天我们来实现一下快捷的nginx日志搜集系统,主讲的是nginx服务里面的openresty的日志搜集。采用的手段是采用lua做中间桥梁。

一、安装openresty

        具体安装步骤在这里《centos7 二进制安装openresty》

二、安装kafka

        1、安装Java及配置Java

                具体安装步骤在这里《采用ELK搭建日志平台,Java安装》

        2、安装ZooKeeper安装与配置

                a、下载ZooKeeper

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz

                b、解压到/usr/local/

mkdir -p /usr/local/zookeeper3.4
tar -zxf zookeeper-3.4.12.tar.gz -C /usr/local/zookeeper3.4 --strip-components=1 #--strip-components选项表示从目录级别上去除指定的前缀,以实现更加控制解压的效果

                c、配置环境变量

vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper3.4/
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile

                d、修改zookeeper3.4的配置文件

cd /usr/local/zookeeper3.4/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000 #ZooKeeper服务器心跳时间,单位为ms
initLimit=10 #允许follower连接并同步到leader的初始化连接时间,以tickTime的倍数来表示
syncLimit=5 #leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为follower"死掉",从服务器列表中删除follower
dataDir=/data/zookeeper/data #数据目录
dataLogDir=/data/zookeeper/data/log #日志目录
clientPort=2181 #ZooKeeper对外服务端口

                e、创建目录zookeeper3.4及存放服务器编号

mkdir -p /data/zookeeper/data/log
cd /data/zookeeper/data
touch myid
echo 0 > myid

                f、设置开机启动

vi /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=forking
Environment=ZOOKEEPER_HOME=/usr/local/zookeeper3.4/
Environment=JAVA_HOME=/usr/local/jdk/1.8.0_391
User=root
Group=root
ExecStart=/usr/local/zookeeper3.4/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper3.4/bin/zkServer.sh stop
[Install]
WantedBy=multi-user.target

systemctl enable zookeeper.service #加入开机启动
systemctl start zookeeper.service #启动
systemctl stop zookeeper.service #停止
systemctl status zookeeper.service #状态

                g、开放端口

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

        3、安装kafka及配置

       

                 a、下载kafka

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz

                b、解压到/usr/local/

mkdir -p /usr/local/kafka2.12
tar -zxf  kafka_2.12-2.8.1.tgz -C /usr/local/kafka2.12 --strip-components=1 #--strip-components选项表示从目录级别上去除指定的前缀,以实现更加控制解压的效果

                c、修改配置文件

cd /usr/local/kafka2.12/config/
vi server.properties
broker.id=0
log.dir=/data/kafka/logs #配置zookeeper管理kafka的路径
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092 #配置kafka的监听端口
advertised.listeners=PLAINTEXT://192.168.0.190:9092 #把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP

                d、创建kafka日志目录

mkdir -p /data/kafka/logs

                e、配置kafka快捷路径

vi /etc/profile
export KAFKA_HOME=/usr/local/kafka2.12
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile

                f、设置开机自动服务

vi /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
Requires=zookeeper.service
[Service]
Type=simple
User=root
Group=root
Environment=JAVA_HOME=/usr/local/jdk/1.8.0_391
Environment=KAFKA_HOME=/usr/local/kafka2.12
Environment=KAFKA_LOG_DIRS=/data/kafka/logs
ExecStart=/usr/local/kafka2.12/bin/kafka-server-start.sh /usr/local/kafka2.12/config/server.properties
ExecStop=/usr/local/kafka2.12/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target

systemctl enable kafka.service #加入开机启动
systemctl start kafka.service #启动
systemctl stop kafka.service #停止
systemctl status kafka.service #状态

                g、开放防火墙

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

三、安装openresty与lua的依赖

        1、下载lua-resty-kafka

wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

        2、安装unzip依赖

yum install unzip -y

        3、解压lua-resty-kafka-master.zip

unzip lua-resty-kafka-master.zip

        4、拷贝kafka插件到openresty目录下

cp -rf /data/file/lua-resty-kafka-master/lib/resty/kafka/ /usr/local/openresty/lualib/resty/kafka

        5、重新授权

chown -R www:www /usr/local/openresty

        6、nginx配置

server {
    listen        80 default_server;
    server_name _;
    error_page   401 /401.html;
    error_page   403 /403.html;
    error_page   404 /404.html;
    error_page      500 502 503 504 /50x.html;
    location / {
        access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线
        return 401;
    }
    location = /401.html {
        access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线
        root "/data/wwwroot/error";
    }
    location = /403.html {
        access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线
        root "/data/wwwroot/error";
    }
    location = /404.html {
        access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线
        root "/data/wwwroot/error";
    }
    location = /50x.html {
        access_by_lua_file /data/wwwroot/items-access.lua;#这个就是lua操作kafka的文件,成功可以走成功的路线,失败可以走失败的路线
        root "/data/wwwroot/error";
    }
}

        7、创建kafka主体

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log_topic

        8、编写lua脚本及上传到指定位置

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
 
-- Kafka 配置
local broker_list = {
      { host = "192.168.223.19", port = 9092 }
}

--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
local uri = ngx.var.uri

-- 定义消息内容
local log_data = {}
log_data["uri"] = uri;
log_data["request_uri"] = ngx.var.request_uri;
log_data["ip"] = ip;
log_data["host"] = ngx.var.host;
log_data["status"] = ngx.var.status;
log_data["bytes_sent"] = ngx.var.bytes_sent;
log_data["request_time"] = ngx.var.request_time;
log_data["http_referer"] = ngx.var.http_referer;
log_data["http_user_agent"] = ngx.var.http_user_agent;
log_data["timestamp"] = ngx.now() * 1000;
 
-- 发送消息
local ok, err = pro:send("log_topic", nil, cjson.encode(log_data))
if not ok then  
    ngx.log(ngx.ERR, "kafka send err:", err)  
    return ngx.exit(401)
end

        9、创建一个消费者来进行接收消息,也是验证结果

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic log_topic --from-beginning

        10、访问系统就能有如下结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2281232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PortSwigger靶场练习---网页 LLM 攻击:间接提示注入

网页 LLM 攻击: Indirect prompt injection 间接提示注入 PortSwigger靶场地址: Dashboard | Web Security Academy - PortSwigger 题目: 官方提示: 发现攻击面 点击实时聊天以访问实验室的聊天功能。 询问LLM它有权访问哪些 AP…

【2024.12】西电英语听说雨课堂期末考试答案

前言 这次的英语听说1和2雨课堂期末是同一张卷子。 26-40为填空题,后面有些话太长了 37. when they are physicial active 38. people could need two times as much water as others do 39. why people have the idea that good health requires eight…

人声检测原理VAD

在机器人的研究中,机器人与人语音交互是一个重要的功能,在语音交互中,人声检测至关重要。不论是在手机中,还是在esp32芯片上,都需要一种简单快捷的方式来检测本地语音,滤掉杂音和噪音。 机器人启动后会一直…

2_高并发内存池_各层级的框架设计及ThreadCache(线程缓存)申请内存设计

一、高并发内存池框架设计 高并发池框架设计,特别是针对内存池的设计,需要充分考虑多线程环境下: 性能问题锁竞争问题内存碎片问题 高并发内存池的整体框架设计旨在提高内存的申请和释放效率,减少锁竞争和内存碎片。 高并发内存…

后端开发基础——JavaWeb(Servlet)

Servlet 关于系统架构 系统架构包括什么形式? C/S架构 B/S架构 C/S架构? Client / Server(客户端 / 服务器) C/S架构的软件或者说系统有哪些呢? QQ(先去腾讯官网下载一个QQ软件,几十MB&…

c++ 与 Matlab 程序的数据比对

文章目录 背景环境数据保存数据加载 背景 ***避免数据精度误差&#xff0c;快速对比变量 *** 环境 c下载 https://github.com/BlueBrain/HighFive 以及hdf5库 在vs 中配置库 数据保存 #include <highfive/highfive.hpp> using namespace HighFive;std::string fil…

Leecode刷题C语言之收集所有金币可获得的最大积分

执行结果:通过 执行用时和内存消耗如下&#xff1a; int dfs(int node, int parent, int f, int* coins, int k, int **children, int *childCount, int **memo) {if (memo[node][f] ! -1) {return memo[node][f];}int res0 (coins[node] >> f) - k;int res1 coins[no…

mybatis(57/134)

今天没什么想法&#xff0c;搭了个转账平台&#xff0c;加深了点之前javaweb的mvc架构的印象&#xff0c;还有异常的抛出处理等

ONNX 简介

ONNX &#xff08;Open Neural Network Exchange&#xff09;是一套表示深度神经网络模型的开放格式&#xff0c;由微软和 Facebook 于 2017 推出&#xff0c;然后迅速得到了各大厂商和框架的支持。目前&#xff0c;在数家机构的共同维护下&#xff0c;ONNX 已经对接了多种深度…

Linux的中断上半部和中断下半部的概念,并利用任务队列(Tasklet)实现中断下半部的处理

中断上半部和中断下半部的介绍 在Linux内核中&#xff0c;中断处理机制被设计成“中断上半部&#xff08;Top Half&#xff09;”和“中断下半部&#xff08;Bottom Half&#xff09;”两个部分&#xff0c;这种设计主要目的是提高系统的中断响应效率&#xff0c;同时减少中断…

数学规划问题2 .有代码(非线性规划模型,最大最小化模型,多目标规划模型)

非线性规划模型 FIrst:转化为标准型 在matlab中求非线性规划的函数 练习题: 典型例题: 最大最小化模型 核心思想&#xff1a; matlab的模型求解 经典例题: 多目标规划模型 基本概念 求解思路: 模型构建步骤 经典例题: 非线性规划模型 非线性规划&#xff08;Nonl…

linux 下tensorrt的yolov8的前向推理(c++ 版本)的实现

一、环境搭建 cuda 11.4 ubuntu 20.04 opencv-4.5.2 1.1 配置tensorrt 根据本机的硬件配置及cuda的版本&#xff0c;选择TensorRT-8.6.1.6的版本&#xff0c;下载网址为: TensorRT SDK | NVIDIA Developer 根据官网的说明&#xff0c;下载对应的压缩包即可。解压后&…

VUE elTree 无子级 隐藏展开图标

这4个并没有下级节点&#xff0c;即它并不是叶子节点&#xff0c;就不需求展示前面的三角展开图标! 查阅官方文档如下描述&#xff0c;支持bool和函数回调处理&#xff0c;这里咱们选择更灵活的函数回调实现。 给el-tree结构配置一下props&#xff0c;注意&#xff01; :pr…

windows git bash 使用zsh 并集成 oh my zsh

参考了 这篇文章 进行配置&#xff0c;记录了自己的踩坑过程&#xff0c;并增加了 zsh-autosuggestions 插件的集成。 主要步骤&#xff1a; 1. git bash 这个就不说了&#xff0c;自己去网上下&#xff0c;windows 使用git时候 命令行基本都有它。 主要也是用它不方便&…

Glary Utilities Pro 多语便携版系统优化工具 v6.21.0.25

Glary Utilities是一款功能强大的系统优化工具软件&#xff0c;旨在帮助用户清理计算机垃圾文件、修复系统错误、优化系统性能等。 软件功能 清理和修复&#xff1a;可以清理系统垃圾文件、无效注册表项、无效快捷方式等&#xff0c;修复系统错误和蓝屏问题。 优化和加速&…

【Python使用】嘿马python基础入门全体系教程第12篇:__init__()方法,说明:【附代码文档】

本教程的知识点为&#xff1a;计算机组成 计算机是由什么组成的&#xff1f; 1. 硬件系统&#xff1a; 2. 软件系统&#xff1a; 目标 运算符的分类 1. 算数运算符 2. 赋值运算符 3. 复合赋值运算符 判断语句和循环语句 if嵌套 1. if嵌套的格式 2. if嵌套的应用 if嵌套执行流程…

从入门到精通:RabbitMQ的深度探索与实战应用

目录 一、RabbitMQ 初相识 二、基础概念速览 &#xff08;一&#xff09;消息队列是什么 &#xff08;二&#xff09;RabbitMQ 核心组件 三、RabbitMQ 基本使用 &#xff08;一&#xff09;安装与环境搭建 &#xff08;二&#xff09;简单示例 &#xff08;三&#xff09;…

【Block总结】WTConv,小波变换(Wavelet Transform)来扩展卷积神经网络(CNN)的感受野

论文解读&#xff1a;Wavelet Convolutions for Large Receptive Fields 论文信息 标题: Wavelet Convolutions for Large Receptive Fields作者: Shahaf E. Finder, Roy Amoyal, Eran Treister, Oren Freifeld提交日期: 2024年7月8日arXiv链接: Wavelet Convolutions for La…

Couchbase UI: Indexes

在Couchbase中&#xff0c;索引的这些指标可以帮助你评估索引的性能和状态。下面是每个指标的详细解释&#xff0c;以及如何判断索引的有效性&#xff1a; 1. Index Name&#xff08;索引名称&#xff09; 描述&#xff1a;每个索引都有一个唯一的名称。这个名称通常会包括表…

(3)STM32 USB设备开发-USB存储设备

例程&#xff1a;STM32USBdevice: 基于STM32的USB设备例子程序 - Gitee.com 本篇为使用芯片内部flash作为USB存储设备的例程&#xff0c;没有知识&#xff0c;全是实操&#xff0c;按照步骤就能获得一个STM32的U盘。本例子是在野火F103MINI开发板上验证的&#xff0c;如果代码…