内核RCU的一次实践——实战中加深了理解

news2025/1/12 23:30:22

遍历内核链表是个常规操作,遍历链表过程可能会向链表增加新成员或者从链表剔除老成员,因此遍历链表时一般需要spin  lock加锁保护。如果向链表增加新成员或者从链表剔除老成员不经常出现,大部分只是遍历查询链表中成员,此时链表遍历还用spin  lock加锁保护,在高并发场景就会造成很大的性能损耗,而rcu可以比较好的解决这个问题。本文主要介绍一次内核开发实践中,用rcu降低了遍历内核链表带来的性能损坏,还总结了rcu使用过程遇到的问题、思考、改进。相信看过本文的同学会加深对rcu的理解。

rcu有时感觉挺简单的,但是用起来又感觉很复杂,尤其是宽限期的判定。先把看过的几篇讲解rcu原理不错的文章贴下:

RCU机制

谢宝友:深入理解RCU之三:概念

谢宝友: 深入理解RCU之五:玩具式实现

Linux RCU机制详解 (透彻)

Linux内核同步 - RCU synchronize原理分析

Linux 内核:RCU机制与使用

本文主要是在内核block层开发时,遇到了一个需要频繁的遍历链表、增加链表成员、删除链表成员的场景。为了避免spin lock加锁带来的性能损失,用rcu正好合适。但是对rcu宽限期的判断一直有很多疑惑,从而尝试用原子变量来判断宽限期,发现应该可以用起来,本文主要记录一下这个开发测试过程。

1:对rcu的简单理解

前文列举的几篇文章已经把rcu讲解挺深入的,这里就说下个人对rcu的一些简单理解。看一段经常出现的rcu演示代码

  1. struct foo *gbl_foo;
  2. void foo_read(void)
  3. {
  4.     rcu_read_lock();
  5.     foo *fp = gbl_foo;
  6.     if ( fp != NULL )
  7.             dosomething(fp->a,fp->b,fp->c);
  8.     rcu_read_unlock();
  9. }
  10.  
  11. void foo_update( foo* new_fp )
  12. {
  13.     spin_lock(&foo_mutex);
  14.     foo *old_fp = gbl_foo;
  15.     gbl_foo = new_fp;
  16.     spin_unlock(&foo_mutex);
  17.     synchronize_rcu();
  18.     kfee(old_fp);
  19. }

foo_read()函数是rcu读者执行的,使用gbl_foo这个全局变量。foo_update()是rcu写者执行的,它里边会用new_fp更新gbl_foo,并kfee释放掉老的gbl_fo。foo_read()中的rcu_read_lock()和rcu_read_unlock()用来标记一个rcu读过程的开始和结束,这个读过程就是rcu宽限期。假设此时进程a正执行foo_read(),处于rcu宽限期,而此时进程b正执行foo_update(),用new_fp更新gbl_foo,但是不能立即执行kfee释放老的gbl_foo,因为老的gbl_foo可能正在被进程a使用!于是进程b就先调用synchronize_rcu()函数阻塞,直到所有执行foo_read()的进程都退出rcu宽限期,进程b才会退出synchronize_rcu()函数,然后执行kfee()安全的释放老的gbl_foo。

