单线程事件处理器ControllerEventManager

news2025/1/19 18:42:50

0 前言

单线程事件处理器,Controller端定义的一个组件。该组件内置了一个专属线程,负责处理其他线程发送过来的Controller事件。还定义了一些管理方法,为专属线程输送待处理事件。

  • 0.11.0.0版本前,Controller组件源码复杂。集群元数据信息在程序中同时被多个线程访问,因此,源码里有大量Monitor锁、Lock锁或其他线程安全机制,导致晦涩难懂,改动困难
  • 0.11.0.0版本开始,社区重构Controller代码结构。将多线程并发访问改为单线程的事件队列方式。并非说Controller只有一个线程,而是指对局部状态的访问限制在一个专属线程,即让这个特定线程排他操作Controller元数据信息

这就无需担心多线程访问引发的各种线程安全问题,简化Controller端代码。

1 基本概念

Controller单线程事件队列处理模型及其基础组件。
Controller端多线程向事件队列写入不同种类事件,如zk端注册的Watcher线程、KafkaRequestHandler线程、Kafka定时任务线程等。

事件队列的另一端,只有一个名为ControllerEventThread的线程,负责“消费”或处理队列中的事件。

即单线程事件队列模型。

2 相关类

2.1 ControllerEventProcessor

Controller端的事件处理器接口:

API

process

接收一个Controller事件,并进行普通处理。实现Controller事件处理的主要方法

preempt

接收一个Controller事件,并抢占队列之前的Controller事件,进行优先处理。Kafka用其实现某些高优先级事件的抢占处理,目前在源码中也只有两类事件(ShutdownEventThread和Expire)需抢占式处理

KafkaController类是Controller组件的功能实现类,ControllerEventProcessor的唯一实现类。

2.2 ControllerEvent

Controller事件,即事件队列中被处理的对象,对应ControllerEvent接口

每个ControllerEvent都定义了一个状态。Controller处理具体事件时,会更新状态。
状态由ControllerState定义:

每类ControllerState都定义一个value值,表示Controller状态的序号,从0开始。
rateAndTimeMetricName方法用于构造Controller状态速率的监控指标名称。

如TopicChange是一类ControllerState:主题总数发生变化。

监控这类状态变更速率,rateAndTimeMetricName方法会定义名为TopicChangeRateAndTimeMs的指标。
并非所有ControllerState都有对应速率监控指标,如表示空闲状态的Idle无对应指标。

Controller总共定义了25类事件和17种状态:

监控到某些Controller状态变更速率异常时,可通过该表,快速确定可能造成瓶颈的Controller事件,并定位处理函数,辅助排查。

多个ControllerEvent可能属相同的ControllerState。

2.3 ControllerEventManager

事件处理器,创建和管理ControllerEventThread。位于ControllerEventManager.scala,该文件的组成:

  • ControllerEventManager Object
    保存一些字符串常量,如线程名称
  • ControllerEventProcessor
    事件处理器接口,目前只有KafkaController实现该接口
  • QueuedEvent
    事件队列上的事件对象
  • ControllerEventManager Class
    ControllerEventManager的伴生类,主要创建和管理事件处理线程和事件队列。该类定义了ControllerEventThread线程类。

ControllerEventManager对象仅定义了3个公共变量。

QueuedEvent

// 每个QueuedEvent定义了两个字段
// event: ControllerEvent类,表示Controller事件
// enqueueTimeMs:表示Controller事件被放入到事件队列的时间戳
class QueuedEvent(val event: ControllerEvent,
                  val enqueueTimeMs: Long) {
  // 标识事件是否开始被处理
  val processingStarted = new CountDownLatch(1)
  
  // 标识事件是否被处理过
  val spent = new AtomicBoolean(false)
  
  // 处理事件
  def process(processor: ControllerEventProcessor): Unit = {
    if (spent.getAndSet(true))
      return
    processingStarted.countDown()
    processor.process(event)
  }
  
  // 抢占式处理事件
  def preempt(processor: ControllerEventProcessor): Unit = {
    if (spent.getAndSet(true))
      return
    processor.preempt(event)
  }
  
  // 阻塞等待事件被处理完成
  def awaitProcessing(): Unit = {
    processingStarted.await()
  }
  
  override def toString: String = {
    s"QueuedEvent(event=$event, enqueueTimeMs=$enqueueTimeMs)"
  }
}

