2021-02-09 分類: 網(wǎng)站建設(shè)
在java.util.concurrent包中,有兩個(gè)很特殊的工具類,Condition和ReentrantLock,使用過(guò)的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨(dú)占鎖的實(shí)現(xiàn)。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),確切的說(shuō)是ReentrantLock的一個(gè)內(nèi)部類繼承了AbstractQueuedSynchronizer,ReentrantLock只不過(guò)是代理了該類的一些方法,可能有人會(huì)問(wèn)為什么要使用內(nèi)部類在包裝一層? 我想是安全的關(guān)系,因?yàn)锳bstractQueuedSynchronizer中有很多方法,還實(shí)現(xiàn)了共享鎖,Condition(稍候再細(xì)說(shuō))等功能,如果直接使ReentrantLock繼承它,則很容易出現(xiàn)AbstractQueuedSynchronizer中的API被無(wú)用的情況。
言歸正傳,今天,我們討論下Condition工具類的實(shí)現(xiàn)。
ReentrantLock和Condition的使用方式通常是這樣的:
public static void main(String[] args) { final ReentrantLock reentrantLock = new ReentrantLock(); final Condition condition = reentrantLock.newCondition(); Thread thread = new Thread((Runnable) () -> { try { reentrantLock.lock(); System.out.println("我要等一個(gè)新信號(hào)" + this); condition.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("拿到一個(gè)信號(hào)!!" + this); reentrantLock.unlock(); }, "waitThread1"); thread.start(); Thread thread1 = new Thread((Runnable) () -> { reentrantLock.lock(); System.out.println("我拿到鎖了"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signalAll(); System.out.println("我發(fā)了一個(gè)信號(hào)!!"); reentrantLock.unlock(); }, "signalThread"); thread1.start(); }
運(yùn)行后,結(jié)果如下:
我要等一個(gè)新信號(hào)lock.ReentrantLockTest$1@a62fc3我拿到鎖了我發(fā)了一個(gè)信號(hào)!!拿到一個(gè)信號(hào)!!
可以看到,
Condition的執(zhí)行方式,是當(dāng)在線程1中調(diào)用await方法后,線程1將釋放鎖,并且將自己沉睡,等待喚醒,
線程2獲取到鎖后,開(kāi)始做事,完畢后,調(diào)用Condition的signal方法,喚醒線程1,線程1恢復(fù)執(zhí)行。
以上說(shuō)明Condition是一個(gè)多線程間協(xié)調(diào)通信的工具類,使得某個(gè),或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被帶調(diào)用)時(shí) ,這些等待線程才會(huì)被喚醒,從而重新?tīng)?zhēng)奪鎖。
那,它是怎么實(shí)現(xiàn)的呢?
首先還是要明白,reentrantLock.newCondition() 返回的是Condition的一個(gè)實(shí)現(xiàn),該類在AbstractQueuedSynchronizer中被實(shí)現(xiàn),叫做newCondition()
public Condition newCondition() { return sync.newCondition(); }
它可以訪問(wèn)AbstractQueuedSynchronizer中的方法和其余內(nèi)部類(AbstractQueuedSynchronizer是個(gè)抽象類,至于他怎么能訪問(wèn),這里有個(gè)很奇妙的點(diǎn),后面我專門用demo說(shuō)明 )
現(xiàn)在,我們一起來(lái)看下Condition類的實(shí)現(xiàn),還是從上面的demo入手,
為了方便書(shū)寫(xiě),我將AbstractQueuedSynchronizer縮寫(xiě)為AQS
當(dāng)await被調(diào)用時(shí),代碼如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 將當(dāng)前線程包裝下后, // 添加到Condition自己維護(hù)的一個(gè)鏈表中。 int savedState = fullyRelease(node);// 釋放當(dāng)前線程占有的鎖,從demo中看到, // 調(diào)用await前,當(dāng)前線程是占有鎖的 int interruptMode = 0; while (!isOnSyncQueue(node)) {// 釋放完畢后,遍歷AQS的隊(duì)列,看當(dāng)前節(jié)點(diǎn)是否在隊(duì)列中, // 不在 說(shuō)明它還沒(méi)有競(jìng)爭(zhēng)鎖的資格,所以繼續(xù)將自己沉睡。 // 直到它被加入到隊(duì)列中,聰明的你可能猜到了, // 沒(méi)有錯(cuò),在singal的時(shí)候加入不就可以了? LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒后,重新開(kāi)始正式競(jìng)爭(zhēng)鎖,同樣,如果競(jìng)爭(zhēng)不到還是會(huì)將自己沉睡,等待喚醒重新開(kāi)始競(jìng)爭(zhēng)。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
回到上面的demo,鎖被釋放后,線程1開(kāi)始沉睡,這個(gè)時(shí)候線程因?yàn)榫€程1沉睡時(shí),會(huì)喚醒AQS隊(duì)列中的頭結(jié)點(diǎn),所所以線程2會(huì)開(kāi)始競(jìng)爭(zhēng)鎖,并獲取到,等待3秒后,線程2會(huì)調(diào)用signal方法,"發(fā)出"signal信號(hào),signal方法如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // firstWaiter為condition自己維護(hù)的一個(gè)鏈表的頭結(jié)點(diǎn), // 取出第一個(gè)節(jié)點(diǎn)后開(kāi)始喚醒操作 if (first != null) doSignal(first); }
說(shuō)明下,其實(shí)Condition內(nèi)部維護(hù)了等待隊(duì)列的頭結(jié)點(diǎn)和尾節(jié)點(diǎn),該隊(duì)列的作用是存放等待signal信號(hào)的線程,該線程被封裝為Node節(jié)點(diǎn)后存放于此。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
關(guān)鍵的就在于此,我們知道AQS自己維護(hù)的隊(duì)列是當(dāng)前等待資源的隊(duì)列,AQS會(huì)在資源被釋放后,依次喚醒隊(duì)列中從前到后的所有節(jié)點(diǎn),使他們對(duì)應(yīng)的線程恢復(fù)執(zhí)行。直到隊(duì)列為空。
而Condition自己也維護(hù)了一個(gè)隊(duì)列,該隊(duì)列的作用是維護(hù)一個(gè)等待signal信號(hào)的隊(duì)列,兩個(gè)隊(duì)列的作用是不同,事實(shí)上,每個(gè)線程也僅僅會(huì)同時(shí)存在以上兩個(gè)隊(duì)列中的一個(gè),流程是這樣的:
可以看到,整個(gè)協(xié)作過(guò)程是靠結(jié)點(diǎn)在AQS的等待隊(duì)列和Condition的等待隊(duì)列中來(lái)回移動(dòng)實(shí)現(xiàn)的,Condition作為一個(gè)條件類,很好的自己維護(hù)了一個(gè)等待信號(hào)的隊(duì)列,并在適時(shí)的時(shí)候?qū)⒔Y(jié)點(diǎn)加入到AQS的等待隊(duì)列中來(lái)實(shí)現(xiàn)的喚醒操作。
看到這里,signal方法的代碼應(yīng)該不難理解了。
取出頭結(jié)點(diǎn),然后doSignal
public final void signal() { if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } Node first = firstWaiter; if (first != null) { doSignal(first); } } private void doSignal(Node first) { do { if ((firstWaiter = first.nextWaiter) == null) // 修改頭結(jié)點(diǎn),完成舊頭結(jié)點(diǎn)的移出工作 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 將老的頭結(jié)點(diǎn),加入到AQS的等待隊(duì)列中 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or attempt * to set waitStatus fails, wake up to resync (in which case the * waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; // 如果該結(jié)點(diǎn)的狀態(tài)為cancel 或者修改waitStatus失敗,則直接喚醒。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
可以看到,正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個(gè)判斷是不會(huì)為true的,所以,不會(huì)在這個(gè)時(shí)候喚醒該線程。
只有到發(fā)送signal信號(hào)的線程調(diào)用reentrantLock.unlock()后因?yàn)樗呀?jīng)被加到AQS的等待隊(duì)列中,所以才會(huì)被喚醒。
標(biāo)題名稱:如何理解Java并發(fā)里的Condition
鏈接URL:http://vcdvsql.cn/news40/100090.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站策劃、自適應(yīng)網(wǎng)站、ChatGPT、虛擬主機(jī)、搜索引擎優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容