继续,一般遍历内核链表用的是list_for_each_entry,rcu场景用的是list_for_each_entry_rcu,二者有什么区别呢?看下源码:

  1. #define list_for_each_entry_rcu(pos, head, member, cond...)     \
  2.     for (__list_check_rcu(dummy, ## cond, 0),           \
  3.          pos = list_entry_rcu((head)->next, typeof(*pos), member);  \
  4.         &pos->member != (head);                 \
  5.         pos = list_entry_rcu(pos->member.next, typeof(*pos), member))
  6. #define list_entry_rcu(ptr, type, member) \
  7. container_of(READ_ONCE(ptr), type, member)
  8. #define READ_ONCE(x) __READ_ONCE(x, 1)
  9. #define __READ_ONCE(x, check)                       \
  10. ({                                  \
  11.     union { typeof(x) __val; char __c[1]; } __u;            \
  12.     if (check)                          \
  13.         __read_once_size(&(x), __u.__c, sizeof(x));     \
  14.     else                                \
  15.         __read_once_size_nocheck(&(x), __u.__c, sizeof(x)); \
  16.     smp_read_barrier_depends(); /* Enforce dependency ordering from x */ \
  17.     __u.__val;                          \
  18. })

主要就是获取链表成员改成了list_entry_rcu,它有什么魔力呢?就是在访问链表成员最后,加入了内存屏障,这样避免了cpu cache的干扰,保证得到最新的链表数据,而不是老的。

继续,一般删除链表成员是用list_del,rcu场景是list_del_rcu,看下源码:

  1. static inline void list_del_rcu(struct list_head *entry)
  2. {
  3.     __list_del_entry(entry);
  4.     entry->prev = LIST_POISON2;
  5. }

list_del_rcu相对普通的list_del,只是多了一个entry->prev = LIST_POISON2操作,这样可以保证要删除的链表成员的prev指针无效。举个例子,如果进程1正在list_del_rcu删除链表成员D,它前边和后边的链表成员是prev和 next。此时进程2正在访问链表成员D,则进程2下一次得到的链表成员们要么是D,要么是next,不会有第3中情况,cpu硬件机制保证。如下示意图简单演示list_del_rcu删除链表成员:

 我觉得重点是,删除链表成员D,不是一下子全删掉,删除过程D还指向next,只是prev不再指向D了。这样如果有进程正在访问成员D,还能保证该进程通过D找到下一个成员next。而后续的进程再遍历该链表,通过prev找到的下一个链表成员是next。

继续,一般向链表插入新成员是执行list_add,rcu的版本是list_add_rcu,看下源码:

  1. static inline void list_add_rcu(struct list_head *new, struct list_head *head)
  2. {
  3.     __list_add_rcu(new, head, head->next);
  4. }
  5. static inline void __list_add_rcu(struct list_head *new,
  6.         struct list_head *prev, struct list_head *next)
  7. {
  8.     if (!__list_add_valid(new, prev, next))
  9.         return;
  10.     new->next = next;
  11.     new->prev = prev;
  12.     rcu_assign_pointer(list_next_rcu(prev), new);
  13.     next->prev = new;
  14. }
  15. #define rcu_assign_pointer(p, v)                          \
  16. ({                                        \
  17.     uintptr_t _r_a_p__v = (uintptr_t)(v);                     \
  18.     rcu_check_sparse(p, __rcu);                   \
  19.                                           \
  20.     if (__builtin_constant_p(v) && (_r_a_p__v) == (uintptr_t)NULL)        \
  21.         WRITE_ONCE((p), (typeof(p))(_r_a_p__v));              \
  22.     else                                      \
  23.         smp_store_release(&p, RCU_INITIALIZER((typeof(p))_r_a_p__v)); \
  24.     _r_a_p__v;                                \
  25. })

这个设计很巧妙,先执行new->next = next和new->prev = prev,令新的链表成员new指向它前后和后边的链表成员prev和next。然后执行rcu_assign_pointer(list_next_rcu(prev), new)令前边的链表成员prev指向new,这个赋值有内存屏障加成,不仅避免了编译器指令乱排序,还避免了cache同步不及时带了的问题。简单说,这个赋值后,前边的链表成员prev立即指向new,并且所有cpu都是这样,避免了多核cpu因cache同步延迟而无法及时得到最新的链表数据。最后执行next->prev = new令后边的链表成员指向new。

这样做的效果是:在把new插入链表prev和next链表成员时,如果有进程正在访问prev链表成员,则该进程得到的下一个链表成员,要么是new,要么是next。不会有第3中情况!要么得到的是老数据,要么得到的是新数据,不会有第3中情况,cpu的硬件机制保证了这种情况。

为了便于理解,如下演示list_add_rcu增加链表成员的示意图:

 我觉得重点是,在把new添加到链表时,如果有其他进程正访问prev,则它得到的下一个链表成员要么是next,要么是new,不会出现第3种情况。

2:rcu的实战应用

实际场景是:我想统计进程IO派发过程的延迟,分别在IO请求插入IO队列、IO请求派发、IO传输完成记录当前时间点,然后计算IO请求在IO队列的时间(ID 耗时)、IO请求派发后在磁盘驱动的时间(DC耗时),最后每隔1s打印一次进程当前周期派发的IO请求数、最大ID和DC耗时、平均ID和DC耗时,依次评估block层调度器对进程IO传输的影响。

原理并不复杂,这里把相关数据结构和源码贴下

  1. struct process_io_control{
  2.         int enable;    
  3.         spinlock_t process_lock_list;
  4.         struct list_head process_io_control_head;
  5.         struct list_head process_io_control_head_del;//暂时存放deleteprocess_io_info
  6.         struct task_struct *kernel_thread;
  7.         struct kmem_cache *process_rq_stat_cachep;
  8.         struct kmem_cache *process_io_info_cachep;
  9.         atomic_t read_lock_count;
  10. };
  11. struct process_io_info{
  12.         int pid;           
  13.         char comm[COMM_LEN];
  14.         //进程有多少个IO请求在传输
  15.         atomic_t rq_count;
  16.         ..........
  17.         u32 max_id_time;//IO请求在IO队列最长的停留时间(进程的)
  18.         u32 max_dc_time;//IO请求在磁盘驱动层的最长耗时(进程的)
  19.         u32 all_id_time;//进程传输的每个IO请求在队列停留时间之和
  20.         u32 all_dc_time;//进程传输的IO请求在磁盘驱动层的耗时之和
  21.         //周期内进程传输完成的IO请求数,每个周期开始时清0,然后进程每传输完成一个IO则加1
  22.         int complete_rq_count;
  23.         struct list_head process_io_info_list;
  24.         struct list_head process_io_info_del;
  25.         int has_deleted;
  26.         spinlock_t io_data_lock;
  27. }
  28. struct process_rq_stat{
  29.         struct request *rq;
  30.         //IO请求插入IO队列的时间
  31.         u64 rq_inset_time;
  32.         //IO请求派发的时间
  33.         u64 rq_issue_time;
  34.         u32 id_time;
  35.         u32 dc_time;
  36.         struct process_io_info *p_process_io_info;
  37. };

进程每次派发IO请求都分配一个struct process_rq_stat结构,里边主要记录IO请求插入IO队列的时间、IO请求派发的时间、ID和DC耗时。每个IO传输的进程都会首先分配一个struct process_io_info结构,主要记录该进程的信息、进程派发的IO的最大ID、最大DC耗时、进程派发IO请求数等等。struct process_io_control是总的控制结构,每个struct process_io_info结构都会添加到struct process_io_control的process_io_control_head链表。

这些数据结构是怎么跟块层联系上呢?如下:

  1. struct gendisk {
  2.     ..........
  3.     struct process_io_control process_io;
  4.     ..........
  5. }
  6. struct request {           
  7.     ..........
  8.     struct process_rq_stat *p_process_rq_stat;
  9.     ..........
  10. }

struct process_io_control 结构内嵌在代表块设备的struct gendisk结构里,struct process_rq_stat *p_process_rq_stat指针内嵌在代表IO请求的struct request结构中,p_process_rq_stat就指向了为每个IO请求分配的struct process_rq_stat结构

下边看下在IO请求插入IO队列、IO请求派发、IO请求传输完成 必须执行的blk_mq_sched_request_inserted、blk_mq_start_request、blk_account_io_done函数里添加的代码:

  1. void blk_mq_sched_request_inserted(struct request *rq)
  2. {
  3.     if(rq->rq_disk && rq->rq_disk->process_io.enable){
  4.         struct process_rq_stat *p_process_rq_stat_tmp = NULL;
  5.         struct process_io_info *p_process_io_info_tmp = NULL;
  6.         int find = 0;
  7.                
  8.         //类似 rcu_read_lock()开始宽限期Grace Period
  9.         atomic_inc(&(rq->rq_disk->process_io.read_lock_count));
  10.         list_for_each_entry_rcu(p_process_io_info_tmp, &(rq->rq_disk->process_io.process_io_control_head), process_io_info_list){
  11.             if(p_process_io_info_tmp->pid == current->pid){
  12.                   spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list));
  13.                   //has_deleted1说明该process_rq_info已经被删除了,那就跳出循环重新分配
  14.                   if(p_process_io_info_tmp->has_deleted){
  15.                       spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  16.                       break;
  17.                   }
  18.                   else
  19.                   {   //进程在传输的IO请求数加1
  20.                       atomic_inc(&(p_process_io_info_tmp->rq_count));
  21.                   }
  22.                   spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  23.                   find = 1;
  24.                   break;
  25.             }
  26.         }
  27.         //类似 rcu_read_unlock()结束宽限期Grace Period
  28.         atomic_dec(&(rq->rq_disk->process_io.read_lock_count));
  29.         ...................
  30.         //没有找到与进程匹配的process_io_info则针对进程分配新的process_io_info
  31.         if(0 == find){
  32.             p_process_io_info_tmp = kmem_cache_alloc(rq->rq_disk->process_io.process_io_info_cachep,GFP_ATOMIC);
  33.             memset(p_process_io_info_tmp,0,sizeof(struct process_io_info));
  34.            
  35.             ///进程在传输的IO请求数加1
  36.             atomic_inc(&(p_process_io_info_tmp->rq_count));
  37.             //process_io_control_head链表插入process_io_info需要加锁,因为同时在print_process_io_info()函数会从 process_io_control_head链表删除process_io_info,同时多个rcu writer,需要加锁
  38.             spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list));
  39.             list_add_rcu(&p_process_io_info_tmp->process_io_info_list,&(rq->rq_disk->process_io.process_io_control_head));
  40.             spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  41.         }
  42.         //针对本次的IO请求分配process_rq_stat结构
  43.         p_process_rq_stat_tmp = kmem_cache_alloc(rq->rq_disk->process_io.process_rq_stat_cachep,GFP_ATOMIC);
  44.         memset(p_process_rq_stat_tmp,0,sizeof(struct process_rq_stat));
  45.  
  46.         p_process_io_info_tmp->pid = current->pid;
  47.         strncpy(p_process_io_info_tmp->comm,current->comm,COMM_LEN-1);
  48.         //建立process_rq_stat process_io_info的关系
  49.         p_process_rq_stat_tmp->p_process_io_info = p_process_io_info_tmp;
  50.         //process_rq_stat 记录 IO请求插入IO队列的时间
  51.         p_process_rq_stat_tmp->rq_inset_time = ktime_to_us(ktime_get());
  52.         //IO请求的 rq->p_process_rq_stat 指向新分配的process_rq_stat结构,这就是为该IO请求分配的process_rq_stat结构
  53.         rq->p_process_rq_stat = p_process_rq_stat_tmp;
  54.         return;
  55.     }
  56. }