每个QueuedEvent对象实例都裹挟了一个ControllerEvent。
在QueuedEvent中,用CountDownLatch来做各种条件控制,比如用于侦测线程是否成功启动、成功关闭等。
QueuedEvent使用它的唯一目的,是确保Expire事件在建立ZooKeeper会话前被处理。
若不是在该场景,则代码就用spent来标识该事件是否已被处理:

  • 若已被处理
    再次调用process方法时就会直接返回

2.4 ControllerEventThread

专属的事件处理线程,唯一作用:处理不同种类的ControllEvent。
ControllerEventManager类内部定义的线程类。

消费QueuedEvent的ControllerEventThread类:

继承自ShutdownableThread:


该类会循环执行doWork,具体实现则由子类完成。

作为Controller唯一的事件处理线程,需关注该线程运行状态。必须要知道该线程在JVM上的名字,后续就能对其监控。
线程名由ControllerEventManager Object中ControllerEventThreadName变量定义:

ControllerEventThread#doWork

override def doWork(): Unit = {
  // 从事件队列获取待处理的Controller事件(QueuedEvent对象实例),否则等待
  val dequeued = queue.take()
  dequeued.event match {
    // 若是关闭线程事件,啥都不做。关闭线程由外部来执行
    case ShutdownEventThread =>
    case controllerEvent =>
      _state = controllerEvent.state
      // 更新对应事件在队列中保存的时间
      eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)
      try {
        def process(): Unit = dequeued.process(processor)
        // 处理事件,同时计算处理速率
        rateAndTimeMetrics.get(state) match {
          case Some(timer) => timer.time { process() }
          case None => process()
        }
      } catch {
        case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
      }
      _state = ControllerState.Idle
  }
}

首先是调用LinkedBlockingQueue#take,去。注意,这里用的是take方法,这说明,如果事件队列中没有QueuedEvent,那么,ControllerEventThread线程将一直处于阻塞状态,直到事件队列上插入了新的待处理事件。

一旦拿到QueuedEvent事件后,线程会判断是否是ShutdownEventThread事件。当ControllerEventManager关闭时,会显式地向事件队列中塞入ShutdownEventThread,表明要关闭ControllerEventThread线程。如果是该事件,那么ControllerEventThread什么都不用做,毕竟要关闭这个线程了。相反地,如果是其他的事件,就调用QueuedEvent的process方法执行对应的处理逻辑,同时计算事件被处理的速率。

该process方法底层调用ControllerEventProcessor#process:

def process(processor: ControllerEventProcessor): Unit = {
  // 若已经被处理过,直接返回
  if (spent.getAndSet(true))
    return
  processingStarted.countDown()
  // 调用ControllerEventProcessor的process方法处理事件
  processor.process(event)
}

方法首先判断该事件是否已被处理:

  • 是,直接返回
  • 不是,调用ControllerEventProcessor#process处理事件

每个ControllerEventProcessor#process都封装在KafkaController.scala文件。就是KafkaController类实现ControllerEventProcessor#process,部分代码:

override def process(event: ControllerEvent): Unit = {
    try {
      // 依次匹配ControllerEvent事件
      event match {
        case event: MockEvent =>
          event.process()
        case ShutdownEventThread =>
          error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
        case AutoPreferredReplicaLeaderElection =>
          processAutoPreferredReplicaLeaderElection()
        ......
      }
    } catch {
      // 如果Controller换成了别的Broker
      case e: ControllerMovedException =>
        info(s"Controller moved to another broker when processing $event.", e)
        // 执行Controller卸任逻辑
        maybeResign()
      case e: Throwable =>
        error(s"Error processing event $event", e)
    } finally {
      updateMetrics()
    }
}

这个process方法接收一个ControllerEvent实例,接着会判断它是哪类Controller事件,并调用相应的处理方法:

  • AutoPreferredReplicaLeaderElection,调processAutoPreferredReplicaLeaderElection
  • 其他类型事件,调用process

put方法和clearAndPut方法也很重要。ControllerEventThread是读取队列事件,这两个方法就是向队列生产元素:

  • put把指定ControllerEvent插入事件队列
  • clearAndPut先执行高优先级的抢占式事件,之后清空队列所有事件,最后再插入指定事件

