【Kubernetes 企业项目实战】04、基于 K8s 构建 EFK+logstash+kafka 日志平台(下)

news2024/12/26 14:13:10

目录

一、安装收集日志组件 Fluentd

二、kibana 可视化展示查询 k8s 容器日志

三、测试 efk 收集生产环境业务 pod 日志

四、基于 EFK+logstash+kafka 构建高吞吐量的日志平台

4.1 部署 fluentd

4.2 接入 kafka

4.3 配置 logstash

4.4 启动 logstash


本篇文章所用到的资料文件下载地址:https://download.csdn.net/download/weixin_46560589/87392272

一、安装收集日志组件 Fluentd

        我们使用 daemonset 控制器部署 fluentd 组件,这样可以保证集群中的每个节点都可以运行同样 fluentd 的 pod 副本,这样就可以收集 k8s 集群中每个节点的日志。在 k8s 集群中,容器应用程序的输入输出日志会重定向到 node 节点里的 json 文件中,fluentd 可以 tail 和过滤以及把日志转换成指定的格式发送到 elasticsearch 集群中。除了容器日志,fluentd 也可以采集 kubelet、kube-proxy、docker 的日志。

# 离线镜像压缩包 fluentd.tar.gz 上传到各个节点上,手动解压:
[root@k8s-master1 ~]# docker load -i fluentd.tar.gz 
[root@k8s-node1 ~]# docker load -i fluentd.tar.gz 
[root@k8s-node2 ~]# docker load -i fluentd.tar.gz 

[root@k8s-master1 efk]# vim fluentd.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: kube-logging
  labels:
    app: fluentd
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluentd
  labels:
    app: fluentd
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - namespaces
  verbs:
  - get
  - list
  - watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: kube-logging
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-logging
  labels:
    app: fluentd
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.4.2-debian-elasticsearch-1.1
        imagePullPolicy: IfNotPresent
        env:
          - name:  FLUENT_ELASTICSEARCH_HOST
            value: "elasticsearch.kube-logging.svc.cluster.local"
          - name:  FLUENT_ELASTICSEARCH_PORT
            value: "9200"
          - name: FLUENT_ELASTICSEARCH_SCHEME
            value: "http"
          - name: FLUENTD_SYSTEMD_CONF
            value: disable
        resources:
          limits:
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers

[root@k8s-master1 efk]# kubectl apply -f fluentd.yaml 

[root@k8s-master1 efk]# kubectl get pods -n kube-logging | grep fluentd
fluentd-6vmdc             1/1     Running   0             24s
fluentd-mtgxg             1/1     Running   0             24s
fluentd-nzv4n             1/1     Running   0             24s

二、kibana 可视化展示查询 k8s 容器日志

Fluentd 启动成功后,我们可以前往 Kibana 的 Dashboard 页面中,点击 Try our sample data:

点击左侧的 Discover 

可以看到如下配置页面 

        在这里可以配置我们需要的 Elasticsearch 索引,前面 Fluentd 配置文件中我们采集的日志使用的是 logstash 格式,这里只需要在文本框中输入logstash-* 即可匹配到 Elasticsearch 集群中的所有日志数据。点击 Next step:

选择 @timestamp,创建索引 

点击左侧的 discover,可看到如下 

三、测试 efk 收集生产环境业务 pod 日志

[root@k8s-master1 efk]# vim pod.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: counter
spec:
  containers:
  - name: count
    image: busybox
    imagePullPolicy: IfNotPresent
    args: [/bin/sh, -c,'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done']

[root@k8s-master1 efk]# kubectl apply -f pod.yaml 

[root@k8s-master1 efk]# kubectl get pods 
NAME                               READY   STATUS    RESTARTS      AGE
counter                            1/1     Running   0             25s
nfs-provisioner-6988f7c774-f478v   1/1     Running   2 (56m ago)   138m