blk_mq_sched_request_inserted函数中,先list_for_each_entry_rcu遍历process_io_control_head链表,查看是否有该进程绑定的process_io_info结构,没有的话就要为进程分配一个process_io_info结构,并把process_io_info添加到process_io_control_head链表。process_io_control_head链表保存了每个进行IO传输的进程绑定的process_io_info结构。然后p_process_io_info_tmp->pid = current->pid 和 strncpy(p_process_io_info_tmp->comm,current->comm,COMM_LEN-1)在process_io_info结构记录当前进程的名字和PID,这样就建立了进程和process_io_info的绑定关系。

接着,为当前进程传输的IO请求分配一个process_rq_stat结构,其成员rq_inset_time记录IO请求插入IO队列的时间。接着执行p_process_rq_stat_tmp->p_process_io_info = p_process_io_info_tmp 和 rq->p_process_rq_stat = p_process_rq_stat_tmp,建立代表IO请求的rq、为IO请求分配的process_rq_stat结构、代表进程的process_io_info结构彼此之间的联系。

IO请求派发blk_mq_start_request函数中添加的源码如下:

  1. void blk_mq_start_request(struct request *rq)
  2. {
  3.     ...................
  4.     if(rq->rq_disk && rq->rq_disk->process_io.enable && rq->p_process_rq_stat){
  5.         struct process_rq_stat *p_process_rq_stat_tmp = rq->p_process_rq_stat;
  6.         struct process_io_info *p_process_io_info_tmp = rq->p_process_rq_stat->p_process_io_info;
  7.         ................
  8.         //记录IO请求派发的时间
  9.         p_process_rq_stat_tmp->rq_issue_time = ktime_to_us(ktime_get());
  10.         //计算IO请求在IO队列的停留时间
  11.         p_process_rq_stat_tmp->id_time = p_process_rq_stat_tmp->rq_issue_time - p_process_rq_stat_tmp->rq_inset_time;
  12.     }
  13. }

