<sub id="gqw76"><listing id="gqw76"></listing></sub>
      <sub id="gqw76"><listing id="gqw76"></listing></sub>

    1. <form id="gqw76"><legend id="gqw76"></legend></form>
    2. 深入淺出AQS源碼解析

      最近一直在研究AQS的源碼,希望可以更深刻的理解AQS的實現原理。雖然網上有很多關于AQS的源碼分析,但是看完以后感覺還是一知半解。于是,我將自己的整個理解過程記錄下來了,希望對大家有所幫助。

      基本原理

      AQS是Java中鎖的基礎,主要由兩個隊列組成。一個隊列是同步隊列,另一個是條件隊列

      同步隊列的原理

      • 同步隊列的隊列頭部是head,隊列尾部是tail節點,head節點是一個空節點,同步隊列是一個雙向鏈表,通過nextprev連接所有節點
      • 所有的線程在競爭鎖的時候都會創建一個Node節點,線程與節點綁定在一起,(如果是同步鎖和排他鎖不同之處是通過nextWaiter來區分的)并且添加到同步隊列的尾部
      • head的第一個節點獲取鎖,其余節點都需要等待被喚醒
      • 同步隊列中的節點會存在取消和null的情況(如:線程超時中斷、線程更新節點的中間態),被取消和null的節點不能被喚醒,將會被視為無效節點
      • 一個線程只能被有效的前驅節點(取消和null的節點除外)喚醒
      • 持有鎖的線程只能是有一個,其他有效節點對應的線程都會被掛起

      條件隊列的原理

      • 一個同步隊列可以對應多個條件隊列
      • 條件隊列是一個單向鏈表,通過nextWaiter來連接起來,條件隊列的頭節點是firstWaiter,尾節點是lastWaiter
      • 某個條件隊列中滿足條件的節點(被signalsignalAll方法喚醒的節點)才會被轉移到同步隊列
      • 條件隊列中的被轉移到同步隊列的節點是從頭節點開始,條件隊列中被阻塞的線程會添加到隊列的尾部

      同步隊列的實現

      首先,了解以下同步隊列中隊列的節點Node的數據結構

      static final class Node {
              /** 共享鎖的標識 */
              static final Node SHARED = new Node();
              /** 排他鎖的標識 */
              static final Node EXCLUSIVE = null;
      
              /** 線程取消 */
              static final int CANCELLED =  1;
              /** 持有鎖的線程的后繼線程被掛起 */
              static final int SIGNAL    = -1;
              /** 條件隊列標識 */
              static final int CONDITION = -2;
              /**
               * 共享鎖情況下,通知所有其他節點
               */
              static final int PROPAGATE = -3;
      
              /**
               * waitStatus的取值如下:
               *   SIGNAL(-1): 當前節點的后繼節點應該被掛起
               *   CANCELLED(1): 當前節點被取消
               *   CONDITION(-2): 當前節點在條件隊列
               *   PROPAGATE(-3): 釋放共享鎖時需要通知所有節點
               *   0: 初始值
               *
               */
              volatile int waitStatus;
      
              /**
               * 前驅節點
               */
              volatile Node prev;
      
              /**
               * 后繼節點
               */
              volatile Node next;
      
              /**
               * 節點對應的線程
               */
              volatile Thread thread;
      
              /**
               * 在共享鎖的情況下,該節點的值為SHARED
               * 在排他鎖的情況下,該節點的值為EXCLUSIVE
               * 在條件隊列的情況下,鏈接的是下一個等待條件的線程
               */
              Node nextWaiter;
      }
      

      其次,我們來看一下同步隊列的鏈表結構
      同步隊列鏈表

      接著,我們根據同步隊列的原理來分析以下acquirerelease需要做哪些事情:

      實現acquire功能需要做的事情

      1. 創建一個Node節點node(該節點可能是排他鎖,也可以能是共享鎖)
      2. node添加到同步隊列尾部,如果同步隊列為空(初始情況下),需要先創建一個空的頭節點,然后再添加到隊列的尾部
      3. 如果node的前驅節點是head,說明node是第一個節點,能夠獲取鎖,需要將head修改成node,釋放前驅節點的資源
      4. 如果node的前驅節點不是head,說明獲取鎖失敗,需要檢測是否需要將node綁定的線程掛起,分以下幾種情況:
        • 如果nodewaitStatus已經被設置為SIGNAL 表示需要被掛起
        • 如果nodewaitStatus設置為CANCEL表示該節點已經被取消,需要被去掉,并修改 nodeprev,直到鏈接上一個有效的節點為止
        • 否則將nodewaitStatus設置為SIGNAL,表示即將要被掛起
      5. 如果需要將node綁定的線程掛起,則讓出CPU,直到當前驅節點來喚起node才會開始繼續從步驟3開始執行

      與acquire功能相關的代碼

      • acquire方法:獲取排他鎖
      public final void acquire(int arg) {
              if (!tryAcquire(arg) &&
                  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                  selfInterrupt();
          }
      
      1. tryAcquire(arg):對外提供的一個擴展方法,常用的鎖都要實現這個方法,具體實現與鎖相關

      2. addWaiter(Node.EXCLUSIVE): 創建一個排他鎖節點,并將該節點添加到同步隊列尾部,代碼如下:

      private Node addWaiter(Node mode) {
              // 創建一個node,EXCLUSIVE類型
              Node node = new Node(mode);
      
              for (;;) {
                  // 獲取尾節點
                  Node oldTail = tail;
                  if (oldTail != null) {
                      // 設置即將成為尾節點的前驅
                      node.setPrevRelaxed(oldTail);
                      // CAS操作設置尾節點
                      if (compareAndSetTail(oldTail, node)) {
                          // 將新尾節點的前驅節點與新的尾節點關聯起來
                          oldTail.next = node;
                          // 返回添加的節點
                          // 這個節點現在不一定是尾節點,因為如果有多個線程調用這個方法時,
                          // 可能還有節點添加在這個節點后面
                          return node;
                      }
                  } else {
                      // 如果隊列為空,初始化頭節點
                      initializeSyncQueue();
                  }
              }
          }
      
      1. acquireQueued同步隊列中的節點獲取排他鎖
      final boolean acquireQueued(final Node node, int arg) {
              try {
                  // 線程是否中斷
                  boolean interrupted = false;
                  for (;;) {
                      // 獲取前驅節點
                      final Node p = node.predecessor();
                      // 如果前驅節點是頭節點,獲取鎖
                      if (p == head && tryAcquire(arg)) {
                          // 修改頭節點
                          setHead(node);
                          // 釋放頭節點的資源
                          p.next = null; // help GC
                          // 返回線程中斷的狀態
                          // 這也是該方法唯一的返回值
                          // 沒有獲取鎖的線程會一直執行該方法直到獲取鎖以后再返回
                          return interrupted;
                      }
                      // 獲取鎖失敗后是否需要將線程掛起
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt()) // 線程掛起并返回是否被中斷
                          interrupted = true;
                  }
              } catch (Throwable t) {
                  // 取消該節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      
      1. shouldParkAfterFailedAcquire:檢測線程獲取鎖失敗以后是否需要被掛起
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
              // 前驅節點的狀態
              int ws = pred.waitStatus;
              if (ws == Node.SIGNAL)
                  /*
                   * 狀態已經設置成SIGNAL,可以直接掛起該節點
                   */
                  return true;
              // 節點被取消
              if (ws > 0) {
                  /*
                   * 找到pred第一個有效的前驅節點
                   */
                  do {
                      node.prev = pred = pred.prev;
                  } while (pred.waitStatus > 0);
                  // pred可能是一個新的節點,需要將pred的next重寫設置為node
                  pred.next = node;
              } else {
                  /*
                   * CAS操作將pred節點的狀態設置為SIGNAL
                   */
                  pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
              }
              // 只有當pred節點的waitStatus已經是SIGNAL狀態時,才可以安全的掛起線程
              // 否則需要不能被掛起
              return false;
          }
      
      1. parkAndCheckInterrupt:將當前線程掛起,并檢測當前線程是否中斷
      private final boolean parkAndCheckInterrupt() {
              // 線程掛起
              LockSupport.park(this);
              // 檢測線程是否中斷
              return Thread.interrupted();
          }
      
      1. cancelAcquire:取消節點
       private void cancelAcquire(Node node) {
              // 如果節點為空,什么都不做
              if (node == null)
                  return;
              // 釋放線程
              node.thread = null;
      
              // 從后往前過濾掉所有的被取消的節點
              Node pred = node.prev;
              while (pred.waitStatus > 0)
                  node.prev = pred = pred.prev;
      
              // 有效前驅節點的nex節點
              Node predNext = pred.next;
      
              // 將node設置為CANCELLED
              node.waitStatus = Node.CANCELLED;
      
              // 如果是尾節點,設置新的尾節點
              if (node == tail && compareAndSetTail(node, pred)) {
                  // 將新的尾節點的后續設置為null
                  pred.compareAndSetNext(predNext, null);
              } else {
                  // If successor needs signal, try to set pred's next-link
                  // so it will get one. Otherwise wake it up to propagate.
                  int ws;
                  // 如果前驅節點的線程不為null并且waitStatus為SIGNAL
                  if (pred != head &&
                      ((ws = pred.waitStatus) == Node.SIGNAL ||
                       (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                      pred.thread != null) {
                      Node next = node.next;
                      // 將node設置成pred的后繼節點
                      if (next != null && next.waitStatus <= 0)
                          pred.compareAndSetNext(predNext, next);
                  } else {
                      // 喚起node節點的后繼節點
                      // 因為node節點已經釋放鎖了
                      unparkSuccessor(node);
                  }
      
                  node.next = node; // help GC
              }
          }
      
      1. unparkSuccessor:喚醒后繼節點
      private void unparkSuccessor(Node node) {
              /*
               * 獲取node節點的waitStatus
               */
              int ws = node.waitStatus;
             // 用CSA操作將waitStatus設置成初始狀態
             // 不管設置是否成功,都無所謂,因為該節點即將被銷毀
              if (ws < 0)
                  node.compareAndSetWaitStatus(ws, 0);
              /*
               * 獲取node的后繼節點
               */
              Node s = node.next;
              // 如果后繼節點為null或者被取消,
              // 通過從同步隊列的尾節點開始一直往前找到一個有效的后繼節點
              if (s == null || s.waitStatus > 0) {
                  s = null;
                  for (Node p = tail; p != node && p != null; p = p.prev)
                      if (p.waitStatus <= 0)
                          s = p;
              }
              // 如果后繼節點不為空
              if (s != null)
                  LockSupport.unpark(s.thread);// 喚醒后繼節點的線程
          }
      

      acquire方法類似的還有acquireInterruptiblytryAcquireNanosacquireSharedacquireSharedInterruptiblytryAcquireSharedNanos,我們都一一分析以下

      • acquireInterruptibly方法:獲取可中斷的排他鎖
      public final void acquireInterruptibly(int arg)
                  throws InterruptedException {
              if (Thread.interrupted()) // 如果線程中斷,直接返回
                  throw new InterruptedException();
              if (!tryAcquire(arg))
                  doAcquireInterruptibly(arg); // 中斷式的獲取鎖
          }
      
      1. doAcquireInterruptibly:可中斷式的獲取鎖
      private void doAcquireInterruptibly(int arg)
              throws InterruptedException {
             // 創建一個排他節點加入同步隊列
              final Node node = addWaiter(Node.EXCLUSIVE);
              try {
                  for (;;) {
                      // 獲取前驅節點
                      final Node p = node.predecessor();
                      // 如果前驅節點是頭節點,說明已經獲取的鎖
                      if (p == head && tryAcquire(arg)) {
                          // 修改頭節點
                          setHead(node);
                          p.next = null; // help GC
                          return;
                      }
                      // 如果沒有獲取鎖,檢測是否需要掛起
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          throw new InterruptedException(); // 如果發現線程已經被中斷,需要拋出異常
                  }
              } catch (Throwable t) {
                  // 發生異常取消節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      
      • tryAcquireNanos方法:超時中斷獲取排他鎖
      public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException(); // 線程中斷直接返回
              return tryAcquire(arg) ||
                  doAcquireNanos(arg, nanosTimeout); // 超時獲取排他鎖
          }
      
      1. doAcquireNanos:超時獲取排他鎖
      private boolean doAcquireNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
              // 如果超時直接返回
              if (nanosTimeout <= 0L)
                  return false;
              // 獲取超時時長
              final long deadline = System.nanoTime() + nanosTimeout;
              // 添加一個排他節點到同步隊列尾部
              final Node node = addWaiter(Node.EXCLUSIVE);
              try {
                  for (;;) {
                       // 獲取前驅節點
                      final Node p = node.predecessor();
                      // 已經獲取鎖
                      if (p == head && tryAcquire(arg)) {
                          setHead(node);
                          p.next = null; // help GC
                          return true;
                      }
                      nanosTimeout = deadline - System.nanoTime();
                      // 如果超時了就取消
                      if (nanosTimeout <= 0L) {
                          cancelAcquire(node);
                          return false;
                      }
                      // 檢測節點是否需要被掛起
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                          // 如果需要掛起,且超時時長大于SPIN_FOR_TIMEOUT_THRESHOLD
                          // 線程掛起nanosTimeout時間
                          LockSupport.parkNanos(this, nanosTimeout); 
                      if (Thread.interrupted())
                          throw new InterruptedException();
                  }
              } catch (Throwable t) {
                  // 發生異常取消節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      
      • acquireShared方法:獲取共享鎖
      public final void acquireShared(int arg) {
              // 對外提供的一個擴展方法,常用的鎖都要實現這個方法,
              // 該方法的實現與鎖的用途有關
              if (tryAcquireShared(arg) < 0) 
                  doAcquireShared(arg); // 獲取共享鎖
          }
      
      1. doAcquireShared:獲取共享鎖
       private void doAcquireShared(int arg) {
              // 添加一個共享節點到同步隊列尾部
              final Node node = addWaiter(Node.SHARED);
              try {
                  boolean interrupted = false;
                  for (;;) {
                      // 獲取前驅節點
                      final Node p = node.predecessor();
                      if (p == head) {
                          // 返回結果大于等于0表示獲取共享鎖
                          int r = tryAcquireShared(arg);
                          if (r >= 0) {
                              // 設置頭節點并廣播通知其他獲取共享鎖的節點
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GC
                              // 如果線程被中斷,將該線程中斷
                              // 共享鎖會被多個線程獲取,如果需要中斷
                              // 所有獲取共享鎖的線程都要被中斷
                              if (interrupted)
                                  selfInterrupt();
                              return;
                          }
                      }
                      // 檢測是否需要掛起
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt()) // 掛起并中斷
                          interrupted = true;
                  }
              } catch (Throwable t) {
                  // 發生異常取消節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      
      1. setHeadAndPropagate:設置頭節點并廣播其他節點來獲取鎖
       private void setHeadAndPropagate(Node node, int propagate) {
              Node h = head; // 記錄舊的頭節點
              setHead(node);// 設置新的頭節點
              /*
               * 如果頭節點為null或者是不是取消狀態,嘗試喚醒后繼節點
               */
              if (propagate > 0 || h == null || h.waitStatus < 0 ||
                  (h = head) == null || h.waitStatus < 0) {
                  Node s = node.next;
                  // node節點的next是SHARED,即共享鎖
                  if (s == null || s.isShared())
                      // 喚起獲取共享鎖的線程
                      doReleaseShared();
              }
          }
      
      1. doReleaseShared:喚醒等待共享鎖的節點
       private void doReleaseShared() {
              /*
               * 喚醒時是從頭節點開始先喚醒第一個共享節點,
               * 第一個共享節點被喚醒后會在doAcquireShared方法里繼續執行(之前就是在這個方法里被掛起的)
               * 第一個共享節點如果獲取鎖會調用setHeadAndPropagate方法修改頭節點,然后再調用doReleaseShared方法
               * 喚醒第二個共享節點,以此類推,最后把所有的共享節點都喚醒
               */
              for (;;) {
                  Node h = head;
                  if (h != null && h != tail) {
                      // 獲取頭節點的狀態
                      int ws = h.waitStatus;
                      // 如果頭節點是SIGNAL,需要將狀態設置為0,表示已經即將被喚醒
                      if (ws == Node.SIGNAL) {
                          if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                              continue;            // 如果失敗了說明有其他線程在修改頭節點,需要繼續重試
                          unparkSuccessor(h); // 喚醒頭節點的后繼節點
                      }
                      else if (ws == 0 &&
                               !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                          continue;                // 將頭節點狀態從0設置成PROPAGATE,如果失敗了繼續,因為也有其他獲取共享鎖的線程在更改頭節點
                  }
                  // 如果頭節點未改變(因為沒有后繼節點需要等待共享鎖),跳出循環
                  if (h == head)
                      break;
              }
          }
      
      1. selfInterrupt:中斷當前線程
      static void selfInterrupt() {
          Thread.currentThread().interrupt();
      }
      
      • acquireSharedInterruptibly方法:可中斷的獲取共享鎖
      public final void acquireSharedInterruptibly(int arg)
                  throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException(); // 如果線程被中斷拋出異常
              if (tryAcquireShared(arg) < 0)
                  doAcquireSharedInterruptibly(arg); // 可中斷的方式獲取共享鎖
          }
      
      1. doAcquireSharedInterruptibly:可中斷的方式后去共享鎖
       private void doAcquireSharedInterruptibly(int arg)
              throws InterruptedException {
              // 添加共享鎖節點到同步隊列尾部
              final Node node = addWaiter(Node.SHARED);
              try {
                  for (;;) {
                      // 獲取前驅節點
                      final Node p = node.predecessor();
                      if (p == head) {
                          int r = tryAcquireShared(arg);
                          if (r >= 0) {
                              // 獲取共享鎖以后修改頭節點,通知其他等待共享鎖的節點
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GC
                              return;
                          }
                      }
                      // 線程獲取共享鎖失敗后需要掛起,并且發現線程被中斷,所以拋出異常
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          throw new InterruptedException();
                  }
              } catch (Throwable t) {
                  // 發生異常取消節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      
      • tryAcquireSharedNanos方法:超時中斷獲取共享鎖
      public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
              if (Thread.interrupted()) // 線程如果中斷了,直接拋出異常
                  throw new InterruptedException();
              return tryAcquireShared(arg) >= 0 ||
                  doAcquireSharedNanos(arg, nanosTimeout); // 超時獲取共享鎖
          }
      
      1. doAcquireSharedNanos:超時的方式獲取中斷鎖
      private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                  throws InterruptedException {
              // 超時直接返回
              if (nanosTimeout <= 0L)
                  return false;
              final long deadline = System.nanoTime() + nanosTimeout;
              // 添加共享節點到同步隊列尾部
              final Node node = addWaiter(Node.SHARED);
              try {
                  for (;;) {
                      // 獲取前驅節點
                      final Node p = node.predecessor();
                      if (p == head) {
                          int r = tryAcquireShared(arg);
                          if (r >= 0) {
                              // 獲取鎖,修改頭節點,通知所有其他等待共享鎖的節點
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GC
                              return true;
                          }
                      }
                      nanosTimeout = deadline - System.nanoTime();
                      if (nanosTimeout <= 0L) {
                          // 超時取消節點
                          cancelAcquire(node);
                          return false;
                      }
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                          // 如果需要掛起,且超時時長大于SPIN_FOR_TIMEOUT_THRESHOLD
                          // 線程掛起nanosTimeout時間
                          LockSupport.parkNanos(this, nanosTimeout);
                      if (Thread.interrupted())
                          throw new InterruptedException(); // 中斷了拋出異常
                  }
              } catch (Throwable t) {
                  // 發生異常取消節點
                  cancelAcquire(node);
                  throw t;
              }
          }
      

      實現release功能需要做的事情

      1. 釋放當前獲取鎖的線程持有的資源
      2. 喚醒有效的一個后繼節點

      與release功能相關的代碼

      • release方法:釋放排他鎖
          public final boolean release(int arg) {
              if (tryRelease(arg)) {
                  Node h = head;
                  // 頭節點不能是一個中間態
                  if (h != null && h.waitStatus != 0)
                      // 喚醒后繼節點
                      unparkSuccessor(h);
                  return true;
              }
              return false;
          }
      
      • release方法:釋放共享鎖
      public final boolean releaseShared(int arg) {
              if (tryReleaseShared(arg)) {
                  // 釋放共享鎖,從頭節點開始一個一個的釋放
                  // 如果存在多個共享節點在同步隊列時,doReleaseShared方式其實是遞歸調用
                  doReleaseShared();
                  return true;
              }
              return false;
          }
      

      至此,將所有獲取鎖和釋放鎖的方法相關的源碼全部分析完

      條件隊列的實現

      我們來看一下條件隊列的鏈表結構
      條件隊列的鏈表結構

      實現await功能需要做的事情

      1. 創建一個CONDITION類型的節點,將該節點添加到條件隊列
      2. 釋放已經獲取的鎖(因為只有當前線程先獲取了鎖才可能再調用Condition.await()方法)
      3. 如果無法獲取鎖,線程掛起

      與await功能相關的代碼

      • await方法:等待條件
      public final void await() throws InterruptedException {
                  if (Thread.interrupted())
                      throw new InterruptedException(); // 如果線程中斷,直接拋出異常
                  // 創建一個CONDITION類型的節點,將該節點添加到條件隊列尾部
                  Node node = addConditionWaiter();
                  // 釋放鎖
                  // 在調用await方法之前都會調用lock方法,這個時候已經獲取鎖了
                  // 有時候鎖還是可重入的,所以需要將所有的資源都釋放掉
                  int savedState = fullyRelease(node);
                  int interruptMode = 0;
                  // 如果節點不再同步隊列,全部都要掛起
                  while (!isOnSyncQueue(node)) {
                      LockSupport.park(this);
                      // 如果在等待期間發生過中斷(不管是調用signal之前還是之后),直接退出
                      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                          break;
                  }
                  // 讓線程嘗試去獲取鎖,如果無法獲取鎖就掛起
                  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                      interruptMode = REINTERRUPT;
                  // 清除所有在條件隊列中是取消狀態的節點
                  if (node.nextWaiter != null) // clean up if cancelled
                      unlinkCancelledWaiters();
                  // 發生中斷,上報中斷情況
                  if (interruptMode != 0)
                      reportInterruptAfterWait(interruptMode);
              }
      
      1. addConditionWaiter:在條件隊列中添加一個節點
       private Node addConditionWaiter() {
                  Node t = lastWaiter;
                  // 清除條件隊列中無效的節點
                  if (t != null && t.waitStatus != Node.CONDITION) {
                      unlinkCancelledWaiters();
                      t = lastWaiter;
                  }
                  // 創建一個節點
                  Node node = new Node(Node.CONDITION);
                  // 添加到條件隊列尾部
                  if (t == null)
                      firstWaiter = node;
                  else
                      t.nextWaiter = node;
                  lastWaiter = node;
                  return node;
              }
      
      1. unlinkCancelledWaiters:清除在條件隊列中被取消的節點
      private void unlinkCancelledWaiters() {
                  Node t = firstWaiter;
                  Node trail = null;
                  // 遍歷條件隊列將所有不是CONDITION狀態的節點全部清除掉
                  // 這些節點都是取消狀態的節點
                  while (t != null) {
                      Node next = t.nextWaiter;
                      if (t.waitStatus != Node.CONDITION) {
                          t.nextWaiter = null;
                          if (trail == null)
                              firstWaiter = next;
                          else
                              trail.nextWaiter = next;
                          if (next == null)
                              lastWaiter = trail;
                      }
                      else
                          trail = t;
                      t = next;
                  }
              }
      
      1. fullyRelease:釋放線程持有的所有鎖資源
      final int fullyRelease(Node node) {
              try {
                  int savedState = getState();
                  // 釋放所有的資源
                  // 如果是可重入鎖,savedState就是重入的次數
                  if (release(savedState))
                      return savedState;
                  throw new IllegalMonitorStateException();
              } catch (Throwable t) {
                  // 發生異常就取消該節點
                  node.waitStatus = Node.CANCELLED;
                  throw t;
              }
          }
      
      1. isOnSyncQueue:判斷節點是否在同步隊列
      final boolean isOnSyncQueue(Node node) {
              // waitStatus是CONDITION或者node沒有前驅節點,說明node不在同步隊列
              if (node.waitStatus == Node.CONDITION || node.prev == null)
                  return false;
              if (node.next != null) // 有后繼節點一定在同步隊列
                  return true;
              /*
               * 在同步隊列中查找node,看是否在同步隊列中
               */
              return findNodeFromTail(node);
          }
      
      1. findNodeFromTail:在同步隊列中查找節點
      private boolean findNodeFromTail(Node node) {
              // 從尾節點開始查找
              for (Node p = tail;;) {
                  if (p == node) // 找到了
                      return true;
                  if (p == null) // 找到頭了還沒找到
                      return false;
                  p = p.prev;
              }
          }
      
      1. checkInterruptWhileWaiting:檢測中斷的情況
      private int checkInterruptWhileWaiting(Node node) {
                  // 沒有發生中斷返回0
                  // 調用signal之前發生中斷返回THROW_IE
                  // 調用signal之后發生中斷返回REINTERRUPT
                  return Thread.interrupted() ?
                      (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                      0;
              }
      
      1. transferAfterCancelledWait:清除在條件隊列中被取消的節點
      // 只有線程處于中斷狀態,才會調用此方法
      // 如果需要的話,將這個已經取消等待的節點轉移到阻塞隊列
      // 返回 true,如果此線程在 signal 之前被取消,否則返回false
      final boolean transferAfterCancelledWait(Node node) {
        
              // 用 CAS 將節點狀態設置為 0 
              // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,
             // 因為如果 signal 先發生的話,signal 中會將 waitStatus 設置為 0
              if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
                  enq(node); // 將節點放入阻塞隊列
                  return true;
              }
              // 到這里是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設置為了 0
              // signal 方法會將節點轉移到阻塞隊列,但是可能還沒完成,這邊自旋等待其完成
              // 當然,這種事情還是比較少的吧:signal 調用之后,沒完成轉移之前,發生了中斷
              while (!isOnSyncQueue(node))
                  Thread.yield();
              return false;
          }
      
      1. enq:把節點添加到同步隊列
      private Node enq(Node node) {
              // 無限循環,將節點添加到同步隊列尾部
              for (;;) {
                  Node oldTail = tail;
                  if (oldTail != null) {
                      node.setPrevRelaxed(oldTail);
                      if (compareAndSetTail(oldTail, node)) {
                          oldTail.next = node;
                          return oldTail;
                      }
                  } else {
                      // 如果同步隊列為空,初始化
                      initializeSyncQueue();
                  }
              }
          }
      
      1. reportInterruptAfterWait:中斷處理
      private void reportInterruptAfterWait(int interruptMode)
                  throws InterruptedException {
                  // 如果是THROW_IE狀態,拋異常
                  if (interruptMode == THROW_IE)
                      throw new InterruptedException();
                  else if (interruptMode == REINTERRUPT) // 再次中斷,因為中斷狀態被使用過一次
                      selfInterrupt();
              }
      

      awaitNanosawaitUntilawait(long time, TimeUnit unit)這幾個方法的整體邏輯是一樣的,就不再分析了

      實現signal功能需要做的事情

      1. 將條件隊列中的節點加入同步隊列
      2. 喚醒線程

      與signal功能相關的代碼

      • signal方法:喚醒等待條件的節點
       public final void signal() {
                  if (!isHeldExclusively())
                      throw new IllegalMonitorStateException();
                  // 獲取條件隊列中的第一個節點
                  Node first = firstWaiter;
                  if (first != null)
                      // 喚醒等待條件的節點
                      doSignal(first); 
              }
      
      1. doSignal:喚醒等待條件的節點
      private void doSignal(Node first) {
                  do {
                      // 去掉無效的節點
                      if ( (firstWaiter = first.nextWaiter) == null)
                          lastWaiter = null;
                      first.nextWaiter = null;
                  } while (!transferForSignal(first) &&  // 將節點轉移到同步隊列
                           (first = firstWaiter) != null);
              }
      
      1. transferForSignal:將節點轉移到同步隊列
      final boolean transferForSignal(Node node) {
              /*
               * 取消的節點不需要轉移
               */
              if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
                  return false;
      
              /*
               * 將節點加入同步隊列尾部
               */
              Node p = enq(node);
              int ws = p.waitStatus;
              // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程
              // 如果 ws <= 0, 那么 compareAndSetWaitStatus 將會被調用
              // 節點入隊后,需要把前驅節點的狀態設為SIGNAL
              if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
                  // 如果前驅節點取消或者 CAS 失敗,會進到這里喚醒線程
                  LockSupport.unpark(node.thread);
              return true;
          }
      
      • signalAlll方法:喚醒所有等待條件的節點
      public final void signalAll() {
                  // 如果是當前線程
                  if (!isHeldExclusively())
                      throw new IllegalMonitorStateException();
                  Node first = firstWaiter;
                  if (first != null)
                      // 喚醒所有等待條件的節點
                      doSignalAll(first);
              }
      
      1. doSignalAll:喚醒所有等待條件的節點
      // 將所有的節點都轉移到同步隊列
      private void doSignalAll(Node first) {
                  lastWaiter = firstWaiter = null;
                  do {
                      Node next = first.nextWaiter;
                      first.nextWaiter = null;
                      transferForSignal(first);
                      first = next;
                  } while (first != null);
              }
      

      現在將與AQS相關的核心代碼都整理了一遍,里面如果有描述不清晰或者不準確的地方希望大家可以幫忙指出!

      posted @ 2020-07-13 01:05  PinXiong  閱讀(413)  評論(0編輯  收藏
      最新chease0ldman老人|无码亚洲人妻下载|大香蕉在线看好吊妞视频这里有精品www|亚洲色情综合网

        <sub id="gqw76"><listing id="gqw76"></listing></sub>
        <sub id="gqw76"><listing id="gqw76"></listing></sub>

      1. <form id="gqw76"><legend id="gqw76"></legend></form>