深入淺出ReentrantReadWriteLock源碼解析

讀寫鎖實現邏輯相對比較復雜,但是卻是一個經常使用到的功能,希望將我對ReentrantReadWriteLock的源碼的理解記錄下來,可以對大家有幫助

前提條件

在理解ReentrantReadWriteLock時需要具備一些基本的知識

理解AQS的實現原理

之前有寫過一篇《深入淺出AQS源碼解析》關于AQS的文章,對AQS原理不了解的同學可以先看一下

理解ReentrantLock的實現原理

ReentrantLock的實現原理可以參考《深入淺出ReentrantLock源碼解析》

什么是讀鎖和寫鎖

對于資源的訪問就兩種形式:要么是讀操作,要么是寫操作。讀寫鎖是將被鎖保護的臨界資源的讀操作和寫操作分開,允許同時有多個線程同時對臨界資源進行讀操作,任意時刻只允許一個線程對資源進行寫操作。簡單的說,對與讀操作采用的是共享鎖,對于寫操作采用的是排他鎖

讀寫狀態的設計

ReentrantReadWriteLock是用state字段來表示讀寫鎖重復獲取資源的次數,高16位用來標記讀鎖的同步狀態,低16位用來標記寫鎖的同步狀態

// 劃分的邊界線,用16位來劃分
static final int SHARED_SHIFT   = 16;
// 讀鎖的基本單位,也就是讀鎖加1或者減1的基本單位(1左移16位后的值)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 讀寫鎖的最大值(在計算讀鎖的時候需要先右移16位)
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 寫鎖的掩碼,state值與掩碼做與運算后得到寫鎖的真實值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 獲取資源被讀鎖占用的次數
static int sharedCount(int c){
      return c >>> SHARED_SHIFT;
}

// 獲取資源被寫鎖占用的次數
static int exclusiveCount(int c){
      return c & EXCLUSIVE_MASK;
}

在統計讀鎖被每個線程持有的次數時,ReentrantReadWriteLock采用的是HoldCounter來實現的,具體如下:

// 持有讀鎖的線程重入的次數
static final class HoldCounter {
    // 重入的次數
    int count = 0;
    // 持有讀鎖線程的線程id
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * 采用ThreadLocal機制,做到線程之間的隔離
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

/**
 * 線程持有可重入讀鎖的次數
 */
private transient ThreadLocalHoldCounter readHolds;

/**
 * 緩存最后一個成功獲取讀鎖的線程的重入次數,有兩方面的好處:
 * 1、避免了通過訪問ThreadLocal來獲取讀鎖的信息,這個優化的前提是
 *    假設多數情況下,一個獲取讀鎖的線程,使用完以后就會釋放讀鎖,
 *    也就是說最后獲取讀鎖的線程和最先釋放讀鎖的線程大多數情況下是同一個線程
 * 2、因為ThreadLocal中的key是一個弱引用類型,當有一個變量持有HoldCounter對象時,
 *    ThreadLocalHolderCounter中最后一個獲取鎖的線程信息不會被GC回收掉
 */
private transient HoldCounter cachedHoldCounter;

/**
 * 第一個獲取讀鎖的線程,有兩方面的考慮:
 * 1、記錄將共享數量從0變成1的線程
 * 2、對于無競爭的讀鎖來說進行線程重入次數數據的追蹤的成本是比較低的
 */
private transient Thread firstReader = null;

/**
 * 第一個獲取讀鎖線程的重入次數,可以參考firstReader的解析
 */
private transient int firstReaderHoldCount;

ReentrantReadWriteLock 源碼解析

ReentrantReadWriteLock 一共有5個內部類,具體如下:

  • Sync:公平鎖和非公平鎖的抽象類
  • NonfairSync:非公平鎖的具體實現
  • FairSync:公平鎖的具體實現
  • ReadLock:讀鎖的具體實現
  • WriteLock:寫鎖的具體實現

我們從讀鎖ReadLock和寫鎖WriteLock的源碼開始分析,然后順著這個思路將整個ReentrantReadWriteLock中所有的核心源碼(所有的包括內部類)進行分析。

ReadLock類源碼解析

public static class ReadLock implements Lock, java.io.Serializable {
    
    private final Sync sync;

    /**
     * 通過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量
     */
    protected ReadLock(ReentrantReadWriteLock lock) {
          sync = lock.sync;
    }

    /**
     * 阻塞的方式獲取鎖,因為讀鎖是共享鎖,所以調用acquireShared方法
     */
    public void lock() {
          sync.acquireShared(1);
    }

    /**
     * 可中斷且阻塞的方式獲取鎖
     */
    public void lockInterruptibly() throws InterruptedException {
          sync.acquireSharedInterruptibly(1);
    }