主要记录IO请求派发时间rq_issue_time,并计算IO请求在IO队列的耗时id_time。IO请求传输执行的函数blk_account_io_done中添加的源码如下:

  1. void blk_account_io_done(struct request *req, u64 now)
  2. {
  3.     .................
  4.     if(req->rq_disk && req->rq_disk->process_io.enable && req->p_process_rq_stat){
  5.         struct process_rq_stat *p_process_rq_stat_tmp = req->p_process_rq_stat;
  6.         struct process_io_info *p_process_io_info_tmp = req->p_process_rq_stat->p_process_io_info;
  7.         //计算IO请求的dc时间
  8.         p_process_rq_stat_tmp->dc_time = ktime_to_us(ktime_get()) - p_process_rq_stat_tmp->rq_issue_time;
  9.        
  10.         spin_lock_irq(&(p_process_io_info_tmp->io_data_lock));
  11.         if(p_process_rq_stat_tmp->id_time > p_process_io_info_tmp->max_id_time){
  12.             //记录最大的id time
  13.             p_process_io_info_tmp->max_id_time = p_process_rq_stat_tmp->id_time;
  14.         }
  15.         if(p_process_rq_stat_tmp->dc_time > p_process_io_info_tmp->max_dc_time){
  16.             //记录最大的dc time
  17.             p_process_io_info_tmp->max_dc_time = p_process_rq_stat_tmp->dc_time;
  18.         }
  19.         //累加进程的IO请求在队列的时间,需加锁保护,因为同时可能执行print_process_io_info函数清0
  20.         p_process_io_info_tmp->all_id_time += p_process_rq_stat_tmp->id_time;
  21.         //累加进程的IO请求在磁盘驱动的时间,需加锁保护,因为同时可能执行print_process_io_info函数清0
  22.         p_process_io_info_tmp->all_dc_time += p_process_rq_stat_tmp->dc_time;
  23.         //进程传输完成的IO请求数加1
  24.         p_process_io_info_tmp->complete_rq_count ++;
  25.         spin_unlock_irq(&(p_process_io_info_tmp->io_data_lock));
  26.        
  27.         //进程在传输的IO请求数减1
  28.         atomic_dec(&(p_process_io_info_tmp->rq_count));
  29.         ..............
  30.     }
  31. }

blk_account_io_done函数主要是计算IO请求在磁盘驱动层的耗时dc_time,并计算进程IO传输过程最大的id_time和dc_time耗时。在哪里使用这些采集到的IO信息呢?在print_process_io_info()函数,每1s执行一次,如下:

  1. void print_process_io_info(struct process_io_control *p_process_io_tmp)
  2. {
  3.     struct process_io_info *p_process_io_info_tmp = NULL;
  4.     struct process_io_info *p_process_io_info_tmp_copy = NULL;
  5.    
  6.     //类似 rcu_read_lock()开始宽限期Grace Period
  7.     atomic_inc(&(p_process_io_tmp->read_lock_count));  
  8.     list_for_each_entry_rcu(p_process_io_info_tmp, &(p_process_io_tmp->process_io_control_head), process_io_info_list){
  9.         //如果上一个周期内process_io_info绑定的进程有IO传输
  10.         if(p_process_io_info_tmp->complete_rq_count != 0)
  11.         {
  12.             spin_lock_irq(&(p_process_io_info_tmp->io_data_lock));
  13.             //获取该进程在上一个周期传输的IO请求中,最大ID耗时 DC耗时
  14.             max_id_time = p_process_io_info_tmp->max_id_time;
  15.             max_dc_time = p_process_io_info_tmp->max_dc_time;
  16.             //计算该进程在上一个周期传输的IO请求中平均ID耗时 平均DC耗时
  17.             avg_id_time = p_process_io_info_tmp->all_id_time/p_process_io_info_tmp->complete_rq_count;
  18.             avg_dc_time = p_process_io_info_tmp->all_dc_time/p_process_io_info_tmp->complete_rq_count;
  19.            ...................
  20.             //对该进程传输完成的IO请求数清0
  21.             p_process_io_info_tmp->complete_rq_count = 0;
  22.             spin_unlock_irq(&(p_process_io_info_tmp->io_data_lock));
  23.         }
  24.         //如果上一个周期内process_io_info绑定的进程没有传输IO
  25.         else if(p_process_io_info_tmp->complete_rq_count == 0)
  26.         {
  27.             spin_lock_irq(&(p_process_io_tmp->process_lock_list));
  28.             //再次确保process_io_info绑定的进程当前也没有IO请求在传输
  29.             if(atomic_read(&(p_process_io_info_tmp->rq_count)) == 0){
  30.                 //设置process_io_infodelete标记,并把process_io_infoprocess_io_control_head链表剔除
  31.                 p_process_io_info_tmp->has_deleted = 1;
  32.                 list_del_rcu(&p_process_io_info_tmp->process_io_info_list);
  33.             }else{
  34.                 spin_unlock_irq(&(p_process_io_tmp->process_lock_list));
  35.                 continue;
  36.             }
  37.             spin_unlock_irq(&(p_process_io_tmp->process_lock_list));
  38.             //process_io_infoprocess_io_control_head链表剔除后,再临时添加到process_io_control_head_del链表,下边在合适实际再释放process_io_info结构
  39.             list_add(&p_process_io_info_tmp->process_io_info_del,&(p_process_io_tmp->process_io_control_head_del));
  40.         }
  41.     }
  42.     //类似 rcu_read_unlock()结束宽限期Grace Period
  43.     atomic_dec(&(p_process_io_tmp->read_lock_count));
  44.    
  45.     while(i ++ < 2)//加锁while循环是一旦 p_process_io_tmp->read_lock_count 不是0,先等等
  46.     {
  47.         // read_lock_count 0,说明process_io_control_head_del上的process_io_info在从process_io_control_head链表剔除后,遍历 process_io_control_head链表的进程都退出了rcu宽限期,可以放心释放process_io_info结构了
  48.         if(atomic_read(&(p_process_io_tmp->read_lock_count)) == 0){
  49.             list_for_each_entry_safe(p_process_io_info_tmp,p_process_io_info_tmp_copy,&(p_process_io_tmp->process_io_control_head_del), process_io_info_del){
  50.                 list_del(&p_process_io_info_tmp->process_io_info_del);
  51.                 //真正释放process_io_info结构                kmem_cache_free(p_process_io_tmp->process_io_info_cachep,p_process_io_info_tmp);
  52.             }
  53.             break;
  54.         }
  55.         msleep(5);
  56.     }
  57. }
  58. //每个1s执行一次 print_process_io_info()
  59. static int process_rq_stat_thread(void *arg)
  60. {
  61.     struct process_io_control *p_process_io_tmp = (struct process_io_control *)arg;
  62.     while (!kthread_should_stop()) {
  63.         print_process_io_info(p_process_io_tmp);
  64.         msleep(1000);
  65.     }  
  66.     p_process_io_tmp->kernel_thread = NULL;
  67.     return 0;
  68. }

