通過前面三篇的分析,我們深入了解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步隊列和條件隊列。我們還是拿公共廁所做比喻,同步隊列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這里排隊。而條件隊列主要是為條件等待設置的,我們想象一下如果一個人通過排隊終于成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入條件隊列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之后它又得重新回到同步隊列中去排隊。當然進入房間的人并不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去條件隊列中去排隊,所以條件隊列可以有多個,依不同的等待條件而設置不同的條件隊列。條件隊列是一條單向鏈表,Condition接口定義了條件隊列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition接口,下面我們看看Condition接口都定義了哪些操作。
成都創新互聯專業為企業提供鶴山網站建設、鶴山做網站、鶴山網站設計、鶴山網站制作等企業網站建設、網頁設計與制作、鶴山企業網站模板建站服務,10余年鶴山做網站經驗,不只是建網站,更提供有價值的思路和整體網絡服務。
public interface Condition { //響應線程中斷的條件等待 void await() throws InterruptedException; //不響應線程中斷的條件等待 void awaitUninterruptibly(); //設置相對時間的條件等待(不進行自旋) long awaitNanos(long nanosTimeout) throws InterruptedException; //設置相對時間的條件等待(進行自旋) boolean await(long time, TimeUnit unit) throws InterruptedException; //設置絕對時間的條件等待 boolean awaitUntil(Date deadline) throws InterruptedException; //喚醒條件隊列中的頭結點 void signal(); //喚醒條件隊列的所有結點 void signalAll(); }
Condition接口雖然定義了這么多方法,但總共就分為兩類,以await開頭的是線程進入條件隊列等待的方法,以signal開頭的是將條件隊列中的線程“喚醒”的方法。這里要注意的是,調用signal方法可能喚醒線程也可能不會喚醒線程,什么時候會喚醒線程這得看情況,后面會講到,但是調用signal方法一定會將線程從條件隊列中移到同步隊列尾部。這里為了敘述方便,我們先暫時不糾結這么多,統一稱signal方法為喚醒條件隊列線程的操作。大家注意看一下,await方法分為5種,分別是響應線程中斷等待,不響應線程中斷等待,設置相對時間不自旋等待,設置相對時間自旋等待,設置絕對時間等待;signal方法只有2種,分別是只喚醒條件隊列頭結點和喚醒條件隊列所有結點的操作。同一類的方法基本上是相通的,由于篇幅所限,我們不可能也不需要將這些方法全部仔細的講到,只需要將一個代表方法搞懂了再看其他方法就能夠觸類旁通。所以在本文中我只會細講await方法和signal方法,其他方法不細講但會貼出源碼來以供大家參考。
1. 響應線程中斷的條件等待
//響應線程中斷的條件等待 public final void await() throws InterruptedException { //如果線程被中斷則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //將當前線程添加到條件隊列尾部 Node node = addConditionWaiter(); //在進入條件等待之前先完全釋放鎖 int savedState = fullyRelease(node); int interruptMode = 0; //線程一直在while循環里進行條件等待 while (!isOnSyncQueue(node)) { //進行條件等待的線程都在這里被掛起, 線程被喚醒的情況有以下幾種: //1.同步隊列的前繼結點已取消 //2.設置同步隊列的前繼結點的狀態為SIGNAL失敗 //3.前繼結點釋放鎖后喚醒當前結點 LockSupport.park(this); //當前線程醒來后立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //線程醒來后就會以獨占模式獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //這步操作主要為防止線程在signal之前中斷而導致沒與條件隊列斷絕聯系 if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //根據中斷模式進行響應的中斷處理 if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } }
當線程調用await方法的時候,首先會將當前線程包裝成node結點放入條件隊列尾部。在addConditionWaiter方法中,如果發現條件隊列尾結點已取消就會調用unlinkCancelledWaiters方法將條件隊列所有的已取消結點清空。這步操作是插入結點的準備工作,那么確保了尾結點的狀態也是CONDITION之后,就會新建一個node結點將當前線程包裝起來然后放入條件隊列尾部。注意,這個過程只是將結點添加到同步隊列尾部而沒有掛起線程哦。
第二步:完全將鎖釋放
//完全釋放鎖 final int fullyRelease(Node node) { boolean failed = true; try { //獲取當前的同步狀態 int savedState = getState(); //使用當前的同步狀態去釋放鎖 if (release(savedState)) { failed = false; //如果釋放鎖成功就返回當前同步狀態 return savedState; } else { //如果釋放鎖失敗就拋出運行時異常 throw new IllegalMonitorStateException(); } } finally { //保證沒有成功釋放鎖就將該結點設置為取消狀態 if (failed) { node.waitStatus = Node.CANCELLED; } } }
將當前線程包裝成結點添加到條件隊列尾部后,緊接著就調用fullyRelease方法釋放鎖。注意,方法名為fullyRelease也就這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會拋出一個運行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。
第三步:進行條件等待
//線程一直在while循環里進行條件等待 while (!isOnSyncQueue(node)) { //進行條件等待的線程都在這里被掛起, 線程被喚醒的情況有以下幾種: //1.同步隊列的前繼結點已取消 //2.設置同步隊列的前繼結點的狀態為SIGNAL失敗 //3.前繼結點釋放鎖后喚醒當前結點 LockSupport.park(this); //當前線程醒來后立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //檢查條件等待時的線程中斷情況 private int checkInterruptWhileWaiting(Node node) { //中斷請求在signal操作之前:THROW_IE //中斷請求在signal操作之后:REINTERRUPT //期間沒有收到任何中斷請求:0 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //將取消條件等待的結點從條件隊列轉移到同步隊列中 final boolean transferAfterCancelledWait(Node node) { //如果這步CAS操作成功的話就表明中斷發生在signal方法之前 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //狀態修改成功后就將該結點放入同步隊列尾部 enq(node); return true; } //到這里表明CAS操作失敗, 說明中斷發生在signal方法之后 while (!isOnSyncQueue(node)) { //如果sinal方法還沒有將結點轉移到同步隊列, 就通過自旋等待一下 Thread.yield(); } return false; }
在以上兩個操作完成了之后就會進入while循環,可以看到while循環里面首先調用LockSupport.park(this)將線程掛起了,所以線程就會一直在這里阻塞。在調用signal方法后僅僅只是將結點從條件隊列轉移到同步隊列中去,至于會不會喚醒線程需要看情況。如果轉移結點時發現同步隊列中的前繼結點已取消,或者是更新前繼結點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒線程,否則的話在signal方法結束時就不會去喚醒已在同步隊列中的線程,而是等到它的前繼結點來喚醒。當然,線程阻塞在這里除了可以調用signal方法喚醒之外,線程還可以響應中斷,如果線程在這里收到中斷請求就會繼續往下執行??梢钥吹骄€程醒來后會馬上檢查是否是由于中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個結點轉移到同步隊列中去,只不過是通過調用transferAfterCancelledWait方法來實現的。最后執行完這一步之后就會返回中斷情況并跳出while循環。
第四步:結點移出條件隊列后的操作
//線程醒來后就會以獨占模式獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //這步操作主要為防止線程在signal之前中斷而導致沒與條件隊列斷絕聯系 if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //根據中斷模式進行響應的中斷處理 if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //結束條件等待后根據中斷情況做出相應處理 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //如果中斷模式是THROW_IE就拋出異常 if (interruptMode == THROW_IE) { throw new InterruptedException(); //如果中斷模式是REINTERRUPT就自己掛起 } else if (interruptMode == REINTERRUPT) { selfInterrupt(); } }
當線程終止了while循環也就是條件等待后,就會回到同步隊列中。不管是因為調用signal方法回去的還是因為線程中斷導致的,結點最終都會在同步隊列中。這時就會調用acquireQueued方法執行在同步隊列中獲取鎖的操作,這個方法我們在獨占模式這一篇已經詳細的講過。也就是說,結點從條件隊列出來后又是乖乖的走獨占模式下獲取鎖的那一套,等這個結點再次獲得鎖之后,就會調用reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷發生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖后就拋出異常;如果中斷發生在signal方法之后,interruptMode就為REINTERRUPT,再次獲得鎖后就重新中斷。
2.不響應線程中斷的條件等待
//不響應線程中斷的條件等待 public final void awaitUninterruptibly() { //將當前線程添加到條件隊列尾部 Node node = addConditionWaiter(); //完全釋放鎖并返回當前同步狀態 int savedState = fullyRelease(node); boolean interrupted = false; //結點一直在while循環里進行條件等待 while (!isOnSyncQueue(node)) { //條件隊列中所有的線程都在這里被掛起 LockSupport.park(this); //線程醒來發現中斷并不會馬上去響應 if (Thread.interrupted()) { interrupted = true; } } if (acquireQueued(node, savedState) || interrupted) { //在這里響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起 //1.線程在條件等待時收到中斷請求 //2.線程在acquireQueued方法里收到中斷請求 selfInterrupt(); } }
3.設置相對時間的條件等待(不進行自旋)
//設置定時條件等待(相對時間), 不進行自旋等待 public final long awaitNanos(long nanosTimeout) throws InterruptedException { //如果線程被中斷則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //將當前線程添加到條件隊列尾部 Node node = addConditionWaiter(); //在進入條件等待之前先完全釋放鎖 int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { //判斷超時時間是否用完了 if (nanosTimeout <= 0L) { //如果已超時就需要執行取消條件等待操作 transferAfterCancelledWait(node); break; } //將當前線程掛起一段時間, 線程在這期間可能被喚醒, 也可能自己醒來 LockSupport.parkNanos(this, nanosTimeout); //線程醒來后先檢查中斷信息 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } long now = System.nanoTime(); //超時時間每次減去條件等待的時間 nanosTimeout -= now - lastTime; lastTime = now; } //線程醒來后就會以獨占模式獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //由于transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這里要再清理一遍 if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //根據中斷模式進行響應的中斷處理 if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //返回剩余時間 return nanosTimeout - (System.nanoTime() - lastTime); }
4.設置相對時間的條件等待(進行自旋)
//設置定時條件等待(相對時間), 進行自旋等待 public final boolean await(long time, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException(); } //獲取超時時間的毫秒數 long nanosTimeout = unit.toNanos(time); //如果線程被中斷則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //將當前線程添加條件隊列尾部 Node node = addConditionWaiter(); //在進入條件等待之前先完全釋放鎖 int savedState = fullyRelease(node); //獲取當前時間的毫秒數 long lastTime = System.nanoTime(); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { //如果超時就需要執行取消條件等待操作 if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } //如果超時時間大于自旋時間, 就將線程掛起一段時間 if (nanosTimeout >= spinForTimeoutThreshold) { LockSupport.parkNanos(this, nanosTimeout); } //線程醒來后先檢查中斷信息 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } long now = System.nanoTime(); //超時時間每次減去條件等待的時間 nanosTimeout -= now - lastTime; lastTime = now; } //線程醒來后就會以獨占模式獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //由于transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這里要再清理一遍 if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //根據中斷模式進行響應的中斷處理 if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //返回是否超時標志 return !timedout; }
5.設置絕對時間的條件等待
//設置定時條件等待(絕對時間) public final boolean awaitUntil(Date deadline) throws InterruptedException { if (deadline == null) { throw new NullPointerException(); } //獲取絕對時間的毫秒數 long abstime = deadline.getTime(); //如果線程被中斷則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //將當前線程添加到條件隊列尾部 Node node = addConditionWaiter(); //在進入條件等待之前先完全釋放鎖 int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { //如果超時就需要執行取消條件等待操作 if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } //將線程掛起一段時間, 期間線程可能被喚醒, 也可能到了點自己醒來 LockSupport.parkUntil(this, abstime); //線程醒來后先檢查中斷信息 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //線程醒來后就會以獨占模式獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //由于transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這里要再清理一遍 if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //根據中斷模式進行響應的中斷處理 if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //返回是否超時標志 return !timedout; }
6.喚醒條件隊列中的頭結點
//喚醒條件隊列中的下一個結點 public final void signal() { //判斷當前線程是否持有鎖 if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } Node first = firstWaiter; //如果條件隊列中有排隊者 if (first != null) { //喚醒條件隊列中的頭結點 doSignal(first); } } //喚醒條件隊列中的頭結點 private void doSignal(Node first) { do { //1.將firstWaiter引用向后移動一位 if ( (firstWaiter = first.nextWaiter) == null) { lastWaiter = null; } //2.將頭結點的后繼結點引用置空 first.nextWaiter = null; //3.將頭結點轉移到同步隊列, 轉移完成后有可能喚醒線程 //4.如果transferForSignal操作失敗就去喚醒下一個結點 } while (!transferForSignal(first) && (first = firstWaiter) != null); } //將指定結點從條件隊列轉移到同步隊列中 final boolean transferForSignal(Node node) { //將等待狀態從CONDITION設置為0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //如果更新狀態的操作失敗就直接返回false //可能是transferAfterCancelledWait方法先將狀態改變了, 導致這步CAS操作失敗 return false; } //將該結點添加到同步隊列尾部 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) { //出現以下情況就會喚醒當前線程 //1.前繼結點是取消狀態 //2.更新前繼結點的狀態為SIGNAL操作失敗 LockSupport.unpark(node.thread); } return true; }
可以看到signal方法最終的核心就是去調用transferForSignal方法,在transferForSignal方法中首先會用CAS操作將結點的狀態從CONDITION設置為0,然后再調用enq方法將該結點添加到同步隊列尾部。我們再看到接下來的if判斷語句,這個判斷語句主要是用來判斷什么時候會去喚醒線程,出現這兩種情況就會立即喚醒線程,一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。這兩種情況都會馬上去喚醒線程,否則的話就僅僅只是將結點從條件隊列中轉移到同步隊列中就完了,而不會立馬去喚醒結點中的線程。signalAll方法也大致類似,只不過它是去循環遍歷條件隊列中的所有結點,并將它們轉移到同步隊列,轉移結點的方法也還是調用transferForSignal方法。
7.喚醒條件隊列的所有結點
//喚醒條件隊列后面的全部結點 public final void signalAll() { //判斷當前線程是否持有鎖 if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } //獲取條件隊列頭結點 Node first = firstWaiter; if (first != null) { //喚醒條件隊列的所有結點 doSignalAll(first); } } //喚醒條件隊列的所有結點 private void doSignalAll(Node first) { //先把頭結點和尾結點的引用置空 lastWaiter = firstWaiter = null; do { //先獲取后繼結點的引用 Node next = first.nextWaiter; //把即將轉移的結點的后繼引用置空 first.nextWaiter = null; //將結點從條件隊列轉移到同步隊列 transferForSignal(first); //將引用指向下一個結點 first = next; } while (first != null); }
至此,我們整個的AbstractQueuedSynchronizer源碼分析就結束了,相信通過這四篇的分析,大家能更好的掌握并理解AQS。這個類確實很重要,因為它是其他很多同步類的基石,由于筆者水平和表達能力有限,如果哪些地方沒有表述清楚的,或者理解不到位的,還請廣大讀者們能夠及時指正,共同探討學習??稍谙路搅粞蚤喿x中所遇到的問題,如果有需要AQS注釋源碼的也可聯系筆者索取。
注:以上全部分析基于JDK1.7,不同版本間會有差異,讀者需要注意。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持創新互聯。
分享題目:Java并發系列之AbstractQueuedSynchronizer源碼分析(條件隊列)
本文鏈接:http://vcdvsql.cn/article4/jhigie.html
成都網站建設公司_創新互聯,為您提供標簽優化、靜態網站、全網營銷推廣、微信小程序、關鍵詞優化、軟件開發
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