原創 why技術
前幾天,有個朋友在微信上找我。他問:why哥,在嗎?
我說:發生腎么事了?
他啪的一下就提了一個問題啊,很快。
我大意了,隨意瞅了一眼,這題不是很簡單嗎?
結果沒想到里面還隱藏著一篇文章。
故事,得從這個問題說起:
上面的圖中的線程池配置是這樣的:
ExecutorService executorService = new ThreadPoolExecutor(40, 80, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new DefaultThreadFactory("test"), new ThreadPoolExecutor.DiscardPolicy());
上面這個線程池里面的參數、執行流程啥的我就不再解釋了。
畢竟我曾經在《一人血書,想讓why哥講一下這道面試題。》這篇文章里面發過毒誓的,再說就是小王吧了:
上面的這個問題其實就是一個非常簡單的八股文問題:
非核心線程在什么時候被回收?
如果經過 keepAliveTime 時間后,超過核心線程數的線程還沒有接受到新的任務,就會被回收。
標準答案,完全沒毛病。
那么我現在帶入一個簡單的場景,為了簡單直觀,我們把線程池相關的參數調整一下:
ExecutorService executorService = new ThreadPoolExecutor(2, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new DefaultThreadFactory("test"), new ThreadPoolExecutor.DiscardPolicy());
那么問題來了:
這個線程最多能容納的任務是不是 5 個?
假設任務需要執行 1 秒鐘,那么我直接循環里面提交 5 個任務到線程池,肯定是在 1 秒鐘之內提交完成,那么當前線程池的活躍線程是不是就是 3 個?
如果接下來的 30 秒,沒有任務提交過來。那么 30 秒之后,當前線程池的活躍線程是不是就是 2 個?
上面這三個問題的答案都是肯定的,如果你搞不明白為什么,那么我建議你先趕緊去補充一下線程池相關的知識點,下面的內容你強行看下去肯定是一臉懵逼的。
接下來的問題是這樣的:
如果當前線程池的活躍線程是 3 個(2 個核心線程+ 1 個非核心線程),但是它們各自的任務都執行完成了,都處于 waiting 狀態。然后我每隔 3 秒往線程池里面扔一個耗時 1 秒的任務。那么 30 秒之后,活躍線程數是多少?
先說答案:還是 3 個。
從我個人正常的思維,是這樣的:核心線程是空閑的,每隔 3 秒扔一個耗時 1 秒的任務過來,所以僅需要一個核心線程就完全處理的過來。
那么,30 秒內,超過核心線程的那一個線程一直處于等待狀態,所以 30 秒之后,就被回收了。 但是上面僅僅是我的主觀認為,而實際情況呢?
30 秒之后,超過核心線程
的線程并不會被回收,活躍線程還是 3 個。 到這里,如果你知道是 3 個,且知道為什么是 3 個,即了解為什么非核心線程并沒有被回收,那么接下里的內容應該就是你已經掌握的了。
可以不看,拉到最后,點個贊,去忙自己的事情吧。
如果你不知道,可以接著看,了解一下為什么是 3 個。
雖然我相信沒有面試官會問這樣的問題,但是對于你去理解線程池,是有幫助的。
先上 Demo
基于我前面說的這個場景,碼出代碼如下:
public class ThreadTest { @Test public void test() throws InterruptedException { ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new DefaultThreadFactory("test"), new ThreadPoolExecutor.DiscardPolicy()); //每隔兩秒打印線程池的信息 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println("=====================================thread-pool-info:" + new Date() + "====================================="); System.out.println("CorePoolSize:" + executorService.getCorePoolSize()); System.out.println("PoolSize:" + executorService.getPoolSize()); System.out.println("ActiveCount:" + executorService.getActiveCount()); System.out.println("KeepAliveTime:" + executorService.getKeepAliveTime(TimeUnit.SECONDS)); System.out.println("QueueSize:" + executorService.getQueue().size()); }, 0, 2, TimeUnit.SECONDS); try { //同時提交5個任務,模擬達到最大線程數 for (int i = 0; i < 5; i++) { executorService.execute(new Task()); } } catch (Exception e) { e.printStackTrace(); } //休眠10秒,打印日志,觀察線程池狀態 Thread.sleep(10000); //每隔3秒提交一個任務 while (true) { Thread.sleep(3000); executorService.submit(new Task()); } } static class Task implements Runnable { @Override public void run(){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "-執行任務"); } }}
這份代碼也是提問的哥們給我的,我做了微調,你直接粘出去就能跑起來。
show me code,no bb。這才是相互探討的正確姿勢。
這個程序的運行結果是這樣的:
一共五個任務,線程池的運行情況是什么樣的呢?
先看標號為 ① 的地方:
三個線程都在執行任務,然后 2 號線程和 1 號線程率先完成了任務,接著把隊列里面的兩個任務拿出來執行(標號為 ② 的地方)。
按照程序,接下來,每隔 3 秒就有一個耗時 1 秒的任務過來。而此時線程池里面的三個活躍線程都是空閑狀態。
那么問題就來了:
該選擇哪個線程來執行這個任務呢?是隨機選一個嗎?
雖然接下來的程序還沒有執行,但是基于前面的截圖,我現在就可以告訴你,接下來的任務,線程執行順序為:
Thread[test-1-3,5,main]-執行任務
Thread[test-1-2,5,main]-執行任務
Thread[test-1-1,5,main]-執行任務
Thread[test-1-3,5,main]-執行任務
Thread[test-1-2,5,main]-執行任務
Thread[test-1-1,5,main]-執行任務
......
即雖然線程都是空閑的,但是當任務來的時候不是隨機調用的,而是輪詢。
由于是輪詢,每三秒執行一次,所以非核心線程的空閑時間最多也就是 9 秒,不會超過 30 秒,所以一直不會被回收。
基于這個 Demo,我們就從表象上回答了,為什么活躍線程數一直為 3。
為什么是輪詢?
我們通過 Demo 驗證了上面場景中,線程執行順序為輪詢。
那么為什么呢?
這只是通過日志得出的表象呀,內部原理呢?對應的代碼呢?
這一小節帶大家看一下到底是怎么回事。
首先我看到這個表象的時候我就猜測:這三個線程肯定是在某個地方被某個隊列存起來了,基于此,才能實現輪詢調用。
所以,我一直在找這個隊列,一直沒有找到對應的代碼,我還有點著急了。想著不會是在操作系統層面控制的吧?
后來我冷靜下來,覺得不太可能。于是電光火石之間,我想到了,要不先 Dump 一下線程,看看它們都在干啥:
Dump 之后,這玩意我眼熟啊,AQS 的等待隊列啊。
根據堆棧信息,我們可以定位到這里的源碼:
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos
看到這里的時候,我才一下恍然大悟了起來。
害,是自己想的太多了。
說穿了,這其實就是個生產者-消費者的問題啊。
三個線程就是三個消費者,現在沒有任務需要處理,它們就等著生產者生產任務,然后通知它們準備消費。
由于本文只是帶著你去找答案在源碼的什么地方,不對源碼進行解讀。
所以我默認你是對 AQS 是有一定的了解的。
可以看到 addConditionWaiter 方法其實就是在操作我們要找的那個隊列。學名叫做等待隊列。
Debug 一下,看看隊列里面的情況:
巧了嘛,這不是。順序剛好是:
Thread[test-1-3,5,main]
Thread[test-1-2,5,main]
Thread[test-1-1,5,main]
消費者這邊我們大概摸清楚了,接著去看看生產者。
java.util.concurrent.ThreadPoolExecutor#execute
線程池是在這里把任務放到隊列里面去的。
而這個方法里面的源碼是這樣的:
其中signalNotEmpty() 最終會走到 doSignal 方法,而該方法里面會調用 transferForSignal 方法。
這個方法里面會調用 LockSupport.unpark(node.thred) 方法,喚醒線程:
而喚醒的順序,就是等待隊列里面的順序:
所以,現在你知道當一個任務來了之后,這個任務該由線程池里面的哪個線程執行,這個不是隨機的,也不是隨便來的。
是講究一個順序的。
什么順序呢?
Condition 里面的等待隊列里面的順序。
什么,你不太懂 Condition?
那還不趕緊去學?等著我給你講呢?
本來我是想寫一下的,后來發現《Java并發編程的藝術》一書中的 5.6.2 小節已經寫的挺清楚了,圖文并茂。這部分內容其實也是面試的時候的高頻考點,所以自己去看看就好了。
先欠著,欠著。
非核心線程怎么回收?
還是上面的例子,假設非核心線程就空閑了超過 30 秒,那么它是怎么被回收的呢?
這個也是一個比較熱門的面試題。
這題沒有什么高深的地方,答案就藏在源碼的這個地方:
java.util.concurrent.ThreadPoolExecutor#getTask
當 timed 參數為 true 的時候,會執行 workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) 方法。
而 timed 什么時候為 true 呢?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
allowCoreThreadTimeOut 默認為 false。
所以,就是看 wc > corePoolSize 條件,wc 是活躍線程數。此時活躍線程數為 3 ,大于核心線程數 2。
因此 timed 為 true。
也就是說,當前 workQueue 為空的時候,現在三個線程都阻塞 workQueue.poll 方法中。
而當指定時間后,workQueue 還是為空,則返回為 null。
于是在 1077 行把 timeOut 修改為 true。
進入一下次循環,返回 null。
最終會執行到這個方法:
java.util.concurrent.ThreadPoolExecutor#processWorkerExit
而這個方法里面會執行 remove 的操作。
于是線程就被回收了。
所以當超過指定時間后,線程會被回收。
那么被回收的這個線程是核心線程還是非核心線程呢?
不知道。
因為在線程池里面,核心線程和非核心線程僅僅是一個概念而已,其實拿著一個線程,我們并不能知道它是核心線程還是非核心線程。
這個地方就是一個證明,因為當工作線程多余核心線程數之后,所有的線程都在 poll,也就是說所有的線程都有可能被回收:
另外一個強有力的證明就是 addWorker 這里:
core 參數僅僅是控制取 corePoolSize 還是 maximumPoolSize。
所以,這個問題你說怎么回答:
JDK 區分的方式就是不區分。
那么我們可以知道嗎?
可以,比如通過觀察日志,前面的案例中,我就知道這兩個是核心線程,因為它們最先創建:
Thread[test-1-1,5,main]-執行任務
Thread[test-1-2,5,main]-執行任務
在程序里面怎么知道呢?
目前是不知道的,但是這個需求,加錢就可以實現。
自己擴展一下線程池嘛,給線程池里面的線程打個標還不是一件很簡單的事情嗎?
只是你想想,你區分這玩意干啥,有沒有可落地的需求?
畢竟,脫離需求談實現。都是耍流氓。
最后說一句
才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,可以在后臺提出來,我對其加以修改。
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.