print_process_io_info函数主要是list_for_each_entry_rcu里遍历process_io_control_head链表,取出每个IO传输进程绑定的process_io_info结构,这个结构记录了该进程IO传输的统计信息,如进程派发的IO请求数complete_rq_count、IO请求在         IO队列最大耗时max_id_time和平均耗时avg_id_time、IO请求的在磁盘驱动层最大耗时max_dc_time 和平均耗时avg_dc_time。list_for_each_entry_rcu循环正是取出这些进程的IO统计信息做处理,源码未贴出,实际是printk只是把这些信息打印出来。

print_process_io_info函数主要是list_for_each_entry_rcu遍历process_io_control_head链表上每个进程的process_io_info结构过程,另一个关键点是:如果进程没有派发IO请求(即else if(p_process_io_info_tmp->complete_rq_count == 0)成立),说明该进程绑定的process_io_info是空闲的,为避免多个空闲process_io_info带来的空间和性能损耗,则执行list_del_rcu(&p_process_io_info_tmp->process_io_info_list)把该进程绑定的process_io_info结构从process_io_control_head链表剔除。但是不能立即执行kmem_cache_free释放这个process_io_info结构,为什么不能立即kmem_cache_free释放这个process_io_info结构呢?因为这个process_io_info结构可能正在被其他进程使用!比如其他进程很大可能正在blk_mq_sched_request_inserted()函数list_for_each_entry_rcu遍历process_io_control_head链表,而正使用这个process_io_info结构。本质就是rcu宽限期还没到!我们这里只是临时把该process_io_info结构再临时添加到process_io_control_head_del链表,等到合适时机再kmem_cache_free。

问题来了,该怎么判断rcu宽限期已经结束了呢?原始判断rcu宽限期的方法要用到rcu_read_lock()、rcu_read_unlock()、synchronize_rcu(),但是由于对rcu宽限期的理解还不深刻,对使用rcu_read_lock()、rcu_read_unlock()判断rcu宽限期的的开始和结束,总觉得是个玄学。于是就想,为什么不自己实现一套rcu宽限期的判断方法呢

3:rcu宽限期判断的一个改进

想来想去可以用原子变量来判断rcu宽限期的开始和结束,请再看下前文列出的blk_mq_sched_request_inserted和print_process_io_info函数源码,这两个函数都会list_for_each_entry_rcu遍历process_io_control_head链表上的process_io_info结构,blk_mq_sched_request_inserted函数会向process_io_control_head链表增加新的process_io_info结构,print_process_io_info函数会从process_io_control_head链表剔除空闲的process_io_info结构。

