ThreadPoolExcutor 原理探究

概論

線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決于可用的并發處理器、處理器內核、內存、網絡 sockets 等的數量。 例如,線程數一般取 cpu 數量 +2 比較合適,線程數過多會導致額外的線程切換開銷。

Java 中的線程池是用 ThreadPoolExecutor 類來實現的. 本文就對該類的源碼來分析一下這個類內部對于線程的創建, 管理以及后臺任務的調度等方面的執行原理。

先看一下線程池的類圖:

線程池的類圖

上圖的目的主要是為了讓大家知道線程池相關類之間的關系,至少賺個眼熟,以后看到不會有害怕的感覺。


 

Executor 框架接口

Executor 框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。

下面是 ThreadPoolExeCutor 類圖。Executors 其實是一個工具類,里面提供了好多靜態方法,這些方法根據用戶選擇返回不同的線程實例。

從上圖也可以看出來,ThreadPoolExeCutor 是線程池的核心。

J.U.C 中有三個 Executor 接口:

  • Executor:一個運行新任務的簡單接口;

  • ExecutorService:擴展了 Executor 接口。添加了一些用來管理執行器生命周期和任務生命周期的方法;

  • ScheduledExecutorService:擴展了 ExecutorService。支持 Future 和定期執行任務。

其實通過這些接口就可以看到一些設計思想,每個接口的名字和其任務是完全匹配的。不會因為 Executor 中只有一個方法,就將其放到其他接口中。這也是很重要的單一原則。


 

ThreadPoolExeCutor 分析

在去具體分析 ThreadPoolExeCutor 運行邏輯前,先看下面的流程圖:

該圖是 ThreadPoolExeCutor 整個運行過程的一個概括,整個源碼的核心邏輯總結起來就是:

  1. 創建線程:要知道如何去創建線程,控制線程數量,線程的存活與銷毀;

  2. 添加任務:任務添加后如何處理,是立刻執行,還是先保存;

  3. 執行任務:如何獲取任務,任務執行失敗后如何處理?

下面將進入源碼分析,來深入理解 ThreadPoolExeCutor 的設計思想。


 

構造函數

先來看構造函數:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
     // 注意 workQueue, threadFactory, handler 是不可以為null 的,為空會直接拋出錯誤
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
  1. corePoolSize 核心線程數表示核心線程池的大小。當提交一個任務時,如果當前核心線程池的線程個數沒有達到 corePoolSize,則會創建新的線程來執行所提交的任務,即使當前核心線程池有空閑的線程。如果當前核心線程池的線程個數已經達到了corePoolSize,則不再重新創建線程。如果調用了 prestartCoreThread() 或者 prestartAllCoreThreads(),線程池創建的時候所有的核心線程都會被創建并且啟動。若 corePoolSize == 0,則任務執行完之后,沒有任何請求進入時,銷毀線程池的線程。若 corePoolSize > 0,即使本地任務執行完畢,核心線程也不會被銷毀。corePoolSize 其實可以理解為可保留的空閑線程數。

  2. maximumPoolSize: 表示線程池能夠容納同時執行的最大線程數。如果當阻塞隊列已滿時,并且當前線程池線程個數沒有超過 maximumPoolSize 的話,就會創建新的線程來執行任務。注意 maximumPoolSize >= 1 必須大于等于 1。maximumPoolSize == corePoolSize ,即是固定大小線程池。實際上最大容量是由 CAPACITY 控制

  3. keepAliveTime: 線程空閑時間。當空閑時間達到 keepAliveTime值時,線程會被銷毀,直到只剩下 corePoolSize 個線程為止,避免浪費內存和句柄資源。默認情況,當線程池的線程數 > corePoolSize 時,keepAliveTime 才會起作用。但當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 變量設置為 true 時,核心線程超時后會被回收。

  4. unit時間單位。為 keepAliveTime 指定時間單位。

  5. workQueue 緩存隊列。當請求的線程數 > corePoolSize 時,線程進入 BlockingQueue 阻塞隊列。可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。

  6. threadFactory 創建線程的工程類。可以通過指定線程工廠為每個創建出來的線程設置更有意義的名字,如果出現并發問題,也方便查找問題原因。

  7. handler 執行拒絕策略的對象。當線程池的阻塞隊列已滿和指定的線程都已經開啟,說明當前線程池已經處于飽和狀態了,那么就需要采用一種策略來處理這種情況。采用的策略有這幾種:
    • AbortPolicy: 直接拒絕所提交的任務,并拋出 RejectedExecutionException 異常;

    • CallerRunsPolicy:只用調用者所在的線程來執行任務;

    • DiscardPolicy:不處理直接丟棄掉任務;

    • DiscardOldestPolicy:丟棄掉阻塞隊列中存放時間最久的任務,執行當前任務


