close

點擊關注公眾號,實用技術文章及時了解


文章目錄
CAS
什麼是 CAS?
CAS 的應用
CAS 的缺點
AQS
什麼是 AQS?
AQS 的應用
AQS 實現原理淺析
CAS什麼是 CAS?

CAS(Compare And Swap),即比較並交換,是解決多線程並行情況下使用鎖造成性能損耗的一種機制,CAS 操作包含三個操作數——內存位置V、預期原值A和新值B。如果內存位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值;否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該位置的值。

CAS 有效地說明了「我認為位置V應該包含值A,如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。」在 Java 中,sun.misc.Unsafe類提供了硬件級別的原子操作來實現這個 CAS,java.util.concurrent包下的大量類都使用了這個Unsafe類的 CAS 操作。

CAS 的應用

java.util.concurrent.atomic包下的類大多是使用 CAS 操作來實現的,如AtomicInteger、AtomicBoolean和AtomicLong等。下面以AtomicInteger的部分實現來大致講解下這些原子類的實現。

publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{privatestaticfinallongserialVersionUID=6214790243416807050L;//setuptouseUnsafe.compareAndSwapIntforupdatesprivatestaticfinalUnsafeunsafe=Unsafe.getUnsafe();privatevolatileintvalue;//初始int大小//省略了部分代碼...//帶參數構造函數,可設置初始int大小publicAtomicInteger(intinitialValue){value=initialValue;}//不帶參數構造函數,初始int大小為0publicAtomicInteger(){}//獲取當前值publicfinalintget(){returnvalue;}//設置值為newValuepublicfinalvoidset(intnewValue){value=newValue;}//返回舊值,並設置新值為newValuepublicfinalintgetAndSet(intnewValue){/***這裡使用for循環不斷通過CAS操作來設置新值*CAS實現和加鎖實現的關係有點類似樂觀鎖和悲觀鎖的關係**/for(;;){intcurrent=get();if(compareAndSet(current,newValue))returncurrent;}}//原子的設置新值為update,expect為期望的當前的值publicfinalbooleancompareAndSet(intexpect,intupdate){returnunsafe.compareAndSwapInt(this,valueOffset,expect,update);}//獲取當前值current,並設置新值為current+1publicfinalintgetAndIncrement(){for(;;){intcurrent=get();intnext=current+1;if(compareAndSet(current,next))returncurrent;}}//此處省略部分代碼,餘下的代碼大致實現原理都是類似的}

一般來說,在競爭不是特別激烈的時候,使用該包下的原子操作性能比使用synchronized關鍵字的方式高效的多。通過查看getAndSet()方法,可知如果資源競爭十分激烈的話,這個for循環可能換持續很久都不能成功跳出。

在這種情況下,我們可能需要考慮如何降低對資源的競爭。在較多的場景下,我們可能會使用到這些原子類操作。一個典型應用就是計數,在多線程的情況下需要考慮線程安全問題,示例代碼如下:

publicclassCounter{privateintcount;publicCounter(){}publicintgetCount(){returncount;}publicvoidincrease(){count++;}}

上面這個類在多線程環境下會有線程安全問題,要解決這個問題最簡單的方式可能就是加鎖,優化代碼如下:

publicclassCounter{privateintcount;publicCounter(){}publicsynchronizedintgetCount(){returncount;}publicsynchronizedvoidincrease(){count++;}}

這是悲觀鎖的實現,如果我們需要獲取這個資源,那麼我們就給它加鎖,其他線程都無法訪問該資源,直到我們操作完後釋放對該資源的鎖。我們知道,悲觀鎖的效率是不如樂觀鎖的,上面說了atomic包下的原子類的實現是樂觀鎖方式,因此其效率會比使用synchronized關鍵字更高一些,推薦使用這種方式,代碼如下:

publicclassCounter{privateAtomicIntegercount=newAtomicInteger();publicCounter(){}publicintgetCount(){returncount.get();}publicvoidincrease(){count.getAndIncrement();}}CAS 的缺點

CAS 雖然能夠很高效的實現原子操作,但是 CAS 仍然存在三大問題。

ABA 問題

因為 CAS 需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用 CAS 進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA 問題的解決思路就是使用版本號,在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A就會變成1A-2B-3A。

從 Java 1.5 開始 JDK 的atomic包里提供了一個類AtomicStampedReference來解決 ABA 問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設置為給定的更新值。

循環時間長開銷大

CAS 自旋如果長時間不成功,會給 CPU 帶來非常大的執行開銷。如果 JVM 能支持處理器提供的pause指令那麼效率會有一定的提升,pause指令有兩個作用,一是它可以延遲流水線執行指令,使 CPU 不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零;二是它可以避免在退出循環的時候因內存順序衝突而引起 CPU 流水線被清空,從而提高 CPU 的執行效率。

只能保證一個共享變量的原子操作

當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就需要用鎖,或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操作。

比如有兩個共享變量i=2,j=a,合併一下ij=2a,然後用 CAS 來操作ij。從 Java 1.5 開始 JDK 提供了AtomicReference類來保證引用對象之間的原子性,我們可以把多個變量放在一個對象里來進行 CAS 操作。

AQS什麼是 AQS?

AQS(AbstractQueuedSynchronizer),即抽象隊列同步器,是 JDK 下提供的一套用於實現基於 FIFO 等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。

如果我們看過類似CountDownLatch類的源碼實現,會發現其內部有一個繼承了AbstractQueuedSynchronizer的內部類Sync。可見CountDownLatch是基於 AQS 框架來實現的一個同步器,類似的同步器在 JUC 下還有不少,如Semaphore等。

AQS 的應用

如上所述,AQS 管理一個關於狀態信息的單一整數,該整數可以表現任何狀態。比如,Semaphore用它來表現剩餘的許可數,ReentrantLock用它來表現擁有它的線程已經請求了多少次鎖;FutureTask用它來表現任務的狀態等。

/*Tousethisclassasthebasisofasynchronizer,redefinethe*followingmethods,asapplicable,byinspectingand/ormodifying*thesynchronizationstateusing{@link#getState},{@link*#setState}and/or{@link#compareAndSetState}:**<ul>*<li>{@link#tryAcquire}*<li>{@link#tryRelease}*<li>{@link#tryAcquireShared}*<li>{@link#tryReleaseShared}*<li>{@link#isHeldExclusively}*</ul>*/

如 JDK 的文檔中所說,使用 AQS 來實現一個同步器需要覆蓋實現如下幾個方法,並且使用getState、setState和compareAndSetState這三個方法來操作狀態。

booleantryAcquire(intarg)booleantryRelease(intarg)inttryAcquireShared(intarg)booleantryReleaseShared(intarg)booleanisHeldExclusively()

以上方法不需要全部實現,根據獲取的鎖的種類可以選擇實現不同的方法,支持獨占(排他)獲取鎖的同步器應該實現tryAcquire、 tryRelease、isHeldExclusively;而支持共享獲取的同步器應該實現tryAcquireShared、tryReleaseShared、isHeldExclusively。

下面以CountDownLatch舉例說明基於 AQS 實現同步器,CountDownLatch用同步狀態持有當前計數,countDown方法調用 release從而導致計數器遞減;當計數器為 0 時,解除所有線程的等待;await調用acquire,如果計數器為 0,acquire會立即返回,否則阻塞。通常用於某任務需要等待其他任務都完成後才能繼續執行的情景。源碼如下:

publicclassCountDownLatch{/***基於AQS的內部Sync*使用AQS的state來表示計數count.*/privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID=4982264981922014374L;Sync(intcount){//使用AQS的getState()方法設置狀態setState(count);}intgetCount(){//使用AQS的getState()方法獲取狀態returngetState();}//覆蓋在共享模式下嘗試獲取鎖protectedinttryAcquireShared(intacquires){//這裡用狀態state是否為0來表示是否成功,為0的時候可以獲取到返回1,否則不可以返回-1return(getState()==0)?1:-1;}//覆蓋在共享模式下嘗試釋放鎖protectedbooleantryReleaseShared(intreleases){//在for循環中Decrementcount直至成功;//當狀態值即count為0的時候,返回false表示signalwhentransitiontozerofor(;;){intc=getState();if(c==0)returnfalse;intnextc=c-1;if(compareAndSetState(c,nextc))returnnextc==0;}}}privatefinalSyncsync;//使用給定計數值構造CountDownLatchpublicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");this.sync=newSync(count);}//讓當前線程阻塞直到計數count變為0,或者線程被中斷publicvoidawait()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}//阻塞當前線程,除非count變為0或者等待了timeout的時間。當count變為0時,返回truepublicbooleanawait(longtimeout,TimeUnitunit)throwsInterruptedException{returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));}//count遞減publicvoidcountDown(){sync.releaseShared(1);}//獲取當前count值publiclonggetCount(){returnsync.getCount();}publicStringtoString(){returnsuper.toString()+"[Count="+sync.getCount()+"]";}}AQS 實現原理淺析

AQS 的實現主要在於維護一個volatile int state(代表共享資源)和一個 FIFO 線程等待隊列(多線程爭用資源被阻塞時會進入此隊列,此隊列稱之為CLH隊列)。CLH 隊列中的每個節點是對線程的一個封裝,包含線程基本信息,狀態,等待的資源類型等。

CLH結構如下:

*+------+prev+-----++-----+*head||<----||<----||tail*+------++-----++-----+

下面簡單看下獲取資源的代碼:

publicfinalvoidacquire(intarg){//首先嘗試獲取,不成功的話則將其加入到等待隊列,再for循環獲取if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}//從clh中選一個線程獲取占用資源finalbooleanacquireQueued(finalNodenode,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){//當節點的先驅是head的時候,就可以嘗試獲取占用資源了tryAcquirefinalNodep=node.predecessor();if(p==head&&tryAcquire(arg)){//如果獲取到資源,則將當前節點設置為頭節點headsetHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}//如果獲取失敗的話,判斷是否可以休息,可以的話就進入waiting狀態,直到被unpark()if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}privateNodeaddWaiter(Nodemode){//封裝當前線程和模式為新的節點,並將其加入到隊列中Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;if(pred!=null){node.prev=pred;if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}enq(node);returnnode;}privateNodeenq(finalNodenode){for(;;){Nodet=tail;if(t==null){//tail為null,說明還沒初始化,此時需進行初始化工作if(compareAndSetHead(newNode()))tail=head;}else{//否則的話,將當前線程節點作為tail節點加入到CLH中去node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}

參考資料:

ava並發編程——CAS
ava並發之AQS詳解
AVA並發編程: CAS和AQS

感謝閱讀,希望對你有所幫助:)

來源:guobinhit.blog.csdn.net/article/details/106592138

推薦

Java面試題寶典

技術內卷群,一起來學習!!

PS:因為公眾號平台更改了推送規則,如果不想錯過內容,記得讀完點一下「在看」,加個「星標」,這樣每次新文章推送才會第一時間出現在你的訂閱列表里。點「在看」支持我們吧!

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()