Android代码重构系列-02-使用Kotlin协程实现一个支持任务编排的轻量级启动器

news2024/11/27 3:43:19

前言

虽然本文的主题是启动器,但是笔者不打算去写怎么做启动优化,以及怎么实现一个完美的启动器。关于开源的第三方Android启动器已经有很多优秀的轮子了,比如阿里巴巴的alpha,参考 alpha 并改进其部分细节的Anchors,Start数比较高的android-startup,以及Android官方自己的app-startup等等。

本文的了灵感来源于我爱田Hebe应用程序启动优化新思路 - Kotlin协程,并参考了以上开源库做了一些改造,以便用起来更丝滑。本文主要分享笔者基于这个思路进行改造的几个点,强烈建议先阅读灵感来源的这篇文章,再继续往下阅读。

整体代码

先看下改造后的代码有一个初步的了解:

XTask

任务类,主要是改了线程调度,增加优先级,增加Kotlin DSL相关方法,以及继承Observable。

/*
 * 任务
 */
data class XTask(
    val name: String,//任务名称,不可以重复
    val desc: String = "",//任务描述
    val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default,//线程调度
    val priority: Int = 0,   // 运行的优先级,值小的先执行,
    var dependentTasks: Set<String> = setOf(), // 依赖的任务集合(不可重复,所以用Set)
    val run: suspend () -> Boolean = { false }, //具体的执行任务的函数,为什么放在构造函数,主要是方便全局查看任务。返回是否拦截
) : Observable() {
    // 入度,用于有向无环图的拓扑排序
    var inDegree = dependentTasks.size

    /**
     * 通知通知观察者
     */
    fun notify(arg: Any) {
        setChanged()
        notifyObservers(arg)
    }

    /**
     * 构造任务集
     * @param tasks Array<out XTask>
     * @return MutableSet<String>
     */
    fun on(vararg tasks: XTask): MutableSet<XTask> {
        val dependentTasks = mutableSetOf<XTask>()
        if (tasks.isNotEmpty()) {
            for (task in tasks) {
                dependentTasks.add(task)
            }
        }
        return dependentTasks
    }

    /**
     * 设置当前任务依赖的任务集
     * @param tasks Set<XTask>
     * @return XTask
     */
    infix fun depends(tasks: Set<XTask>): XTask {
        if (tasks.isNotEmpty()) {
            val dependentTasks = mutableSetOf<String>()
            for (task in tasks) {
                dependentTasks.add(task.name)
            }
            this.dependentTasks = dependentTasks
            this.inDegree = this.dependentTasks.size
        }
        return this
    }

    /**
     * 设置当前任务依赖的任务集
     * @param block [@kotlin.ExtensionFunctionType] Function1<XTask, MutableSet<String>>
     * @return String 返回任务名称
     */
    fun depends(block: XTask.() -> MutableSet<XTask>): XTask {
        val tasks = block()
        if (tasks.isNotEmpty()) {
            val dependentTasks = mutableSetOf<String>()
            for (task in tasks) {
                dependentTasks.add(task.name)
            }
            this.dependentTasks = dependentTasks
            this.inDegree = this.dependentTasks.size
        }
        return this
    }
}

fun <T> on(vararg elements: T): Set<T> = if (elements.isNotEmpty()) elements.toSet() else emptySet()

XTaskProject

参考Anchors新增Project的定义,这样可以按项目的思维更灵活的定制我们的任务链。

/*
 * 任务项目,将一系列任务的集合当做一个任务工程来处理,这样每个功能模块都可以有自己的任务项目
 */
class XTaskProject(private val name: String) : IXTaskProject, XTaskLogger {

    /**
     * 为什么用map,利用Key-Value,判断同名的Task是否存在
     */
    private val taskMap = mutableMapOf<String, XTask>()

    /**
     * 为什么用List,因为需要对Task进行排序,可以说是空间换时间
     */
    private val taskList: MutableList<XTask> = mutableListOf()

    override fun addTask(task: XTask): IXTaskProject {
        //任务名称不能重复
        if (!taskMap.contains(task.name)) {
            logger.debug { "add Task $task" }
            taskMap[task.name] = task
        }
        return this
    }

    override fun getTasks(): List<XTask> {
        // 根据优先级排序,值小的先执行
        return taskList.apply {
            clear()
            addAll(ArrayList(taskMap.values))
            sortBy { it.priority }
        }
    }

    override fun release() {
        logger.debug { "release" }
        taskMap.clear()
        taskList.clear()
    }
}

interface IXTaskProject {

    fun addTask(task: XTask): IXTaskProject

    fun getTasks(): List<XTask>

    fun release()

}

XTaskStarter

任务启动器,主要是支持生成PlantUML Activity示意图。

package com.lonbon.lonbonxtask.core

import com.lonbon.lonbonxtask.core.logger.XTaskLogger
import com.lonbon.lonbonxtask.logger.PlantUMLCreator
import kotlinx.coroutines.*
import java.util.*

/*
 * *****************************************************************************
 * <p>
 * Copyright (C),2007-2016, LonBon Technologies Co. Ltd. All Rights Reserved.
 * <p>
 * *****************************************************************************
 * 任务启动器
 */
