Linux Workqueue
1、前言
Workqueue 是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务 (work) 都不会自己起一个线程来处理,而是扔到 Workqueue 中处理。Workqueue 的主要工作就是用进程上下文来处理内核中大量的小任务。
所以 Workqueue 的主要设计思想:一个是并行,多个 work 不要相互阻塞;另外一个是节省资源,多个 work 尽量共享资源 ( 进程、调度、内存 ),不要造成系统过多的资源浪费。
为了实现的设计思想,workqueue 的设计实现也更新了很多版本。最新的 workqueue 实现叫做 CMWQ(Concurrency Managed Workqueue),也就是用更加智能的算法来实现“并行和节省”。
老版本workqueue:
1.支持单线程和多线程workqueue
2.多线程每个CPU绑定一个线程。每个CPU对应一个workqueue,且每个CPU只有一个worker线程处理这个workqueue,即workqueue和worker线程是一一对应的。因为仅有一个worker线程,假如worker线程在执行某work时休眠,其他work也无法执行。
3.多线程workqueue中,在某个CPU上进行queue_work的work,一定会在该CPU上执行
4.可能出现的死锁:已经加入到一个CPU上的work是不能转移到另一个CPU上的,那么假设有两个work 1和work 2,work1 依赖于work2,但是work1和work2被顺序加入到了同一个CPU worker线程上,那么work1将永远无法等到work2执行结束,这就形成了死锁问题。
CMWQ就是为了解决以上问题而引入内核的,它对workqueue的实现进行了优化,使得work的调度更加的灵活,为了兼容旧版本workqueue,接口基本保持了不变。
2、相关概念
假设一个集团有4个工厂(4个CPU),每个工厂都分了两条流水线,一条是由高级工构成的高效流水线,一条是由初级工构成的普通流水线。那么每个流水线就对应一个pool_workqueue,每条流水线中的一组工人就是worker_pool,每个工人就是worker,流水线中的每个产品就是work。这4个工厂的高效流水线为一组叫做workqueue_struct,普通流水线则同属于另一个workqueue_struct。
- work:工作。
- workqueue:工作的集合。workqueue 和 work 是一对多的关系。
- worker:工人。在代码中每个 worker 都对应一个
work_thread()
内核线程。 - worker_pool:工人的集合。worker_pool 和 worker 是一对多的关系。
- pwq(pool_workqueue):中间人 / 中介,负责建立起 workqueue 和 worker_pool 之间的关系。workqueue 和 pwq 是一对多的关系,pwq 和 worker_pool 是一对一的关系。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OHtQMjZU-1675062108557)(D:\01 学习资料\workqueue关系图.png)]
每个执行 work 的线程叫做 worker,一组 worker 的集合叫做 worker_pool。CMWQ 的精髓就在 worker_pool 里面 worker 的动态增减管理上 manage_workers()
。
3. worker & worker_pool
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
struct list_head entry; /* L: while idle */
struct hlist_node hentry; /* L: while busy */
};
struct work_struct *current_work; /* L: work being processed */
work_func_t current_func; /* L: current_work's fn */
struct pool_workqueue *current_pwq; /* L: current_work's pwq */
struct list_head scheduled; /* L: scheduled works */
/* 64 bytes boundary on 64bit, 32 on 32bit */
struct task_struct *task; /* I: worker task */
struct worker_pool *pool; /* A: the associated pool */
/* L: for rescuers */
struct list_head node; /* A: anchored at pool->workers */
/* A: runs through worker->node */
unsigned long last_active; /* L: last active timestamp */
unsigned int flags; /* X: flags */
int id; /* I: worker id */
int sleeping; /* None */
/*
* Opaque string set with work_set_desc(). Printed out with task
* dump for debugging - WARN, BUG, panic or sysrq.
*/
char desc[WORKER_DESC_LEN];
/* used only by rescuers to point to the target workqueue */
struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */
/* used by the scheduler to determine a worker's last known identity */
work_func_t last_func;
};
3.1 CMWQ 对 worker_pool 分成两类:
- normal worker_pool,给通用的 workqueue 使用;
- unbound worker_pool,给 WQ_UNBOUND 类型的的 workqueue 使用;
struct worker_pool {
spinlock_t lock; /* the pool lock */
int cpu; /* I: the associated cpu */
int node; /* I: the associated node ID */
int id; /* I: pool ID */
unsigned int flags; /* X: flags */
unsigned long watchdog_ts; /* L: watchdog timestamp */
struct list_head worklist; /* L: list of pending works */
int nr_workers; /* L: total number of workers */
int nr_idle; /* L: currently idle workers */
struct list_head idle_list; /* X: list of idle workers */
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */
/* a workers is either on busy_hash or idle_list, or the manager */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
struct worker *manager; /* L: purely informational */
struct list_head workers; /* A: attached workers */
struct completion *detach_completion; /* all workers detached */
struct ida worker_ida; /* worker IDs for task name */
struct workqueue_attrs *attrs; /* I: worker attributes */
struct hlist_node hash_node; /* PL: unbound_pool_hash node */
int refcnt; /* PL: refcnt for unbound pools */
/*
* The current concurrency level. As it's likely to be accessed
* from other CPUs during try_to_wake_up(), put it in a separate
* cacheline.
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
/*
* Destruction of pool is RCU protected to allow dereferences
* from get_work_pool().
*/
struct rcu_head rcu;
}
- 若一个worker正在处理work将挂载在busy workers对应的hash表上,key就是正在被处理的work的内存地址。否则就挂在idle链表上。
- 有未处理的work则内核会启动新的worker线程,当创建的worker线程过多就会销毁一部分worker线程。
用"ps"命令来看下系统中都有哪些worker线程,worker线程被命名成了"kworker/n:x"的格式,其中n是worker线程所在的CPU的编号,x是其在worker pool中的编号,如果带了"H"后缀,说明这是高优先级的worker pool。
root@(mt7981):~# ps | grep kworker
5 root 0 IW [kworker/0:0-eve]
6 root 0 IW< [kworker/0:0H-kb]
7 root 0 IW [kworker/u4:0-ev]
12 root 0 IW [kworker/0:1-eve]
17 root 0 IW [kworker/1:0-rcu]
18 root 0 IW< [kworker/1:0H-kb]
20 root 0 IW [kworker/u4:1-ev]
217 root 0 IW [kworker/1:1-mm_]
635 root 0 IW< [kworker/1:1H]
636 root 0 IW< [kworker/0:1H]
838 root 0 IW [kworker/1:2-rcu]
892 root 0 IW [kworker/0:2-rcu]
1370 root 0 IW [kworker/u4:2-ev]
1373 root 1072 S grep kworker
root@(mt7981):~#
3.1 normal worker_pool
默认 work 是在 normal worker_pool 中处理的。系统的规划是每个 CPU 创建两个 normal worker_pool:一个 normal 优先级 (nice=0)、一个高优先级 (nice=HIGHPRI_NICE_LEVEL),对应创建出来的 worker 的进程 nice 不一样。
每个 worker 对应一个 worker_thread()
内核线程,一个 worker_pool 包含一个或者多个 worker,worker_pool 中 worker 的数量是根据 worker_pool 中 work 的负载来动态增减的。
3.2 workqueue机制创建过程
workqueue_init_early
alloc_workqueue
alloc_and_link_pwqs
init_pwq
link_pwq
/* workqueue子系统初始化的上半部分:一旦内存分配完成、cpumasks和idr up之后就会调用
* 主要是为了允许早期的启动代码创建workqueue并且queue/cancle work。
*/
int __init workqueue_init_early(void)
{
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
int i, cpu;
WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
/* 创建worker_pool */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
......
}
/* create default unbound and ordered wq attrs */
for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
unbound_std_wq_attrs[i] = attrs;
ordered_wq_attrs[i] = attrs;
}
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
......
return 0;
}
/* 初始化wq,为每个cpu创建对应级别的pwq并关联pwq。
*关联已经创建好的worker_pool
*max_active传值为0,则为WQ_DFL_ACTIVE即256 */
__printf(1, 4)
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...)
{
size_t tbl_size = 0;
va_list args;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
/*
* Unbound && max_active == 1 used to imply ordered, which is no
* longer the case on NUMA machines due to per-node pools. While
* alloc_ordered_workqueue() is the right way to create an ordered
* workqueue, keep the previous behavior to avoid subtle breakages
* on NUMA.
*/
if ((flags & WQ_UNBOUND) && max_active == 1)
flags |= __WQ_ORDERED;
/* see the comment above the definition of WQ_POWER_EFFICIENT */
if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
flags |= WQ_UNBOUND;
/* allocate wq and format name */
if (flags & WQ_UNBOUND)
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq)
return NULL;
if (flags & WQ_UNBOUND) {
wq->unbound_attrs = alloc_workqueue_attrs();
if (!wq->unbound_attrs)
goto err_free_wq;
}
va_start(args, max_active);
vsnprintf(wq->name, sizeof(wq->name), fmt, args);
va_end(args);
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
/* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->mutex);
atomic_set(&wq->nr_pwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->pwqs);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
INIT_LIST_HEAD(&wq->maydays);
wq_init_lockdep(wq);
INIT_LIST_HEAD(&wq->list);
/* 创建并初始化pwq */
if (alloc_and_link_pwqs(wq) < 0)
goto err_unreg_lockdep;
if (wq_online && init_rescuer(wq) < 0)
goto err_destroy;
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
goto err_destroy;
/*
* wq_pool_mutex protects global freeze state and workqueues list.
* Grab it, adjust max_active and add the new @wq to workqueues
* list.
*/
mutex_lock(&wq_pool_mutex);
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
/* 新创建的wq加入到workqueues链表 */
list_add_tail_rcu(&wq->list, &workqueues);
mutex_unlock(&wq_pool_mutex);
return wq;
...
}
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret;
if (!(wq->flags & WQ_UNBOUND)) {
/* 为每个cpu申请pwq */
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
if (!wq->cpu_pwqs)
return -ENOMEM;
for_each_possible_cpu(cpu) {
struct pool_workqueue *pwq =
per_cpu_ptr(wq->cpu_pwqs, cpu);
struct worker_pool *cpu_pools =
per_cpu(cpu_worker_pools, cpu);
/* 取出每个cpu中wq->flags类型的pwq和worker_pool,初始化pwq的wq和pool成员。
这一步建立了pwq到worker_pool、pwq到wq的关联 */
init_pwq(pwq, wq, &cpu_pools[highpri]);
mutex_lock(&wq->mutex);
/* 将pwq加入到wq的pwqs链表中(wq->pwqs保存该wq的所有pwq)。
这一步建立了wq到pwq的关联 */
link_pwq(pwq);
mutex_unlock(&wq->mutex);
}
return 0;
}
...
}
/* workqueue子系统初始化的下半部分:
start_kernel
arch_call_rest_init
rest_init
kernel_init
kernel_init_freeable
workqueue_init
*/
int __init workqueue_init(void)
{
struct workqueue_struct *wq;
struct worker_pool *pool;
int cpu, bkt;
/*
* It'd be simpler to initialize NUMA in workqueue_init_early() but
* CPU to node mapping may not be available that early on some
* archs such as power and arm64. As per-cpu pools created
* previously could be missing node hint and unbound pools NUMA
* affinity, fix them up.
*非一致性内存访问
* Also, while iterating workqueues, create rescuers if requested.
*/
wq_numa_init();
mutex_lock(&wq_pool_mutex);
for_each_possible_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
pool->node = cpu_to_node(cpu);
}
}
list_for_each_entry(wq, &workqueues, list) {
wq_update_unbound_numa(wq, smp_processor_id(), true);
WARN(init_rescuer(wq),
"workqueue: failed to create early rescuer for %s",
wq->name);
}
mutex_unlock(&wq_pool_mutex);
/* create the initial workers */
for_each_online_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
BUG_ON(!create_worker(pool));
}
}
hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
BUG_ON(!create_worker(pool));
wq_online = true;
wq_watchdog_init();
return 0;
}
static struct worker *create_worker(struct worker_pool *pool)
{
struct worker *worker = NULL;
int id = -1;
char id_buf[16];
/* ID is needed to determine kthread name */
id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
if (id < 0)
goto fail;
// 为worker申请内存,初始化worker成员
worker = alloc_worker(pool->node);
if (!worker)
goto fail;
worker->pool = pool;
worker->id = id;
if (pool->cpu >= 0)
/* (2.1) 给 normal worker_pool 的 worker 构造进程名:
6 root 0 IW< [kworker/0:0H-kb]
12 root 0 IW [kworker/0:1-rcu]
18 root 0 IW< [kworker/1:0H-kb]
636 root 0 IW< [kworker/0:1H]
637 root 0 IW< [kworker/1:1H]
887 root 0 IW [kworker/1:2-pm]
1413 root 0 IW [kworker/1:0-mm_]
1482 root 0 IW [kworker/0:2-eve]
*/
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
pool->attrs->nice < 0 ? "H" : "");
else
/* (2.2) 给 unbound worker_pool 的 worker 构造进程名
1487 root 0 IW [kworker/u4:0-ev]
1488 root 0 IW [kworker/u4:2-ev]
1489 root 0 IW [kworker/u4:1-ev]
*/
snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
// (2.3) 创建 worker 对应的内核进程worker_thread
worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
"kworker/%s", id_buf);
if (IS_ERR(worker->task))
goto fail;
// (2.4) 设置内核进程对应的优先级 nice
set_user_nice(worker->task, pool->attrs->nice);
/* prevent userland from meddling with cpumask of workqueue workers */
worker->task->flags |= PF_NO_SETAFFINITY;
// (2.5) 将 worker 和 worker_pool 绑定
worker_attach_to_pool(worker, pool);
// (2.6) 将 worker 初始状态设置成 idle,
// wake_up_process 以后,worker 自动 leave idle 状态
/* start the newly created worker */
spin_lock_irq(&pool->lock);
worker->pool->nr_workers++;
worker_enter_idle(worker);
wake_up_process(worker->task);
spin_unlock_irq(&pool->lock);
return worker;
fail:
if (id >= 0)
ida_simple_remove(&pool->worker_ida, id);
kfree(worker);
return NULL;
}
static void worker_attach_to_pool(struct worker *worker,
struct worker_pool *pool)
{
mutex_lock(&pool->attach_mutex);
// (2.5.1) 将 worker 线程和 cpu 绑定
set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
if (pool->flags & POOL_DISASSOCIATED)
worker->flags |= WORKER_UNBOUND;
if (worker->rescue_wq)
set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
// (2.5.2) 将 worker 加入 worker_pool 链表
list_add_tail(&worker->node, &pool->workers);
worker->pool = pool;
mutex_unlock(&wq_pool_attach_mutex);
}
1.1.2 unbound worker_pool
大部分的 work 都是通过 normal worker_pool 来执行的 ( 例如通过 schedule_work()
、schedule_work_on()
压入到系统 workqueue(system_wq) 中的 work),最后都是通过 normal worker_pool 中的 worker 来执行的。这些 worker 是和某个 CPU 绑定的,work 一旦被 worker 开始执行,都是一直运行到某个 CPU 上的不会切换 CPU。
unbound worker_pool 相对应的意思,就是 worker 可以在多个 CPU 上调度的。但是他其实也是绑定的,只不过它绑定的单位不是 CPU 而是 node。所谓的 node 是对 NUMA(Non Uniform Memory Access Architecture) 系统来说的,NUMA 可能存在多个 node,每个 node 可能包含一个或者多个 CPU。
unbound worker_pool 对应内核线程 (worker_thread()
) 类似名字是 unbound worker_pool:
shell@PRO5:/ $ ps | grep "kworker"
1487 root 0 IW [kworker/u4:0-ev] #unbound pool 4 的第0个worker 进程
1488 root 0 IW [kworker/u4:2-ev] #unbound pool 4 的第2个worker 进程
1489 root 0 IW [kworker/u4:1-ev] #unbound pool 4 的第1个worker 进程
unbound worker_pool 也分成两类:
- unbound_std_wq。每个 node 对应一个 worker_pool,多个 node 就对应多个 worker_pool;
unbound_std_wq
- ordered_wq。所有 node 对应一个 default worker_pool;
unbound worker_pool: ordered_wq
以下是 unbound worker_pool 详细的创建过程代码分析:
- kernel/workqueue.c:
- workqueue_init_early() -> unbound_std_wq_attrs/ordered_wq_attrs
int __init workqueue_init_early(void)
{
/* create default unbound and ordered wq attrs */
for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
struct workqueue_attrs *attrs;
BUG_ON(!(attrs = alloc_workqueue_attrs()));
attrs->nice = std_nice[i];
unbound_std_wq_attrs[i] = attrs;
/*
* An ordered wq should have only one pwq as ordering is
* guaranteed by max_active which is enforced by pwqs.
* Turn off NUMA so that dfl_pwq is used for all nodes.
*/
BUG_ON(!(attrs = alloc_workqueue_attrs()));
attrs->nice = std_nice[i];
attrs->no_numa = true;
ordered_wq_attrs[i] = attrs;
}
}
- kernel/workqueue.c:
- alloc_workqueue()
->
alloc_and_link_pwqs()->
apply_workqueue_attrs()->
alloc_unbound_pwq()/
numa_pwq_tbl_install()`
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...)
{
// (3) 给 workqueue 分配对应的 pool_workqueue
// pool_workqueue 将 workqueue 和 worker_pool 链接起来
if (alloc_and_link_pwqs(wq) < 0)
goto err_unreg_lockdep;
/*
* wq_pool_mutex protects global freeze state and workqueues list.
* Grab it, adjust max_active and add the new @wq to workqueues
* list.
*/
mutex_lock(&wq_pool_mutex);
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
// (6) 将新的 workqueue 加入到全局链表 workqueues 中
list_add_tail_rcu(&wq->list, &workqueues);
mutex_unlock(&wq_pool_mutex);
...
}
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret;
...
// (3.2) unbound ordered_wq workqueue
// pool_workqueue 链接 workqueue 和 worker_pool 的过程
if (wq->flags & __WQ_ORDERED) {
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */
WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name);
} else {
// (3.3) unbound unbound_std_wq workqueue
// pool_workqueue 链接 workqueue 和 worker_pool 的过程
ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
}
put_online_cpus();
return ret;
}
apply_workqueue_attrs
apply_workqueue_attrs_locked
apply_wqattrs_prepare
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
int node;
lockdep_assert_held(&wq_pool_mutex);
ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs();
tmp_attrs = alloc_workqueue_attrs();
if (!ctx || !new_attrs || !tmp_attrs)
goto out_free;
/*
* Calculate the attrs of the default pwq.
* If the user configured cpumask doesn't overlap with the
* wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
*/
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
if (unlikely(cpumask_empty(new_attrs->cpumask)))
cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
/*
* We may create multiple pwqs with differing cpumasks. Make a
* copy of @new_attrs which will be modified and used to obtain
* pools.
*/
copy_workqueue_attrs(tmp_attrs, new_attrs);
/*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
if (!ctx->dfl_pwq)
goto out_free;
for_each_node(node) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[node])
goto out_free;
} else {
ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[node] = ctx->dfl_pwq;
}
}
/* save the user configured attrs and sanitize it. */
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;
ctx->wq = wq;
free_workqueue_attrs(tmp_attrs);
return ctx;
out_free:
free_workqueue_attrs(tmp_attrs);
free_workqueue_attrs(new_attrs);
apply_wqattrs_cleanup(ctx);
return NULL;
}
int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
// (3.2.1) 根据的 ubound 的 ordered_wq_attrs/unbound_std_wq_attrs
// 创建对应的 pool_workqueue 和 worker_pool
// 其中 worker_pool 不是默认创建好的,是需要动态创建的,对应的 worker 内核进程也要重新创建
// 创建好的 pool_workqueue 赋值给 pwq_tbl[node]
/*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
if (!dfl_pwq)
goto enomem_pwq;
for_each_node(node) {
if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!pwq_tbl[node])
goto enomem_pwq;
} else {
dfl_pwq->refcnt++;
pwq_tbl[node] = dfl_pwq;
}
}
/* save the previous pwq and install the new one */
// (3.2.2) 将临时 pwq_tbl[node] 赋值给 wq->numa_pwq_tbl[node]
for_each_node(node)
pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
}
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
lockdep_assert_held(&wq_pool_mutex);
// (3.2.1.1) 如果对应 attrs 已经创建多对应的 unbound_pool,则使用已有的 unbound_pool
// 否则根据 attrs 创建新的 unbound_pool
pool = get_unbound_pool(attrs);
if (!pool)
return NULL;
pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
if (!pwq) {
put_unbound_pool(pool);
return NULL;
}
init_pwq(pwq, wq, pool);
return pwq;
}
3.2 worker
每个 worker 对应一个 worker_thread()
内核线程,一个 worker_pool 对应一个或者多个 worker。多个 worker 从同一个链表中 worker_pool->worklist 获取 work 进行处理。
1.2.1 worker 处理 work
处理 work 的过程主要在 worker_thread()
-> process_one_work()
中处理,我们具体看看代码的实现过程。
- kernel/workqueue.c:
worker_thread()
->process_one_work()
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct worker_pool *pool = worker->pool;
/* tell the scheduler that this is a workqueue worker */
worker->task->flags |= PF_WQ_WORKER;
woke_up:
spin_lock_irq(&pool->lock);
// (1) 是否 die
/* am I supposed to die? */
if (unlikely(worker->flags & WORKER_DIE)) {
spin_unlock_irq(&pool->lock);
WARN_ON_ONCE(!list_empty(&worker->entry));
worker->task->flags &= ~PF_WQ_WORKER;
set_task_comm(worker->task, "kworker/dying");
ida_simple_remove(&pool->worker_ida, worker->id);
worker_detach_from_pool(worker, pool);
kfree(worker);
return 0;
}
// (2) 脱离 idle 状态
// 被唤醒之前 worker 都是 idle 状态
worker_leave_idle(worker);
recheck:
// (3) 如果需要本 worker 继续执行则继续,否则进入 idle 状态
// need more worker 的条件: (pool->worklist != 0) && (pool->nr_running == 0)
// worklist 上有 work 需要执行,并且现在没有处于 running 的 work
/* no more worker necessary? */
if (!need_more_worker(pool))
goto sleep;
// (4) 如果 (pool->nr_idle == 0),则启动创建更多的 worker
// 说明 idle 队列中已经没有备用 worker 了,先创建 一些 worker 备用
/* do we need to manage? */
if (unlikely(!may_start_working(pool)) && manage_workers(worker))
goto recheck;
/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
WARN_ON_ONCE(!list_empty(&worker->scheduled));
/*
* Finish PREP stage. We're guaranteed to have at least one idle
* worker or that someone else has already assumed the manager
* role. This is where @worker starts participating in concurrency
* management if applicable and concurrency management is restored
* after being rebound. See rebind_workers() for details.
*/
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
// (5) 如果 pool->worklist 不为空,从其中取出一个 work 进行处理
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
// (6) 执行正常的 work
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
// (7) 执行系统特意 scheduled 给某个 worker 的 work
// 普通的 work 是放在池子的公共 list 中的 pool->worklist
// 只有一些特殊的 work 被特意派送给某个 worker 的 worker->scheduled
// 包括:1、执行 flush_work 时插入的 barrier work;
// 2、collision 时从其他 worker 推送到本 worker 的 work
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
// (8) worker keep_working 的条件:
// pool->worklist 不为空 && (pool->nr_running <= 1)
} while (keep_working(pool));
worker_set_flags(worker, WORKER_PREP);supposed
sleep:
// (9) worker 进入 idle 状态
/*
* pool->lock is held and there's no work to process and no need to
* manage, sleep. Workers are woken up only while holding
* pool->lock or from local cpu, so setting the current state
* before releasing pool->lock is enough to prevent losing any
* event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irq(&pool->lock);
schedule();
goto woke_up;
}
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool;
bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
int work_color;
struct worker *collision;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
* inside the function that is called from it, this we need to
* take into account for lockdep too. To avoid bogus "held
* lock freed" warnings as well as problems when looking into
* work->lockdep_map, make a copy and use that here.
*/
struct lockdep_map lockdep_map;
lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
/* ensure we're on the correct CPU */
WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
raw_smp_processor_id() != pool->cpu);
// (8.1) 如果 work 已经在 worker_pool 的其他 worker 上执行,
// 将 work 放入对应 worker 的 scheduled 队列中延后执行
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu. Check whether anyone is
* already processing the work. If so, defer the work to the
* currently executing one.
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}
// (8.2) 将 worker 加入 busy 队列 pool->busy_hash
/* claim and dequeue */
debug_work_deactivate(work);
hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
work_color = get_work_color(work);
list_del_init(&work->entry);
// (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE
// 则当前 work 的执行脱离 worker_pool 的动态调度,成为一个独立的线程
/*
* CPU intensive works don't participate in concurrency management.
* They're the scheduler's responsibility. This takes @worker out
* of concurrency management and the next code block will chain
* execution of the pending work items.
*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判断是否需要唤醒 idle worker
// 普通 work 不会执行这个操作
/*
* Wake up another worker if necessary. The condition is always
* false for normal per-cpu workers since nr_running would always
* be >= 1 at this point. This is used to chain execution of the
* pending work items for WORKER_NOT_RUNNING workers such as the
* UNBOUND and CPU_INTENSIVE ones.
*/
if (need_more_worker(pool))
wake_up_worker(pool);
/*
* Record the last pool and clear PENDING which should be the last
* update to @work. Also, do this inside @pool->lock so that
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
set_work_pool_and_clear_pending(work, pool->id);
spin_unlock_irq(&pool->lock);
lock_map_acquire_read(&pwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
trace_workqueue_execute_start(work);
// (8.5) 执行 work 函数
worker->current_func(work);
/*
* While we must be careful to not use "work" after this, the trace
* point will only record its address.
*/
trace_workqueue_execute_end(work);
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
" last function: %pf\n",
current->comm, preempt_count(), task_pid_nr(current),
worker->current_func);
debug_show_held_locks(current);
dump_stack();
}
/*
* The following prevents a kworker from hogging CPU on !PREEMPT
* kernels, where a requeueing work item waiting for something to
* happen could deadlock with stop_machine as such work item could
* indefinitely requeue itself while all other CPUs are trapped in
* stop_machine. At the same time, report a quiescent RCU state so
* the same condition doesn't freeze RCU.
*/
cond_resched_rcu_qs();
spin_lock_irq(&pool->lock);
/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* we're done with it, release */
hash_del(&worker->hentry);
worker->current_work = NULL;
worker->current_func = NULL;
worker->current_pwq = NULL;
worker->desc_valid = false;
pwq_dec_nr_in_flight(pwq, work_color);
}
1.2.2 worker_pool 动态管理 worker
worker_pool 怎么来动态增减 worker,这部分的算法是 CMWQ 的核心。其思想如下:
- worker_pool 中的 worker 有 3 种状态:idle、running、suspend;
- 如果 worker_pool 中有 work 需要处理,保持至少一个 running worker 来处理;
- running worker 在处理 work 的过程中进入了阻塞 suspend 状态,为了保持其他 work 的执行,需要唤醒新的 idle worker 来处理 work;
- 如果有 work 需要执行且 running worker 大于 1 个,会让多余的 running worker 进入 idle 状态;
- 如果没有 work 需要执行,会让所有 worker 进入 idle 状态;
- 如果创建的 worker 过多,destroy_worker 在 300s(IDLE_WORKER_TIMEOUT) 时间内没有再次运行的 idle worker。
详细代码可以参考上节 worker_thread()
-> process_one_work()
的分析。
为了追踪 worker 的 running 和 suspend 状态,用来动态调整 worker 的数量。wq 使用在进程调度中加钩子函数的技巧:
- 追踪 worker 从 suspend 进入 running 状态:
ttwu_activate()
->wq_worker_waking_up()
void wq_worker_waking_up(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task);
if (!(worker->flags & WORKER_NOT_RUNNING)) {
WARN_ON_ONCE(worker->pool->cpu != cpu);
// 增加 worker_pool 中 running 的 worker 数量
atomic_inc(&worker->pool->nr_running);
}
}
- 追踪 worker 从 running 进入 suspend 状态:
__schedule()
->wq_worker_sleeping()
struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task), *to_wakeup = NULL;
struct worker_pool *pool;
/*
* Rescuers, which may not have all the fields set up like normal
* workers, also reach here, let's not access anything before
* checking NOT_RUNNING.
*/
if (worker->flags & WORKER_NOT_RUNNING)
return NULL;
pool = worker->pool;
/* this can only happen on the local cpu */
if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))
return NULL;
/*
* The counterpart of the following dec_and_test, implied mb,
* worklist not empty test sequence is in insert_work().
* Please read comment there.
*
* NOT_RUNNING is clear. This means that we're bound to and
* running on the local cpu w/ rq lock held and preemption
* disabled, which in turn means that none else could be
* manipulating idle_list, so dereferencing idle_list without pool
* lock is safe.
*/
// 减少 worker_pool 中 running 的 worker 数量
// 如果 worklist 还有 work 需要处理,唤醒第一个 idle worker 进行处理
if (atomic_dec_and_test(&pool->nr_running) &&
!list_empty(&pool->worklist))
to_wakeup = first_idle_worker(pool);
return to_wakeup ? to_wakeup->task : NULL;
}
这里 worker_pool 的调度思想是:如果有 work 需要处理,保持一个 running 状态的 worker 处理,不多也不少。
但是这里有一个问题如果 work 是 CPU 密集型的,它虽然也没有进入 suspend 状态,但是会长时间的占用 CPU,让后续的 work 阻塞太长时间。
为了解决这个问题,CMWQ 设计了 WQ_CPU_INTENSIVE,如果一个 wq 声明自己是 CPU_INTENSIVE,则让当前 worker 脱离动态调度,像是进入了 suspend 状态,那么 CMWQ 会创建新的 worker,后续的 work 会得到执行。
- kernel/workqueue.c:
worker_thread()
->process_one_work()
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
// (1) 设置当前 worker 的 WORKER_CPU_INTENSIVE 标志
// nr_running 会被减 1
// 对 worker_pool 来说,当前 worker 相当于进入了 suspend 状态
/*
* CPU intensive works don't participate in concurrency management.
* They're the scheduler's responsibility. This takes @worker out
* of concurrency management and the next code block will chain
* execution of the pending work items.
*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (2) 接上一步,判断是否需要唤醒新的 worker 来处理 work
/*
* Wake up another worker if necessary. The condition is always
* false for normal per-cpu workers since nr_running would always
* be >= 1 at this point. This is used to chain execution of the
* pending work items for WORKER_NOT_RUNNING workers such as the
* UNBOUND and CPU_INTENSIVE ones.
*/
if (need_more_worker(pool))
wake_up_worker(pool);
// (3) 执行 work
worker->current_func(work);
// (4) 执行完,清理当前 worker 的 WORKER_CPU_INTENSIVE 标志
// 当前 worker 重新进入 running 状态
/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
}
WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
WORKER_UNBOUND | WORKER_REBOUND,
static inline void worker_set_flags(struct worker *worker, unsigned int flags)
{
struct worker_pool *pool = worker->pool;
WARN_ON_ONCE(worker->task != current);
/* If transitioning into NOT_RUNNING, adjust nr_running. */
if ((flags & WORKER_NOT_RUNNING) &&
!(worker->flags & WORKER_NOT_RUNNING)) {
atomic_dec(&pool->nr_running);
}
worker->flags |= flags;
}
static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
{
struct worker_pool *pool = worker->pool;
unsigned int oflags = worker->flags;
WARN_ON_ONCE(worker->task != current);
worker->flags &= ~flags;
/*
* If transitioning out of NOT_RUNNING, increment nr_running. Note
* that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask
* of multiple flags, not a single flag.
*/
if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
if (!(worker->flags & WORKER_NOT_RUNNING))
atomic_inc(&pool->nr_running);
}
3.3 CPU hotplug 处理
从上几节可以看到,系统会创建和 CPU 绑定的 normal worker_pool 和不绑定 CPU 的 unbound worker_pool,worker_pool 又会动态的创建 worker。
那么在 CPU hotplug 的时候,会怎么样动态的处理 worker_pool 和 worker 呢?来看具体的代码分析:
- kernel/workqueue.c:
workqueue_cpu_up_callback()
/workqueue_cpu_down_callback()
static int __init init_workqueues(void)
{
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
}
| →
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
int cpu = (unsigned long)hcpu;
struct work_struct unbind_work;
struct workqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
/* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
// (1) cpu down_prepare
// 把和当前 cpu 绑定的 normal worker_pool 上的 worker 停工
// 随着当前 cpu 被 down 掉,这些 worker 会迁移到其他 cpu 上
queue_work_on(cpu, system_highpri_wq, &unbind_work);
// (2) unbound wq 对 cpu 变化的更新
/* update NUMA affinity of unbound workqueues */
mutex_lock(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, cpu, false);
mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */
flush_work(&unbind_work);
destroy_work_on_stack(&unbind_work);
break;
}
return NOTIFY_OK;
}
static int workqueue_cpu_up_callback(struct notifier_block *nfb,
unsigned long action, void *hcpu)
{
int CPU = (unsigned long)hcpu;
struct worker_pool *pool;
struct workqueue_struct *wq;
int pi;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_UP_PREPARE:
for_each_cpu_worker_pool(pool, CPU) {
if (pool->nr_workers)
continue;
if (!create_worker(pool))
return NOTIFY_BAD;
}
break;
case CPU_DOWN_FAILED:
case CPU_ONLINE:
mutex_lock(&wq_pool_mutex);
// (3) CPU up
for_each_pool(pool, pi) {
mutex_lock(&pool->attach_mutex);
// 如果和当前 CPU 绑定的 normal worker_pool 上,有 WORKER_UNBOUND 停工的 worker
// 重新绑定 worker 到 worker_pool
// 让这些 worker 开工,并绑定到当前 CPU
if (pool->CPU == CPU)
rebind_workers(pool);
else if (pool->CPU < 0)
restore_unbound_workers_cpumask(pool, CPU);
mutex_unlock(&pool->attach_mutex);
}
/* update NUMA affinity of unbound workqueues */
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, CPU, true);
mutex_unlock(&wq_pool_mutex);
break;
}
return NOTIFY_OK;
}