    /**
     * 超時嘗試獲取鎖,非阻塞的方式
     */
    public boolean tryLock(long timeout, TimeUnit unit)
          throws InterruptedException {
          return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    /**
     * 嘗試獲取寫鎖,非阻塞的方式
     */
    public boolean tryLock() {
          return sync.tryReadLock();
    }

    /**
     * 釋放鎖
     */
    public void unlock() {
          sync.releaseShared(1);
    }
}

接下來,我們重點看一下在公平鎖和非公平鎖下Sync.acquireSharedSync.releaseSharedSync.tryLock這3個方法的實現(acquireSharedInterruptiblytryAcquireSharedNanos是AQS中的方法,這里就不在討論了,具體可以參考《深入淺出AQS源碼解析》),其中Sync.acquireShared中核心調用的方法是Sync.tryAcquireSharedSync. releaseShared中核心調用的方法是Sync.tryReleaseSharedSync.tryLock中核心調用的方法是Sync.tryReadLock,所以我們重點分析Sync.tryAcquireShared方法、Sync.tryReleaseShared方法和sync.tryReadLock方法

Sync.tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    /**
     * 以共享鎖的方式嘗試獲取讀鎖,步驟如下:
     * 1、如果資源已經被寫鎖獲取了,直接返回失敗
     * 2、如果讀鎖不需要等待(公平鎖和非公平鎖的具體實現有區別)、
     *    并且讀鎖未超過上限、同時設置讀鎖的state值成功,則返回成功
     * 3、如果步驟2失敗了,需要進入fullTryAcquireShared函數再次嘗試獲取讀鎖
     */
    Thread current = Thread.currentThread();
    int c = getState();
    /**
     * 資源已經被寫鎖獨占,直接返回false
     */
    if (exclusiveCount(c) != 0 &&
          getExclusiveOwnerThread() != current)
          return -1;
    int r = sharedCount(c);
    /**
     * 1、讀鎖不需要等待
     * 2、讀鎖未超過上限
     * 3、設置讀鎖的state值成功
     * 則返回成功
     */
    if (!readerShouldBlock() &&
          r < MAX_COUNT &&
          compareAndSetState(c, c + SHARED_UNIT)) {
          if (r == 0) {
            // 記錄第一個獲取讀鎖的線程信息
            firstReader = current;
            firstReaderHoldCount = 1;
          } else if (firstReader == current) {
            // 第一個獲取讀鎖的線程再次獲取鎖(重入)
            firstReaderHoldCount++;
          } else {
            // 修改獲取鎖的線程的重入的次數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
          }
          return 1;
    }
    /**
     * 如果CAS失敗再次獲取讀鎖
     */
    return fullTryAcquireShared(current);
}

接下來看一下fullTryAcquireShared方法:

final int fullTryAcquireShared(Thread current) {
    /**
     * 調用該方法的線程都是希望獲取讀鎖的線程,有3種情況:
     * 1、在嘗試通過CAS操作修改state時由于有多個競爭讀鎖的線程導致CAS操作失敗
     * 2、需要排隊等待獲取讀鎖的線程(公平鎖)
     * 3、超過讀鎖限制的最大申請次數的線程
     */
    HoldCounter rh = null;
    for (;;) { // 無限循環獲取鎖
        int c = getState();
        // 已經被寫線程獲取鎖了,直接返回
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        // 需要被block的讀線程(公平鎖)
        } else if (readerShouldBlock()) {
            // 如果時當前線程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 清理當前線程中重入次數為0的數據
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 當前線程獲取鎖失敗
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判斷是否超過讀鎖的最大值
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改讀鎖的state值
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 最新獲取到讀鎖的線程設置相關的信息
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++; // 當前線程重復獲取鎖(重入)
            } else {
                // 在readHolds中記錄獲取鎖的線程的信息
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

Sync.tryReleaseShared方法

tryReleaseShared方法的實現邏輯比較簡單,我們直接看代碼中的注釋

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    /**
     * 如果當前線程是第一個獲取讀鎖的線程,有兩種情況:
     * 1、如果持有鎖的次數為1,直接釋放成功
     * 2、如果持有鎖的次數大于1,說明有重入的情況,需要次數減1
     */
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
      /**
       * 如果當前線程不是第一個獲取讀鎖的線程
       * 需要更新線程持有鎖的重入次數
       * 如果次數小于等于0說明有異常,因為只有當前線程才會出現持有鎖的重入次數等于0或者1
       */
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 修改state的值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 如果是最后一個釋放讀鎖的線程nextc為0,否則不是
            return nextc == 0;
    }
}

sync.tryReadLock方法

tryReadLock的代碼比較簡單,就直接在將解析過程在注釋中描述

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) { // 無限循環獲取讀鎖
        int c = getState();
        // 當前線程不是讀線程,直接返回失敗
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        // 讀鎖的總重入次數是否超過最大次數限制
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        /**
         * 通過CAS操作設置state的值,如果成功表示嘗試獲取讀鎖成功,需要做以下幾件事情:
         * 1、如果是第一獲取讀鎖要記錄第一個獲取讀鎖的線程信息
         * 2、如果是當前獲取鎖的線程和第一次獲取鎖的線程相同,需要更新第一獲取線程的重入次數
         * 3、更新獲取讀鎖線程相關的信息
         */
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

WriteLock類源碼解析

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    /**
     * 通過ReentrantReadWriteLock中的公平鎖或非公平鎖來初始化sync變量
     */
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    /**
     * 阻塞的方式獲取寫鎖
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 中斷的方式獲取寫鎖
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 嘗試獲取寫鎖
     */
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    /**
     * 超時嘗試獲取寫鎖
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * 釋放寫鎖
     */
    public void unlock() {
        sync.release(1);
    }
}