屬性定義

看完構造函數之后,再來看下該類里面的變量,有助于進一步理解整個代碼運行邏輯,下面是一些比較重要的變量:

// 用來標記線程池狀態(高3位),線程個數(低29位)
// 默認是 RUNNING 狀態,線程個數為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 線程個數掩碼位數,整型最大位數-3,可以適用于不同平臺
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個數(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 獲取高三位 運行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//獲取低29位 線程個數
private static int workerCountOf(int c)  { return c & CAPACITY; }

//計算ctl新值,線程狀態 與 線程個數
private static int ctlOf(int rs, int wc) { return rs | wc; }

這里需要對一些操作做些解釋。 

  • Integer.SIZE:對于不同平臺,其位數不一樣,目前常見的是 32 位;

  • (1 << COUNT_BITS) - 1:首先是將 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其余都是 0;-1 操作則是將后面前面的 COUNT_BITS 位都變成 1。

  • -1 << COUNT_BITS:-1 的原碼是 10000000 00000000 00000000 00000001 ,反碼是 111111111 11111111 11111111 11111110 ,補碼 +1,然后左移 29 位是 11100000 00000000 00000000 00000000;這里轉為十進制是負數。

  • ~CAPACITY取反,最高三位是1;

總結:這里巧妙利用 bit 操作來將線程數量和運行狀態聯系在一起,減少了變量的存在和內存的占用。其中五種狀態的十進制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED


 

線程池狀態

線程池狀態含義:

  • RUNNING:接受新任務并且處理阻塞隊列里的任務;

  • SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務;

  • STOP:拒絕新任務并且拋棄阻塞隊列里的任務同時會中斷正在處理的任務;

  • TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為 0,將要調用 terminated 方法

  • TERMINATED:終止狀態。terminated 方法調用完成以后的狀態;

線程池狀態轉換:

  • RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它里面調用了shutdown()方法。

  • RUNNING or SHUTDOWN)-> STOP:顯式 shutdownNow() 方法;

  • SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候;

  • STOP -> TIDYING:當線程池為空的時候;

  • TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候;


 原碼,反碼,補碼知識小劇場:

1. 原碼:原碼就是符號位加上真值的絕對值, 即用第一位表示符號,其余位表示值. 比如如果是 8 位二進制:

[+1] = 0000 0001

[-1] = 1000 0001

負數原碼第一位是符號位. 

 

2. 反碼:反碼的表示方法是,正數的反碼是其本身,負數的反碼是在其原碼的基礎上, 符號位不變,其余各個位取反.

[+1] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110]

 

3. 補碼:補碼的表示方法是,正數的補碼就是其本身,負數的補碼是在其原碼的基礎上, 符號位不變, 其余各位取反, 最后 +1. (即在反碼的基礎上 +1)

[+1] = [0000 0001] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110] = [1111 1111]

4. 總結
在知道一個數原碼的情況下:
正數:反碼,補碼 就是本身自己
負數:反碼是高位符號位不變,其余位取反。補碼:反碼+1

 

 5. 左移:當數值左、右移時,先將數值轉化為其補碼形式,移完后,再轉換成對應的原碼

     左移:高位丟棄,低位補零

     [+1]  = [00000001]

     [0000 0001] << 1 = [0000 0010] = [0000 0010] = [+2]

     [-1]  = [1000 0001] = [1111 1111]

     [1111 1111] << 1 = [1111 1110] = [1000 0010] = [-2]

其中,再次提醒,負數的補碼是反碼+1;負數的反碼是補碼-1;

 

 6. 右移:高位保持不變,低位丟棄

     [+127] = [0111 1111] = [0111 1111]

     [0111 1111]補 >> 1 = [0011 1111] = [0011 1111] = [+63]

     [-127] = [1111 1111] = [1000 0001]

     [1000 0001] >> 1 = [1100 0000] = [1100 0000]原 = [-64]