下面这两段源码分别对应这两个方法:

// put方法
def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
  // 构建QueuedEvent实例
  val queuedEvent = new QueuedEvent(event, time.milliseconds())
  // 插入到事件队列
  queue.put(queuedEvent)
  // 返回新建QueuedEvent实例
  queuedEvent
}
// clearAndPut方法
def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
  // 优先处理抢占式事件
  queue.forEach(_.preempt(processor))
  // 清空事件队列
  queue.clear()
  // 调用上面的put方法将给定事件插入到事件队列
  put(event)
}

中的put方法使用putLock对代码进行保护,我觉得这个putLock是不需要的,因为LinkedBlockingQueue数据结构本身就已线程安全。put方法只会与全局共享变量queue打交道,因此,它们的线程安全性完全可委托LinkedBlockingQueue实现。LinkedBlockingQueue内部已维护一个putLock和一个takeLock,专门保护读写操作。

当然,我同意在clearAndPut中使用锁,毕竟要保证,访问抢占式事件和清空操作构成一个原子操作。

3 总结

Controller端的单线程事件队列实现方式,即ControllerEventManager通过构建ControllerEvent、ControllerState和对应的ControllerEventThread线程,并且结合专属事件队列,共同实现事件处理。

ControllerEvent:定义Controller能够处理的各类事件名称,目前总共定义了25类事件。
ControllerState:定义Controller状态。ControllerEvent的上一级分类,因此,ControllerEvent和ControllerState是多对一。
ControllerEventManager:Controller定义的事件管理器,专门定义和维护专属线程以及对应的事件队列。
ControllerEventThread:事件管理器创建的事件处理线程。该线程排他性地读取事件队列并处理队列中的所有事件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/175494.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Ajax】XMLHttpRequest和Level2

一、XMLHttpRequest什么是XMLHttpRequestXMLHttpRequest(简称 xhr)是浏览器提供的 Javascript 对象,通过它,可以请求服务器上的数据资源。之前所学的 jQuery 中的 Ajax 函数,就是基于 xhr 对象封装出来的。二、了解xhr…

java面试

java面试目录概述需求:设计思路实现思路分析1.代码:参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Surv…

【数据结构】保姆级队列各接口功能实现

目录 🍊前言🍊: 🥝一、队列概述🥝: 1.队列的概念: 2.队列的结构: 🍉二、队列的各接口功能实现🍉: 1.初始化队列: 2.入队&#…

k8s之挂载NFS到POD中

写在前面 在k8s之挂载本地磁盘到POD中 一文中我们看了如何将POD中的数据写到本地磁盘中,这种方式要求POD只能在指定的Node上,一旦POD更换Node,数据依然会丢失,所以本文看下如何通过将数据写到NFS中来解决这个问题。下面我们就开始…

sklearn数据降维之字典学习

文章目录字典学习简介构造函数实战Step1 制作实验数据Step2 小批字典学习Step 3 参数调整字典学习简介 如果把降维理解成压缩的话,那么字典学习的本质是编码,其目的是找到少量的原子,用以描述或构建原始样本。举个一维的例子,以a…

程序员护眼指南

前言 前言:脱发和近视是当代年轻人的两大痛点,今天来聊聊如何护眼。 文章目录前言一、护眼的核心二、调节睫状肌的方法1. 眨眼2. 望远3. 睡觉4. 促进血液循环5. 吃补剂6. 好的屏幕一、护眼的核心 护眼的核心就是保护睫状肌。 睫状肌是眼内的一种平滑肌…

一起自学SLAM算法:7.7 典型SLAM算法

连载文章,长期更新,欢迎关注: 针对式(7-38)所述的在线SLAM系统,以扩展卡尔曼滤波(EKF)为代表的滤波方法,是求解该状态估计问题最典型的方法,在7.4节中已经详细…

GY-US42超声波传感器模块介绍

GY-US42超声波传感器模块简介GY-US42 是一款低成本高品质测距传感器模块。工作电压 3-5v,功耗小,体积小,安装方便。其工作原理是,探头发射超声波,照射到被测物体后,探头接收返回声波,利用时间差…

学人工智能电脑主机八大件配置选择指南