这两个函数在list_for_each_entry_rcu遍历process_io_control_head链表上的process_io_info结构那个循环前后,都添加了atomic_inc(&(rq->rq_disk->process_io.read_lock_count)) 和 atomic_dec(&(rq->rq_disk->process_io.read_lock_count)) 这两行代码。没错,二者就跟rcu_read_lock()和rcu_read_unlock()作用一样,用来标记rcu宽限期的开始和结束。在进入rcu宽限期时,令read_lock_count这个原子变量加1。在rcu宽限期结束时,令read_lock_count这个原子变量减1。,read_lock_count原子变量初值是0,如果read_lock_count原子变量是0,说明此时处于rcu宽限期外,可以安全的释放process_io_control_head链表上的process_io_info结构。我们把关键源码再列下:

  1. void blk_mq_sched_request_inserted(struct request *rq)
  2. {
  3.     if(rq->rq_disk && rq->rq_disk->process_io.enable){
  4.         struct process_rq_stat *p_process_rq_stat_tmp = NULL;
  5.         struct process_io_info *p_process_io_info_tmp = NULL;
  6.         int find = 0;
  7.                
  8.         //类似 rcu_read_lock()开始宽限期Grace Period
  9.         atomic_inc(&(rq->rq_disk->process_io.read_lock_count));
  10.         list_for_each_entry_rcu(p_process_io_info_tmp, &(rq->rq_disk->process_io.process_io_control_head), process_io_info_list){
  11.             if(p_process_io_info_tmp->pid == current->pid){
  12.                   spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list));
  13.                   //has_deleted1说明该process_rq_stat已经被删除了,那就跳出循环重新分配
  14.                   if(p_process_io_info_tmp->has_deleted){
  15.                       spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  16.                       break;
  17.                   }
  18.                   else
  19.                   {   //进程在传输的IO请求数加1
  20.                       atomic_inc(&(p_process_io_info_tmp->rq_count));
  21.                   }
  22.                   spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  23.                   find = 1;
  24.                   break;
  25.             }
  26.         }
  27.         //类似 rcu_read_unlock()结束宽限期Grace Period
  28.         atomic_dec(&(rq->rq_disk->process_io.read_lock_count));
  29.         ...................
  30.         //没有找到与进程匹配的process_io_info则针对进程分配新的process_io_info
  31.         if(0 == find){
  32.             p_process_io_info_tmp = kmem_cache_alloc(rq->rq_disk->process_io.process_io_info_cachep,GFP_ATOMIC);
  33.             memset(p_process_io_info_tmp,0,sizeof(struct process_io_info));
  34.            
  35.             ///进程在传输的IO请求数加1
  36.             atomic_inc(&(p_process_io_info_tmp->rq_count));
  37.             //process_io_control_head链表插入process_io_info需要加锁,因为同时在print_process_io_info()函数会从 process_io_control_head链表删除process_io_info,同时多个writer,需要加锁
  38.             spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list));
  39.             list_add_rcu(&p_process_io_info_tmp->process_io_info_list,&(rq->rq_disk->process_io.process_io_control_head));
  40.             spin_unlock_irq(&(rq->rq_disk->process_io.process_lock_list));
  41.         }
  42.         ...................
  43.         return;
  44. }
  45. void print_process_io_info(struct process_io_control *p_process_io_tmp)
  46. {
  47.     struct process_io_info *p_process_io_info_tmp = NULL;
  48.     struct process_io_info *p_process_io_info_tmp_copy = NULL;
  49.    
  50.     //类似 rcu_read_lock()开始宽限期Grace Period
  51.     atomic_inc(&(p_process_io_tmp->read_lock_count));  
  52.     list_for_each_entry_rcu(p_process_io_info_tmp, &(p_process_io_tmp->process_io_control_head), process_io_info_list){
  53.         ..................
  54.         //如果上一个周期内process_io_info绑定的进程没有IO请求传输
  55.         else if(p_process_io_info_tmp->complete_rq_count == 0)
  56.         {
  57.             spin_lock_irq(&(p_process_io_tmp->process_lock_list));
  58.             //确保process_io_info绑定的进程目前也没有IO请求在传输
  59.             if(atomic_read(&(p_process_io_info_tmp->rq_count)) == 0){
  60.                 //设置process_io_infodelete标记,并把process_io_infoprocess_io_control_head链表剔除
  61.                 p_process_io_info_tmp->has_deleted = 1;
  62.                 list_del_rcu(&p_process_io_info_tmp->process_io_info_list);
  63.             }else{
  64.                 spin_unlock_irq(&(p_process_io_tmp->process_lock_list));
  65.                 continue;
  66.             }
  67.             spin_unlock_irq(&(p_process_io_tmp->process_lock_list));
  68.             //process_io_infoprocess_io_control_head链表剔除后,再临时添加到process_io_control_head_del链表,下边再合适实际再释放process_io_info结构
  69.             list_add(&p_process_io_info_tmp->process_io_info_del,&(p_process_io_tmp->process_io_control_head_del));
  70.         }
  71.     }
  72.     //类似 rcu_read_unlock()结束宽限期Grace Period
  73.     atomic_dec(&(p_process_io_tmp->read_lock_count));
  74.    
  75.     while(i ++ < 2)//加锁while循环是一旦 p_process_io_tmp->read_lock_count 不是0,先等等
  76.     {
  77.         // read_lock_count 0,说明process_io_control_head_del上的process_io_info在从process_io_control_head链表剔除后,遍历 process_io_control_head链表的进程都退出了宽限期,可以放心process_io_info结构释放了
  78.         if(atomic_read(&(p_process_io_tmp->read_lock_count)) == 0){
  79.             list_for_each_entry_safe(p_process_io_info_tmp,p_process_io_info_tmp_copy,&(p_process_io_tmp->process_io_control_head_del), process_io_info_del){
  80.                 list_del(&p_process_io_info_tmp->process_io_info_del);
  81.                 //真正释放process_io_info结构
  82. kmem_cache_free(p_process_io_tmp->process_io_info_cachep,p_process_io_info_tmp);
  83.             }
  84.             break;
  85.         }
  86.         msleep(5);
  87.     }
  88. }

print_process_io_info()函数中,list_for_each_entry_rcu遍历process_io_control_head链表上的process_io_info时,如果该process_io_info对应的进程没有近期没有IO传输,则list_del_rcu(&p_process_io_info_tmp->process_io_info_list)把该process_io_info从process_io_control_head链表中剔除,但是不会立即kmem_cache_free释放该结构,而只是把它暂时再移动到process_io_control_head_del链表。最后,在while(i ++ < 2){…..}中,里边主要判断如果read_lock_count原子变量是0,说明此时rcu宽限期已经结束,就安全的从process_io_control_head_del链表链表中取出已经从process_io_control_head链表上剔除的process_io_info结构, 然后kmem_cache_free释放它。

简单总结一下:在print_process_io_info和blk_mq_sched_request_inserted函数,都会list_for_each_entry_rcu遍历process_io_control_head链表。但是遍历前都要执行atomic_inc(&(rq->rq_disk->process_io.read_lock_count))令read_lock_count原子变量加1,在遍历链表结束后执行atomic_dec(&(rq->rq_disk->process_io.read_lock_count))令read_lock_count原子变量减1。简单说,read_lock_count原子变量初值是0,如果read_lock_count大于0说明有进程正在print_process_io_info或 blk_mq_sched_request_inserted函数中,list_for_each_entry_rcu遍历process_io_control_head链表。此时正处于rcu宽限期,只能从process_io_control_head链表上list_del_rcu剔除链表成员process_io_info,但不能kmem_cache_free释放它。而等read_lock_count原子变量是0,说明此时已经退出了rcu宽限期,就可以确定没有进程在使用刚从链表剔除的process_io_info结构,可安全的kmem_cache_free释放了。

ok,另外需要注意的一点是,rcu允许多个读者,但是如果有多个写者就需要加锁防护。可以看下blk_mq_sched_request_inserted和print_process_io_info函数里有spin lock加锁的地方。首先看下两个函数里list_for_each_entry_rcu遍历process_io_control_head链表的循环里,都有spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list))加锁。这个加锁是为了防止print_process_io_info函数里从链表中删除掉的process_io_info再被blk_mq_sched_request_inserted函数用到。

具体过程解释如下:进程绑定的process_io_info结构的成员rq_count表示该进程正在派发的IO请求数,在IO请求插入IO队列执行blk_mq_sched_request_inserted时加1,在IO请求传输完成时减1。存在这样一种极端情况,进程A执行print_process_io_info函数的list_for_each_entry_rcu遍历process_io_control_head链表,遍历到某个process_io_info的rq_count是0,于是就执行list_del_rcu(&p_process_io_info_tmp->process_io_info_list)把这个process_io_info从链表剔除掉。

