kotlin coroutine源码解析之Dispatchers协程调度器

news2025/1/23 3:55:52

目录

  • Dispatchers协程调度器
    • Dispatchers.Default
    • Dispatchers.IO
    • Dispatchers.Main
    • Dispatchers.Unconfined
  • 协程调度器的实现CoroutineScheduler
  • 总结

Dispatchers协程调度器

CoroutineDispatcher,具有用于调度任务的底层执行器。ExecutorCoroutineDispatcher的实例应由调度程序的所有者关闭。
此类通常用作基于协程的API和异步API之间的桥梁,异步API需要Executor的实例。

根据各种调度器的继承关系,梳理如下继承结构:
在这里插入图片描述

CoroutineDispatcher基类将由所有协程调度器实现扩展,kotlin官方实现了以下四种调度器:

Dispatchers.Default -如果上下文中未指定调度器或任何其他ContinuationInterceptor,则所有标准构建器都使用默认值。它使用共享后台线程的公共池。对于消耗CPU资源的计算密集型协程来说,这是一个合适的选择。

Dispatchers.IO -使用按需创建线程的共享池,用于卸载IO密集型阻塞操作(如文件I/O和阻塞套接字I/O)。

Dispatchers.Unconfined -在当前调用帧中启动协程执行,直到第一次暂停,然后协程生成器函数返回。协程稍后将在相应的挂起函数使用的任何线程中恢复,而不将其限制在任何特定的线程或池中。无约束调度器通常不应在代码中使用。

HandlerContext -在主线程中调度任务,android中主线程也就是ui线程,使用该调度器谨慎ANR异常,不应该使用该调度器调度阻塞或者耗时任务。

可以使用newSingleThreadContext和newFixedThreadPoolContext创建专用线程池。
可以使用asCoroutineDispatcher扩展函数将任意执行器转换为调度器。

Dispatchers.Default

这个调度器的类型是DefaultScheduler,一般是做cpu密集计算型任务,内部包含的成员变量IO,也就是对应的Dispatchers.IO调度器。主要实现在ExecutorCoroutineDispatcher()中,代码如下:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
	//省略。。。
}

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(//省略。。。)

    override val executor: Executor
        get() = coroutineScheduler

    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block, tailDispatch = true)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatchYield(context, block)
        }
    }
    //省略。。。
}

Default调度器其实没做什么特别的操作,只是用coroutineScheduler代理实现了协程的调度。

Dispatchers.IO

这个是LimitingDispatcher类型的,是DefaultScheduler类型的成员变量,而LimitingDispatcher类型又是继承自ExecutorCoroutineDispatcher的,LimitingDispatcher在它基础上做了有调度个数限制的排队机制,IO这个名字代表的IO操作,IO操作又是阻塞线程的操作,线程不能及时释放,所以加入了队列机制,防止IO线程爆炸式增长。如下:

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
    //省略。。。
}

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Commit in-flight tasks slot
            val inFlight = inFlightTasks.incrementAndGet()

            // Fast path, if parallelism limit is not reached, dispatch task and return
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }

            queue.add(taskToSchedule)
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }

            taskToSchedule = queue.poll() ?: return
        }
    }
    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        dispatch(block, tailDispatch = true)
    }

}

构造函数 传入了parallelism参数 ,这个是并发数。

dispatchYield方法 实现是直接调用的dispatch方法。

dispatch方法:一个while循环,循环内,

  1. 给inFlightTasks变量加一(这个变量代表正在调度中的个数),如果inFlightTasks <= parallelism,代表当前调度任务数小于最大并发数,说明可以继续向调度器中调度任务,
  2. 否则将任务加入到队列中,接着尝试将inFlightTasks减一,如果大于并发数,那么直接结束;
  3. 如果小于并发数,说明刚刚已经有任务结束了,让出了并发数,这个时候可以再次尝试从队列中取出任务,从1开始。
    override fun afterTask() {
        var next = queue.poll()
        // If we have pending tasks in current blocking context, dispatch first
        if (next != null) {
            dispatcher.dispatchWithContext(next, this, true)
            return
        }
        inFlightTasks.decrementAndGet()

        next = queue.poll() ?: return
        dispatch(next, true)
    }