object XTaskStarter : XTaskLogger {

    /**
     * 开始执行project中的任务
     * runBlocking 会阻塞线程,如果是在主线程调用,则必须小心ANR
     * @param project IXTaskProject
     */
    @JvmStatic
    fun start(project: IXTaskProject) = runBlocking {
        //获取按优先级排序的列表
        val taskList: List<XTask> = project.getTasks()
        //缓存完成的任务
        val finishedTaskList: MutableList<XTask> = mutableListOf()
        // 找出所有入度为0的任务,加入到队列当中
        val queue: Queue<XTask> = LinkedList()
        taskList.filter { it.inDegree == 0 }.forEach(queue::add)
        //建立一个map, 通过name可以获取协程的Job
        val jobMap = mutableMapOf<String, Job>()
        //循环执行队列里的任务
        logger.info { "Start assigning tasks, current time ${System.currentTimeMillis()}" }
        var totalCostTime = 0L
        //添加第一个活动节点
        val plantUMLCreator = PlantUMLCreator()
        plantUMLCreator.reset()
        if (queue.size > 1) {
            val parallelTasks = mutableListOf<String>()
            for (task in queue) {
                parallelTasks.add(task.name)
            }
            plantUMLCreator.addParallelTask(parallelTasks)
        } else {
            queue.peek()?.let {
                plantUMLCreator.addSingleTask(it.name)
            }
        }
        while (queue.isNotEmpty()) {
            //获取当前需要执行的任务
            val currentTask = queue.poll()!!
            logger.info { "--- Current assigned task is $currentTask" }
            //在指定的协程调度器启动一个协程,并返回Job
            jobMap[currentTask.name] = launch(currentTask.coroutineDispatcher) {
                //遍历当前任务依赖的任务集合
                for (dependentTask in currentTask.dependentTasks) {
                    //使用给定的协程上下文调用指定的挂起块,挂起直到完成,并返回结果。
                    withContext(currentTask.coroutineDispatcher) { // 这句代码很重要,不然会有死锁,想一想为什么?
                        jobMap[dependentTask]!!.join() // 依赖的任务必须先执行完,因为这个是拓扑排序执行的,所以理论上jobMap[dep]不可能为空,当然有可能填错任务名称的
                    }
                }
                //依赖已经执行完成,执行自身的任务
                val startTime = System.currentTimeMillis()
                logger.info { ">>>>>> Task ${currentTask.name} start <<<<<<" }
                val intercept = currentTask.run()
                logger.info { "Task ${currentTask.name} notifyObservers $intercept" }
                val endTime = System.currentTimeMillis()
                val costTime = (endTime - startTime)
                totalCostTime += costTime
                logger.info { "<<<<<< Task ${currentTask.name} completed, cost $costTime ms >>>>>>" }
                //添加到任务完成集合
                finishedTaskList.add(currentTask)
                //通知观察者
                currentTask.notify(intercept)
                //更新节点耗时
                plantUMLCreator.getPlantUMLActivityNode(currentTask.name)?.let {
                    logger.info { "${currentTask.name} getPlantUMLActivityNode ${it.elements}" }
                    logger.info { "${currentTask.name} costTime $costTime" }
                    it.elements[currentTask.name]?.costTime = costTime
                }
                //如果拦截,则取消剩余未完成的job
                if (intercept) {
                    project.release()
                    queue.clear()
                    for ((taskName, job) in jobMap) {
                        if (job.isActive) {
                            job.cancel()
                            logger.info { "Task $taskName $job cancel" }
                        }
                    }
                    jobMap.clear()
                    logger.warn { "============ Task ${currentTask.name} intercept, clear queue and cancel all jobs " }
                }
                logger.info { "${currentTask.name} taskList size ${taskList.size}  finishedTaskList size ${finishedTaskList.size} " }
                if (taskList.size == finishedTaskList.size) {
                    logger.info { "All tasks completed cost $totalCostTime ms " }
                    //保存
                    logger.info { "All tasks completed cost $totalCostTime ms " }
                    plantUMLCreator.create()
                }
            }
            //当前任务已经开始分配,找到所有依赖当前任务的任务,将它们的入度减1,如果入度为0,则加入队列
            val toBeOfferTasks = mutableListOf<XTask>()
            //记录当前任务是否有被其它任务依赖
            var currentTaskHasDependentTasks = false
            for (task in taskList) {
                if (task.dependentTasks.contains(currentTask.name)) {
                    currentTaskHasDependentTasks = true
                    //入度减1
                    task.inDegree--
                    //入度为0,则进入队列
                    if (task.inDegree == 0) {
                        toBeOfferTasks.add(task)
                    }
                }
            }
            logger.info { "${currentTask.name} currentTaskHasDependentTasks $currentTaskHasDependentTasks toBeOfferTasks $toBeOfferTasks" }
            //修改任务end标记
            plantUMLCreator.getPlantUMLActivityNode(currentTask.name)?.apply {
                elements[currentTask.name]?.end = !currentTaskHasDependentTasks
            }
            //添加下一个活动节点
            if (toBeOfferTasks.isNotEmpty()) {
                if (toBeOfferTasks.size > 1) {
                    val parallelTasks = mutableListOf<String>()
                    for (task in toBeOfferTasks) {
                        parallelTasks.add(task.name)
                    }
                    logger.info { "${currentTask.name} addParallelTask $parallelTasks" }
                    plantUMLCreator.addParallelTask(parallelTasks)
                } else {
                    toBeOfferTasks[0].let {
                        logger.info { "${currentTask.name} addSingleTask ${it.name}" }
                        plantUMLCreator.addSingleTask(it.name)
                    }
                }
                //将入度为零的任务加入队列
                for (task in toBeOfferTasks) {
                    queue.offer(task)
                }
            }
        }
        // 这个地方需要判断一下,是否所有的任务都已经被安排执行了,如果还有任务没有被安排,说明任务存在循环依赖,抛出异常。
        if (taskList.isNotEmpty() && jobMap.isNotEmpty() && jobMap.size != taskList.size) {
            //循环结束后,如果有任务没有安排,则jobMap数量肯定是不等于taskList的数量
            throw Throwable("Exist Recycle Task!")
        }
        logger.info { "All task assignments completed" }
    }

}

