如何借助Kafka持久化存储K8S事件数据?

news2024/12/24 2:39:23

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。
 

$ kubectl get events

15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc    

Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

 

尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。
 

构建K8s事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。
  • Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。
  • 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。
     

为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。
 

本示例中所有的配置、源代码和详细设置指示都已经放在以下代码仓库中:
https://github.com/esys/kube-events-kafka
 

 

创建 Kafka broker 和主题

我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:
https://strimzi.io/docs/operators/latest/overview.html
 

首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: kube-events
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 3
      log.message.format.version: "2.6"
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
    - name: plain
      port: 9092
      tls: false
      type: internal
    - name: tls
      port: 9093
      tls: true
      type: internal
    replicas: 3
    storage:
      type: jbod
      volumes:
      - deleteClaim: false
        id: 0
        size: 10Gi
        type: persistent-claim
    version: 2.6.0
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim

 

然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cluster-events
spec:
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 1

 

设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1
data:
  config.json: |-
    {
      "sink": "kafka",
      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
      "kafkaTopic": "cluster-events"
    }
kind: ConfigMap
metadata:
  name: eventrouter-cm

 

验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
  --bootstrap-server kube-events-kafka-bootstrap:9092 \
  --topic kube-events \
  --from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...

 

编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:

  • 消费者部分在 cluster-events 主题上监听传入的集群事件
  • 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中
     

如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。

p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {
        sugar.Fatal("cannot create producer")
}
defer p.Close()

c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {
        sugar.Fatal("cannot create consumer")
}
defer c.Close()

run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
        sig := <-sigs
        sugar.Infof("signal %s received, terminating", sig)
        run = false
}()

var wg sync.WaitGroup
go func() {
        wg.Add(1)
        for run {
                data, err := c.Read()
                if err != nil {
                        sugar.Errorf("read event error: %v", err)
                        time.Sleep(5 * time.Second)
                        continue
                }
                if data == nil {
                        continue
                }
                msg, err := event.CreateDestinationMessage(data)
                if err != nil {
                        sugar.Errorf("cannot create destination event: %v", err)
                }
                p.Write(msg.Topic, msg.Message)
        }
        sugar.Info("worker thread done")
        wg.Done()
}()

wg.Wait()

 

完整代码在此处:
https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go
 

当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。
 

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: events-fanout
  name: events-fanout
spec:
  replicas: 1
  selector:
    matchLabels:
      app: events-fanout
  template:
    metadata:
      labels:
        app: events-fanout
    spec:
      containers:
        - image: emmsys/events-fanout:latest
          name: events-fanout
          command: [ "./events-fanout"]
          args:
            - -logLevel=info
          env:
            - name: ENDPOINT
              value: kube-events-kafka-bootstrap:9092
            - name: TOPIC
              value: cluster-events

 

检查目标主题是否创建

现在,新的主题已经创建完成:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name

kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events

 

你会发现你的事件根据其命名空间整齐地存储在这些主题中。
 

总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。
 

原文链接:
https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/553690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c语言笔试题整理

1、请填写 bool , float, 指针变量 与 “零值” 比较的 if 语句。 提示&#xff1a;这里“零值”可以是0, 0.0 , FALSE 或者“空指针”。 例如int 变量n 与“零值”比较的if 语句为&#xff1a; if ( n 0 ) if ( n ! 0 ) 以此类推。 (1&#xff09;请写出bool flag 与“零值…

陪诊系统源码|陪诊系统开发|陪诊小程序源码

随着医疗技术的不断发展&#xff0c;陪诊已经成为了一个非常普遍的现象。随之而来的&#xff0c;就是一款名为“陪诊小程序”的应用产品的诞生。这款小程序通过互联网技术来提供陪诊服务&#xff0c;包含了很多实用功能。下面&#xff0c;我们就来详细介绍一下陪诊小程序的功能…

【历史上的今天】5 月 22 日:Windows 3.0 发布;虚幻引擎诞生;《吃豆人》问世

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 5 月 22 日&#xff0c;在 1994 年的今天&#xff0c;知名中文论坛曙光 BBS 站开通。1994 年 4 月 20 日&#xff0c;建立在中科院和北京大学、清华大学之间的…

Spring 官方建议的在 Spring Boot 应用中如何做单元测试

Spring Boot 提供了丰富的测试功能&#xff0c;主要由以下两个模块组成&#xff1a; ● spring-boot-test&#xff1a;提供测试核心功能。 ● spring-boot-test-autoconfigure&#xff1a;提供对测试的自动配置。 Spring Boot 提供了一个 spring-boot-starter-test一站式启动…