afterTask方法

这个方法是任务调度结束后的回调,这里面首先从队列中取出一个任务,
任务不为空,让调度器调度这个任务,结束;
为空,给调度任务数加一,然后尝试取出任务,为空返回,不为空,继续调用dispatch方法,整个流程就串起来了。
整个流程如下图所示:
在这里插入图片描述

综上:IO调度器侧重于调度任务数量的限制,防止IO操作阻塞线程,让线程数量爆炸式增长。

Dispatchers.Main

具体的实现类是HandlerContext,代码如下:

HandlerContext(Looper.getMainLooper().asHandler(async = true))

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
	//省略。。。
}

主线程中调度任务,android中主线程也就是ui线程。实现原理是内部持有一个val handler : Handler = Looper.getMainLooper().asHandler(async = true),这个handler正是主线程的handler。

在调用dispatch调度方法的时候,是使用handler发送一个Runnable任务,

override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

在delay的时候,如果当前的dispatcher正是HandlerContext,那么实现是handler发送一个延迟了timeMillis毫秒时长的Runnable。invokeOnCancellation的扩展方法是在协程被取消的时候,移除掉该runnable消息。

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val block = Runnable {
        with(continuation) { resumeUndispatched(Unit) }
    }
    handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
    continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}

下面这个方法也比较常看到,就是协程在调度continuation的时候,会去判断是不是需要去调度,不需要的话,直接在当前线程执行,需要调度的,需要由dispatcher来重新调度任务,这样可能执行的线程会被切换,如果不是主线程的话,、就需要调度了, 如果是主线程的话立刻执行。

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

Dispatchers.Unconfined

具体的实现如下:

internal object Unconfined : CoroutineDispatcher() {
	//省略。。。
}

isDispatchNeeded直接返回false,代表不需要重新调度。

 override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

dispatchYield没有被覆写,直接调用dispatch方法,用的还是CoroutineDispatcher的实现。
dispatch的报错信息显示,Unconfined调度器只能在存在YieldContext的时候调度,否则就会报异常。

//CoroutineDispatcher
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)

//Unconfined
override fun dispatch(context: CoroutineContext, block: Runnable) {
    // It can only be called by the "yield" function. See also code of "yield" function.
    val yieldContext = context[YieldContext]
    if (yieldContext != null) {
        // report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
        yieldContext.dispatcherWasUnconfined = true
        return
    }
    throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
        "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
        "isDispatchNeeded and dispatch calls.")
}

yied方法:是暂时让出工作线程,等待下一次线程调取恢复协程。
yield代码如下:

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    context.checkCompletion()
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (cont.dispatcher.isDispatchNeeded(context)) {
        cont.dispatchYield(context, Unit)
    } else {
        val yieldContext = YieldContext()
        cont.dispatchYield(context + yieldContext, Unit)
        if (yieldContext.dispatcherWasUnconfined) {
            return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
        }
    }
    COROUTINE_SUSPENDED
}
  1. 如果isDispatchNeeded == true,那么就需要重新将协程被调度器调度一次,线程有可能切换掉;
  2. 如果isDispatchNeeded == false,上下文集合需要添加val yieldContext = YieldContext()这个元素(在上面的Dispatchers.Unconfined
    的dispatche方法中,如果有YieldContext元素,将dispatcherWasUnconfined设置为true,代表yield操作什么都没有做,需要协程调度器用其他方法调度一次)。
    判断dispatcherWasUnconfined,true:说明Dispatchers.Unconfined什么都没有做,需要在调度一次,调用了yieldUndispatched方法,这个方法大概就是让协程直接恢复一次,或者线程调度一次恢复;
    false:说明正在被调度器调度,是个挂起点,返回COROUTINE_SUSPENDED值。

不太清楚Dispatchers.Unconfined这个调度器有啥用,有知道的留言下,学习学习。

协程调度器的实现CoroutineScheduler

调度过程正真的实现是CoroutineScheduler这个类,上面说的四种调度器是包装类,调度逻辑在CoroutineScheduler中,代码如下:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
	//省略。。。
}

构造函数入参 corePoolSize: Int定义核心线程数,maxPoolSize: Int定义最大线程数量


	fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
	}