PlantUMLCreator

PlantUMLCreator用于生成PlantUML Activity示意图。见:activity-diagram-beta

/*
 * 创建PlantUML 活动图(beta版)
 */
class PlantUMLCreator : XTaskLogger {

    companion object {
        private const val START_UML = "@startuml"
        private const val END_UML = "@enduml"
        private const val START = "start"
        private const val END = "end"
        private const val STOP = "stop"
        private const val FORK = "fork"
        private const val FORK_AGAIN = "fork again"
        private const val END_FORK = "end fork"
        private const val COLON = ":"
        private const val SEMI_COLON = ";"
        private const val LINE_BREAK = "\r\n"
    }

    /**
     * PlantUML内容
     */
    private var text: String = ""

    /**
     * 活动类,见https://plantuml.com/zh/activity-diagram-beta
     */
    private val activityDiagram = PlantUMLActivityDiagram()

    /**
     * 重置数据
     */
    fun reset() {
        activityDiagram.nodeDeque.clear()
    }

    private fun startUml() {
        text = START_UML + LINE_BREAK
    }

    private fun endUml() {
        text += END_UML + LINE_BREAK
    }

    private fun start() {
        text += START + LINE_BREAK
    }

    private fun stop() {
        text += STOP + LINE_BREAK
    }

    private fun end() {
        text += END + LINE_BREAK
    }

    private fun fork() {
        text += FORK + LINE_BREAK
    }

    private fun forkAgain() {
        text += FORK_AGAIN + LINE_BREAK
    }

    private fun endFork() {
        text += END_FORK + LINE_BREAK
    }

    private fun addActivityLabel(label: String) {
        text += COLON + label + SEMI_COLON + LINE_BREAK
    }

    /**
     * 根据任务名称获取指定的活动节点
     * @param taskName String
     * @return PlantUMLActivityNode?
     */
    fun getPlantUMLActivityNode(taskName: String): PlantUMLActivityNode? =
        activityDiagram.nodeMap[taskName]

    /**
     * 添加一个单线任务
     * @param taskName String
     */
    fun addSingleTask(taskName: String): PlantUMLActivityNode = PlantUMLActivityNode().apply {
        elements[taskName] = PlantUMLActivityNodeElement()
        activityDiagram.nodeDeque.add(this)
        activityDiagram.nodeMap[taskName] = this
        logger.info { "addSingleTask: $taskName $elements" }
    }

    /**
     * 添加并行任务
     * @param taskNameList List<String>
     */
    fun addParallelTask(taskNameList: List<String>): PlantUMLActivityNode =
        PlantUMLActivityNode().apply {
            logger.info { "addParallelTask: taskNameList $taskNameList" }
            for (taskName in taskNameList) {
                elements[taskName] = PlantUMLActivityNodeElement()
                activityDiagram.nodeMap[taskName] = this
            }
            activityDiagram.nodeDeque.add(this)
            logger.info { "addParallelTask: $taskNameList $elements" }
        }


    /**
     * 生成plantuml代码
     */
    fun create(): String {
        startUml()
        start()
        //logger.info { "create: node size is ${activityDiagram.nodeDeque.size}" }
        while (activityDiagram.nodeDeque.isNotEmpty()) {
            activityDiagram.nodeDeque.removeFirstOrNull()?.run {
                //logger.info { "create: node ${this.elements}" }
                if (elements.size == 1) {
                    for (ele in elements) {
                        logger.info { "create: element ${ele.key}" }
                        addActivityLabel("${ele.key} ${ele.value.costTime}ms")
                    }
                    return@run
                }
                var first = true
                for (ele in elements) {
                    if (first) {
                        first = false
                        fork()
                    } else {
                        forkAgain()
                    }
                    //logger.info { "create: element $ele" }
                    addActivityLabel("${ele.key} ${ele.value.costTime}ms")
                    if (ele.value.end) {
                        end()
                    }
                }
                endFork()
            }
        }
        stop()
        endUml()
        logger.info { "create: \n\n$text" }
        return text
    }

}


/**
 * 节点元素
 * @property end Boolean true没有被其它任务依赖
 * @property costTime Long 消耗时长(毫秒ms)
 * @constructor
 */
