Linux Workqueue

news2025/1/16 2:01:23

Linux Workqueue

1、前言

Workqueue 是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务 (work) 都不会自己起一个线程来处理,而是扔到 Workqueue 中处理。Workqueue 的主要工作就是用进程上下文来处理内核中大量的小任务。

所以 Workqueue 的主要设计思想:一个是并行,多个 work 不要相互阻塞;另外一个是节省资源,多个 work 尽量共享资源 ( 进程、调度、内存 ),不要造成系统过多的资源浪费。

为了实现的设计思想,workqueue 的设计实现也更新了很多版本。最新的 workqueue 实现叫做 CMWQ(Concurrency Managed Workqueue),也就是用更加智能的算法来实现“并行和节省”。

老版本workqueue:

1.支持单线程和多线程workqueue
2.多线程每个CPU绑定一个线程。每个CPU对应一个workqueue,且每个CPU只有一个worker线程处理这个workqueue,即workqueue和worker线程是一一对应的。因为仅有一个worker线程,假如worker线程在执行某work时休眠,其他work也无法执行。
3.多线程workqueue中,在某个CPU上进行queue_work的work,一定会在该CPU上执行
4.可能出现的死锁:已经加入到一个CPU上的work是不能转移到另一个CPU上的,那么假设有两个work 1和work 2,work1 依赖于work2,但是work1和work2被顺序加入到了同一个CPU worker线程上,那么work1将永远无法等到work2执行结束,这就形成了死锁问题。

CMWQ就是为了解决以上问题而引入内核的,它对workqueue的实现进行了优化,使得work的调度更加的灵活,为了兼容旧版本workqueue,接口基本保持了不变。

2、相关概念

假设一个集团有4个工厂(4个CPU),每个工厂都分了两条流水线,一条是由高级工构成的高效流水线,一条是由初级工构成的普通流水线。那么每个流水线就对应一个pool_workqueue,每条流水线中的一组工人就是worker_pool,每个工人就是worker,流水线中的每个产品就是work。这4个工厂的高效流水线为一组叫做workqueue_struct,普通流水线则同属于另一个workqueue_struct。

  • work:工作。
  • workqueue:工作的集合。workqueue 和 work 是一对多的关系。
  • worker:工人。在代码中每个 worker 都对应一个work_thread() 内核线程。
  • worker_pool:工人的集合。worker_pool 和 worker 是一对多的关系。
  • pwq(pool_workqueue):中间人 / 中介,负责建立起 workqueue 和 worker_pool 之间的关系。workqueue 和 pwq 是一对多的关系,pwq 和 worker_pool 是一对一的关系。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OHtQMjZU-1675062108557)(D:\01 学习资料\workqueue关系图.png)]

每个执行 work 的线程叫做 worker,一组 worker 的集合叫做 worker_pool。CMWQ 的精髓就在 worker_pool 里面 worker 的动态增减管理上 manage_workers()

3. worker & worker_pool

struct worker {
	/* on idle list while idle, on busy hash table while busy */
	union {
		struct list_head	entry;	/* L: while idle */
		struct hlist_node	hentry;	/* L: while busy */
	};

	struct work_struct	*current_work;	/* L: work being processed */
	work_func_t		current_func;	/* L: current_work's fn */
	struct pool_workqueue	*current_pwq; /* L: current_work's pwq */
	struct list_head	scheduled;	/* L: scheduled works */

	/* 64 bytes boundary on 64bit, 32 on 32bit */

	struct task_struct	*task;		/* I: worker task */
	struct worker_pool	*pool;		/* A: the associated pool */
						/* L: for rescuers */
	struct list_head	node;		/* A: anchored at pool->workers */
						/* A: runs through worker->node */

	unsigned long		last_active;	/* L: last active timestamp */
	unsigned int		flags;		/* X: flags */
	int			id;		/* I: worker id */
	int			sleeping;	/* None */

	/*
	 * Opaque string set with work_set_desc().  Printed out with task
	 * dump for debugging - WARN, BUG, panic or sysrq.
	 */
	char			desc[WORKER_DESC_LEN];

	/* used only by rescuers to point to the target workqueue */
	struct workqueue_struct	*rescue_wq;	/* I: the workqueue to rescue */

	/* used by the scheduler to determine a worker's last known identity */
	work_func_t		last_func;
};

img

3.1 CMWQ 对 worker_pool 分成两类:

  • normal worker_pool,给通用的 workqueue 使用;
  • unbound worker_pool,给 WQ_UNBOUND 类型的的 workqueue 使用;
struct worker_pool {
	spinlock_t		lock;		/* the pool lock */
	int			cpu;		/* I: the associated cpu */
	int			node;		/* I: the associated node ID */
	int			id;		/* I: pool ID */
	unsigned int		flags;		/* X: flags */

	unsigned long		watchdog_ts;	/* L: watchdog timestamp */

	struct list_head	worklist;	/* L: list of pending works */

	int			nr_workers;	/* L: total number of workers */
	int			nr_idle;	/* L: currently idle workers */

	struct list_head	idle_list;	/* X: list of idle workers */
	struct timer_list	idle_timer;	/* L: worker idle timeout */
	struct timer_list	mayday_timer;	/* L: SOS timer for workers */

	/* a workers is either on busy_hash or idle_list, or the manager */
	DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
						/* L: hash of busy workers */

	struct worker		*manager;	/* L: purely informational */
	struct list_head	workers;	/* A: attached workers */
	struct completion	*detach_completion; /* all workers detached */

	struct ida		worker_ida;	/* worker IDs for task name */

	struct workqueue_attrs	*attrs;		/* I: worker attributes */
	struct hlist_node	hash_node;	/* PL: unbound_pool_hash node */
	int			refcnt;		/* PL: refcnt for unbound pools */

	/*
	 * The current concurrency level.  As it's likely to be accessed
	 * from other CPUs during try_to_wake_up(), put it in a separate
	 * cacheline.
	 */
	atomic_t		nr_running ____cacheline_aligned_in_smp;

	/*
	 * Destruction of pool is RCU protected to allow dereferences
	 * from get_work_pool().
	 */
	struct rcu_head		rcu;
} 
  • 若一个worker正在处理work将挂载在busy workers对应的hash表上,key就是正在被处理的work的内存地址。否则就挂在idle链表上。
  • 有未处理的work则内核会启动新的worker线程,当创建的worker线程过多就会销毁一部分worker线程。