execute 方法分析

通過 ThreadPoolExecutor 創建線程池后,提交任務后執行過程是怎樣的,下面來通過源碼來看一看。execute 方法源碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // 返回包含線程數及線程池狀態(頭3位)
    int c = ctl.get();
    
    // 如果工作線程數小于核心線程數,則創建線程任務執行
    if (workerCountOf(c) < corePoolSize) {
        
        if (addWorker(command, true))
            return;
            
        // 如果創建失敗,防止外部已經在線程池中加入新任務,重新獲取
        c = ctl.get();
    }
    
    // 只有線程池處于 RUNNING 狀態,且 入隊列成功
    if (isRunning(c) && workQueue.offer(command)) {
      // 后面的操作屬于double-check
        int recheck = ctl.get();
        
        // 如果線程池不是 RUNNING 狀態,則將剛加入隊列的任務移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
            
        // 如果之前的線程已被消費完,新建一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 核心池和隊列都滿了,嘗試創建一個新線程
    else if (!addWorker(command, false))
        // 如果 addWorker 返回是 false,即創建失敗,則喚醒拒絕策略
        reject(command);
} 
execute 方法執行邏輯有這樣幾種情況:
  1. 如果當前運行的線程少于 corePoolSize,則會創建新的線程來執行新的任務;

  2. 如果運行的線程個數等于或者大于 corePoolSize,則會將提交的任務存放到阻塞隊列 workQueue 中;

  3. 如果當前 workQueue 隊列已滿的話,則會創建新的線程來執行任務;

  4. 如果線程個數已經超過了 maximumPoolSize,則會使用飽和策略 RejectedExecutionHandler 來進行處理。

這里要注意一下 addWorker(null, false) 也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到 workQueue 中了,所以 worker 在執行的時候,會直接從 workQueue 中獲取任務。所以,在 workerCountOf(recheck) == 0 時執行 addWorker(null, false) 也是為了保證線程池在 RUNNING 狀態下必須要有一個線程來執行任務。

需要注意的是,線程池的設計思想就是使用了核心線程池 corePoolSize,阻塞隊列 workQueue 和線程池 maximumPoolSize,這樣的緩存策略來處理任務,實際上這樣的設計思想在需要框架中都會使用。

需要注意線程和任務之間的區別,任務是保存在 workQueue 中的,線程是從線程池里面取的,由 CAPACITY 控制容量。


addWorker 方法分析

addWorker 方法的主要工作是在線程池中創建一個新的線程并執行,firstTask 參數用于指定新增的線程執行的第一個任務,core 參數為 true 表示在新增線程時會判斷當前活動線程數是否少于 corePoolSize,false 表示新增線程前需要判斷當前活動線程數是否少于 maximumPoolSize,代碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取運行狀態
        int rs = runStateOf(c);
        
        /*
         * 這個if判斷
         * 如果rs >= SHUTDOWN,則表示此時不再接收新任務;
         * 接著判斷以下3個條件,只要有1個不滿足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務
         * 2. firsTask為空
         * 3. 阻塞隊列不為空
         * 
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;
         * 然后,如果firstTask為空,并且workQueue也為空,則返回false,
         * 因為隊列中已經沒有任務了,不需要再添加線程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取線程數
            int wc = workerCountOf(c);
            // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
            // 這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較,
            // 如果為false則根據maximumPoolSize來比較。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount,如果成功,則跳出第一個for循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失敗,則重新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果當前的運行狀態不等于rs,說明狀態已被改變,返回第一個for循環繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來創建Worker對象
        w = new Worker(firstTask);
        // 每一個Worker對象都會創建一個線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態;
                // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態并且firstTask為null,向線程池中添加線程。
                // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄著線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這里需要注意有以下幾點:

  1. 在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。

  2.  t.start()會調用 Worker 類中的 run 方法,Worker 本身實現了 Runnable 接口。原因在創建線程得時候,將 Worker 實例傳入了 t 當中,可參見 Worker 類的構造函數。

  3. wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次調用 addWorker 來添加線程會先判斷當前線程數是否超過了CAPACITY,然后再去判斷是否超 corePoolSize 或 maximumPoolSize,說明線程數實際上是由 CAPACITY 來控制的。


內部類 Worker 分析

上面分析過程中,提到了一個 Worker 類,對于某些對源碼不是很熟悉得同學可能有點不清楚,下面就來看看 Worker 的源碼:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
       // 注意此處傳入的是this
this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */
     // 這里其實會調用外部的 runWorker 方法來執行自己。