data class PlantUMLActivityNodeElement(var end: Boolean = false, var costTime: Long = 0L)

/**
 * PlantUML活动节点
 * @property elements MutableMap<PlantUMLActivityNodeElement, Boolean> Key->Value:节点元素->结束标志
 */
class PlantUMLActivityNode {
    val elements = mutableMapOf<String, PlantUMLActivityNodeElement>()
}

/**
 * PlantUML活动示意图类
 * @property nodeDeque Deque<PlantUMLActivityNode> 双端队列
 * @property nodeMap Deque<PlantUMLActivityNode> map方便获取Node节点
 */
class PlantUMLActivityDiagram {
    val nodeDeque: ArrayDeque<PlantUMLActivityNode> = ArrayDeque()
    val nodeMap = mutableMapOf<String, PlantUMLActivityNode>()
}

线程调度

原本的线程调度是通过Task中的mainThread属性来判断是否需要在主线程中调用,这样有一个不好的地方就是不能指定任务的调度线程,也不方便做单元测试,改成可指定CoroutineDispatcher自由度会更高一些。

代码如下:

data class XTask(
    val name: String,//任务名称,不可以重复
    val desc: String = "",//任务描述
    val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default,//线程调度
    val priority: Int = 0,   // 运行的优先级,值小的先执行
    var dependentTasks: Set<String> = kotlin.collections.setOf(), // 依赖的任务集合(不可重复,所以用Set)
    val run: suspend () -> Boolean = { false }, //具体的执行任务的函数,为什么放在构造函数,主要是方便全局查看任务。返回是否拦截
) 

默认为Dispatchers.Default,在Android中如果需要在主线程运行,可以指定为Dispatchers.Main。

在本地单元测试中,封装 Android UI线程的Dispatchers.Main是无法使用的,因为我们执行的单元测试是在本地 JVM中,并不是在 Android 设备上。如果被测试代码使用了Dispatchers.Main,那么单元测试就会在运行过程中抛出异常。

注意:这种情况只会发生于本地单元测试。在可使用真实界面线程的 插桩测试 中,不应该替换 Main 调度程序。

如果需要在所有情况下都将 Dispatchers.Main调度程序替换为 TestDispatcher,则可以使用 Dispatchers.setMain 和 Dispatchers.resetMain 函数。

代码示例:

@Test
fun settingMainDispatcher() = runTest {
    val testDispatcher = UnconfinedTestDispatcher(testScheduler)
    Dispatchers.setMain(testDispatcher)
    try {
        val viewModel = HomeViewModel()
        viewModel.loadMessage() // Uses testDispatcher, runs its coroutine eagerly
        assertEquals("Greetings!", viewModel.message.value)
    } finally {
        Dispatchers.resetMain()
    }
}

更多关于协程的单元测试内容参考:在 Android 上测试 Kotlin 协程

一个完整的单元测试用例如下:

/**
 * Example local unit test, which will execute on the development machine (host).
 *
 * See [testing documentation](http://d.android.com/tools/testing).
 */
class ExampleUnitTest {

    @Test
    fun testXTask() = runTest(UnconfinedTestDispatcher()) {
        val project = XTaskProject("NormalProject")
        project.addTask(XTask(
            name = "TaskStart", coroutineDispatcher = Dispatchers.Default
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskA",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskStart")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskB",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskStart")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskC",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskStart")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskD",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskA", "TaskC")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskE",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskD")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskF",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskD")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskG",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskD")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskH",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskF", "TaskG")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskI",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskH")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskJ",
            coroutineDispatcher = Dispatchers.Default,
            dependentTasks = setOf("TaskI")
        ) {
            delay(1000)
            false
        }).addTask(XTask(
            name = "TaskK",
            coroutineDispatcher = Dispatchers.Default,
        ) {
            delay(4000)
            false
        })
        XTaskManager.start(project)
    }

}

需要的注意是,当我们在本地单元测试中执行协程时,最好配合runTest和UnconfinedTestDispatcher来使用,否则无法立即执行协程。

runTest(UnconfinedTestDispatcher()) {
    //launch
}

任务优先级

有一些任务虽然是并行的,但是我们仍然希望这些任务有一个优先级,因此笔者加了个priority来对任务进行排序,priority值小的先执行。

override fun getTasks(): List<XTask> {
    // 根据优先级排序,值小的先执行
    return taskList.apply {
        clear()
        addAll(ArrayList(taskMap.values))
        sortBy { it.priority }
    }
}

注意,实际上优先级是对入度一样的任务才有效。并不是指所有任务的执行先后的优先级,也不是线程的优先级。

任务拦截

需求是多样化的,实际我们可能有这样的场景,比如网络不通或者不需要进行数据同步时,之后的任务链就不用执行了,因此需要增加拦截机制,提前结束后续的任务链。

因为所有的任务都是通过协程执行并返回job的对象,因此,想要进行拦截,就可以通过执行job.cancel来完成。当然,我们还需一个触发条件,笔者是通过任务返回值来判断是否需要进行拦截的。

代码如下:

val run: suspend () -> Boolean = { false }, //具体的执行任务的函数,为什么放在构造函数,主要是方便全局查看任务。返回是否拦截

val intercept = currentTask.run()
...省略若干代码....
//如果拦截,则取消剩余未完成的job
if (intercept) {
    project.release()
    queue.clear()
    for ((taskName, job) in jobMap) {
        if (job.isActive) {
            job.cancel()
            logger.info { "Task $taskName $job cancel" }
        }
    }
    jobMap.clear()
    logger.warn { "============ Task ${currentTask.name} intercept, clear queue and cancel all jobs " }
}

但是,如果协程里面又运行了协程怎么办?套娃呢。嗯,这个确实不好处理。因此,笔者强烈不建议这样写代码,我们应该通过拆分代码,定义新的任务和依赖关系来实现我们的需求。

可能还会有这样的需求,比如TaskB依赖TaskA,但是TaskB是否执行依赖于TaskA的结果,其它的Task则不受干扰。

面对这样的需求,可能就需要任务之间进行数据传递,就像RxJava那样,TaskA把结果给到TaskB,TaskB发现不满足条件就不执行对应的代码。或者简单点,Task增加一个是否可运行的标记位,当TaskA执行完后根据需要去设置TaskB的标记位,TaskB判断标记位不满足则不执行对应的代码。

总之如果要按上面的需求做一个精细化的控制,还是有蛮多工作要做的,这样会让整个逻辑变得更加复杂,调试起来会难上加难。实际上,笔者认为更简单更合理的方式,应该是将需求拆分,分成不同的Project来处理。如果Project之间有依赖关系,比如有ProjectA和ProjectB,ProjectB必须在ProjectA的TaskA之后才能开始,那么我们就可以给Task设计一个任务完成通知或者说增加一个锚点,这样ProjectB就可以在收到TaskA任务完成通知或到达锚点后开始执行。

任务完成通知

任务完成通知可以使用观察者模式,但如果需求简单,其实一个任务回调就基本满足需求了。

data class XTask(
    val name: String,//任务名称,不可以重复
    val desc: String = "",//任务描述
    val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default,//线程调度
    val priority: Int = 0,   // 运行的优先级,值小的先执行,
    var dependentTasks: Set<String> = setOf(), // 依赖的任务集合(不可重复,所以用Set)
    val run: suspend () -> Boolean = { false }, //具体的执行任务的函数,为什么放在构造函数,主要是方便全局查看任务。返回是否拦截
) : Observable() {
    // 入度,用于有向无环图的拓扑排序
    var inDegree = dependentTasks.size

    /**
     * 通知通知观察者
     */
    fun notify(arg: Any) {
        setChanged()
        notifyObservers(arg)
    }
}
//添加观察者
val taskA = XTask(
    name = "TaskA",
    coroutineDispatcher = Dispatchers.Default,
) {
    delay(1000)
    false
}.apply {
    addObserver { _, arg ->
        println("$name update intercept $arg")
    }
}

//依赖已经执行完成,执行自身的任务
val intercept = currentTask.run()
...省略若干代码
//通知观察者
currentTask.notify(intercept)

使用Kotlin DSL进行任务编排

原本的写法,任务的创建和任务之间依赖关系的定义是捆绑在一起的,这样有个不好的地方就是,任务之间的依赖关系不太直观,而且如果要改依赖关系也不太方便。实际上,我们还可以提供新的任务编排方式,将任务的创建和任务之间的依赖关系分开。

首先创建任务并添加进Project:

val diagramProject = XTaskProject("DiagramProject")
val taskStart = XTask(
    name = "TaskStart", coroutineDispatcher = Dispatchers.Default
) {
    delay(1000)
    false
}
val taskA = XTask(
    name = "TaskA",
    coroutineDispatcher = Dispatchers.Default,
) {
    delay(1000)
    false
}
val taskB = XTask(
    name = "TaskB",
    coroutineDispatcher = Dispatchers.Default,
) {
    delay(1000)
    false
}
val taskC = XTask(
    name = "TaskC",
    coroutineDispatcher = Dispatchers.Default,
) {
    delay(1000)
    false
}
val taskEnd = XTask(
    name = "TaskEnd", coroutineDispatcher = Dispatchers.Default
) {
    delay(1000)
    false
}
//添加进Project
diagramProject.apply {
    addTask(taskStart)
    addTask(taskA)
    addTask(taskB)
    addTask(taskC)
    addTask(taskEnd)
}

使用Kotlin DSL任务编排:

/**
 * <- 表示左边的任务依赖于右边的任务,即做的任务必须等右边的任务执行完毕之后才能执行
 * taskEnd <- taskB <- taskA <-taskStar
 * taskEnd <- taskC <- taskA
 */
taskEnd.depends {
    on(
        taskB.depends {
            on(
                taskA.depends {
                    on(taskStart)
                }
            )
        },
        taskC.depends {
            on(taskA)
        },
    )
}

使用Kotlin中缀函数美化一下代码:


taskEnd depends on(
    taskB depends on(
        taskA depends on(
            taskStart
        )
    ),
    taskC depends on(taskA)
)

奇奇怪怪的代码,不过倒是可以看得清楚任务之间的依赖关系。

执行下任务并看下结果:

