在JAVA的世界里,如果想并行的執(zhí)行一些任務(wù),可以使用ThreadPoolExecutor。
大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場(chǎng)景下,比如瞬時(shí)大流量的,為了提高響應(yīng)和吞吐量,最好還是擴(kuò)展一下ThreadPoolExecutor。
全宇宙的JAVA IT人士應(yīng)該都知道ThreadPoolExecutor的執(zhí)行流程:
core線程還能應(yīng)付的,則不斷的創(chuàng)建新的線程;
core線程無(wú)法應(yīng)付,則將任務(wù)扔到隊(duì)列里面;
隊(duì)列滿了(意味著插入任務(wù)失敗),則開(kāi)始創(chuàng)建MAX線程,線程數(shù)達(dá)到MAX后,隊(duì)列還一直是滿的,則拋出RejectedExecutionException.
這個(gè)執(zhí)行流程有個(gè)小問(wèn)題,就是當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,會(huì)立刻將任務(wù)添加到隊(duì)列中,如果隊(duì)列非常長(zhǎng),而任務(wù)又非常多,那么將會(huì)有頻繁的任務(wù)入隊(duì)列和任務(wù)出隊(duì)列的操作。
根據(jù)實(shí)際的壓測(cè)發(fā)現(xiàn),這種操作也是有一定消耗的。其實(shí)JAVA提供的SynchronousQueue隊(duì)列是一個(gè)零長(zhǎng)度的隊(duì)列,任務(wù)都是直接由生產(chǎn)者遞交給消費(fèi)者,中間沒(méi)有入隊(duì)列的過(guò)程,可見(jiàn)JAVA API的設(shè)計(jì)者也是有考慮過(guò)入隊(duì)列這種操作的開(kāi)銷。
另外,任務(wù)一多,立刻扔到隊(duì)列里,而MAX線程又不干活,如果隊(duì)列里面太多任務(wù)了,只有可憐的core線程在忙,也是會(huì)影響性能的。
當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,能不能延后入隊(duì)列這個(gè)操作呢? 讓MAX線程盡快啟動(dòng)起來(lái),幫忙處理任務(wù)。
也即是說(shuō),當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,如果當(dāng)前線程池中的線程數(shù)量還小于MAX線程數(shù)的時(shí)候,繼續(xù)創(chuàng)建新的線程處理任務(wù),一直到線程數(shù)量到達(dá)MAX后,才將任務(wù)插入到隊(duì)列里
我們通過(guò)覆蓋隊(duì)列的offer方法來(lái)實(shí)現(xiàn)這個(gè)目標(biāo)。
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果線程池里的線程數(shù)量已經(jīng)到達(dá)最大,將任務(wù)添加到隊(duì)列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說(shuō)明有空閑的線程,這個(gè)時(shí)候無(wú)需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊(duì)列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果線程池里的線程數(shù)量還沒(méi)有到達(dá)最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊(duì)列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
注意其中的
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
是表示core線程仍然能處理的來(lái),同時(shí)又有空閑線程的情況,將任務(wù)插入到隊(duì)列中。 如何判斷線程池中有空閑線程呢? 可以使用一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn),每當(dāng)execute方法被執(zhí)行的時(shí)候,計(jì)算器加1,當(dāng)afterExecute被執(zhí)行后,計(jì)數(shù)器減1.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
//代碼未完整,待補(bǔ)充。。。。。
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
這樣,當(dāng)
executor.getSubmittedTaskCount() < currentPoolThreadSize
的時(shí)候,說(shuō)明有空閑線程。