public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) {
       // 如果已經設置過1了,這時候在設置1就會返回false,也就是不可重入
if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }      // 提供安全中斷線程得方法 void interruptIfStarted() { Thread t;
       // 一開始 setstate(-1) 避免了還沒開始運行就被中斷可能
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

首先看到的是 Worker 繼承了(AbstractQueuedSynchronizer) AQS,并實現了 Runnable 接口,說明 Worker 本身也是線程。然后看其構造函數可以發現,內部有兩個屬性變量分別是 Runnable 和 Thread 實例,該類其實就是對傳進來得屬性做了一個封裝,并加入了獲取鎖的邏輯(繼承了 AQS )。具體可參考文章:透過 ReentrantLock 分析 AQS 的實現原理

Worker 繼承了 AQS,使用 AQS 來實現獨占鎖的功能。為什么不使用 ReentrantLock 來實現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:

  1. lock 方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;

  2. 如果正在執行任務,則不應該中斷線程;

  3. 如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷;

  4. 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是否是空閑狀態;

  5. 之所以設置為不可重入,是因為我們不希望任務在調用像 setCorePoolSize 這樣的線程池控制方法時重新獲取鎖。如果使用 ReentrantLock,它是可重入的,這樣如果在任務中調用了如 setCorePoolSize 這類線程池控制的方法,會中斷正在運行的線程,因為 size 小了,需要中斷一些線程 。

所以,Worker 繼承自 AQS,用于判斷線程是否空閑以及是否可以被中斷。

此外,在構造方法中執行了 setState(-1);,把 state 變量設置為 -1,為什么這么做呢?是因為 AQS 中默認的 state 是 0,如果剛創建了一個 Worker 對象,還沒有執行任務時,這時就不應該被中斷,看一下 tryAquire 方法: 

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

正因為如此,在 runWorker 方法中會先調用 Worker 對象的 unlock 方法將 state 設置為 0。tryAcquire 方法是根據 state 是否是 0 來判斷的,所以,setState(-1);將 state 設置為 -1 是為了禁止在執行任務前對線程進行中斷。


 runWorker 方法分析

前面提到了內部類 Worker 的 run 方法調用了外部類 runWorker,下面來看下 runWork 的具體邏輯。

final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
       w.unlock(); // status 設置為0,允許中斷,也可以避免再次加鎖失敗
       boolean completedAbruptly = true;
       try {
           while (task != null || (task = getTask()) != null) {
               // 要派發task的時候,需要上鎖
               w.lock();
               // 如果線程池當前狀態至少是stop,則設置中斷標志;
               // 如果線程池當前狀態是RUNNININ,則重置中斷標志,重置后需要重新
               //檢查下線程池狀態,因為當重置中斷標志時候,可能調用了線程池的shutdown方法
               //改變了線程池狀態。
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(), STOP))) &&
                   !wt.isInterrupted())
                   wt.interrupt();

               try {
                   //任務執行前干一些事情
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();//執行任務
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       //任務執行完畢后干一些事情
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   //統計當前worker完成了多少個任務
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {

           //執行清了工作
           processWorkerExit(w, completedAbruptly);
       }
   }

總結一下 runWorker 方法的執行過程:

  1. while 循環不斷地通過 getTask() 方法從阻塞隊列中取任務;

  2. 如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態;

  3. 調用 task.run()執行任務;

  4. 如果 task 為 null 則跳出循環,執行 processWorkerExit 方法;

  5. runWorker 方法執行完畢,也代表著 Worker 中的 run 方法執行完畢,銷毀線程。

這里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 類中是空的,留給子類來實現。

completedAbruptly 變量來表示在執行任務過程中是否出現了異常,在 processWorkerExit 方法中會對該變量的值進行判斷。


 

getTask 方法分析

getTask 方法是從阻塞隊列里面獲取任務,具體代碼邏輯如下:

private Runnable getTask() {
    // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /*
         * 如果線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行以下判斷:
         * 1. rs >= STOP,線程池是否正在stop;
         * 2. 阻塞隊列是否為空。
         * 如果以上條件滿足,則將workerCount減1并返回null。
         * 因為如果當前線程池狀態的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed變量用于判斷是否需要進行超時控制。
        // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
        // wc > corePoolSize,表示當前線程池中的線程數量大于核心線程數量;
        // 對于超過核心線程數量的這些線程,需要進行超時控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        /*
         * wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法;
         * timed && timedOut 如果為true,表示當前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務發生了超時
         * 接下來判斷,如果有效線程數量大于1,或者阻塞隊列是空的,那么嘗試將workerCount減1;
         * 如果減1失敗,則返回重試。
         * 如果wc == 1時,也就說明當前線程是線程池中唯一的一個線程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            /*
             * 根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null;
             * 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,說明已經超時,timedOut設置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果獲取任務時當前線程發生了中斷,則設置timedOut為false并返回循環重試
            timedOut = false;
        }
    }
}

其實到這里后,你會發現在 ThreadPoolExcute 內部有幾個重要的檢驗:

  • 判斷當前的運行狀態,根據運行狀態來做處理,如果當前都停止運行了,那很多操作也就沒必要了;

  • 判斷當前線程池的數量,然后將該數據和 corePoolSize 以及 maximumPoolSize 進行比較,然后再去決定下一步該做啥;

首先是第一個 if 判斷,當運行狀態處于非 RUNNING 狀態,此外 rs >= STOP(線程池是否正在 stop)或阻塞隊列是否為空。則將 workerCount 減 1 并返回 null。為什么要減 1 呢,因為此處其實是去獲取一個 task,但是發現處于停止狀態了,也就是沒必要再去獲取運行任務了,那這個線程就沒有存在的意義了。后續也會在 processWorkerExit 將該線程移除。

第二個 if 條件目的是控制線程池的有效線程數量。由上文中的分析可以知道,在執行 execute 方法時,如果當前線程池的線程數量超過了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前線程池中不需要那么多線程來執行任務了,可以把多于 corePoolSize 數量的線程銷毀掉,保持線程數量在 corePoolSize 即可。

什么時候會銷毀?當然是 runWorker 方法執行完之后,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。

getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然后會執行 processWorkerExit 方法。


 

processWorkerExit 方法

下面在看 processWorkerExit 方法的具體邏輯:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值為true,則說明線程執行時出現了異常,需要將workerCount減1;
    // 如果線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操作,這里就不必再減了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統計完成的任務數
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示著從線程池中移除了一個工作線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據線程池狀態進行判斷是否結束線程池
    tryTerminate();
    int c = ctl.get();
    /*
     * 當線程池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那么會直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待隊列有任務,至少保留一個worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit 執行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期。但是這有兩點需要注意:

  1. 大家想想什么時候才會調用這個方法,任務干完了才會調用。那么沒事做了,就需要看下是否有必要結束線程池,這時候就會調用 tryTerminate。

  2. 如果此時線程處于 STOP 狀態以下,那么就會判斷核心線程數是否達到了規定的數量,沒有的話,就會繼續創建一個線程。


tryTerminate方法

tryTerminate 方法根據線程池狀態進行判斷是否結束線程池,代碼如下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 當前線程池的狀態為以下幾種情況時,直接返回:
         * 1. RUNNING,因為還在運行中,不能停止;
         * 2. TIDYING或TERMINATED,因為線程池中已經沒有正在運行的線程了;
         * 3. SHUTDOWN并且等待隊列非空,這時要執行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果線程數量不為0,則中斷一個空閑的工作線程,并返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 這里嘗試設置狀態為TIDYING,如果設置成功,則調用terminated方法
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // terminated方法默認什么都不做,留給子類實現
                    terminated();
                } finally {
                    // 設置狀態為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

interruptIdleWorkers(boolean onlyOne) 如果 ONLY_ONE = true 那么就的最多讓一個空閑線程發生中斷,ONLY_ONE = false 時是所有空閑線程都會發生中斷。那線程什么時候會處于空閑狀態呢?

一是線程數量很多,任務都完成了;二是線程在 getTask 方法中執行 workQueue.take() 時,如果不執行中斷會一直阻塞。

所以每次在工作線程結束時調用 tryTerminate 方法來嘗試中斷一個空閑工作線程,避免在隊列為空時取任務一直阻塞的情況。


 

shutdown方法

shutdown 方法要將線程池切換到 SHUTDOWN 狀態,并調用 interruptIdleWorkers 方法請求中斷所有空閑的 worker,最后調用 tryTerminate 嘗試結束線程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 安全策略判斷
        checkShutdownAccess();
        // 切換狀態為SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷空閑線程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試結束線程池
    tryTerminate();
}

這里思考一個問題:在 runWorker 方法中,執行任務時對 Worker 對象 w 進行了 lock 操作,為什么要在執行任務的時候對每個工作線程都加鎖呢?

下面仔細分析一下:

  • 在 getTask 方法中,如果這時線程池的狀態是 SHUTDOWN 并且 workQueue 為空,那么就應該返回 null 來結束這個工作線程,而使線程池進入 SHUTDOWN 狀態需要調用shutdown 方法;

  • shutdown 方法會調用 interruptIdleWorkers 來中斷空閑的線程,interruptIdleWorkers 持有 mainLock,會遍歷 workers 來逐個判斷工作線程是否空閑。但 getTask 方法中沒有mainLock;

  • 在 getTask 中,如果判斷當前線程池狀態是 RUNNING,并且阻塞隊列為空,那么會調用 workQueue.take() 進行阻塞;

  • 如果在判斷當前線程池狀態是 RUNNING 后,這時調用了 shutdown 方法把狀態改為了 SHUTDOWN,這時如果不進行中斷,那么當前的工作線程在調用了 workQueue.take() 后會一直阻塞而不會被銷毀,因為在 SHUTDOWN 狀態下不允許再有新的任務添加到 workQueue 中,這樣一來線程池永遠都關閉不了了;

  • 由上可知,shutdown 方法與 getTask 方法(從隊列中獲取任務時)存在競態條件;

  • 解決這一問題就需要用到線程的中斷,也就是為什么要用 interruptIdleWorkers 方法。在調用 workQueue.take() 時,如果發現當前線程在執行之前或者執行期間是中斷狀態,則會拋出 InterruptedException,解除阻塞的狀態;

  • 但是要中斷工作線程,還要判斷工作線程是否是空閑的,如果工作線程正在處理任務,就不應該發生中斷;

  • 所以 Worker 繼承自 AQS,在工作線程處理任務時會進行 lock,interruptIdleWorkers 在進行中斷時會使用 tryLock 來判斷該工作線程是否正在處理任務,如果 tryLock 返回 true,說明該工作線程當前未執行任務,這時才可以被中斷。

下面就來分析一下 interruptIdleWorkers 方法。

interruptIdleWorkers方法

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers 遍歷 workers 中所有的工作線程,若線程沒有被中斷 tryLock 成功,就中斷該線程。

為什么需要持有 mainLock ?因為 workers 是 HashSet 類型的,不能保證線程安全。


 

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 中斷所有工作線程,無論是否空閑
        interruptWorkers();
        // 取出隊列中沒有被執行的任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow 方法與 shutdown 方法類似,不同的地方在于:

  1. 設置狀態為 STOP;

  2. 中斷所有工作線程,無論是否是空閑的;

  3. 取出阻塞隊列中沒有被執行的任務并返回。

shutdownNow 方法執行完之后調用 tryTerminate 方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置為 TERMINATED。


 

線程池的監控

通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

  • getTaskCount:線程池已經執行的和未執行的任務總數;

  • getCompletedTaskCount:線程池已完成的任務數量,該值小于等于 taskCount;

  • getLargestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize;

  • getPoolSize:線程池當前的線程數量;

  • getActiveCount:當前線程池中正在執行任務的線程數量。

通過這些方法,可以對線程池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計線程池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展。

到此,關于 ThreadPoolExecutor 的內容就講完了。

  

 參考文獻

Java中線程池ThreadPoolExecutor原理探究

【Java】 之ThreadPoolExcutor源碼淺析

線程池ThreadPoolExecutor實現原理

深入理解Java線程池:ThreadPoolExecutor

 

posted @ 2020-04-07 23:49  huansky  閱讀(...)  評論(...編輯  收藏
最新chease0ldman老人