dispatch函数的实现:

  1. 创建task,block如果是Task类型的话,设置submissionTime变量,submissionTime变量用于延迟执行的时间判断,以及队列排序的时间顺序;设置taskContext,该变量是task执行的协程上下文。不是Task类型的话,会创建TaskImp类型的任务返回,关键是finally中的taskContext.afterTask(),就是task执行完成后需要回调afterTask通知协程上下文执行完毕了,上面的Dispatchers.IO里面的LimitingDispatcher调度器就是需要afterTask回调通知,才能将队列中下一个任务抛给CoroutineScheduler去执行。
   internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
        val nanoTime = schedulerTimeSource.nanoTime()
        if (block is Task) {
            block.submissionTime = nanoTime
            block.taskContext = taskContext
            return block
        }
        return TaskImpl(block, nanoTime, taskContext)
    }
    
internal class TaskImpl(
    @JvmField val block: Runnable,
    submissionTime: Long,
    taskContext: TaskContext
) : Task(submissionTime, taskContext) {
    override fun run() {
        try {
            block.run()
        } finally {
            taskContext.afterTask()
        }
    }
}
  1. 获取当前的工作线程,如果当前是工作线程直接返回,不是的话返回空
    private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
  1. 将任务提交到工作线程的本地队列中
    private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
        if (this == null) return task
        if (state === WorkerState.TERMINATED) return task
        if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
            return task
        }
        mayHaveLocalTasks = true
        return localQueue.add(task, fair = tailDispatch)
    }

返回是空的,说明添加成功了,返回task说明没有添加成功。
如果线程是中断状态,那么直接返回task。
如果任务是非阻塞的也就是cpu密集型任务,而线程是阻塞的(正在执行任务中),那么不添加任务,直接返回task。
其他情况,添加任务到队列中,mayHaveLocalTasks标志位true,代表当前线程中有任务。

  1. 没有添加的话,需要添加到全局队列中,globalCpuQueue全局cpu密集型队列,globalBlockingQueue全局IO队列,根据任务类型添加到对应的队列中。如果全局队列都添加失败的话,直接抛出异常。
	 if (notAdded != null) {
	     if (!addToGlobalQueue(notAdded)) {
	         // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
	         throw RejectedExecutionException("$schedulerName was terminated")
	     }
	 }
	 

    val globalCpuQueue = GlobalQueue()
    val globalBlockingQueue = GlobalQueue()

    private fun addToGlobalQueue(task: Task): Boolean {
        return if (task.isBlocking) {
            globalBlockingQueue.addLast(task)
        } else {
            globalCpuQueue.addLast(task)
        }
    }
  1. 根据是否是尾部添加和当前线程是否是空,决定是否跳过唤醒工作线程的步骤。
    val skipUnpark = tailDispatch && currentWorker != null
  1. 非阻塞任务:skipUnpark为true,跳过唤醒步骤,否则唤醒cpu密集型线程;阻塞任务:skipUnpark为true,跳过唤醒步骤,唤醒IO线程。
       // Checking 'task' instead of 'notAdded' is completely okay
       if (task.mode == TASK_NON_BLOCKING) {
           if (skipUnpark) return
           signalCpuWork()
       } else {
           // Increment blocking tasks anyway
           signalBlockingWork(skipUnpark = skipUnpark)
       }

看下唤醒步骤的具体实现,大概都是先tryUnpark,唤醒线程,如果没有唤醒成功,创建一个新的线程,再次尝试唤醒。

    private fun signalBlockingWork(skipUnpark: Boolean) {
        // Use state snapshot to avoid thread overprovision
        val stateSnapshot = incrementBlockingTasks()
        if (skipUnpark) return
        if (tryUnpark()) return
        if (tryCreateWorker(stateSnapshot)) return
        tryUnpark() // Try unpark again in case there was race between permit release and parking
    }

    internal fun signalCpuWork() {
        if (tryUnpark()) return
        if (tryCreateWorker()) return
        tryUnpark()
    }