接下來,我們重點看一下在公平鎖和非公平鎖下Sync.tryAcquireSync.tryReleaseSync.tryWriteLock這幾個核心方法是如何實現寫鎖的功能

Sync.tryAcquire方法

Sync.tryAcquire方法的邏輯比較簡單,就直接在代碼中注釋,具體如下:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 讀寫鎖的次數
    int c = getState();
    // 寫鎖的次數
    int w = exclusiveCount(c);
    /*
     * 如果讀寫鎖的次數不為0,說明鎖可能有以下3中情況:
     * 1、全部是讀線程占用資源
     * 2. 全部是寫線程占用資源
     * 3. 讀寫線程都占用了資源(鎖降級:持有寫鎖的線程可以去持有讀鎖),但是讀寫線程都是同一個線程
     */
    if (c != 0) {
        // 寫線程不占用資源,第一個獲取鎖的線程也不是當前線程,直接獲取失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 檢查獲取寫鎖的線程是否超過了最大的重入次數
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改state的狀態,之所以沒有用CAS操作來修改,是因為寫線程只有一個,是獨占的
        setState(c + acquires);
        return true;
    }
    /*
     * 寫線程是第一個競爭鎖資源的線程
     * 如果寫線程需要等待(公平鎖的情況),或者
     * 寫線程的state設置失敗,直接返回false
     */
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 設置當前線程為owner
    setExclusiveOwnerThread(current);
    return true;
}

Sync.tryRelease方法

Sync.tryRelease方便的代碼很簡單,直接看代碼中的注釋

protected final boolean tryRelease(int releases) {
    // 如果釋放鎖的線程不持有鎖,返回失敗
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取寫鎖的重入的次數
    int nextc = getState() - releases;
    // 如果次數為0,需要釋放鎖的owner
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null); // 釋放鎖的owner
    setState(nextc);
    return free;
}

Sync.tryWriteLock方法

Sync.tryWriteLock這個方法也比較簡單,就直接上代碼了

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    // 讀鎖或者寫鎖已經被線程持有
    if (c != 0) {
        int w = exclusiveCount(c);
        // 寫鎖第一次獲取鎖或者當前線程不是第一次獲取寫鎖的線程(也就是不是owner),直接失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超出寫鎖的最大次數,直接失敗
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    // 競爭寫鎖的線程修改state,
    // 如果成功將自己設置成鎖的owner,
    // 如果失敗直接返回
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current); // 設置當前線程持有鎖
    return true;
}

總結

  • 讀鎖和寫鎖的占用(重入)次數都是共用state字段,高位記錄讀鎖,地位記錄寫鎖,所以讀鎖和寫鎖的最大占用次數為2^16
  • 讀鎖和寫鎖都是可重入的
  • 讀鎖是共享鎖,允許多個線程獲取
  • 寫鎖是排他鎖,只允許一個線程獲取
  • 一個線程獲取了讀鎖,在非公平鎖的情況下,其他等待獲取讀鎖的線程都可以嘗試獲取讀鎖,在公平鎖的情況下,按照AQS同步隊列的順利來獲取,如果隊列前面有一個等待寫鎖的線程在排隊,則后面所有等待獲取讀鎖的線程都將無法獲取讀鎖
  • 獲取讀鎖的線程,不能再去申請獲取寫鎖
  • 一個獲取了寫鎖的線程,在持有鎖的時候可以去申請獲取讀鎖,在釋放寫鎖以后,還會繼續持有讀鎖,這就是所謂的鎖降級
  • 讀鎖無法升級為寫鎖,原因是獲取讀鎖的線程可能是多個,而寫鎖是獨占的,不能多個線程持有,也就是說不支持鎖升級
posted @ 2020-07-17 17:28  PinXiong  閱讀(97)  評論(0編輯  收藏
最新chease0ldman老人