来源:深度之眼 作者:frank 编辑:学姐 本篇主要是帮助大家构建高性能、高性价比的AI开发的硬件平台。如何不把钱浪费到不必要的硬件上,并合理搭配硬件配置节省预算是本文想要去讨论的问题。如果预算充足,笔者建议购买一…

【JavaSE专栏1】Java的介绍、特点和历史

作者主页:Designer 小郑 作者简介:Java全栈软件工程师一枚,来自浙江宁波,负责开发管理公司OA项目,专注软件前后端开发(Vue、SpringBoot和微信小程序)、系统定制、远程技术指导。CSDN学院、蓝桥云…

Python ·保险理赔分析:数据分析

介绍 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 在本笔记本中,我们将仔细研究保险索赔,并弄清一些有关血压、BMI、糖尿病、吸烟、年龄和性别等条件如何影响索赔价值的事实。 我们将使用散点图、饼图、直…

IDEA必装插件-Gyro

前言用SpringBootTest运行单测的时候,是不是每运行都需要重新启动Spring容器?大型应用启动一次会浪费大量的时间,导致效率比较低。Gyro插件可以解决你的问题。Gyro介绍它是一个IDEA插件,安装之后,用Gyro Debug运行你的…

一起自学SLAM算法:7.4 基于贝叶斯网络的状态估计

连载文章,长期更新,欢迎关注: 在7.2.4节中,讨论了表示机器人观测与运动之间依赖关系的概率图模型,主要是贝叶斯网络(实际应用在机器人中的是动态贝叶斯网络)和马尔可夫网络(实际应用…

fpga实操训练(lcd字符显示)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 用fpga编写屏幕显示,和c语言编写有很大的不同。用c语言开发,很大程度上是遵循soc ip提供的规范进行编写。而用fpga开发的话,则需要考虑不同信号的时序关系。但是,用fpga开发也有…

c++ 优先级队列priority_queue的使用

c priority_queue是对其他容器元素顺序的调整包装; 堆的原理 1.定义 priority_queue<Type, Container, Functional> q; 其中&#xff0c;Type是数据类型&#xff0c;Container是低层容器&#xff0c;如vector, stack, deque等. Functional是比较函数&#xff1b;默认可…

day25-类加载器反射

1.类加载器 1.1类加载器【理解】 作用 负责将.class文件&#xff08;存储的物理文件&#xff09;加载在到内存中 1.2类加载的过程【理解】 类加载时机 创建类的实例&#xff08;对象&#xff09;调用类的类方法访问类或者接口的类变量&#xff0c;或者为该类变量赋值使用反…

NodeJS 之 HTTP 模块(实现一个基本的 HTTP 服务器)

NodeJS 之 HTTP 模块&#xff08;实现一个基本的 HTTP 服务器&#xff09;参考描述HTTP 模块搭建 HTTP 服务器http.createServer()监听检测服务器端口是否被占用终端Error Code超时处理处理客户端的请求request 事件http.IncomingMessagehttp.ServerResponse中文乱码问题问题解…

Java EE之线程编(进阶版)

这些锁策略能适用于很多中语言&#xff0c;博主是学Java的&#xff0c;所以下面的代码会用Java去写&#xff0c;请大家见谅&#xff0c;但是处理的方法是大差不差的。 一、常见锁和锁策略&#xff1a; (一)、乐观锁和悲观锁 1、何为乐观锁和悲观锁呢&#xff1f; 答&#…

Linux服务器常见运维性能测试(3)CPU测试super_pi、sysbench

Linux服务器常见运维性能测试&#xff08;3&#xff09;CPU测试常见性能测试软件CPU测试&#xff1a;super_pi &#xff08;计算圆周率&#xff09;CPU测试&#xff1a;sysbench&#xff08;CPU功能测试部分&#xff09;下载安装sysbench综合测试功能执行CPU测试最近需要测试一…

Java面试题含答案,最新面试题(1)

Java 中 InvokeDynamic指令是干什么的&#xff1f; JVM字节码指令集一直比较稳定&#xff0c;一直到Java7中才增加了一个InvokeDynamic 指令&#xff0c;这是JAVA为了实现『动态类型语言』支持而做的一种改进&#xff1b;但是在Java7中并没有提供直接生成InvokeDynamic 指令的…