Kibana 查询语言 KQL 官方地址:Kibana Query Language | Kibana Guide [7.2] | Elastic

        登录到 kibana 的控制面板,在 discover 处的搜索栏中输入 kubernetes.pod_name:counter这将过滤名为 counter 的 Pod 的日志数据 ,如下所示: 

        通过前面的实验,我们已经在 k8s 集群成功部署了 elasticsearch、fluentd、kibana,这里使用的 efk 系统包括 3 个 Elasticsearch Pod,一个 Kibana Pod 和一组作为 DaemonSet 部署的Fluentd Pod。

四、基于 EFK+logstash+kafka 构建高吞吐量的日志平台

fluentd --> kafka --> logstash --> elasticsearch --> kibana

适用于数据量大的场景。此处只提供思路方法,有待验证。

4.1 部署 fluentd

[root@k8s-master1 efk]# vim fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host 192.168.78.143            # es 主机 ip
      port 9200
      logstash_format true
      request_timeout 30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

[root@k8s-master1 efk]# kubectl apply -f fluentd-configmap.yaml 

[root@k8s-master1 efk]# vim fluentd-daemonset.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    version: v2.0.4
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
      version: v2.0.4
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
        version: v2.0.4
      # This annotation ensures that fluentd does not get evicted if the node
      # supports critical pod annotation based priority scheme.
      # Note that this does not guarantee admission on the nodes (#40573).
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: cnych/fluentd-elasticsearch:v2.0.4
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      nodeSelector:
        beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config

# 创建节点标签
[root@k8s-master1 efk]# kubectl label nodes k8s-master1 beta.kubernetes.io/fluentd-ds-ready=true
[root@k8s-master1 efk]# kubectl label nodes k8s-node1 beta.kubernetes.io/fluentd-ds-ready=true
[root@k8s-master1 efk]# kubectl label nodes k8s-node2 beta.kubernetes.io/fluentd-ds-ready=true

[root@k8s-master1 efk]# kubectl apply -f fluentd-daemonset.yaml 

4.2 接入 kafka

[root@k8s-master1 efk]# vim kafka-config.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    # Logs from systemd-journal for interesting services.
    <source>
      @id journald-docker
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
      </storage>
      read_from_head true
      tag kubelet
    </source>
  forward.input.conf: |-
    # Takes the messages sent over TCP
    <source>
      @type forward
    </source>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id kafka
      @type kafka2
      @log_level info
      include_tag_key true
      # list of seed brokers
      brokers kafka ip:9092
      use_event_time true
      # buffer settings
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
      # data type settings
      <format>
        @type json
      </format>
      # topic settings
      topic_key topic
      default_topic messages
      # producer settings
      required_acks -1
      compression_codec gzip
    </match>

[root@k8s-master1 efk]# kubectl apply -f kafka-config.yaml 

# 重启 fluentd 
[root@k8s-master1 efk]# kubectl delete -f fluentd-daemonset.yaml 
[root@k8s-master1 efk]# kubectl apply -f fluentd-daemonset.yaml 

4.3 配置 logstash

配置 logstash 消费 messages 日志写入 elasticsearch

[root@k8s-master1 efk]# vim config/kafkaInput_fluentd.conf
input {
    kafka {
        bootstrap_servers => ["kafka ip:9092"]
        client_id => "fluentd"
        group_id => "fluentd"
        consumer_threads => 1
        auto_offset_reset => "latest"
        topics => ["messages"]
    }
}
 
filter {
        json{
                source => "message"
        }
        
       ruby {
       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
       }
      ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
       }
      ruby {
        code => "event.set('find_time',event.get('@timestamp').time.localtime - 8*60*60)"
       }
     mutate {
    remove_field => ["timestamp"]
    remove_field => ["message"]
    }
     
} 
output {
          elasticsearch{
               hosts => ["es ip地址: 9200"]
               index => "kubernetes_%{+YYYY_MM_dd}"

          }
#    stdout {
#           codec => rubydebug
#           }
}

4.4 启动 logstash

[root@k8s-master1 efk]# nohup ./bin/logstash -f config/kafkaInput_fluentd.conf --config.reload.automatic --path.data=/opt/logstash/data_fluentd 2>&1 > fluentd.log &