用"ps"命令来看下系统中都有哪些worker线程,worker线程被命名成了"kworker/n:x"的格式,其中n是worker线程所在的CPU的编号,x是其在worker pool中的编号,如果带了"H"后缀,说明这是高优先级的worker pool。

root@(mt7981):~# ps | grep kworker
    5 root         0 IW   [kworker/0:0-eve]
    6 root         0 IW<  [kworker/0:0H-kb]
    7 root         0 IW   [kworker/u4:0-ev]
   12 root         0 IW   [kworker/0:1-eve]
   17 root         0 IW   [kworker/1:0-rcu]
   18 root         0 IW<  [kworker/1:0H-kb]
   20 root         0 IW   [kworker/u4:1-ev]
  217 root         0 IW   [kworker/1:1-mm_]
  635 root         0 IW<  [kworker/1:1H]
  636 root         0 IW<  [kworker/0:1H]
  838 root         0 IW   [kworker/1:2-rcu]
  892 root         0 IW   [kworker/0:2-rcu]
 1370 root         0 IW   [kworker/u4:2-ev]
 1373 root      1072 S    grep kworker
root@(mt7981):~#

3.1 normal worker_pool

默认 work 是在 normal worker_pool 中处理的。系统的规划是每个 CPU 创建两个 normal worker_pool:一个 normal 优先级 (nice=0)、一个高优先级 (nice=HIGHPRI_NICE_LEVEL),对应创建出来的 worker 的进程 nice 不一样。

每个 worker 对应一个 worker_thread() 内核线程,一个 worker_pool 包含一个或者多个 worker,worker_pool 中 worker 的数量是根据 worker_pool 中 work 的负载来动态增减的。

3.2 workqueue机制创建过程

workqueue_init_early
    alloc_workqueue
    	alloc_and_link_pwqs
    		init_pwq
    		link_pwq
/* workqueue子系统初始化的上半部分:一旦内存分配完成、cpumasks和idr up之后就会调用 
* 主要是为了允许早期的启动代码创建workqueue并且queue/cancle work。
*/
int __init workqueue_init_early(void)
{
	int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
	int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
	int i, cpu;

	WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

	BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
	cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));

	pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);

	/* 创建worker_pool */
	for_each_possible_cpu(cpu) {
		struct worker_pool *pool;
         ......
	}

	/* create default unbound and ordered wq attrs */
	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
		unbound_std_wq_attrs[i] = attrs;
		ordered_wq_attrs[i] = attrs;
	}

	system_wq = alloc_workqueue("events", 0, 0);
	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
					    WQ_UNBOUND_MAX_ACTIVE);
    ......
	return 0;
}

/* 初始化wq,为每个cpu创建对应级别的pwq并关联pwq。
*关联已经创建好的worker_pool
*max_active传值为0,则为WQ_DFL_ACTIVE即256 */
__printf(1, 4)
struct workqueue_struct *alloc_workqueue(const char *fmt,
					 unsigned int flags,
					 int max_active, ...)
{
	size_t tbl_size = 0;
	va_list args;
	struct workqueue_struct *wq;
	struct pool_workqueue *pwq;

	/*
	 * Unbound && max_active == 1 used to imply ordered, which is no
	 * longer the case on NUMA machines due to per-node pools.  While
	 * alloc_ordered_workqueue() is the right way to create an ordered
	 * workqueue, keep the previous behavior to avoid subtle breakages
	 * on NUMA.
	 */
	if ((flags & WQ_UNBOUND) && max_active == 1)
		flags |= __WQ_ORDERED;

	/* see the comment above the definition of WQ_POWER_EFFICIENT */
	if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
		flags |= WQ_UNBOUND;

	/* allocate wq and format name */
	if (flags & WQ_UNBOUND)
		tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);

	wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
	if (!wq)
		return NULL;

	if (flags & WQ_UNBOUND) {
		wq->unbound_attrs = alloc_workqueue_attrs();
		if (!wq->unbound_attrs)
			goto err_free_wq;
	}

	va_start(args, max_active);
	vsnprintf(wq->name, sizeof(wq->name), fmt, args);
	va_end(args);

	max_active = max_active ?: WQ_DFL_ACTIVE;
	max_active = wq_clamp_max_active(max_active, flags, wq->name);

	/* init wq */
	wq->flags = flags;
	wq->saved_max_active = max_active;
	mutex_init(&wq->mutex);
	atomic_set(&wq->nr_pwqs_to_flush, 0);
	INIT_LIST_HEAD(&wq->pwqs);
	INIT_LIST_HEAD(&wq->flusher_queue);
	INIT_LIST_HEAD(&wq->flusher_overflow);
	INIT_LIST_HEAD(&wq->maydays);

	wq_init_lockdep(wq);
	INIT_LIST_HEAD(&wq->list);

    /* 创建并初始化pwq */
	if (alloc_and_link_pwqs(wq) < 0)
		goto err_unreg_lockdep;

	if (wq_online && init_rescuer(wq) < 0)
		goto err_destroy;

	if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
		goto err_destroy;

	/*
	 * wq_pool_mutex protects global freeze state and workqueues list.
	 * Grab it, adjust max_active and add the new @wq to workqueues
	 * list.
	 */
	mutex_lock(&wq_pool_mutex);

	mutex_lock(&wq->mutex);
	for_each_pwq(pwq, wq)
		pwq_adjust_max_active(pwq);
	mutex_unlock(&wq->mutex);

    /* 新创建的wq加入到workqueues链表 */ 
	list_add_tail_rcu(&wq->list, &workqueues);

	mutex_unlock(&wq_pool_mutex);

	return wq;
...
}

static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
	bool highpri = wq->flags & WQ_HIGHPRI;
	int cpu, ret;

	if (!(wq->flags & WQ_UNBOUND)) {
        /* 为每个cpu申请pwq */
		wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
		if (!wq->cpu_pwqs)
			return -ENOMEM;
		for_each_possible_cpu(cpu) {
			struct pool_workqueue *pwq =
				per_cpu_ptr(wq->cpu_pwqs, cpu);
			struct worker_pool *cpu_pools =
				per_cpu(cpu_worker_pools, cpu);
			/* 取出每个cpu中wq->flags类型的pwq和worker_pool,初始化pwq的wq和pool成员。
			这一步建立了pwq到worker_pool、pwq到wq的关联 */
			init_pwq(pwq, wq, &cpu_pools[highpri]);

			mutex_lock(&wq->mutex);
            /* 将pwq加入到wq的pwqs链表中(wq->pwqs保存该wq的所有pwq)。
            这一步建立了wq到pwq的关联 */
			link_pwq(pwq);
			mutex_unlock(&wq->mutex);
		}
		return 0;
	}
    ...
}