springboot+vue私人健身与教练预约管理系统(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的私人健身与教练预约管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 &#x1f495;&#x1f495;作者&…

【Java】常见内存溢出OOM

文章目录 前言一、定义二、 为什么会OOM&#xff1f;三、JVM内存模型四、OOM常见类型4.1 java堆内存溢出4.2 方法区溢出4.3 JAVA虚拟机栈溢出 五、OOM分析--heapdump总结 前言 一、定义 内存溢出&#xff1a; out of memory&#xff0c;OOM&#xff0c;全称“Out Of Memory”&…

西门子S7-1200 PLC之间无线PROFINET通信

西门子S7-1200 PLC 使用Profinet通讯时&#xff0c;一个做Profinet IO控制器&#xff0c;一个做Profinet IO设备。一个Profinet IO控制器可以最多支持16个Profinet IO设备&#xff0c;Profinet通讯不使用通讯指令&#xff0c;只需要配置好数据传输地址&#xff0c;就能够实现数…

睿铂在广东,自然资源部经济管理科学研究所“多测合一”项目分享

引言 DG4 Pros倾斜摄影相机作为睿铂旗舰系列产品&#xff0c;它的硬件与软件配置都无愧于其顶级倾斜相机的称号。在它的帮助下&#xff0c;客户得以挑战很多以往受限于设备技术条件&#xff0c;实施起来非常困难的项目。 本次&#xff0c;自然资源部经济管理科学研究所&#…

App Store上线APP流程

现在App Store上已经有数百万款应用&#xff0c;因此对于App的规范要求也越来越高&#xff0c;对于新上线的APP需要满足这些规则并不是件容易的事。今天和大家分享这方面的知识&#xff0c;希望大家喜欢。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff…

Mysql中存储引擎的区别及比较

MyISAM存储引擎 MyISAM基于ISAM存储引擎&#xff0c;并对其进行扩展。它是在Web、数据仓储和其他应用环境下最常使用的存储引擎之一。MyISAM拥有较高的插入、查询速度&#xff0c;但不支持事务。 MyISAM主要特性有&#xff1a; 1、大文件&#xff08;达到63位文件长度&#…

AI人工智能随机森林分类器的原理、优缺点、应用场景和实现方法

随机森林分类器&#xff08;Random Forest Classifier&#xff09;是一种常用的机器学习算法&#xff0c;它是基于决策树的一种集成学习方法。在人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;领域中&#xff0c;随机森林分类器是一种高效的算法&…

艺术签名免费设计的方法分享,快来收下

在今天的数字时代&#xff0c;艺术签名已经成为一个流行的趋势。无论是在社交媒体上&#xff0c;还是在个人品牌推广中&#xff0c;艺术签名都是一个重要的元素。但是&#xff0c;对于很多人来说&#xff0c;设计一个独特的艺术签名可能需要付出昂贵的代价。然而&#xff0c;有…

flstudio21更新内容介绍FL水果2023旗舰版下载

昨天为大家展示了 FL STUDIO21 新增的插件&#xff0c;今天让我们看一看还有哪些新变化&#xff1f;FL Studio中文版惯称水果, 是一个完整的电音软件音乐制作环境或数字音频工作站。是现在流行的数字音频工作站之一,包括撰写,整理,记录,编辑,电音,混音和掌握专业品质的音乐。 0…

Open Ai 常见接口参数说明以及常见报错总结

&#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是阿牛&#xff0c;全栈领域优质创作者。&#x1f61c;&#x1f4dd; 个人主页&#xff1a;馆主阿牛&#x1f525;&#x1f389; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4d…

【SPSS】生存-寿命表分析详细操作教程(附案例实战)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

Plc能否通过以太网网关实现无线数据交互?

西门子S7-1200 PLC 使用Profinet通讯时&#xff0c;一个做Profinet IO控制器&#xff0c;一个做Profinet IO设备。一个Profinet IO控制器可以最多支持16个Profinet IO设备&#xff0c;Profinet通讯不使用通讯指令&#xff0c;只需要配置好数据传输地址&#xff0c;就能够实现数…

Cesium 实战 - 模型亮度调整(解决模型非常暗的问题)

Cesium 实战 - 模型亮度调整&#xff08;解决模型非常暗的问题&#xff09; 环境版本试错过程解决问题在线示例 在某个项目中&#xff0c;遇到个问题&#xff0c;模型加载之后非常暗&#xff0c;经其他软件确认&#xff0c;模型本身正常&#xff0c;但是通过 Cesium 加载之后就…

Vue3项目技巧(更新中ing)

文章目录 axios封装http.jstestAPI.jsmain.js测试如果项目中需要多个baseURL 自动导入scss文件案例文件使用案例 引入aliyun图标库先看效果查看官网文档引入并使用 vueuse实现-吸附导航交互安装案例 多个组件共享的请求、数据、封装到pinia案例父组件中调用子组件中应用 axios封…

django admin后台管理系统中配置可上传多张图片功能

目录 一、默认的常规方式只能上传一张图片的示例 二、配置可上传多张图片 问题&#xff1a;在django自带的admin后台管理系统中常规的方式只能上传一张图片&#xff0c;无法上传添加多张图片&#xff0c;如下图。所以现在需要配置可上传多张图片的功能&#xff01; 一、默认的…