而同一个时间点,该process_io_info绑定的进程B正执行blk_mq_sched_request_inserted函数里的list_for_each_entry_rcu,遍历到process_io_control_head链表上的这个process_io_info结构。注意,即便此时进程A在从process_io_control_head链表剔除该process_io_info,但是在剔除前一瞬间,进程B已经在process_io_control_head链表遍历到这个process_io_info了,这种情况完全是有可能的!于是,进程B继续执行blk_mq_sched_request_inserted函数里的if(p_process_io_info_tmp->pid == current->pid),则该if成立,则find = 1。这表示已经为该进程分配了process_io_info结构,就不会分配新的process_io_info结构了。这样就会出问题,进程A在print_process_io_info函数最后回kmem_cache_free释放这个process_io_info结构,而进程B还在使用这个process_io_info结构,十有八九触发内核crash。

出现这个问题的根本原因是,没有一种机制表示process_io_info已经被从链表中剔除了,于是就有了print_process_io_info和blk_mq_sched_request_inserted函数list_for_each_entry_rcu循环里,spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list))加锁,并且为process_io_info结构增加has_deleted成员。

有了这个机制就完美了,还是前文同样的场景。进程A和进程B只有一个进程能抢占process_lock_list锁。如果进程A在print_process_io_info函数的list_for_each_entry_rcu循环首先抢占process_lock_list锁,则执行p_process_io_info_tmp->has_deleted = 1并list_del_rcu(&p_process_io_info_tmp->process_io_info_list)从链表剔除该process_io_info。同一个时间点进程B在blk_mq_sched_request_inserted函数在list_for_each_entry_rcu循环也遍历到了这个process_io_info结构,但是获取process_lock_list锁失败。等进程B得到 process_lock_list锁,此时process_io_info的has_deleted是1,说明这个process_io_info已经被从链表剔除了,则只能重新为进程分配一个新的process_io_info结构了。

如果同一个时间点,是进程B首先在blk_mq_sched_request_inserted函数的list_for_each_entry_rcu循环,首先抢占了process_lock_list锁,则atomic_inc(&(p_process_io_info_tmp->rq_count))令rq_count加1。而同一个时间点,进程A在print_process_io_info函数,在得到process_lock_list锁后,因为rq_count是1则if(atomic_read(&(p_process_io_info_tmp->rq_count)) == 0)不成立,那就不会再把这个process_io_info结构从process_io_control_head链表剔除了。

rcu机制也不能完美保障可以从链表放心大胆剔除链表成员,具体情况需要具体分析!最后一点还需要提下,blk_mq_sched_request_inserted函数里if(0 == find)分支,在把新分配的process_io_info执行list_add_rcu(&p_process_io_info_tmp->process_io_info_list,&(rq->rq_disk->process_io.process_io_control_head))添加到process_io_control_head链表时,也spin_lock_irq(&(rq->rq_disk->process_io.process_lock_list))加了锁。这是因为此时print_process_io_info函数list_for_each_entry_rcu循环里,可能正在从process_io_control_head链表剔除process_io_info结构,这两处都是rcu写者,而多个rcu写者同时操作链表时,是需要加锁的。为什么?

举个例子:blk_mq_sched_request_inserted函数里正执行list_add_rcu(&p_process_io_info_tmp->process_io_info_list,&(rq->rq_disk->process_io.process_io_control_head))向process_io_control_head链表链表头增加一个新的成员,而同时print_process_io_info函数list_for_each_entry_rcu循环里正在process_io_control_head链表删除链表头后的第一个成员。二者是不能同时进行的,一个删除链表第一个成员,一个在向链表第一个成员前添加一个新的成员,没有锁保护,肯定会出错。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/131375.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Cloud:eureka注册中心

在传统的单体应用中&#xff0c;所有的业务都集中在一个服务器中&#xff0c;当浏览器发起请求时&#xff0c;通过前端请求调用后端接口&#xff0c;后端接口调用相应的业务并在前端进行响应&#xff0c;整个的调用就是从请求到响应的一条龙服务。所以不存在服务之间的中转&…

jetson nano GPIO引脚控制舵机

文章目录一.舵机介绍二.舵机工作原理180度舵机360度舵机三.利用jetson nano GPIO控制舵机1.jetson nano与舵机接2.c编写程序输出脉冲(Qt做界面)一.舵机介绍 舵机&#xff0c;是指在自动驾驶仪中操纵飞机舵面&#xff08;操纵面&#xff09;转动的一种执行部件。分有&#xff1a…

代码随想录算法训练营第十三天(栈与队列)| 239. 滑动窗口最大值,347.前 K 个高频元素

代码随想录算法训练营第十三天&#xff08;栈与队列&#xff09;| 239. 滑动窗口最大值&#xff0c;347.前 K 个高频元素 239. 滑动窗口最大值 之前讲的都是栈的应用&#xff0c;这次该是队列的应用了。 本题算比较有难度的&#xff0c;需要自己去构造单调队列&#xff0c;建…

std::map使用方式以及注意事项(关于相同key的问题)

std::map的使用在C开发中也是经常会用到的一些东西&#xff0c;这里进行一些简单的使用记录&#xff0c;包括如何插入、删除以及修改等。 1、std::map插入&#xff1a; map的插入使用的是insert的方式&#xff0c;一个map包含了key与value两个值。首先需要对两个值进行赋值&a…

Spring Security认证授权练手小项目 腾讯视频VIP权限管理功能

腾讯视频VIP权限管理1、项目功能视频演示2、需求与设计1、需求2、功能概要3、接口设计3、项目源码结构4、项目源码下载5、项目部署1、部署架构2、数据库环境准备3、redis环境准备4、Spring Boot服务准备5、nginx负载均衡准备6、nginx静态资源服务器准备6、项目介绍1、技术架构2…

人工智能-集成学习

1、 集成学习算法介绍 1.1 什么是集成学习 集成学习通过建立几个模型来解决单一预测问题。工作原理&#xff1a;生成多个分类器/模型&#xff0c;各自独立地学习和做出预测。这些预测再结合成组合预测&#xff0c;因此由于任何一个单分类的预测。 1.2 机器学习的两个核心任…

