本文將主要結(jié)合源碼對(duì) JDK 中的阻塞隊(duì)列進(jìn)行分析,并比較其各自的特點(diǎn);
在順昌等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,成都全網(wǎng)營(yíng)銷推廣,外貿(mào)營(yíng)銷網(wǎng)站建設(shè),順昌網(wǎng)站建設(shè)費(fèi)用合理。
說到阻塞隊(duì)列想到的第一個(gè)應(yīng)用場(chǎng)景可能就是生產(chǎn)者消費(fèi)者模式了,如圖所示;
根據(jù)上圖所示,明顯在入隊(duì)和出隊(duì)的時(shí)候,會(huì)發(fā)生競(jìng)爭(zhēng);所以一種很自然的想法就是使用鎖,而在 JDK 中也的確是通過鎖來實(shí)現(xiàn)的;所以?BlockingQueue
?的源碼其實(shí)可以當(dāng)成鎖的應(yīng)用示例來查看;同時(shí) JDK 也為我們提供了多種不同功能的隊(duì)列:
ArrayBlockingQueue?:基于數(shù)組的有界隊(duì)列;
LinkedBlockingQueue?:基于鏈表的×××隊(duì)列(可以設(shè)置容量);
PriorityBlockingQueue?:基于二叉堆的×××優(yōu)先級(jí)隊(duì)列;
DelayQueue?:基于 PriorityBlockingQueue 的×××延遲隊(duì)列;
SynchronousQueue?:無容量的阻塞隊(duì)列(Executors.newCachedThreadPool() 中使用的隊(duì)列);
LinkedTransferQueue?:基于鏈表的×××隊(duì)列;
接下來我們就對(duì)最常用的?ArrayBlockingQueue
?和?LinkedBlockingQueue
?進(jìn)行分析;
public?class?ArrayBlockingQueue<E>?extends?AbstractQueue<E>?implements?BlockingQueue<E>,?java.io.Serializable?{????final?Object[]?items;???????????????//?容器數(shù)組 ????int?takeIndex;??????????????????????//?出隊(duì)索引 ????int?putIndex;???????????????????????//?入隊(duì)索引 ????int?count;??????????????????????????//?排隊(duì)個(gè)數(shù) ????final?ReentrantLock?lock;???????????//?全局鎖 ????private?final?Condition?notEmpty;???//?出隊(duì)條件隊(duì)列 ????private?final?Condition?notFull;????//?入隊(duì)條件隊(duì)列 ????... }
ArrayBlockingQueue
?的結(jié)構(gòu)如圖所示:
如圖所示,
ArrayBlockingQueue
?的數(shù)組其實(shí)是一個(gè)邏輯上的環(huán)狀結(jié)構(gòu),在添加、取出數(shù)據(jù)的時(shí)候,并沒有像?ArrayList
?一樣發(fā)生數(shù)組元素的移動(dòng)(當(dāng)然除了?removeAt(final int removeIndex)
);
并且由?takeIndex
?和?putIndex
?指示讀寫位置;
在讀寫的時(shí)候還有兩個(gè)讀寫條件隊(duì)列;
下面我們就讀寫操作,對(duì)源碼簡(jiǎn)單分析:
public?void?put(E?e)?throws?InterruptedException?{ ??checkNotNull(e);??final?ReentrantLock?lock?=?this.lock; ??lock.lockInterruptibly();??try?{????while?(count?==?items.length)??//?當(dāng)隊(duì)列已滿的時(shí)候放入?putCondition?條件隊(duì)列 ??????notFull.await();??? ????enqueue(e);??//?入隊(duì) ??}?finally?{ ????lock.unlock(); ??} }
private?void?enqueue(E?x)?{??//?assert?lock.getHoldCount()?==?1; ??//?assert?items[putIndex]?==?null; ??final?Object[]?items?=?this.items; ??items[putIndex]?=?x;??//?插入隊(duì)列 ??if?(++putIndex?==?items.length)?putIndex?=?0;??//?指針走一圈的時(shí)候復(fù)位 ??count++; ??notEmpty.signal();??//?喚醒?takeCondition?條件隊(duì)列中等待的線程}
public?E?take()?throws?InterruptedException?{??final?ReentrantLock?lock?=?this.lock; ??lock.lockInterruptibly();??try?{????while?(count?==?0)??//?當(dāng)隊(duì)列為空的時(shí)候,放入?takeCondition?條件 ??????notEmpty.await();?? ????return?dequeue();???//?出隊(duì) ??}?finally?{ ????lock.unlock(); ??} }
private?E?dequeue()?{??//?assert?lock.getHoldCount()?==?1; ??//?assert?items[takeIndex]?!=?null; ??final?Object[]?items?=?this.items;??@SuppressWarnings("unchecked") ??E?x?=?(E)?items[takeIndex];??//?取出元素 ??items[takeIndex]?=?null;??if?(++takeIndex?==?items.length) ????takeIndex?=?0; ??count--;??if?(itrs?!=?null) ????itrs.elementDequeued(); ??notFull.signal();??//?取出元素后,隊(duì)列空出一位,所以喚醒?putCondition?中的線程 ??return?x; }
public?class?LinkedBlockingQueue<E>?extends?AbstractQueue<E>?implements?BlockingQueue<E>,?java.io.Serializable?{?? ??private?final?int?capacity;?//?默認(rèn)?Integer.MAX_VALUE ??private?final?AtomicInteger?count?=?new?AtomicInteger();?//?容量 ??transient?Node<E>?head;??????????//?頭結(jié)點(diǎn)?head.item?==?null ??private?transient?Node<E>?last;??//?尾節(jié)點(diǎn)?last.next?==?null ??private?final?ReentrantLock?takeLock?=?new?ReentrantLock();??//?出隊(duì)鎖 ??private?final?Condition?notEmpty?=?takeLock.newCondition();??//?出隊(duì)條件 ??private?final?ReentrantLock?putLock?=?new?ReentrantLock();???//?入隊(duì)鎖 ??private?final?Condition?notFull?=?putLock.newCondition();????//?入隊(duì)條件 ?? ??static?class?Node<E>?{ ????E?item; ????Node<E>?next; ????Node(E?x)?{?item?=?x;?} ??} }
LinkedBlockingQueue
?的結(jié)構(gòu)如圖所示:
如圖所示,
LinkedBlockingQueue
?其實(shí)就是一個(gè)簡(jiǎn)單的單向鏈表,其中頭部元素的數(shù)據(jù)為空,尾部元素的 next 為空;
因?yàn)樽x寫都有競(jìng)爭(zhēng),所以在頭部和尾部分別有一把鎖;同時(shí)還有對(duì)應(yīng)的兩個(gè)條件隊(duì)列;
下面我們就讀寫操作,對(duì)源碼簡(jiǎn)單分析:
public?boolean?offer(E?e)?{??if?(e?==?null)?throw?new?NullPointerException();??final?AtomicInteger?count?=?this.count;??if?(count.get()?==?capacity)?return?false;??//?如果隊(duì)列已滿,直接返回失敗 ??int?c?=?-1; ??Node<E>?node?=?new?Node<E>(e);??????????????//?將數(shù)據(jù)封裝為節(jié)點(diǎn) ??final?ReentrantLock?putLock?=?this.putLock; ??putLock.lock();??try?{????if?(count.get()?<?capacity)?{ ??????enqueue(node);??????????????????????????//?入隊(duì) ??????c?=?count.getAndIncrement();??????if?(c?+?1?<?capacity)???????????????????//?如果隊(duì)列未滿,則繼續(xù)喚醒?putCondition?條件隊(duì)列 ????????notFull.signal(); ????} ??}?finally?{ ????putLock.unlock(); ??}??if?(c?==?0)???????????//?如果添加之前的容量為0,說明在出隊(duì)的時(shí)候有競(jìng)爭(zhēng),則喚醒?takeCondition ????signalNotEmpty();???//?因?yàn)槭莾砂焰i,所以在喚醒?takeCondition的時(shí)候,還需要獲取?takeLock ??return?c?>=?0; }
private?void?enqueue(Node<E>?node)?{??//?assert?putLock.isHeldByCurrentThread(); ??//?assert?last.next?==?null; ??last?=?last.next?=?node;??//?連接節(jié)點(diǎn),并設(shè)置尾節(jié)點(diǎn)}
public?E?take()?throws?InterruptedException?{ ??E?x;??int?c?=?-1;??final?AtomicInteger?count?=?this.count;??final?ReentrantLock?takeLock?=?this.takeLock; ??takeLock.lockInterruptibly();??try?{????while?(count.get()?==?0)?{???//?如果隊(duì)列為空,則加入?takeCondition?條件隊(duì)列 ??????notEmpty.await(); ????} ????x?=?dequeue();???????????????//?出隊(duì) ????c?=?count.getAndDecrement();????if?(c?>?1) ??????notEmpty.signal();?????????//?如果隊(duì)列還有剩余,則繼續(xù)喚醒?takeCondition?條件隊(duì)列 ??}?finally?{ ????takeLock.unlock(); ??}??if?(c?==?capacity)?????????????//?如果取之前隊(duì)列是滿的,說明入隊(duì)的時(shí)候有競(jìng)爭(zhēng),則喚醒?putCondition ????signalNotFull();?????????????//?同樣注意是兩把鎖 ??return?x; }
private?E?dequeue()?{??//?assert?takeLock.isHeldByCurrentThread(); ??//?assert?head.item?==?null; ??Node<E>?h?=?head; ??Node<E>?first?=?h.next; ??h.next?=?h;?//?help?GC???//?將next引用指向自己,則該節(jié)點(diǎn)不可達(dá),在下一次GC的時(shí)候回收 ??head?=?first; ??E?x?=?first.item; ??first.item?=?null;??return?x; }
根據(jù)以上的講解,我們可以逐步分析出一些不同,以及在不同場(chǎng)景隊(duì)列的選擇:
結(jié)構(gòu)不同
ABQ:基于數(shù)組,有界,一把鎖;
LBQ:基于鏈表,×××,兩把鎖;
內(nèi)存分配
ABQ:隊(duì)列空間預(yù)先初始化,受堆空間影響小,穩(wěn)定性高;
LBQ:隊(duì)列空間動(dòng)態(tài)變化,受對(duì)空間影響大,穩(wěn)定性差;
入隊(duì)、出隊(duì)效率
ABQ:數(shù)據(jù)直接賦值,移除;隊(duì)列空間重復(fù)使用,效率高;
LBQ:數(shù)據(jù)需要包裝為節(jié)點(diǎn);需開辟新空間,效率低;
競(jìng)爭(zhēng)方面
ABQ:出入隊(duì)共用一把鎖,相互影響;競(jìng)爭(zhēng)嚴(yán)重時(shí)效率低;
LBQ:出入隊(duì)分用兩把鎖,互不影響;競(jìng)爭(zhēng)嚴(yán)重時(shí)效率影響??;
所以在這里并不能簡(jiǎn)單的給出詳細(xì)的數(shù)據(jù),證明哪個(gè)隊(duì)列更適合什么場(chǎng)景,最好是結(jié)合實(shí)際使用場(chǎng)景分析。
網(wǎng)站名稱:JDK源碼分析(11)之BlockingQueue相關(guān)
網(wǎng)頁(yè)鏈接:http://vcdvsql.cn/article6/pdhgig.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、Google、虛擬主機(jī)、網(wǎng)站內(nèi)鏈、小程序開發(fā)、軟件開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)