上一篇文章:【Kubernetes 企业项目实战】04、基于 K8s 构建 EFK+logstash+kafka 日志平台(中)_Stars.Sky的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/172966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对象的比较

Java中基本类型间的元素比较&#xff0c;可以直接通过">"、"<"、""等符号判断大小&#xff0c;也可使用compareTo比较大小或者equals判断是否相等&#xff0c;作为引用类型的String类不可以使用">"、"<"比较大小…

2023最新 - 谷歌学术文献Bibtex批量获取脚本

首先&#xff0c;自行解决网络访问问题&#xff0c;保证能访问到谷歌学术&#xff0c;否则下面可免看 第一步&#xff1a;安装 selenium python 安装 selenium pip install selenium 第二步&#xff1a;安装 Chrome 浏览器 http://chorm.com.cn/ 第三步&#xff1a;根据 …

Linux应用基础——控制服务与守护进程

一、systemd基本介绍 1.作用 systemd守护进程管理Linux的启动&#xff0c;一般包括服务启动和服务管理&#xff0c;它可以在系统引导时以及运行中的系统上激活系统资源、服务器守护进程和其他进程 2.守护进程 守护进程是执行各种任务的后台等待或运行的进程&#xff0c;一般…

day18|235. 二叉搜索树的最近公共祖先、701.二叉搜索树中的插入操作、450.删除二叉搜索树中的节点

235. 二叉搜索树的最近公共祖先 给定一个二叉搜索树, 找到该树中两个指定节点的最近公共祖先。 最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个结点 p、q&#xff0c;最近公共祖先表示为一个结点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08…

【容器技术——Docker基本使用】

文章目录docker1 概述1.1 是什么1.2 相关资源2 使用2.1 镜像2.1.1 拉取镜像2.2.2 列出镜像2.2.3 删除镜像2.2 容器2.2.1 运行容器2.2.2 查看容器2.2.3 启动和关闭容器2.2.4 删除容器2.3 制作镜像2.4 Docker 仓库2.4.1 注册登录2.4.2 推送镜像2.5 dockerfile2.5.1 构建镜像2.5.2…

关于CIS移植的一些基本概念

1. 摄像头sensor 的原理 定时脉冲生成器会生成clock&#xff0c;用于访问image sensor 阵列中的行&#xff0c;预充电&#xff0c;并且按顺序采样像素阵列中的所有行。在一个行的预充电和采样的时间段里&#xff0c;像素的电荷量会随着曝光时间而逐渐减少。这就是快门结构中的曝…

Python采集周边烤肉店数据,康康哪一家最好吃?

人生苦短&#xff0c;我用Python 这不是天气开始突然大范围降温了吗&#xff1f; 降温就要吃烤肉啊 滋辣滋辣的声音特别好听&#xff5e; 放假吃烤肉真的特别快乐~ 天冷了&#xff0c;逛街…… 天冷了&#xff0c;吃烤肉…… 天冷了&#xff0c;喝奶茶…… 有温度的冬天&a…

p83出现的问题(空指针异常)原因及解决方案

我的GitHub&#xff1a;Powerveil GitHub我的Gitee&#xff1a;Powercs12 (powercs12) - Gitee.com皮卡丘每天学Java相关知识&#xff1a;Mybatis Plus、Spring Security本质原因&#xff1a;我们自己将Article字段的update字段做了自动填充导致自动填充时无法获取当前用户(程…

pytorch Conv2d令人迷惑的几个参数

本篇仅用来记录nn.Conv2d()中容易令人不解的几个参数 1. nn.Conv2d() 的输出维度的计算 相信大家都看过官网给出的计算方式: 直接套公式即可, 但需要注意的是, 最终计算的结果要向下取整. 2. dilation 官方解释为: dilation (int or tuple, optional) – Spacing between k…

贪心 455. 分发饼干

455. 分发饼干 难度简单636 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&…

为什么STM32设置Flash地址0x08000000而不是0x00000000?STM32的启动过程