任务的执行的先后顺序以及耗时一目了然,改成TaskEnd不依赖TaskC:

可以看到TaskC没有链接到TaskEnd,说明TaskEnd不依赖TaskC。

任务执行结果可视化

可能有人会好奇上面的图是怎么生成的,手动用画图软件画的?当然不是!利用plantuml可以帮助我们通过代码来画图。笔者使用的是plantuml的活动图(新语法)来表示任务执行的耗时和任务之间的依赖关系,当然,如果大家有更好的建议,非常欢迎一起交流。

活动图(新语法)的语法就不多说了,不了解的小伙伴点上面的链接先学习下。

如下图所示,通过一段代码可以生成对应的图。

这样一来,我们只需要关心怎么生成这段表示图的代码就可以了,不过前提是,你得了解它的语法吧。

代码如下:


/*
 * 创建PlantUML 活动图(beta版)
 */
class XTaskPlantUMLCreator : XTaskLogger {

    companion object {
        private const val START_UML = "@startuml"
        private const val END_UML = "@enduml"
        private const val START = "start"
        private const val END = "end"
        private const val STOP = "stop"
        private const val FORK = "fork"
        private const val FORK_AGAIN = "fork again"
        private const val END_FORK = "end fork"
        private const val COLON = ":"
        private const val SEMI_COLON = ";"
        private const val LINE_BREAK = "\r\n"
    }

    /**
     * PlantUML内容
     */
    private var text: String = ""

    /**
     * 活动类,见https://plantuml.com/zh/activity-diagram-beta
     */
    private val activityDiagram = PlantUMLActivityDiagram()

    /**
     * 重置数据
     */
    fun reset() {
        activityDiagram.nodeDeque.clear()
    }

    private fun startUml() {
        text = START_UML + LINE_BREAK
    }

    private fun endUml() {
        text += END_UML + LINE_BREAK
    }

    private fun start() {
        text += START + LINE_BREAK
    }

    private fun stop() {
        text += STOP + LINE_BREAK
    }

    private fun end() {
        text += END + LINE_BREAK
    }

    private fun fork() {
        text += FORK + LINE_BREAK
    }

    private fun forkAgain() {
        text += FORK_AGAIN + LINE_BREAK
    }

    private fun endFork() {
        text += END_FORK + LINE_BREAK
    }

    private fun addActivityLabel(label: String) {
        text += COLON + label + SEMI_COLON + LINE_BREAK
    }

    /**
     * 根据任务名称获取指定的活动节点
     * @param taskName String
     * @return PlantUMLActivityNode?
     */
    fun getPlantUMLActivityNode(taskName: String): PlantUMLActivityNode? =
        activityDiagram.nodeMap[taskName]

    /**
     * 添加一个单线任务
     * @param taskName String
     */
    fun addSingleTask(taskName: String): PlantUMLActivityNode = PlantUMLActivityNode().apply {
        elements[taskName] = PlantUMLActivityNodeElement()
        activityDiagram.nodeDeque.add(this)
        activityDiagram.nodeMap[taskName] = this
        logger.info { "addSingleTask: $taskName $elements" }
    }

    /**
     * 添加并行任务
     * @param taskNameList List<String>
     */
    fun addParallelTask(taskNameList: List<String>): PlantUMLActivityNode =
        PlantUMLActivityNode().apply {
            logger.info { "addParallelTask: taskNameList $taskNameList" }
            for (taskName in taskNameList) {
                elements[taskName] = PlantUMLActivityNodeElement()
                activityDiagram.nodeMap[taskName] = this
            }
            activityDiagram.nodeDeque.add(this)
            logger.info { "addParallelTask: $taskNameList $elements" }
        }


    /**
     * 生成plantuml代码
     */
    fun create(): String {
        startUml()
        start()
        //logger.info { "create: node size is ${activityDiagram.nodeDeque.size}" }
        while (activityDiagram.nodeDeque.isNotEmpty()) {
            activityDiagram.nodeDeque.removeFirstOrNull()?.run {
                //logger.info { "create: node ${this.elements}" }
                if (elements.size == 1) {
                    for (ele in elements) {
                        logger.info { "create: element ${ele.key}" }
                        addActivityLabel("${ele.key} ${ele.value.costTime}ms")
                    }
                    return@run
                }
                var first = true
                for (ele in elements) {
                    if (first) {
                        first = false
                        fork()
                    } else {
                        forkAgain()
                    }
                    //logger.info { "create: element $ele" }
                    addActivityLabel("${ele.key} ${ele.value.costTime}ms")
                    if (ele.value.end) {
                        end()
                    }
                }
                endFork()
            }
        }
        stop()
        endUml()
        logger.info { "create: \n\n$text" }
        return text
    }

}


/**
 * 节点元素
 * @property end Boolean true没有被其它任务依赖
 * @property costTime Long 消耗时长(毫秒ms)
 * @constructor
 */
data class PlantUMLActivityNodeElement(var end: Boolean = false, var costTime: Long = 0L)

/**
 * PlantUML活动节点
 * @property elements MutableMap<PlantUMLActivityNodeElement, Boolean> Key->Value:节点元素->结束标志
 */
