多核多线程已经成为当下一个时髦的话题,而无锁编程更是这个时髦话题中的热点话题。Linux内核可能是当今最大最复杂的并行程序之一,为我们分析多核多线程提供了绝佳的范例。内核设计者已经将最新的无锁编程技术带进了2。6系统内核中,本文以2。6。10版本为蓝本,带领您领略多核多线程编程的真谛,窥探无锁编程的奥秘,体味大师们的高雅设计!
非阻塞型同步(Non-blockingSynchronization)简介
如何正确有效的保护共享数据是编写并行程序必须面临的一个难题,通常的手段就是同步。同步可分为阻塞型同步(BlockingSynchronization)和非阻塞型同步(Non-blockingSynchronization)。
阻塞型同步是指当一个线程到达临界区时,因另外一个线程已经持有访问该共享数据的锁,从而不能获取锁资源而阻塞,直到另外一个线程释放锁。常见的同步原语有mutex、semaphore等。如果同步方案采用不当,就会造成死锁(deadlock),活锁(livelock)和优先级反转(priorityinversion),以及效率低下等现象。
为了降低风险程度和提高程序运行效率,业界提出了不采用锁的同步方案,依照这种设计思路设计的算法称为非阻塞型算法,其本质特征就是停止一个线程的执行不会阻碍系统中其他执行实体的运行。
当今比较流行的Non-blockingSynchronization实现方案有三种:
Wait-free
Wait-free是指任意线程的任何操作都可以在有限步之内结束,而不用关心其它线程的执行速度。Wait-free是基于per-thread的,可以认为是starvation-free的。非常遗憾的是实际情况并非如此,采用Wait-free的程序并不能保证starvation-free,同时内存消耗也随线程数量而线性增长。目前只有极少数的非阻塞算法实现了这一点。
Lock-free
Lock-Free是指能够确保执行它的所有线程中至少有一个能够继续往下执行。由于每个线程不是starvation-free的,即有些线程可能会被任意地延迟,然而在每一步都至少有一个线程能够往下执行,因此系统作为一个整体是在持续执行的,可以认为是system-wide的。所有Wait-free的算法都是Lock-Free的。
Obstruction-free
Obstruction-free是指在任何时间点,一个孤立运行线程的每一个操作可以在有限步之内结束。只要没有竞争,线程就可以持续运行。一旦共享数据被修改,Obstruction-free要求中止已经完成的部分操作,并进行回滚。所有Lock-Free的算法都是Obstruction-free的。
综上所述,不难得出Obstruction-free是Non-blockingsynchronization中性能最差的,而Wait-free性能是最好的,但实现难度也是最大的,因此Lock-free算法开始被重视,并广泛运用于当今正在运行的程序中,比如linux内核。
一般采用原子级的read-modify-write原语来实现Lock-Free算法,其中LL和SC是Lock-Free理论研究领域的理想原语,但实现这些原语需要CPU指令的支持,非常遗憾的是目前没有任何CPU直接实现了SC原语。根据此理论,业界在原子操作的基础上提出了著名的CAS(Compare-And-Swap)操作来实现Lock-Free算法,Intel实现了一条类似该操作的指令:cmpxchg8。
CAS原语负责将某处内存地址的值(1个字节)与一个期望值进行比较,如果相等,则将该内存地址处的值替换为新值,CAS操作伪码描述如下:
清单1。CAS伪码
BoolCAS(T*addr,Texpected,TnewValue)
{
if(*addr==expected)
{
*addr=newValue;
returntrue;
}
else
returnfalse;
}
在实际开发过程中,利用CAS进行同步,代码如下所示:
清单2。CAS实际操作
do{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS(内存地址,备份的旧数据,新数据))
就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出CAS操作是基于共享数据不会被修改的假设,采用了类似于数据库的commit-retry的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。
加锁的层级
根据复杂程度、加锁粒度及运行速度,可以得出如下图所示的锁层级:
图1。加锁层级
其中标注为红色字体的方案为Blockingsynchronization,黑色字体为Non-blockingsynchronization。Lock-based和Lockless-based两者之间的区别仅仅是加锁粒度的不同。图中最底层的方案就是大家经常使用的mutex和semaphore等方案,代码复杂度低,但运行效率也最低。
Linux内核中的无锁分析
Linux内核可能是当今最大最复杂的并行程序之一,它的并行主要来至于中断、内核抢占及SMP等。内核设计者们为了不断提高Linux内核的效率,从全局着眼,逐步废弃了大内核锁来降低锁的粒度;从细处下手,不断对局部代码进行优化,用无锁编程替代基于锁的方案,如seqlock及RCU等;不断减少锁冲突程度、降低等待时间,如Double-checkedlocking和原子锁等。
无论什么时候当临界区中的代码仅仅需要加锁一次,同时当其获取锁的时候必须是线程安全的,此时就可以利用Double-checkedLocking模式来减少锁竞争和加锁载荷。目前Double-checkedLocking已经广泛应用于单例(Singleton)模式中。内核设计者基于此思想,巧妙的将Double-checkedLocking方法运用于内核代码中。
当一个进程已经僵死,即进程处于TASK_ZOMBIE状态,如果父进程调用waitpid()系统调用时,父进程需要为子进程做一些清理性的工作,代码如下所示:
清单3。少锁操作
984staticintwait_task_zombie(task_t*p,intnoreap,
985structsiginfo__user*infop,
986int__user*stat_addr,structrusage__user*ru)
987{
……
1103if(p->real_parent!=p->parent){
1104write_lock_irq(&tasklist_lock);
1105/*Double-checkwithlockheld。*/
1106if(p->real_parent!=p->parent){
1107__ptrace_unlink(p);
1108//TODO:isthissafe?
1109p->exit_state=EXIT_ZOMBIE;
……
1120}
1121write_unlock_irq(&tasklist_lock);
1122}
……
1127}
如果将write_lock_irq放置于1103行之前,锁的范围过大,锁的负载也会加重,影响效率;如果将加锁的代码放到判断里面,且没有1106行的代码,程序会正确吗?在单核情况下是正确的,但在双核情况下问题就出现了。一个非主进程在一个CPU上运行,正准备调用exit退出,此时主进程在另外一个CPU上运行,在子进程调用release_task函数之前调用上述代码。子进程在exit_notify函数中,先持有读写锁tasklist_lock,调用forget_original_parent。主进程运行到1104处,由于此时子进程先持有该锁,所以父进程只好等待。在forget_original_parent函数中,如果该子进程还有子进程,则会调用reparent_thread(),将执行p->parent=p->real_parent;语句,导致两者相等,等非主进程释放读写锁tasklist_lock时,另外一个CPU上的主进程被唤醒,一旦开始执行,继续运行将会导致bug。
严格的说,Double-checkedlocking不属于无锁编程的范畴,但由原来的每次加锁访问到大多数情况下无须加锁,就是一个巨大的进步。同时从这里也可以看出一点端倪,内核开发者为了降低锁冲突率,减少等待时间,提高运行效率,一直在持续不断的进行改进。
原子操作可以保证指令以原子的方式执行——执行过程不被打断。内核提供了两组原子操作接口:一组针对于整数进行操作,另外一组针对于单独的位进行操作。内核中的原子操作通常是内联函数,一般是通过内嵌汇编指令来完成。对于一些简单的需求,例如全局统计、引用计数等等,可以归结为是对整数的原子计算。
1。Lock-free应用场景一——SpinLock
SpinLock是一种轻量级的同步方法,一种非阻塞锁。当lock操作被阻塞时,并不是把自己挂到一个等待队列,而是死循环CPU空转等待其他线程释放锁。Spinlock锁实现代码如下:
清单4。spinlock实现代码
staticinlinevoid__preempt_spin_lock(spinlock_t*lock)
{
……
do{
preempt_enable();
while(spin_is_locked(lock))
cpu_relax();
preempt_disable();
}while(!_raw_spin_trylock(lock));
}
staticinlineint_raw_spin_trylock(spinlock_t*lock)
{
charoldval;
__asm____volatile__(
"xchgb%b0,%1"
:"=q"(oldval),"=m"(lock->lock)
:"0"(0):"memory");
returnoldval>0;
}
汇编语言指令xchgb原子性的交换8位oldval(存0)和lock->lock的值,如果oldval为1(lock初始值为1),则获取锁成功,反之,则继续循环,接着relax休息一会儿,然后继续周而复始,直到成功。
对于应用程序来说,希望任何时候都能获取到锁,也就是期望lock->lock为1,那么用CAS原语来描述_raw_spin_trylock(lock)就是CAS(lock->lock,1,0);
如果同步操作总是能在数条指令内完成,那么使用SpinLock会比传统的mutexlock快一个数量级。SpinLock多用于多核系统中,适合于锁持有时间小于将一个线程阻塞和唤醒所需时间的场合。
pthread库已经提供了对spinlock的支持,所以用户态程序也能很方便的使用spinlock了,需要包含pthread。h。在某些场景下,pthread_spin_lock效率是pthread_mutex_lock效率的一倍多。美中不足的是,内核实现了读写spinlock锁,但pthread未能实现。
2。Lock-free应用场景二——Seqlock
手表最主要最常用的功能是读时间,而不是校正时间,一旦后者成了最常用的功能,消费者肯定不会买账。计算机的时钟也是这个功能,修改时间是小概率事件,而读时间是经常发生的行为。以下代码摘自2。4。34内核:
清单5。2。4。34seqlock实现代码
443voiddo_gettimeofday(structtimeval*tv)
444{
……
448read_lock_irqsave(&xtime_lock,flags);
……
455sec=xtime。tv_sec;
456usec+=xtime。tv_usec;
457read_unlock_irqrestore(&xtime_lock,flags);
……
466}
468voiddo_settimeofday(structtimeval*tv)
469{
470write_lock_irq(&xtime_lock);
……
490write_unlock_irq(&xtime_lock);
491}
不难发现获取时间和修改时间采用的是spinlock读写锁,读锁和写锁具有相同的优先级,只要读持有锁,写锁就必须等待,反之亦然。
Linux2。6内核中引入一种新型锁——顺序锁(seqlock),它与spinlock读写锁非常相似,只是它为写者赋予了较高的优先级。也就是说,即使读者正在读的时候也允许写者继续运行。当存在多个读者和少数写者共享一把锁时,seqlock便有了用武之地,因为seqlock对写者更有利,只要没有其他写者,写锁总能获取成功。根据lock-free和时钟功能的思想,内核开发者在2。6内核中,将上述读写锁修改成了顺序锁seqlock,代码如下:
清单6。2。6。10seqlock实现代码
staticinlineunsignedread_seqbegin(constseqlock_t*sl)
{
unsignedret=sl->sequence;
smp_rmb();
returnret;
}
staticinlineintread_seqretry(constseqlock_t*sl,unsignediv)
{
smp_rmb();
return(iv&1)|(sl->sequence^iv);
}
staticinlinevoidwrite_seqlock(seqlock_t*sl)
{
spin_lock(&sl->lock);
++sl->sequence;
smp_wmb();
}
voiddo_gettimeofday(structtimeval*tv)
{
unsignedlongseq;
unsignedlongusec,sec;
unsignedlongmax_ntp_tick;
……
do{
unsignedlonglost;
seq=read_seqbegin(&xtime_lock);
……
sec=xtime。tv_sec;
usec+=(xtime。tv_nsec/1000);
}while(read_seqretry(&xtime_lock,seq));
……
tv->tv_sec=sec;
tv->tv_usec=usec;
}
intdo_settimeofday(structtimespec*tv)
{
……
write_seqlock_irq(&xtime_lock);
……
write_sequnlock_irq(&xtime_lock);
clock_was_set();
return0;
}
Seqlock实现原理是依赖一个序列计数器,当写者写入数据时,会得到一把锁,并且将序列值加1。当读者读取数据之前和之后,该序列号都会被读取,如果读取的序列号值都相同,则表明写没有发生。反之,表明发生过写事件,则放弃已进行的操作,重新循环一次,直至成功。不难看出,do_gettimeofday函数里面的while循环和接下来的两行赋值操作就是CAS操作。
采用顺序锁seqlock好处就是写者永远不会等待,缺点就是有些时候读者不得不反复多次读相同的数据直到它获得有效的副本。当要保护的临界区很小,很简单,频繁读取而写入很少发生(WRRM---WriteRarelyReadMostly)且必须快速时,就可以使用seqlock。但seqlock不能保护包含有指针的数据结构,因为当写者修改数据结构时,读者可能会访问一个无效的指针。
3。Lock-free应用场景三——RCU
在2。6内核中,开发者还引入了一种新的无锁机制-RCU(Read-Copy-Update),允许多个读者和写者并发执行。RCU技术的核心是写操作分为写和更新两步,允许读操作在任何时候无阻碍的运行,换句话说,就是通过延迟写来提高同步性能。RCU主要应用于WRRM场景,但它对可保护的数据结构做了一些限定:RCU只保护被动态分配并通过指针引用的数据结构,同时读写控制路径不能有睡眠。以下数组动态增长代码摘自2。4。34内核:
清单7。2。4。34RCU实现代码
其中ipc_lock是读者,grow_ary是写者,不论是读或者写,都需要加spinlock对被保护的数据结构进行访问。改变数组大小是小概率事件,而读取是大概率事件,同时被保护的数据结构是指针,满足RCU运用场景。以下代码摘自2。6。10内核:
清单8。2。6。10RCU实现代码
#definercu_read_lock()preempt_disable()
#definercu_read_unlock()preempt_enable()
#definercu_assign_pointer(p,v)({
smp_wmb();
(p)=(v);
})
structkern_ipc_perm*ipc_lock(structipc_ids*ids,intid)
{
……
rcu_read_lock();
entries=rcu_dereference(ids->entries);
if(lid>=entries->size){
rcu_read_unlock();
returnNULL;
}
out=entries->p[lid];
if(out==NULL){
rcu_read_unlock();
returnNULL;
}
……
returnout;
}
staticintgrow_ary(structipc_ids*ids,intnewsize)
{
structipc_id_ary*new;
structipc_id_ary*old;
……
new=ipc_rcu_alloc(sizeof(structkern_ipc_perm*)*newsize+
sizeof(structipc_id_ary));
if(new==NULL)
returnsize;
new->size=newsize;
memcpy(new->p,ids->entries->p,sizeof(structkern_ipc_perm*)*size
+sizeof(structipc_id_ary));
for(i=size;inew->p[i]=NULL;
}
old=ids->entries;
/*
*Usercu_assign_pointer()tomakesurethememcpyedcontents
*ofthenewarrayarevisiblebeforethenewarraybecomesvisible。
*/
rcu_assign_pointer(ids->entries,new);
ipc_rcu_putref(old);
returnnewsize;
}
纵观整个流程,写者除内核屏障外,几乎没有一把锁。当写者需要更新数据结构时,首先复制该数据结构,申请new内存,然后对副本进行修改,调用memcpy将原数组的内容拷贝到new中,同时对扩大的那部分赋新值,修改完毕后,写者调用rcu_assign_pointer修改相关数据结构的指针,使之指向被修改后的新副本,整个写操作一气呵成,其中修改指针值的操作属于原子操作。在数据结构被写者修改后,需要调用内存屏障smp_wmb,让其他CPU知晓已更新的指针值,否则会导致SMP环境下的bug。当所有潜在的读者都执行完成后,调用call_rcu释放旧副本。同Spinlock一样,RCU同步技术主要适用于SMP环境。
环形缓冲区是生产者和消费者模型中常用的数据结构。生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部。
如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(RingBuffer)。写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构。同理,读取索引也只允许消费者访问并修改。
图2。环形缓冲区实现原理图
如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面时,表明缓冲区已满。
清单9。2。6。10环形缓冲区实现代码
/*
*__kfifo_put-putssomedataintotheFIFO,nolockingversion
*Notethatwithonlyoneconcurrentreaderandoneconcurrent
*writer,youdon'tneedextralockingtousethesefunctions。
*/
unsignedint__kfifo_put(structkfifo*fifo,
unsignedchar*buffer,unsignedintlen)
{
unsignedintl;
len=min(len,fifo->size-fifo->in+fifo->out);
/*firstputthedatastartingfromfifo->intobufferend*/
l=min(len,fifo->size-(fifo->in&(fifo->size-1)));
memcpy(fifo->buffer+(fifo->in&(fifo->size-1)),buffer,l);
/*thenputtherest(ifany)atthebeginningofthebuffer*/
memcpy(fifo->buffer,buffer+l,len-l);
fifo->in+=len;
returnlen;
}
/*
*__kfifo_get-getssomedatafromtheFIFO,nolockingversion
*Notethatwithonlyoneconcurrentreaderandoneconcurrent
*writer,youdon'tneedextralockingtousethesefunctions。
*/
unsignedint__kfifo_get(structkfifo*fifo,
unsignedchar*buffer,unsignedintlen)
{
unsignedintl;
len=min(len,fifo->in-fifo->out);
/*firstgetthedatafromfifo->outuntiltheendofthebuffer*/
l=min(len,fifo->size-(fifo->out&(fifo->size-1)));
memcpy(buffer,fifo->buffer+(fifo->out&(fifo->size-1)),l);
/*thengettherest(ifany)fromthebeginningofthebuffer*/
memcpy(buffer+l,fifo->buffer,len-l);
fifo->out+=len;
returnlen;
}
以上代码摘自2。6。10内核,通过代码的注释(斜体部分)可以看出,当只有一个消费者和一个生产者时,可以不用添加任何额外的锁,就能达到对共享数据的访问。
总结
通过对比2。4和2。6内核代码,不得不佩服内核开发者的智慧,为了提高内核性能,一直不断的进行各种优化,并将业界最新的lock-free理念运用到内核中。
在实际开发过程中,进行无锁设计时,首先进行场景分析,因为每种无锁方案都有特定的应用场景,接着根据场景分析进行数据结构的初步设计,然后根据先前的分析结果进行并发模型建模,最后在调整数据结构的设计,以便达到最优。