/* workqueue子系统初始化的下半部分:
start_kernel
	arch_call_rest_init
		rest_init
			kernel_init
				kernel_init_freeable
					workqueue_init
*/
int __init workqueue_init(void)
{
	struct workqueue_struct *wq;
	struct worker_pool *pool;
	int cpu, bkt;

	/*
	 * It'd be simpler to initialize NUMA in workqueue_init_early() but
	 * CPU to node mapping may not be available that early on some
	 * archs such as power and arm64.  As per-cpu pools created
	 * previously could be missing node hint and unbound pools NUMA
	 * affinity, fix them up.
	 *非一致性内存访问
	 * Also, while iterating workqueues, create rescuers if requested.
	 */
	wq_numa_init();

	mutex_lock(&wq_pool_mutex);

	for_each_possible_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->node = cpu_to_node(cpu);
		}
	}

	list_for_each_entry(wq, &workqueues, list) {
		wq_update_unbound_numa(wq, smp_processor_id(), true);
		WARN(init_rescuer(wq),
		     "workqueue: failed to create early rescuer for %s",
		     wq->name);
	}

	mutex_unlock(&wq_pool_mutex);

	/* create the initial workers */
	for_each_online_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->flags &= ~POOL_DISASSOCIATED;
			BUG_ON(!create_worker(pool));
		}
	}

	hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
		BUG_ON(!create_worker(pool));

	wq_online = true;
	wq_watchdog_init();

	return 0;
}

static struct worker *create_worker(struct worker_pool *pool)
{
    struct worker *worker = NULL;
    int id = -1;
    char id_buf[16];

    /* ID is needed to determine kthread name */
    id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
    if (id < 0)
        goto fail;
	// 为worker申请内存,初始化worker成员
    worker = alloc_worker(pool->node);
    if (!worker)
        goto fail;

    worker->pool = pool;
    worker->id = id;

    if (pool->cpu >= 0)
        /* (2.1) 给 normal worker_pool 的 worker 构造进程名:
        6 root         0 IW<  [kworker/0:0H-kb]
       12 root         0 IW   [kworker/0:1-rcu]
       18 root         0 IW<  [kworker/1:0H-kb]
      636 root         0 IW<  [kworker/0:1H]
      637 root         0 IW<  [kworker/1:1H]
      887 root         0 IW   [kworker/1:2-pm]
     1413 root         0 IW   [kworker/1:0-mm_]
     1482 root         0 IW   [kworker/0:2-eve]
        */
        snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
             pool->attrs->nice < 0  ? "H" : "");
    else
        /* (2.2) 给 unbound worker_pool 的 worker 构造进程名
        1487 root         0 IW   [kworker/u4:0-ev]
         1488 root         0 IW   [kworker/u4:2-ev]
         1489 root         0 IW   [kworker/u4:1-ev]
        */
        snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

    // (2.3) 创建 worker 对应的内核进程worker_thread
    worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
                          "kworker/%s", id_buf);
    if (IS_ERR(worker->task))
        goto fail;

    // (2.4) 设置内核进程对应的优先级 nice
    set_user_nice(worker->task, pool->attrs->nice);

    /* prevent userland from meddling with cpumask of workqueue workers */
    worker->task->flags |= PF_NO_SETAFFINITY;

    // (2.5) 将 worker 和 worker_pool 绑定
    worker_attach_to_pool(worker, pool);

    // (2.6) 将 worker 初始状态设置成 idle,
    // wake_up_process 以后,worker 自动 leave idle 状态
    /* start the newly created worker */
    spin_lock_irq(&pool->lock);
    worker->pool->nr_workers++;
    worker_enter_idle(worker);
    wake_up_process(worker->task);
    spin_unlock_irq(&pool->lock);

    return worker;

fail:
    if (id >= 0)
        ida_simple_remove(&pool->worker_ida, id);
    kfree(worker);
    return NULL;
}

static void worker_attach_to_pool(struct worker *worker,
                   struct worker_pool *pool)
{
    mutex_lock(&pool->attach_mutex);

    // (2.5.1) 将 worker 线程和 cpu 绑定
    set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
    if (pool->flags & POOL_DISASSOCIATED)
        worker->flags |= WORKER_UNBOUND;
	if (worker->rescue_wq)
		set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

    // (2.5.2) 将 worker 加入 worker_pool 链表
 	list_add_tail(&worker->node, &pool->workers);
	worker->pool = pool;

	mutex_unlock(&wq_pool_attach_mutex);
}

1.1.2 unbound worker_pool

大部分的 work 都是通过 normal worker_pool 来执行的 ( 例如通过 schedule_work()schedule_work_on() 压入到系统 workqueue(system_wq) 中的 work),最后都是通过 normal worker_pool 中的 worker 来执行的。这些 worker 是和某个 CPU 绑定的,work 一旦被 worker 开始执行,都是一直运行到某个 CPU 上的不会切换 CPU。

unbound worker_pool 相对应的意思,就是 worker 可以在多个 CPU 上调度的。但是他其实也是绑定的,只不过它绑定的单位不是 CPU 而是 node。所谓的 node 是对 NUMA(Non Uniform Memory Access Architecture) 系统来说的,NUMA 可能存在多个 node,每个 node 可能包含一个或者多个 CPU。

unbound worker_pool 对应内核线程 (worker_thread()) 类似名字是 unbound worker_pool:

shell@PRO5:/ $ ps | grep "kworker"
 1487 root         0 IW   [kworker/u4:0-ev] #unbound pool 4 的第0个worker 进程
 1488 root         0 IW   [kworker/u4:2-ev] #unbound pool 4 的第2个worker 进程
 1489 root         0 IW   [kworker/u4:1-ev] #unbound pool 4 的第1个worker 进程

unbound worker_pool 也分成两类:

  • unbound_std_wq。每个 node 对应一个 worker_pool,多个 node 就对应多个 worker_pool;

Linux Workqueue_内核线程_04unbound_std_wq

  • ordered_wq。所有 node 对应一个 default worker_pool;

Linux Workqueue_sed_06unbound worker_pool: ordered_wq