class PlantUMLActivityNode {
    val elements = mutableMapOf<String, PlantUMLActivityNodeElement>()
}

/**
 * PlantUML活动示意图类
 * @property nodeDeque Deque<PlantUMLActivityNode> 双端队列
 * @property nodeMap Deque<PlantUMLActivityNode> map方便获取Node节点
 */
class PlantUMLActivityDiagram {
    val nodeDeque: ArrayDeque<PlantUMLActivityNode> = ArrayDeque()
    val nodeMap = mutableMapOf<String, PlantUMLActivityNode>()
}

也没什么技巧,直接拼接就完事了。有了这段代码,我们就可以在任务执行的过程中,根据依赖关系,执行耗时,调用相关的方法来填充需要的信息。

需要注意的,最终我们只生成了需要的代码,如果要转成图片,需要通过plantuml网站或者下载它们提供的jar包来操作。参考:plantuml入门指南。

小结

经过一番改造,一个使用Kotlin协程实现支持任务编排的轻量级启动器就完成了。虽然代码量小,但是正因为足够简单,所以容易上手方便调试,应对一般的需求足够了。正所谓,麻雀虽小五脏俱全。

正如前言所说,这个启动器的目的不是用来做Android应用的启动优化,因为启动优化实际上会涉及到很多知识点,需要涉及到的细节也更多。比如多进程,跨模块,线程收敛,线程监控等等。

当然,这个启动器还有一些细节需要处理,比如怎样防止依赖一个不存在的任务,怎么避免死锁,怎样处理异常等。这些问题留给大家去思考了。

写在最后,首先非常感谢您耐心阅读完整篇文章,坚持写原创且基于实战的文章不是件容易的事,如果本文刚好对您有点帮助,欢迎您给文章点赞评论,您的鼓励是笔者坚持不懈的动力。写博客不仅仅是巩固学习的一个好方式,更是一个观点碰撞查漏补缺的绝佳机会,若文章有不对之处非常欢迎指正,再次感谢。

参考资料

我爱田Hebe应用程序启动优化新思路 - Kotlin协程

czy1121的init

程序员徐公的Android 启动优化(一) - 有向无环图

网易云音乐技术团队的心遇 Android 启动优化实践:将启动时间降低 50%

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402789.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mybatis框架源码笔记(七)之Mybatis中类型转换模块(TypeHandler)解析