看下工作线程的具体实现吧:
worker继承自Thread,实现了run方法,具体是由runWorker()方法实现的,每个工作线程都有一个本地队列用于存储任务,这样本地有任务就不用去全局队列中去抢资源了,减少锁竞争。

	internal inner class Worker private constructor() : Thread() {
		//省略。。。
		
		@JvmField
        val localQueue: WorkQueue = WorkQueue()
        
        @JvmField
        var mayHaveLocalTasks = false
        
		override fun run() = runWorker()
        
		//省略。。。
   }

runWorker() 的实现:

        private fun runWorker() {
            var rescanned = false
            while (!isTerminated && state != WorkerState.TERMINATED) {
                val task = findTask(mayHaveLocalTasks)
                // Task found. Execute and repeat
                if (task != null) {
                    rescanned = false
                    minDelayUntilStealableTaskNs = 0L
                    executeTask(task)
                    continue
                } else {
                    mayHaveLocalTasks = false
                }

                if (minDelayUntilStealableTaskNs != 0L) {
                    if (!rescanned) {
                        rescanned = true
                    } else {
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L
                    }
                    continue
                }

                tryPark()
            }
            tryReleaseCpu(WorkerState.TERMINATED)
        }

工作线程是用while循环一直运行的,循环内:

  1. val task = findTask(mayHaveLocalTasks),前面这个变量mayHaveLocalTasks出现过,在添加task到本地队列的时候,会置为true,本地队列有任务,从本地获取,没有就从全局队列中获取,如果还是没有,从其他线程队列中偷取任务到自己队列中:
    fun findTask(scanLocalQueue: Boolean): Task? {
        if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
        // If we can't acquire a CPU permit -- attempt to find blocking task
        val task = if (scanLocalQueue) {
            localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
        } else {
            globalBlockingQueue.removeFirstOrNull()
        }
        return task ?: trySteal(blockingOnly = true)
    }

trySteal方法,循环workers队列,遍历线程本地队列,去偷取任务,偷到的话返回任务,没偷到的话,返回null:

private fun trySteal(blockingOnly: Boolean): Task? {
			//省略。。。
            var currentIndex = nextInt(created)
            var minDelay = Long.MAX_VALUE
            repeat(created) {
            	//省略。。。
                val worker = workers[currentIndex]
                if (worker !== null && worker !== this) {
                    val stealResult = if (blockingOnly) {
                        localQueue.tryStealBlockingFrom(victim = worker.localQueue)
                    } else {
                        localQueue.tryStealFrom(victim = worker.localQueue)
                    }
                    if (stealResult == TASK_STOLEN) {
                        return localQueue.poll()
                    } else if (stealResult > 0) {
                        minDelay = min(minDelay, stealResult)
                    }
                }
            }
            minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
            return null
        }

在偷不到任务的时候会设置一个变量,stealResult等于-2,最后minDelayUntilStealableTaskNs 等于0;

internal const val TASK_STOLEN = -1L
internal const val NOTHING_TO_STEAL = -2L

在偷取任务的时候,如果上个任务时间和这次时间间隔太短的话,返回下次执行的间隔时间差,minDelayUntilStealableTaskNs设置为这个时间值,大于0。

  1. 找到task了,直接执行任务executeTask(task) ,执行完成,continue循环,从1开始;
  2. 没找到任务,设置mayHaveLocalTasks = false
  3. 如果minDelayUntilStealableTaskNs不等于0,就是上面的间隔时间太短的条件触发,那么让线程释放锁(防止线程执行任务太过密集,等待下次循环再去调度任务),continue循环,从1开始;
  4. 上面条件不成立,调用tryPark(),这个是和unPark相反的操作,让线程闲置,放入到线程队列中:
    private fun tryPark() {
        if (!inStack()) {
            parkedWorkersStackPush(this)
            return
        }
        assert { localQueue.size == 0 }
        workerCtl.value = PARKED // Update value once
        while (inStack()) { // Prevent spurious wakeups
            if (isTerminated || state == WorkerState.TERMINATED) break
            tryReleaseCpu(WorkerState.PARKING)
            interrupted() // Cleanup interruptions
            park()
        }
    }

首先判断是否在队列中,不在的话,放入线程队列中;在队列中,将状态设置为PARKED,不断循环将释放线程的cpu占用锁,尝试放到队列中,park函数中有可能销毁工作线程,看线程是否到达死亡时间点。

worker工作流程如下图所示:
在这里插入图片描述