以下是 unbound worker_pool 详细的创建过程代码分析:

  • kernel/workqueue.c:
  • workqueue_init_early() -> unbound_std_wq_attrs/ordered_wq_attrs
int __init workqueue_init_early(void)
{
	/* create default unbound and ordered wq attrs */
	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
		struct workqueue_attrs *attrs;

		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		unbound_std_wq_attrs[i] = attrs;

		/*
		 * An ordered wq should have only one pwq as ordering is
		 * guaranteed by max_active which is enforced by pwqs.
		 * Turn off NUMA so that dfl_pwq is used for all nodes.
		 */
		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		attrs->no_numa = true;
		ordered_wq_attrs[i] = attrs;
	}
}
  • kernel/workqueue.c:
  • alloc_workqueue() ->alloc_and_link_pwqs() ->apply_workqueue_attrs() ->alloc_unbound_pwq()/numa_pwq_tbl_install()`
struct workqueue_struct *alloc_workqueue(const char *fmt,
					 unsigned int flags,
					 int max_active, ...)
{
 // (3) 给 workqueue 分配对应的 pool_workqueue
    // pool_workqueue 将 workqueue 和 worker_pool 链接起来
	if (alloc_and_link_pwqs(wq) < 0)
		goto err_unreg_lockdep;

	/*
	 * wq_pool_mutex protects global freeze state and workqueues list.
	 * Grab it, adjust max_active and add the new @wq to workqueues
	 * list.
	 */
	mutex_lock(&wq_pool_mutex);

	mutex_lock(&wq->mutex);
	for_each_pwq(pwq, wq)
		pwq_adjust_max_active(pwq);
	mutex_unlock(&wq->mutex);
// (6) 将新的 workqueue 加入到全局链表 workqueues 中
	list_add_tail_rcu(&wq->list, &workqueues);

	mutex_unlock(&wq_pool_mutex);
...
}

static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
    bool highpri = wq->flags & WQ_HIGHPRI;
    int cpu, ret;
...
     // (3.2) unbound ordered_wq workqueue
    // pool_workqueue 链接 workqueue 和 worker_pool 的过程
	if (wq->flags & __WQ_ORDERED) {
		ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
		/* there should only be single pwq for ordering guarantee */
		WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
			      wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
		     "ordering guarantee broken for workqueue %s\n", wq->name);
	} else {
        // (3.3) unbound unbound_std_wq workqueue
    // pool_workqueue 链接 workqueue 和 worker_pool 的过程
		ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
	}
	put_online_cpus();

	return ret;
}
apply_workqueue_attrs
	apply_workqueue_attrs_locked
    	apply_wqattrs_prepare
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
		      const struct workqueue_attrs *attrs)
{
	struct apply_wqattrs_ctx *ctx;
	struct workqueue_attrs *new_attrs, *tmp_attrs;
	int node;

	lockdep_assert_held(&wq_pool_mutex);

	ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL);

	new_attrs = alloc_workqueue_attrs();
	tmp_attrs = alloc_workqueue_attrs();
	if (!ctx || !new_attrs || !tmp_attrs)
		goto out_free;

	/*
	 * Calculate the attrs of the default pwq.
	 * If the user configured cpumask doesn't overlap with the
	 * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
	 */
	copy_workqueue_attrs(new_attrs, attrs);
	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
	if (unlikely(cpumask_empty(new_attrs->cpumask)))
		cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);

	/*
	 * We may create multiple pwqs with differing cpumasks.  Make a
	 * copy of @new_attrs which will be modified and used to obtain
	 * pools.
	 */
	copy_workqueue_attrs(tmp_attrs, new_attrs);

	/*
	 * If something goes wrong during CPU up/down, we'll fall back to
	 * the default pwq covering whole @attrs->cpumask.  Always create
	 * it even if we don't use it immediately.
	 */
	ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
	if (!ctx->dfl_pwq)
		goto out_free;

	for_each_node(node) {
		if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
			ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
			if (!ctx->pwq_tbl[node])
				goto out_free;
		} else {
			ctx->dfl_pwq->refcnt++;
			ctx->pwq_tbl[node] = ctx->dfl_pwq;
		}
	}

	/* save the user configured attrs and sanitize it. */
	copy_workqueue_attrs(new_attrs, attrs);
	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
	ctx->attrs = new_attrs;

	ctx->wq = wq;
	free_workqueue_attrs(tmp_attrs);
	return ctx;

out_free:
	free_workqueue_attrs(tmp_attrs);
	free_workqueue_attrs(new_attrs);
	apply_wqattrs_cleanup(ctx);
	return NULL;
}
int apply_workqueue_attrs(struct workqueue_struct *wq,
              const struct workqueue_attrs *attrs)
{

    // (3.2.1) 根据的 ubound 的 ordered_wq_attrs/unbound_std_wq_attrs
    // 创建对应的 pool_workqueue 和 worker_pool
    // 其中 worker_pool 不是默认创建好的,是需要动态创建的,对应的 worker 内核进程也要重新创建
    // 创建好的 pool_workqueue 赋值给 pwq_tbl[node]
    /*
     * If something goes wrong during CPU up/down, we'll fall back to
     * the default pwq covering whole @attrs->cpumask.  Always create
     * it even if we don't use it immediately.
     */
    dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
    if (!dfl_pwq)
        goto enomem_pwq;

    for_each_node(node) {
        if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
            pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
            if (!pwq_tbl[node])
                goto enomem_pwq;
        } else {
            dfl_pwq->refcnt++;
            pwq_tbl[node] = dfl_pwq;
        }
    }

    /* save the previous pwq and install the new one */
    // (3.2.2) 将临时 pwq_tbl[node] 赋值给 wq->numa_pwq_tbl[node]
    for_each_node(node)
        pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);

}

static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
                    const struct workqueue_attrs *attrs)
{
    struct worker_pool *pool;
    struct pool_workqueue *pwq;

    lockdep_assert_held(&wq_pool_mutex);

    // (3.2.1.1) 如果对应 attrs 已经创建多对应的 unbound_pool,则使用已有的 unbound_pool
    // 否则根据 attrs 创建新的 unbound_pool
    pool = get_unbound_pool(attrs);
    if (!pool)
        return NULL;

    pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
    if (!pwq) {
        put_unbound_pool(pool);
        return NULL;
    }

    init_pwq(pwq, wq, pool);
    return pwq;
}

3.2 worker

每个 worker 对应一个 worker_thread() 内核线程,一个 worker_pool 对应一个或者多个 worker。多个 worker 从同一个链表中 worker_pool->worklist 获取 work 进行处理。

1.2.1 worker 处理 work

处理 work 的过程主要在 worker_thread() -> process_one_work() 中处理,我们具体看看代码的实现过程。

  • kernel/workqueue.c:
  • worker_thread() ->process_one_work()
static int worker_thread(void *__worker)
{
    struct worker *worker = __worker;
    struct worker_pool *pool = worker->pool;

    /* tell the scheduler that this is a workqueue worker */
    worker->task->flags |= PF_WQ_WORKER;
woke_up:
    spin_lock_irq(&pool->lock);

    // (1) 是否 die
    /* am I supposed to die? */
    if (unlikely(worker->flags & WORKER_DIE)) {
        spin_unlock_irq(&pool->lock);
        WARN_ON_ONCE(!list_empty(&worker->entry));
        worker->task->flags &= ~PF_WQ_WORKER;

        set_task_comm(worker->task, "kworker/dying");
        ida_simple_remove(&pool->worker_ida, worker->id);
        worker_detach_from_pool(worker, pool);
        kfree(worker);
        return 0;
    }

    // (2) 脱离 idle 状态
    // 被唤醒之前 worker 都是 idle 状态
    worker_leave_idle(worker);
recheck:

    // (3) 如果需要本 worker 继续执行则继续,否则进入 idle 状态
    // need more worker 的条件: (pool->worklist != 0) && (pool->nr_running == 0)
    // worklist 上有 work 需要执行,并且现在没有处于 running 的 work
    /* no more worker necessary? */
    if (!need_more_worker(pool))
        goto sleep;

    // (4) 如果 (pool->nr_idle == 0),则启动创建更多的 worker
    // 说明 idle 队列中已经没有备用 worker 了,先创建 一些 worker 备用
    /* do we need to manage? */
    if (unlikely(!may_start_working(pool)) && manage_workers(worker))
        goto recheck;

    /*
     * ->scheduled list can only be filled while a worker is
     * preparing to process a work or actually processing it.
     * Make sure nobody diddled with it while I was sleeping.
     */
    WARN_ON_ONCE(!list_empty(&worker->scheduled));

    /*
     * Finish PREP stage.  We're guaranteed to have at least one idle
     * worker or that someone else has already assumed the manager
     * role.  This is where @worker starts participating in concurrency
     * management if applicable and concurrency management is restored
     * after being rebound.  See rebind_workers() for details.
     */
    worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

    do {
        // (5) 如果 pool->worklist 不为空,从其中取出一个 work 进行处理
        struct work_struct *work =
            list_first_entry(&pool->worklist,
                     struct work_struct, entry);

        if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
            /* optimization path, not strictly necessary */
            // (6) 执行正常的 work
            process_one_work(worker, work);
            if (unlikely(!list_empty(&worker->scheduled)))
                process_scheduled_works(worker);
        } else {
            // (7) 执行系统特意 scheduled 给某个 worker 的 work
            // 普通的 work 是放在池子的公共 list 中的 pool->worklist
            // 只有一些特殊的 work 被特意派送给某个 worker 的 worker->scheduled
            // 包括:1、执行 flush_work 时插入的 barrier work;
            // 2、collision 时从其他 worker 推送到本 worker 的 work
            move_linked_works(work, &worker->scheduled, NULL);
            process_scheduled_works(worker);
        }
    // (8) worker keep_working 的条件:
    // pool->worklist 不为空 && (pool->nr_running <= 1)
    } while (keep_working(pool));

    worker_set_flags(worker, WORKER_PREP);supposed
sleep:
    // (9) worker 进入 idle 状态
    /*
     * pool->lock is held and there's no work to process and no need to
     * manage, sleep.  Workers are woken up only while holding
     * pool->lock or from local cpu, so setting the current state
     * before releasing pool->lock is enough to prevent losing any
     * event.
     */
    worker_enter_idle(worker);
    __set_current_state(TASK_INTERRUPTIBLE);
    spin_unlock_irq(&pool->lock);
    schedule();
    goto woke_up;
}

static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
    struct pool_workqueue *pwq = get_work_pwq(work);
    struct worker_pool *pool = worker->pool;
    bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
    int work_color;
    struct worker *collision;
#ifdef CONFIG_LOCKDEP
    /*
     * It is permissible to free the struct work_struct from
     * inside the function that is called from it, this we need to
     * take into account for lockdep too.  To avoid bogus "held
     * lock freed" warnings as well as problems when looking into
     * work->lockdep_map, make a copy and use that here.
     */
    struct lockdep_map lockdep_map;

    lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
    /* ensure we're on the correct CPU */
    WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
             raw_smp_processor_id() != pool->cpu);

    // (8.1) 如果 work 已经在 worker_pool 的其他 worker 上执行,
    // 将 work 放入对应 worker 的 scheduled 队列中延后执行
    /*
     * A single work shouldn't be executed concurrently by
     * multiple workers on a single cpu.  Check whether anyone is
     * already processing the work.  If so, defer the work to the
     * currently executing one.
     */
    collision = find_worker_executing_work(pool, work);
    if (unlikely(collision)) {
        move_linked_works(work, &collision->scheduled, NULL);
        return;
    }

    // (8.2) 将 worker 加入 busy 队列 pool->busy_hash
    /* claim and dequeue */
    debug_work_deactivate(work);
    hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
    worker->current_work = work;
    worker->current_func = work->func;
    worker->current_pwq = pwq;
    work_color = get_work_color(work);

    list_del_init(&work->entry);

    // (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE
    // 则当前 work 的执行脱离 worker_pool 的动态调度,成为一个独立的线程
    /*
     * CPU intensive works don't participate in concurrency management.
     * They're the scheduler's responsibility.  This takes @worker out
     * of concurrency management and the next code block will chain
     * execution of the pending work items.
     */
    if (unlikely(cpu_intensive))
        worker_set_flags(worker, WORKER_CPU_INTENSIVE);

    // (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判断是否需要唤醒 idle worker
    // 普通 work 不会执行这个操作
    /*
     * Wake up another worker if necessary.  The condition is always
     * false for normal per-cpu workers since nr_running would always
     * be >= 1 at this point.  This is used to chain execution of the
     * pending work items for WORKER_NOT_RUNNING workers such as the
     * UNBOUND and CPU_INTENSIVE ones.
     */
    if (need_more_worker(pool))
        wake_up_worker(pool);

    /*
     * Record the last pool and clear PENDING which should be the last
     * update to @work.  Also, do this inside @pool->lock so that
     * PENDING and queued state changes happen together while IRQ is
     * disabled.
     */
    set_work_pool_and_clear_pending(work, pool->id);

    spin_unlock_irq(&pool->lock);

    lock_map_acquire_read(&pwq->wq->lockdep_map);
    lock_map_acquire(&lockdep_map);
    trace_workqueue_execute_start(work);
    // (8.5) 执行 work 函数
    worker->current_func(work);
    /*
     * While we must be careful to not use "work" after this, the trace
     * point will only record its address.
     */
    trace_workqueue_execute_end(work);
    lock_map_release(&lockdep_map);
    lock_map_release(&pwq->wq->lockdep_map);

    if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
        pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
               "     last function: %pf\n",
               current->comm, preempt_count(), task_pid_nr(current),
               worker->current_func);
        debug_show_held_locks(current);
        dump_stack();
    }

    /*
     * The following prevents a kworker from hogging CPU on !PREEMPT
     * kernels, where a requeueing work item waiting for something to
     * happen could deadlock with stop_machine as such work item could
     * indefinitely requeue itself while all other CPUs are trapped in
     * stop_machine. At the same time, report a quiescent RCU state so
     * the same condition doesn't freeze RCU.
     */
    cond_resched_rcu_qs();

    spin_lock_irq(&pool->lock);

    /* clear cpu intensive status */
    if (unlikely(cpu_intensive))
        worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

    /* we're done with it, release */
    hash_del(&worker->hentry);
    worker->current_work = NULL;
    worker->current_func = NULL;
    worker->current_pwq = NULL;
    worker->desc_valid = false;
    pwq_dec_nr_in_flight(pwq, work_color);
}

1.2.2 worker_pool 动态管理 worker

worker_pool 怎么来动态增减 worker,这部分的算法是 CMWQ 的核心。其思想如下:

  • worker_pool 中的 worker 有 3 种状态:idle、running、suspend;
  • 如果 worker_pool 中有 work 需要处理,保持至少一个 running worker 来处理;
  • running worker 在处理 work 的过程中进入了阻塞 suspend 状态,为了保持其他 work 的执行,需要唤醒新的 idle worker 来处理 work;
  • 如果有 work 需要执行且 running worker 大于 1 个,会让多余的 running worker 进入 idle 状态;
  • 如果没有 work 需要执行,会让所有 worker 进入 idle 状态;
  • 如果创建的 worker 过多,destroy_worker 在 300s(IDLE_WORKER_TIMEOUT) 时间内没有再次运行的 idle worker。

Linux Workqueue_代码分析_08

详细代码可以参考上节 worker_thread() -> process_one_work() 的分析。

为了追踪 worker 的 running 和 suspend 状态,用来动态调整 worker 的数量。wq 使用在进程调度中加钩子函数的技巧:

  • 追踪 worker 从 suspend 进入 running 状态:ttwu_activate() ->wq_worker_waking_up()
void wq_worker_waking_up(struct task_struct *task, int cpu)
{
    struct worker *worker = kthread_data(task);

    if (!(worker->flags & WORKER_NOT_RUNNING)) {
        WARN_ON_ONCE(worker->pool->cpu != cpu);
        // 增加 worker_pool 中 running 的 worker 数量
        atomic_inc(&worker->pool->nr_running);
    }
}
  • 追踪 worker 从 running 进入 suspend 状态:__schedule() ->wq_worker_sleeping()
struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
{
    struct worker *worker = kthread_data(task), *to_wakeup = NULL;
    struct worker_pool *pool;

    /*
     * Rescuers, which may not have all the fields set up like normal
     * workers, also reach here, let's not access anything before
     * checking NOT_RUNNING.
     */
    if (worker->flags & WORKER_NOT_RUNNING)
        return NULL;

    pool = worker->pool;

    /* this can only happen on the local cpu */
    if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))
        return NULL;

    /*
     * The counterpart of the following dec_and_test, implied mb,
     * worklist not empty test sequence is in insert_work().
     * Please read comment there.
     *
     * NOT_RUNNING is clear.  This means that we're bound to and
     * running on the local cpu w/ rq lock held and preemption
     * disabled, which in turn means that none else could be
     * manipulating idle_list, so dereferencing idle_list without pool
     * lock is safe.
     */
    // 减少 worker_pool 中 running 的 worker 数量
    // 如果 worklist 还有 work 需要处理,唤醒第一个 idle worker 进行处理
    if (atomic_dec_and_test(&pool->nr_running) &&
        !list_empty(&pool->worklist))
        to_wakeup = first_idle_worker(pool);
    return to_wakeup ? to_wakeup->task : NULL;
}

这里 worker_pool 的调度思想是:如果有 work 需要处理,保持一个 running 状态的 worker 处理,不多也不少。

但是这里有一个问题如果 work 是 CPU 密集型的,它虽然也没有进入 suspend 状态,但是会长时间的占用 CPU,让后续的 work 阻塞太长时间。

为了解决这个问题,CMWQ 设计了 WQ_CPU_INTENSIVE,如果一个 wq 声明自己是 CPU_INTENSIVE,则让当前 worker 脱离动态调度,像是进入了 suspend 状态,那么 CMWQ 会创建新的 worker,后续的 work 会得到执行。

  • kernel/workqueue.c:
  • worker_thread() ->process_one_work()
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{

    bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;

    // (1) 设置当前 worker 的 WORKER_CPU_INTENSIVE 标志
    // nr_running 会被减 1
    // 对 worker_pool 来说,当前 worker 相当于进入了 suspend 状态
    /*
     * CPU intensive works don't participate in concurrency management.
     * They're the scheduler's responsibility.  This takes @worker out
     * of concurrency management and the next code block will chain
     * execution of the pending work items.
     */
    if (unlikely(cpu_intensive))
        worker_set_flags(worker, WORKER_CPU_INTENSIVE);

    // (2) 接上一步,判断是否需要唤醒新的 worker 来处理 work
    /*
     * Wake up another worker if necessary.  The condition is always
     * false for normal per-cpu workers since nr_running would always
     * be >= 1 at this point.  This is used to chain execution of the
     * pending work items for WORKER_NOT_RUNNING workers such as the
     * UNBOUND and CPU_INTENSIVE ones.
     */
    if (need_more_worker(pool))
        wake_up_worker(pool);

    // (3) 执行 work
    worker->current_func(work);

    // (4) 执行完,清理当前 worker 的 WORKER_CPU_INTENSIVE 标志
    // 当前 worker 重新进入 running 状态
    /* clear cpu intensive status */
    if (unlikely(cpu_intensive))
        worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

}

    WORKER_NOT_RUNNING    = WORKER_PREP | WORKER_CPU_INTENSIVE |
                  WORKER_UNBOUND | WORKER_REBOUND,

static inline void worker_set_flags(struct worker *worker, unsigned int flags)
{
    struct worker_pool *pool = worker->pool;

    WARN_ON_ONCE(worker->task != current);

    /* If transitioning into NOT_RUNNING, adjust nr_running. */
    if ((flags & WORKER_NOT_RUNNING) &&
        !(worker->flags & WORKER_NOT_RUNNING)) {
        atomic_dec(&pool->nr_running);
    }

    worker->flags |= flags;
}

static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
{
    struct worker_pool *pool = worker->pool;
    unsigned int oflags = worker->flags;

    WARN_ON_ONCE(worker->task != current);

    worker->flags &= ~flags;

    /*
     * If transitioning out of NOT_RUNNING, increment nr_running.  Note
     * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
     * of multiple flags, not a single flag.
     */
    if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
        if (!(worker->flags & WORKER_NOT_RUNNING))
            atomic_inc(&pool->nr_running);
}

3.3 CPU hotplug 处理

从上几节可以看到,系统会创建和 CPU 绑定的 normal worker_pool 和不绑定 CPU 的 unbound worker_pool,worker_pool 又会动态的创建 worker。

那么在 CPU hotplug 的时候,会怎么样动态的处理 worker_pool 和 worker 呢?来看具体的代码分析:

  • kernel/workqueue.c:
  • workqueue_cpu_up_callback()/workqueue_cpu_down_callback()
static int __init init_workqueues(void)
{

    cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
    hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);

}
| →
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
                         unsigned long action,
                         void *hcpu)
{
    int cpu = (unsigned long)hcpu;
    struct work_struct unbind_work;
    struct workqueue_struct *wq;

    switch (action & ~CPU_TASKS_FROZEN) {
    case CPU_DOWN_PREPARE:
        /* unbinding per-cpu workers should happen on the local CPU */
        INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
        // (1) cpu down_prepare
        // 把和当前 cpu 绑定的 normal worker_pool 上的 worker 停工
        // 随着当前 cpu 被 down 掉,这些 worker 会迁移到其他 cpu 上
        queue_work_on(cpu, system_highpri_wq, &unbind_work);

        // (2) unbound wq 对 cpu 变化的更新
        /* update NUMA affinity of unbound workqueues */
        mutex_lock(&wq_pool_mutex);
        list_for_each_entry(wq, &workqueues, list)
            wq_update_unbound_numa(wq, cpu, false);
        mutex_unlock(&wq_pool_mutex);

        /* wait for per-cpu unbinding to finish */
        flush_work(&unbind_work);
        destroy_work_on_stack(&unbind_work);
        break;
    }
    return NOTIFY_OK;
}

static int workqueue_cpu_up_callback(struct notifier_block *nfb,
        unsigned long action, void *hcpu)
{
    int CPU = (unsigned long)hcpu;
    struct worker_pool *pool;
    struct workqueue_struct *wq;
    int pi;
    switch (action & ~CPU_TASKS_FROZEN) {
    case CPU_UP_PREPARE:
        for_each_cpu_worker_pool(pool, CPU) {
            if (pool->nr_workers)
                continue;
            if (!create_worker(pool))
                return NOTIFY_BAD;
        }
        break;
    case CPU_DOWN_FAILED:
    case CPU_ONLINE:
        mutex_lock(&wq_pool_mutex);
        // (3) CPU up
        for_each_pool(pool, pi) {
            mutex_lock(&pool->attach_mutex);
            // 如果和当前 CPU 绑定的 normal worker_pool 上,有 WORKER_UNBOUND 停工的 worker
            // 重新绑定 worker 到 worker_pool
            // 让这些 worker 开工,并绑定到当前 CPU
            if (pool->CPU == CPU)
                rebind_workers(pool);
            else if (pool->CPU < 0)
                restore_unbound_workers_cpumask(pool, CPU);
            mutex_unlock(&pool->attach_mutex);
        }

        /* update NUMA affinity of unbound workqueues */
        list_for_each_entry(wq, &workqueues, list)
            wq_update_unbound_numa(wq, CPU, true);
        mutex_unlock(&wq_pool_mutex);
        break;
    }
    return NOTIFY_OK;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/187679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于php的旅游管理系统

摘要随着计算机技术&#xff0c;网络技术的迅猛发展&#xff0c;Internet 的不断普及&#xff0c;网络在各个领域里发挥了越来越重要的作用。特别是随着近年人民生活水平不断提高&#xff0c;在线旅游给人们的旅游业带来了更大的发展机遇。在经济快速发展的带动下&#xff0c;旅…

【Linux】tar命令打包 | 查看压缩文件 | 打包时忽略文件

tar命令打包 | 查看压缩文件 | 打包时忽略文件 等操作 1.起因 今天下午写阿狸bot的代码的时候&#xff0c;写错了aiofiles的保存操作 # 正确写法 async def write_file_aio(path:str, value):async with aiofiles.open(path, w, encodingutf-8) as f:await f.write(json.dump…

MyBatis持久层框架详细解读:核心配置文件

文章目录1. 前言2. 多环境配置3. 类型别名4. 对象工厂5. 总结1. 前言 前面我们在使用 MyBatis 开发时&#xff0c;编写核心配置文件替换 JDBC 中的连接信息&#xff0c;解决了 JDBC 硬编码的问题。其实&#xff0c;MyBatis 核心配置文件中还可以配置很多的内容。 MyBatis 的配…

mongodb分片

分片是MongoDB的扩展方式,通过分片能够增加更多的机器来用对不断增加的负载和数据,还不影响应用.1.分片简介分片是指将数据拆分,将其分散存在不同机器上的过程.有时也叫分区.将数据分散在不同的机器上,不需要功能强大的大型计算机就可以存储更多的数据,处理更大的负载.使用几乎…

屏幕录制下载推荐(可以无水印录制视频)

您有没有遇到过这种情况&#xff0c;在使用录屏工具录制电脑屏幕时&#xff0c;录制出来的视频是带有明显水印的。那有没有可以无水印录制的屏幕录制推荐呢&#xff1f;当然有。最近小编发现了一款可以无水印&#xff08;自定义图文水印&#xff09;录制的视频&#xff0c;快来…

Pycharm误触ignore的解决方法--有图

步骤1&#xff1a;进入pycharm编辑器之后&#xff0c;找到菜单栏中的file选项&#xff0c;点击之后会有一个下拉列表&#xff0c;直接选择settings&#xff0c;进入到设置的窗口。步骤2&#xff1a;在设置界面的左侧&#xff0c;找到Inspections选项&#xff0c;点击之后&#…

JavaScript 练手小技巧:拖拽事件、把图片拖拽入页面

HTML5 新增了拖拽事件 drag&#xff0c;利用它可以实现把外部文件拖拽入页面中&#xff0c;可以实现文件的读取&#xff0c;上传等等功能。 拖拽&#xff0c;又叫拖拉、拖动&#xff0c;英文为 drag。 拖拽事件是 HTML5 新增的事件操作。 拖拽指的是&#xff0c;用户在某个对…

【Rust】4. Rust 基础

4. Rust 基础 4.1 变量和可变性 4.1.1 常量 const xxx: type ...&#xff1a;常量使用 const 来定义&#xff0c;且必须注明值的类型常量在声明它的作用域之中&#xff0c;常量在整个程序生命周期中都有效 4.1.2 隐藏&#xff08;Shadowing&#xff09; 隐藏&#xff08;Sh…

基于卡尔曼滤波器的PID控制-1

采用M语言对算例进行仿真&#xff01;&#xff01;设置控制对象传递函数&#xff1a;取采样时间为1ms&#xff0c;采用Z变换将对象离散化&#xff0c;并描述为离散状态方程的形式&#xff1a;x(k 1) Ax(k) B(u(k)wk))y(k) Cx(k)带有测量噪声的被控对象输出为&#xff1a;yv(k)C…

Ubuntu18.04下安装OpenCV4.2.0与Opencv_contrib(图文详细报错总结)

Ubuntu18.04下安装OpenCV4.2.0与Opencv_contrib&#xff08;图文详细&#xff09;前期准备—环境依赖Cmake&#xff08;编译器&#xff09;依赖环境Python环境streamer环境图像处理依赖安装OpenCV编译OpenCV配置cmake编译参数make编译配置OpenCV动态库验证OpenCV环境# python环…

724. 寻找数组的中心下标——你行吗???

兄弟们&#xff0c;今早遇到了一个题&#xff0c;案例看起来很简单&#xff0c;于是就尝试起来&#xff0c;求知己&#x1f62d;题目描述724. 寻找数组的中心下标难度简单511收藏分享切换为英文接收动态反馈给你一个整数数组 nums &#xff0c;请计算数组的 中心下标 。数组 中…

一文了解 Java 中 so 文件的加载原理

前言 无论是 Android 开发者还是 Java 工程师应该都有使用过 JNI 开发&#xff0c;但对于 JVM 如何加载 so、Android 系统如何加载 so&#xff0c;可能鲜有时间了解。 本文通过代码、流程解释&#xff0c;带大家快速了解其加载原理&#xff0c;扫清困惑。 1. System#load() …

3.1.2 访问控制符及修饰符

文章目录1.访问控制符2.静态字段/方法/代码块2.1 静态字段2.2 静态常量2.3 静态方法2.4 特点2.5 static入门案例2.6 静态的调用关系2.7 静态代码块2.7.1 格式2.7.2 特性&#xff1a;2.7.3 执行顺序2.7.4 案例练习3.final的概念3.1 特点3.2 final入门案例1.访问控制符 在JAVA中…

【HBase——陌陌海量存储案例】8. 基于Phoenix消息数据查询(下)

索引示例二&#xff1a;创建本地索引 需求 在程序中&#xff0c;我们可能会根据订单ID、订单状态、支付金额、支付方式、用户ID来查询订单。所以&#xff0c;我们需要在这些列上来查询订单。 针对这种场景&#xff0c;我们可以使用本地索引来提高查询效率。 创建本地索引 cre…

超全小程序开发的学习 知识点

第一章&#xff1a;邂逅小程序开发 01_小程序开发和各个平台小程序的介绍 小程序加载的时候是双线程模型.wxml文件和wxss文件是一个线程&#xff0c;js和json文件是一个线程。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mEP3PUoo-1675132790458…

七步让您的MySQL服务器更安全

本文将以最常见的数据库管理系统——MySQL为例&#xff0c;向您介绍如何通过7步骤来安全加固数据库服务器。 不知您是否发现一种现象&#xff0c;那些初学渗透测试的人员往往过于关注应用的安全性&#xff0c;而对数据库的安全性不太重视。他们殊不知&#xff0c;没有数据库的…

上海亚商投顾:兔年首日开门红 北向资金净流入超186亿

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。市场情绪两市早盘受外围影响大幅高开&#xff0c;随后指数高开低走&#xff0c;板块及个股相对活跃&#xff0c;汽车产业链&a…

视频图像分析处理流程(完整版)

来源&#xff1a;投稿 作者&#xff1a;LSC 编辑&#xff1a;学姐 一、视频分析处理的完整流程 (1)视频编解码的入门知识 尽管压缩工具五花八门&#xff0c;但是他们的目的都只有一个&#xff1a;都是为了减小文件的占用空间。 除去我们常见的.zip&#xff0c;.7z&#xff0…

MyBatis框架如何实现数据查询?有几种方法?

在实际开发中&#xff0c;查询操作通常都会涉及到单条数据的精确查询&#xff0c;以及多条数据的模糊查询。那么使用MyBatis框架是如何进行这两种查询的呢&#xff1f;接下来&#xff0c;本小节将讲解下如何使用MyBatis框架根据客户编号查询客户信息&#xff0c;以及根据客户名…

【前沿技术】在安全且可靠的区块链基础设施中运行业务条线应用

发表时间&#xff1a;2022年4月27日 信息来源&#xff1a;coingeek.com 了解特定企业的业务需求将使你能够构建出一个可扩容的业务条线应用&#xff0c;它将按照你想要的方式进行运作&#xff0c;并在不可篡改的BSV区块链中保存相关记录。 大多数企业都有一个业务条线&#xf…