1、JDBC的基本操作回顾 这里使用伪代码概括一下流程: 对应数据库版本的驱动包自行下载加载驱动类 (Class.forName("com.mysql.cj.jdbc.Driver"))创建Connection连接: conn DriverManager.getConnection("jdbc:mysql://数据库IP:port/数据库名称?useUnico…

最新消息:2023年软考高项教材改版!

最新通知&#xff1a;从2023年上半年软考开始信息系统项目管理师考试将依据新版考试大纲进行。 给备考高项的朋友的一些建议&#xff1a; 备考资源&#xff1a; 【腾讯文档】软考各科资料分享 https://docs.qq.com/doc/DTVN1SWtFZHdicUNp 复习方法&#xff1a; 选择题 选择题…

ChatGPT,乌合之众的疯狂

最近ChatGPT有多火爆就不用我说了。公司里&#xff0c;从CEO到技术人员&#xff0c;乃至于门口的保安、食堂的大婶&#xff0c;没有一个不会聊两句ChatGPT的。连我20年未见的小学同学、三线城市警官&#xff0c;都问我这东西能不能给领导写汇报材料。 用不了多久&#xff0c;家…

颠覆推特VS改造推特:什么是去中心化社交的正确姿势?

去年&#xff0c;“钢铁侠”伊隆马斯克收购了全球最大的社交媒体之一——推特。推特成立于2006年&#xff0c;是一个“公民广场”&#xff0c;允许大家公开发表观点和内容。用户可以关注自己喜欢的账号&#xff0c;也可以点赞转发评论他人的推文&#xff0c;中国的微博便是照搬…

【halcon】轮廓拟合相关算子

涉及函数 edges_sub_pix 寻找边缘 edges_sub_pix (Image, Edges, canny, 1, 10, 20) 后面三个参数&#xff0c;越小&#xff0c;找到的细节越多。这个是对应录波器为canny时。 canny滤波器用的最多。 segment_contours_xld 将连续的轮廓进行分段&#xff0c;按圆弧或者执…

JUC(七)

1.线程安全集合类 1>.线程安全集合类可以分为三大类: ①.遗留的(/旧的)线程安全集合,如:Hashtable,Vector; ②.使用Collections装饰的线程安全集合,如: Collections.synchronizedCollectionCollections.synchronizedListCollections.synchronizedMapCollections.synchroniz…

window通过wsl启动appsmith源码

window通过wsl启动appsmith前端后端前言appsmith前端本地启动WSL安装下载ubuntu升级wsl到wsl2ubuntu安装环境环境要求Ubuntu环境配置node下载解压运行[源码](https://www.appsmith.com/)本地访问后端appsmith后台本地启动启动mongo、rediswsl ubuntu中启动后台试试流程总结最后…

缓存双写一致性之更新策略探讨

问题由来 数据redis和MySQL都要有一份&#xff0c;如何保证两边的一致性。 如果redis中有数据&#xff1a;需要和数据库中的值相同如果redis中没有数据&#xff1a;数据库中的值是最新值&#xff0c;且准备会写redis 缓存操作分类 自读缓存读写缓存&#xff1a; &#xff0…

关于vuex的使用

1.首先安装vuex npm install vuex --save 这时如果直接安装vuex&#xff0c;不指定版本的话&#xff0c;就会直接安装最新的vuex的版本。所以会出现报错。 报错就安装这个 npm install --save vuex3 2.创建文件夹&#xff0c; 有的时候安装好会自动创建vuex的文件夹 &#xf…

Python解题 - CSDN周赛第35期 - 不算题解的题解

本期四道题还是全考过&#xff0c;题解在网上也都搜得到。。。只好继续水一份不算题解的题解。 第一题&#xff1a;交换后的or 给定两组长度为n的二进制串&#xff0c;请问有多少种方法在第一个串中交换两个不同位置上的数字&#xff0c;使得这两个二进制串“或”的结果发生改…

案例01-修改数据redis没有同步更新

目录 一&#xff1a;背景介绍 二&#xff1a;思路&方案 三&#xff1a;过程 1.修改数据没有删除缓存 2.修改数据删除了缓存 四&#xff1a;总结 五&#xff1a;升华 一&#xff1a;背景介绍 redis中存储了关于一个课程下多个班级的信息。但是难免会在一个课程下添加新…

pandas 数据预处理+数据概览 处理技巧整理(持续更新版)

这篇文章主要是整理下使用pandas的一些技巧&#xff0c;因为经常不用它&#xff0c;这些指令忘得真的很快。前段时间在数模美赛中已经栽过跟头了&#xff0c;不希望以后遇到相关问题的时候还去网上查&#xff08;主要是太杂了&#xff09;。可能读者跟我有一样的问题&#xff0…

程序员养发神器:拒绝加班熬夜,告别秃头!

身为一个程序员&#xff0c;每天的工作就是写代码和吹牛逼&#xff0c;但是代码写多了&#xff0c;都没有多少让自己吹的时间了。摸鱼时间少是我们太菜了吗&#xff1f;可不要小瞧自己&#xff0c;可能是你没掌握方法。 我自己本身就是一个十分疯狂的工具收集者&#xff0c;收…

实在智能RPA数字员工竞技“宝罗杯”机器人创新总决赛,斩获佳绩!

近日&#xff0c;由中国钢铁工业协会和中国自动化学会指导&#xff0c;中国宝武钢铁集团有限公司主办、宝信软件承办的机器人行业领域的“宝罗杯”机器人创新大赛总决赛在中国宝武上海总部圆满收官。 此次大赛旨在充分凝聚社会智力&#xff0c;聚焦工业机器人的应用场景&#x…

es深度分页原因概念及处理方法

概述 当使用es分页查询的时候&#xff0c;如果查询的数据太靠后了&#xff0c;就会产生深度分页问题。 假设es有3个节点&#xff0c;node1,node2,node3 查询 limti 50000,50 假设请求的是node1,此时会在每个节点上抓出 50050条数据&#xff0c;然后在node1汇总排序&#xff0…

【设计模式】装饰器模式

装饰器模式 以生活中的场景来举例&#xff0c;一个蛋糕胚&#xff0c;给它涂上奶油就变成了奶油蛋糕&#xff0c;再加上巧克力和草莓&#xff0c;它就变成了巧克力草莓蛋糕。 像这样在不改变原有对象的基础之上&#xff0c;将功能附加到原始对象上的设计模式就称为装饰模式(D…

如何查看磁盘空间并挂载磁盘

df -h内容参数含义Filesystem文件系统Size分区大小1k-blocks单位是1KB(使用df查看)Used已用容量Avail还可用的容量Use%已用百分比Mounted on挂载点du -h查看某目录下占用空间最多的文件或目录。取前10个。需要先进入该目录下。du -cks * | sort -rn | head -n 10参数含义-s对每…

腾讯游戏,“迷失”自己

【潮汐商业评论/原创】“那个号我忘记密码了&#xff0c;你等我换个新号跟你玩”。这是Lynn《王者荣耀》双排队友常说的话。因为未成年&#xff0c;账号只有周末能玩&#xff0c;而且只有两小时。所以Lynn的这位网友&#xff0c;经常用家长的手机号注册游戏账号&#xff0c;但是…

Yarn调度器和调度算法

目录 1 先进先出调度器&#xff08;FIFO&#xff09; 2 容量调度器&#xff08;Capacity Scheduler&#xff09; 3 公平调度器&#xff08;Fair Scheduler&#xff09; 缺额&#xff1a; 公平调度器队列资源分配方式 公平调度器资源分配算法 Hadoop作业调度器主要有三种&…

分库分表原理

一、数据库瓶颈 会导致数据库的活跃连接数增加&#xff0c;进而逼近甚至达到数据库可承载活跃连接数的阈值。在业务Service来看就是&#xff0c;可用数据库连接少甚至无连接可用。接下来就可以想象了吧&#xff08;并发量、吞吐量、崩溃&#xff09;。 IO瓶颈-分库和垂直分表…