优化RPC网络通信

文章目录什么是RPC通信RPCRPC框架SOARPC通信得重要性具体优化措施1.扩展其他RPC框架.2.选择合适的通信协议3.使用单一长连接4.优化Socket通信.5.高性能的序列化协议6.量身定做报文格式什么是RPC通信 RPC RPC&#xff08;Remote Process Call&#xff09;&#xff0c;即远程服…

算法训练 —— 链表(2)

目录 1. LeetCode24. 两两交换链表中的结点 2. LeetCode19. 删除链表的倒数第N个节点 3. LeetCode160.相交链表 4. LeetCode141.环形链表 5. LeetCode142.环形链表II 6. LeetCode138.复制带随机指针的链表 1. LeetCode24. 两两交换链表中的结点 两两交换链表中的结点 …

机器学习时间序列特征处理与构造,这篇我建议你收藏

数据和特征决定了机器学习的上限&#xff0c;而模型和算法只是逼近这个上限而已。由此可见&#xff0c;特征工程在机器学习中占有相当重要的地位。在实际应用当中&#xff0c;可以说特征工程是机器学习成功的关键。 那特征工程是什么&#xff1f; 特征工程是利用数据领域的相关…

vue3 antd项目实战——Form表单使用【v-model双向绑定数据,form表单嵌套input输入框、Radio单选框】

vue3 ant design vue项目实战——单选框&#xff08;Radio&#xff09;的使用以及Form表单的双向绑定知识调用&#xff08;form表单的源代码附在文章最后&#xff09;场景复现实现需求form表单整体架构的搭建input输入框文本域的嵌套单选组合Radio的嵌套button按钮组合的嵌套fo…

小米手机不为人知的秘密—后台静默安装任何应用

导读你是否拥有一台小米&#xff0c;HTC&#xff0c;三星或者是一加的 Android 手机呢&#xff1f;如果回答是肯定的&#xff0c;那么你应该意识到&#xff0c;几乎所有的智能手机厂商提供的定制 ROM&#xff0c;如 CyanogenMod、Paranoid Android、 MIUI 或者一些其它的 ROM 都…

再谈指针(12)

目录 1、字符指针 2、指针数组 3、数组指针 1、定义 2、&数组名VS数组名 3、数组指针的使用 1、二维数组的数组名 4、数组参数、指针参数 1、一维数组传参 2、二维数组传参 3、一级指针传参 4、二级指针传参 5、函数指针 6、函数指针数组 7、指向函数指针数…

SpringCloud之Sleuth全链路日志跟踪

文章目录1 Sleuth链路跟踪1.1 分布式系统面临的问题1.2 Sleuth是什么1.3 Zipkin是什么1.4 链路监控相关术语1.5 实战练习1.5.1 pom.xml1.5.2 添加yml配置1.5.3 添加控制器1.5.4 测试访问1.6 Zipkin1.6.1 下载与启动1.6.2 搭建链路监控步骤1.6.2.1 搭建8990提供者1.6.2.2 搭建89…

08 `.o`中的汇编信息 hopper disassembler 调试 HelloWorld

前言 上周[2020.05.23]想要 直接使用 fastdebug 版本的 jdk 来进行调试, 可惜失败了 原来是 缺少 可执行文件关联的, object file, 里面记录了 关联的源码的一些信息 看来还是 免不了, 需要 手动 编译 open jdk, 哎 本文主要是两个东西 : 1. 查看 object file 中的汇编信…

CSS权威指南(一)CSS概述

文章目录1.元素2.引入样式表3.样式表4.媒体查询5.特性查询1.元素 &#xff08;1&#xff09;置换元素和非置换元素 置换元素&#xff0c;指用来置换元素内容的部分不由文档内容直接表示。比如img标签。非置换元素&#xff0c;元素的内容是由用户代理在元素自身生成的框中显示…

这样的C盘或许还有?救救C盘......

C盘红了&#xff01;&#xff01;&#xff01; 大部分软件默认缓存在C盘&#xff08;有的甚至只能安装到C盘&#xff09; C盘太满电脑运行会很卡顿 对于这种情况&#xff0c;为了节约C盘空间&#xff0c;我们可以将这些被迫存在C盘的文件挪到其他盘 但是有的应用无法更改默…

C++ 显示图片

编译环境为codeblocks 20.03&#xff0c;编译器为mingw64非自带的版本&#xff08;版本号多少忘记了&#xff09; 头文件 #include <graphics.h>//图形库 #include <conio.h>//_getch() 显示图片代码 int main() {initgraph(640,360,EX_SHOWCONSOLE);//初始化绘…

我亲身经历的2022年软件质量工作——测试工作的经验总结及一些建议

2022年对于大部分人来说都是辛苦的一年。对于整个社会&#xff0c;疫情反反复复&#xff0c;折磨的每一个人都心力交瘁。 经济下滑&#xff0c;失业率上升似乎听到的都是不好的消息。对于整个互联网行业也频频传出大厂裁员的消息。 而质量团队在大厂的裁员计划里也是首当其冲。…

4)Django模型,表单,视图,路由

目录 一 Django模型 Django ORM 数据库配置 Django 如何使用 mysql 数据库 实例 定义模型 创建 APP 数据库操作 添加数据 获取数据 更新数据 删除数据 二 Django 表单 HTTP 请求 GET 方法 POST 方法 Request 对象 QueryDict对象 三 Django视图 视图层 请求…

Vault的程序侧接入方式-AppRole

前言&#xff1a; 程序侧的接入对于Vault来说也是一种Accessor的接入&#xff0c;而AppRole绝对不是Vault首推的程序侧接入方式&#xff0c;但它是最方便的接入方式。 AppRole的本质是由Vault为程序单独引入一套由Vault托管的鉴权方式&#xff0c;对于安全平台来说没引入一套…