总结

1. Dispatchers的四种调度器是饿汉式单例对象,所以一个进程只存在一个实例对象。
2. Dispatchers的四种调度器中,IO和default是共用的一个线程池,它的实现是CoroutineScheduler。
3. CoroutineScheduler线程池,有一个保存线程的队列,有两种全局任务队列:一个是IO阻塞型队列,一个是cpu密集型任务队列;Worker线程拥有一个本地任务队列。
4. Worker线程会根据任务类型,去对应的全局队列或者从本地队列找任务,找不到会从其他worker队列中偷任务,然后执行;worker会根据自己的状态回到线程队列或者销毁自己。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机系统基础实验——数据的机器级表示(条件表达式 x?y:z)

题目描述&#xff1a; /* *conditional- 条件表达式 x?y:z *例子&#xff1a;conditional (2,4,5)4, *合法运算符号&#xff1a;&#xff01;~&^|<<>> */ int conditional (int x,int y, int z) { /**************/ return/******/; }首先来看什么是三目运算&…

Kubernetes云原生实战02 磁盘分区挂载实战

大家好&#xff0c;我是飘渺。 今天咱们继续更新Kubernetes云原生实战系列&#xff0c;如何基于上篇文章中提到的部署架构进行磁盘分区、格式化、挂载目录。 看到这里估计很多人要直接就关掉了&#xff1a;磁盘分区格式化不是运维的事吗&#xff0c;跟我开发有什么关系&#x…

图书管理系统(Java实现)[附完整代码]

作者&#xff1a;爱塔居的博客_CSDN博客-JavaSE领域博主 专栏&#xff1a;JavaSE 作者专栏&#xff1a;大三学生&#xff0c;希望跟大家一起进步&#xff01; 文章目录 目录 文章目录 一、图书管理系统菜单 二、实现基本框架 三、实现业务 3.1 打印所有图书 3.2 退出系统 3.3 查…

化合物纯度、溶剂溶解度检测

产品检测方法一般有核磁共振氢谱 (HNMR)&#xff0c;液质联用 (LCMS)&#xff0c;高效液相色谱 (HPLC)。我们一般通过核磁共振确定结构式 (产品是否正确) 和大概纯度 (是否含杂质及杂质大概比例)&#xff0c;通过 LCMS 或 HPLC 测定确定产品具体纯度 (产品需要有紫外吸收)。■ …

连锁超市如何部署远程监控系统

大型超市又称综合超市&#xff0c;一般是采取自选销售方式&#xff0c;以销售大众化实用品为主&#xff0c;并将超市和折扣店的经营优势结合为一体的&#xff0c;品种齐全&#xff0c;满足顾客一次性购齐的零售业态。根据商品结构&#xff0c;可以分为以经营食品为主的大型超市…

神了,用 Python 预测世界杯决赛,发现准确率还挺高

那么四年一度的世界杯即将要在卡塔尔开幕了&#xff0c;对于不少热爱足球运动的球迷来说&#xff0c;这可是十分难得的盛宴&#xff0c;而对于最后大力神杯的归属&#xff0c;相信很多人都满怀着期待&#xff0c;每个人心中都有不同的答案。 今天我就通过Python数据分析以及机…

低/无代码开发系统集成能力有多强?一文告诉你

Gartner预计&#xff0c;到2025年&#xff0c;公司将会有70&#xff05;的新应用软件使用到低/无代码技术。Statista的报告表明&#xff0c;在2027年的时候&#xff0c;在低/无代码技术上的花费将会达到650亿。 面对庞大的数字经济&#xff0c;许多公司都在加快数字化转型的步伐…

【Linux进程间通信】共享内存

共享内存API简单案例&#xff1a;一个进程往共享内存中写一次数据然后在另一块共享内存读一次数据&#xff0c;然后另一个进程在一个共享内存读一次数据在另一块共享内存写一次数据&#xff08;同时验证了它是半双工的&#xff09;使用信号量进行同步原理&#xff1a;多个进程映…

C++socket网络编程实战http服务器(支持php)(上)

