<sub id="gqw76"><listing id="gqw76"></listing></sub>
      <sub id="gqw76"><listing id="gqw76"></listing></sub>

    1. <form id="gqw76"><legend id="gqw76"></legend></form>
    2. DelayQueue延遲隊列原理剖析

      DelayQueue延遲隊列原理剖析

      介紹

      DelayQueue隊列是一個延遲隊列,DelayQueue中存放的元素必須實現Delayed接口的元素,實現接口后相當于是每個元素都有個過期時間,當隊列進行take獲取元素時,先要判斷元素有沒有過期,只有過期的元素才能出隊操作,沒有過期的隊列需要等待剩余過期時間才能進行出隊操作。

      源碼分析

      DelayQueue隊列內部使用了PriorityQueue優先隊列來進行存放數據,它采用的是二叉堆進行的優先隊列,使用ReentrantLock鎖來控制線程同步,由于內部元素是采用的PriorityQueue來進行存放數據,所以Delayed接口實現了Comparable接口,用于比較來控制優先級,如下代碼所示:

       1public interface Delayed extends Comparable<Delayed{
      2
      3    /**
      4     * Returns the remaining delay associated with this object, in the
      5     * given time unit.
      6     *
      7     * @param unit the time unit
      8     * @return the remaining delay; zero or negative values indicate
      9     * that the delay has already elapsed
      10     */

      11    long getDelay(TimeUnit unit);
      12}

      DelayQueue的成員變量如下所示:

       1// 鎖。
      2private final transient ReentrantLock lock = new ReentrantLock();
      3// 優先隊列。
      4private final PriorityQueue<E> q = new PriorityQueue<E>();
      5
      6/**
      7 * Leader-Follower的變種。
      8 * Thread designated to wait for the element at the head of
      9 * the queue.  This variant of the Leader-Follower pattern
      10 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
      11 * minimize unnecessary timed waiting.  When a thread becomes
      12 * the leader, it waits only for the next delay to elapse, but
      13 * other threads await indefinitely.  The leader thread must
      14 * signal some other thread before returning from take() or
      15 * poll(...), unless some other thread becomes leader in the
      16 * interim.  Whenever the head of the queue is replaced with
      17 * an element with an earlier expiration time, the leader
      18 * field is invalidated by being reset to null, and some
      19 * waiting thread, but not necessarily the current leader, is
      20 * signalled.  So waiting threads must be prepared to acquire
      21 * and lose leadership while waiting.
      22 */

      23private Thread leader = null;
      24
      25/**
      26 * Condition signalled when a newer element becomes available
      27 * at the head of the queue or a new thread may need to
      28 * become leader.
      29 */

      30// 條件,代表如果有數據則通知Follower線程,喚醒線程處理隊列內容。
      31private final Condition available = lock.newCondition();

      Leader-Follower模式的變種,用于最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。

      offer操作

       1public boolean offer(E e) {
      2    // 獲取到鎖
      3    final ReentrantLock lock = this.lock;
      4    lock.lock();
      5    try {
      6        // 將元素存儲到PriorityQueue優先隊列中
      7        q.offer(e);
      8        // 如果第一個元素是當前元素,說明之前隊列中為空,則先將Leader設置為空,通知等待線程可以爭搶Leader了。
      9        if (q.peek() == e) {
      10            leader = null;
      11            available.signal();
      12        }
      13        // 返回成功
      14        return true;
      15    } finally {
      16        lock.unlock();
      17    }
      18}

      offer操作前先進行獲取鎖的操作,也就是同一時間內只能有一個線程可以入隊操作。

      1. 獲取到ReentrantLock鎖對象。
      2. 將元素添加到PriorityQueue優先隊列中
      3. 如果隊列中最早過期的元素是自己,則說明隊列原先是空的,所以將Leader進行重置,通知Follower線程可以成為Leader線程。
      4. 最后進行解鎖操作。

      put操作

      put操作其實就是調用的offer操作來進行添加數據的,以下是源碼信息:

      1public void put(E e) {
      2    offer(e);
      3}

      take操作

       1public E take() throws InterruptedException {
      2    final ReentrantLock lock = this.lock;
      3    // 獲取可中斷的鎖。
      4    lock.lockInterruptibly();
      5    try {
      6        // 循環獲取數據。
      7        for (;;) {
      8            // 獲取最早過期的元素,但是不彈出對象。
      9            E first = q.peek();
      10            // 如果最早過期的元素為空,說明隊列為空,則線程直接進入無限期等待,并且讓出鎖。
      11            if (first == null)
      12                // 當前線程無限期等待,直到被喚醒,并且讓出鎖對象。
      13                available.await();
      14            else {
      15                // 獲取最早過期的元素剩余過期時間。
      16                long delay = first.getDelay(NANOSECONDS);
      17                // 如果剩余過期時間小于0,則說明已經過期,反之還沒有過期。
      18                if (delay <= 0)
      19                    // 如果已經過期直接獲取最早過期的元素,并返回。
      20                    return q.poll();
      21                // 如果剩余過期日期大于0,則會進入到這里。
      22                // 將剛才獲取的最早過期的元素設置為空。
      23                first = null// don't retain ref while waiting
      24                // 如果有線程爭搶的Leader線程,則進行無限期等待。
      25                if (leader != null)
      26                    // 無限期等待并讓出鎖。
      27                    available.await();
      28                else {
      29                    // 獲取當前線程。
      30                    Thread thisThread = Thread.currentThread();
      31                    // 設置當前線程變為Leader線程。
      32                    leader = thisThread;
      33                    try {
      34                        // 等待剩余等待時間。
      35                        available.awaitNanos(delay);
      36                    } finally {
      37                        // 將Leader設置為null。
      38                        if (leader == thisThread)
      39                            leader = null;
      40                    }
      41                }
      42            }
      43        }
      44    } finally {
      45        // 如果隊列不為空,并且沒有Leader則通知等待線程可以成為Leader。
      46        if (leader == null && q.peek() != null)
      47            // 通知等待線程。
      48            available.signal();
      49        lock.unlock();
      50    }
      51}
      1. 當獲取元素時,先獲取到鎖對象。
      2. 獲取最早過期的元素,但是并不從隊列中彈出元素。
      3. 最早過期元素是否為空,如果為空則直接讓當前線程無限期等待狀態,并且讓出當前鎖對象。
      4. 如果最早過期的元素不為空
      • 獲取最早過期元素的剩余過期時間,如果已經過期則直接返回當前元素
      • 如果沒有過期,也就是說剩余時間還存在,則先獲取Leader對象,如果Leader已經有線程在處理,則當前線程進行無限期等待,如果Leader為空,則首先將Leader設置為當前線程,并且讓當前線程等待剩余時間。
      • 最后將Leader線程設置為空
      1. 如果Leader已經為空,并且隊列有內容則喚醒一個等待的隊列。

      poll操作

      獲取最早過期的元素,如果隊列頭沒有過期的元素則直接返回null,反之返回過期的元素。

       1public E poll() {
      2    final ReentrantLock lock = this.lock;
      3    lock.lock();
      4    try {
      5        E first = q.peek();
      6        // 如果隊列為空或者隊列最早過期的元素沒有過期,則返回null。
      7        if (first == null || first.getDelay(NANOSECONDS) > 0)
      8            return null;
      9        else
      10            // 出隊列操作。
      11            return q.poll();
      12    } finally {
      13        lock.unlock();
      14    }
      15}

      小結

      1. DelayQueue是一個無界的并發延遲阻塞隊列,隊列中的元素必須實現Delayed接口,相應了需要實現Comparable接口實現比較的方法
      2. Leader-Follower模式的變種,用于最小化不必要的定時等待,當一個線程被選擇為Leader時,它會等待延遲過去執行代碼邏輯,而其他線程則需要無限期等待,在從take或poll返回之前,每當隊列的頭部被替換為具有更早到期時間的元素時,leader字段將通過重置為空而無效,Leader線程必須向其中一個Follower線程發出信號,被喚醒的 follwer 線程被設置為新的Leader 線程。

      喜歡的同學點贊關注下微信公眾號,推送優質文章。

      本博客文章皆為原創作品,轉載請注明出處?。?!謝謝
      posted @ 2021-06-07 22:44  BattleHeart  閱讀(306)  評論(0編輯  收藏  舉報

      最新chease0ldman老人|无码亚洲人妻下载|大香蕉在线看好吊妞视频这里有精品www|亚洲色情综合网

        <sub id="gqw76"><listing id="gqw76"></listing></sub>
        <sub id="gqw76"><listing id="gqw76"></listing></sub>

      1. <form id="gqw76"><legend id="gqw76"></legend></form>