STM32F103ZE芯片存储空间的地址映射关系图。 在MDK编译程序设置ROM和RAM地址时候发现&#xff1a;    IROM1为片上程序存储器&#xff0c;即片上集成的Flash存储器&#xff0c;对该处理器Flash大小为512KB&#xff0c;即0x80000 地址区间为0x8000000~0x0807FFFF  IRAM1为片…

day10文件知识点

模态对话框ajax后台ModelForm校验 目录切换&#xff1a;展示当前文件夹和文件 删除文件夹&#xff1a;嵌套的子文件和子文件夹全部删除 js上传文件到cos&#xff08;wiki用python上cos上传文件&#xff09; 进度条的操作&#xff08;bootstrap&#xff09;实现 删除文件&#x…

【Linux详解】——进程地址空间

&#x1f4d6; 前言&#xff1a;本节将以新的视角看的地址空间的特点&#xff0c;与以前对指针的认识做区分。 目录&#x1f552; 1. C/C 地址空间回顾&#x1f552; 2. 进程地址空间&#x1f558; 2.1 感性理解概念&#x1f558; 2.2 如何“画饼”&#x1f558; 2.3 区域划分&…

SQL面试题

有3个表S(学生表)&#xff0c;C&#xff08;课程表&#xff09;&#xff0c;SC&#xff08;学生选课表&#xff09; S&#xff08;SNO&#xff0c;SNAME&#xff09;代表&#xff08;学号&#xff0c;姓名&#xff09; C&#xff08;CNO&#xff0c;CNAME&#xff0c;CTEACHER&…

SAP FICO 定义成本组件结构

成本组件结构定义 我们在使用CK11N核算物料标准成本时候可以看到有项目明细&#xff0c;也可以看到有成本构成&#xff0c;那么问题来了&#xff0c;怎么将项目明细分类到各个成本构成上面呢&#xff1f; 【后台配置路径】&#xff1a; SPRO→控制→产品成本控制→产品成本计划…

【云原生】k8s之包管理器Helm

内容预知 前言 1.Helm的相关知识 1.1 Helm的简介与了解 1.2 Helm的三个重要概念 1.3 Helm2与Helm3的的区别 &#xff08;1&#xff09;helm2的部署方式与使用 &#xff08;2&#xff09;Helm3的部署与使用 2.Helm在k8s集群中的部署 &#xff08;1&#xff09;将Helm安…

【docker概念和实践 3】 注册阿里云账号、应用阿里云数据源

一、说明 阿里云是什么&#xff1f;是出租、出售运算资源的平台。几乎囊括各个领域的运算、存储、服务器、云端资源。阿里云的明星产品四大件是&#xff1a;1、即云服务器ECS、2、云数据库RDS、3、负载均衡SLB 4对象存储OSS。 其它多种服务&#xff1a;云小站_专享特惠_云产品推…

OpenSceneGraph图形状态管理内幕

在这个教程中&#xff0c;我们将了解 Open Scene Graph 如何表示 OpenGL 图形状态&#xff0c;并探索 Open Scene Graph 优化渲染以最小化状态更改次数的一些方法。 推荐&#xff1a;用 3D场景编辑器快速搭建数字孪生场景。 1、OpenGL状态机 Open Scene Graph 最重要的优化之一…

Android的linux内核解耦

1、boot内容查看Boot Image Header&#xff0c;version 2版本包含内容最多&#xff0c;包括了内核、设备树、根目录、recovery设备树&#xff0c;cmdline。boot拆包与内容解析参考1、Android bootimg kernel&#xff08;boot.img&#xff09;2、linux的ramdisk解耦2.1、ramdisk…

Python学习笔记——文件操作

输入和输出Python两种输出值的方式: 表达式语句和 print() 函数。第三种方式是使用文件对象的 write() 方法&#xff0c;标准输出文件可以用 sys.stdout 引用。如果你希望输出的形式更加多样&#xff0c;可以使用 str.format() 函数来格式化输出值。如果你希望将输出的值转成字…