rcu参考资料:
https://airekans.github.io/c/2016/05/10/dive-into-liburcu
https://lwn.net/Articles/262464/
https://cloud.tencent.com/developer/article/1684477
https://www.cnblogs.com/LoyenWang/p/12681494.html
userspace rcu:
https://github.com/urcu/userspace-rcu
内核rcu实现不太一样,但是原理类似
rcu的原理:
rcu会维护一个全局的数据结构rcu_gp
struct rcu_gp {
unsigned long ctr;
int32_t futex;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
extern struct rcu_gp rcu_gp;
他是通过ctr成员变量来实现一写多读操作
假设一个写线程,4个读线程
读线程ctr可能的值:
读线程1:rcu_register_thread,会获取ctr的值,假设此时为10
读线程2:rcu_register_thread,会获取ctr的值,假设此时为11(可能发生在synchronize_rcu之后,获取到了新的值)
读线程3:rcu_register_thread,会获取ctr的值,假设此时为10
读线程4:rcu_register_thread,会获取ctr的值,假设此时为10
rcu_quiescent_state:
1、如果将全局g_ctr的值等于读线程的ctr值,直接唤醒写线程(写线程等待的读线程数减一)
2、不相等的话直接把全局g_ctr的值赋值给读线程的ctr
/*
* This is a helper function for _rcu_quiescent_state().
* The first cmm_smp_mb() ensures memory accesses in the prior read-side
* critical sections are not reordered with store to
* URCU_TLS(urcu_qsbr_reader).ctr, and ensures that mutexes held within an
* offline section that would happen to end with this
* urcu_qsbr_quiescent_state() call are not reordered with
* store to URCU_TLS(urcu_qsbr_reader).ctr.
*/
static inline void _urcu_qsbr_quiescent_state_update_and_wakeup(unsigned long gp_ctr)
{
cmm_smp_mb();
_CMM_STORE_SHARED(URCU_TLS(urcu_qsbr_reader).ctr, gp_ctr);
cmm_smp_mb(); /* write URCU_TLS(urcu_qsbr_reader).ctr before read futex */
urcu_qsbr_wake_up_gp();
cmm_smp_mb();
}
/*
* Inform RCU of a quiescent state.
*
* This function is less than 10 lines long. The intent is that this
* function meets the 10-line criterion for LGPL, allowing this function
* to be invoked directly from non-LGPL code.
*
* We skip the memory barriers and gp store if our local ctr already
* matches the global urcu_qsbr_gp.ctr value: this is OK because a prior
* _rcu_quiescent_state() or _rcu_thread_online() already updated it
* within our thread, so we have no quiescent state to report.
*/
static inline void _urcu_qsbr_quiescent_state(void)
{
unsigned long gp_ctr;
urcu_assert_debug(URCU_TLS(urcu_qsbr_reader).registered);
if ((gp_ctr = CMM_LOAD_SHARED(urcu_qsbr_gp.ctr)) == URCU_TLS(urcu_qsbr_reader).ctr)
return;
_urcu_qsbr_quiescent_state_update_and_wakeup(gp_ctr);
}
写线程1:synchronize_rcu会将ctr的值加1,ctr的值为11,并且会等待所有读线程的值都为11(即所有读线程都执行完一次rcu_quiescent_state)才会执行下一个步骤
所以可以理解为临界区间在rcu_register_thread开始,到rcu_quiescent_state结束,写线程可以继续执行
rcu锁example:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <urcu-qsbr.h>
struct foo
{
int a;
int b;
int c;
};
static struct foo *g_foo;
void g_foo_init()
{
g_foo = (struct foo *)malloc(sizeof(struct foo));
g_foo->a = 1;
g_foo->b = 1;
g_foo->c = 1;
}
void g_foo_release()
{
free(g_foo);
g_foo = NULL;
}
void do_something(struct foo *l_foo)
{
int i = 0;
int j = 0;
long sum = 0;
for (i = 0; i < 10000; i++) {
for (j = 0; j < 100000; j++) {
// 加锁或者不加锁都ok,rcu_read_lock内部实现为NULL
// rcu_read_lock();
if (l_foo) {
sum = sum + l_foo->a + l_foo->b + l_foo->c;
}
// rcu_read_unlock();
}
}
printf("sum:%ld\n", sum);
}
void * reader_thread()
{
rcu_register_thread();
struct foo *l_foo = rcu_dereference(g_foo);
if (l_foo) {
do_something(l_foo);
}
rcu_quiescent_state();
rcu_unregister_thread();
}
void * writer_thread()
{
struct foo *old_foo = g_foo;
struct foo *new_foo = (struct foo *)malloc(sizeof(struct foo));
new_foo->a = 10;
new_foo->b = 11;
new_foo->c = 12;
rcu_xchg_pointer(&g_foo, new_foo);
synchronize_rcu();
if (old_foo) {
free(old_foo);
}
}
/**
* rcu,一写多读需要遵循的规则
*
* 读线程:
* 1、rcu_register_thread 将改线程加入rcu链表,synchronize_rcu判断的时候会用到
* 2、rcu_unregister_thread 将改线程从rcu链表中移除
* 3、rcu_quiescent_state 读线程主动通知写线程已经结束一批临界区
* 4、rcu_dereference 读线程获取被保护的共享指针需要用该API
* 5、rcu_read_lock/rcu_read_unlock 可选API,实际什么都没做
*
* 写线程:
* 1、rcu_xchg_pointer 写线程更新指针需要用到该API
* 2、synchronize_rcu 等待Grace Period结束(所有读线程都已经调用过rcu_quiescent_state)
*/
int main()
{
pthread_t reader1, reader2, reader3, reader4, writer;
g_foo_init();
pthread_create(&writer, NULL, writer_thread, NULL);
pthread_create(&reader1, NULL, reader_thread, NULL);
pthread_create(&reader2, NULL, reader_thread, NULL);
pthread_create(&reader3, NULL, reader_thread, NULL);
pthread_create(&reader4, NULL, reader_thread, NULL);
pthread_join(writer, NULL);
pthread_join(reader1, NULL);
pthread_join(reader2, NULL);
pthread_join(reader3, NULL);
pthread_join(reader4, NULL);
g_foo_release();
return 0;
}
运行结果:
从上图也可以看出来,rcu锁并不保证每次读到的数据到底是旧数据还是新数据,但是会保证旧数据和新数据都能正常访问到