TOC 第一章 Socket快速入门篇 1、TCP/IP模型 用Wireshark抓包工具来看一下上图TCP/IP模型这种4层协议里面究竟有什么内容。 在windows和Linux系统之间配置共享 首先保证我们的putty已经连接上了linux服务器&#xff0c;然后我们要安装samba这么一个目录共享工具&#xff1a…

Spark 离线开发框架设计与实现

一、背景 随着 Spark 以及其社区的不断发展&#xff0c;Spark 本身技术也在不断成熟&#xff0c;Spark 在技术架构和性能上的优势越来越明显&#xff0c;目前大多数公司在大数据处理中都倾向使用 Spark。Spark 支持多种语言的开发&#xff0c;如 Scala、Java、Sql、Python 等。…

亚马逊、OZON、速卖通等跨境电商平台卖家怎样快速提高产品权重?

亚马逊跨境电商是世界顶级的电子商务平台之一。基本上&#xff0c;当80%的客户购买产品时&#xff0c;亚马逊跨境电子商务将成为首选的在线购物平台。亚马逊是一个拥有自己独特优化算法的服务平台&#xff0c;对服务平台上数亿产品进行有序排序。当客户进行产品检索时&#xff…

【附源码】计算机毕业设计JAVA学生宿舍信息管理系统

【附源码】计算机毕业设计JAVA学生宿舍信息管理系统 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; JAVA…

经典排序算法JAVA实现

1、选择排序 首先在未排序数列中找到最小元素&#xff0c;然后将其与数列的首部元素进行交换&#xff0c;然后&#xff0c;在剩余未排序元素中继续找出最小元素&#xff0c;将其与已排序数列的末尾位置元素交换。以此类推&#xff0c;直至所有元素均排序完毕.复杂度为n2&#…

《Java并发编程之美》读书笔记——第一部分(并发编程基础知识)

文章目录第一章 并发编程线程基础1.什么是线程2.线程的创建与运行3.线程的通知与等待wait()wait(long timeout)wait(long timeout, int nanos)notify()与notifyAll()虚假唤醒4.等待线程执行终止的join方法5.让线程睡眠的sleep方法6.让CPU交出执行权的yield方法7.线程中断8.理解…

[附源码]java毕业设计物理中考复习在线考试系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

消息队列卡夫卡+EFLFK集群部署

pache公司的软件包官方下载地址&#xff1a;archive.apache.org/dist/ 注&#xff1a;kafka从3.0版本之后&#xff0c;不再依赖zookeeper。 一 Zookeeper概述 官方下载地址&#xff1a;archive.apache.org/dist/zookee… 1.Zookeeper定义 Zookeeper是一个开源的分布式的&a…

国内网络编译,Ambari 2.7.6 全部模块源码编译笔记

本次编译 ambari 2.7.6 没有使用科学上网的工具&#xff0c;使用的普通网络&#xff0c;可以编译成功&#xff0c;过程比 ambari 2.7.5 编译时要顺畅。 该版本相对 2.7.5 版本以来&#xff0c;共有 26 个 contributors 提交了 114 个 commits 以及修改了 557 个文件。详情见&a…

ovirt-engine通过UI Plugin自定义页面

官方API&#xff1a;点击打开 1 新增一个菜单项 1.1 创建引导html 首先你的这个页面是作为一个功能插件存在的&#xff0c;所以先给他起个名字&#xff0c;我这里的页面主要是用作用户创建&#xff0c;所以我的这个插件的名字就叫user。 接着就创建这个插件的 引导html &…

多级式多传感器信息融合中的状态估计(Matlab代码实现)

&#x1f352;&#x1f352;&#x1f352;欢迎关注&#x1f308;&#x1f308;&#x1f308; &#x1f4dd;个人主页&#xff1a;我爱Matlab &#x1f44d;点赞➕评论➕收藏 养成习惯&#xff08;一键三连&#xff09;&#x1f33b;&#x1f33b;&#x1f33b; &#x1f34c;希…

新能源车提车、上牌流程

漫长等待四个多月&#xff0c;终于2022年10月27日&#xff0c;笔者圆梦&#xff0c;喜提人生第一辆车。从选车、提车、上牌全程一人&#xff0c;用文记录下经历&#xff0c;以供参考。 一、提车流程 1.1 提车时间 若分到车&#xff0c;4S店销售会提前联系确定时间。 提示&…