精品亚洲aⅴ在线观看-精品亚洲av高清一区二区三区-精品亚洲av无码喷奶水糖心al-精品亚洲av无码一区-精品亚洲av无码一区二-精品亚洲av无码一区二区

相關欄目
新聞資訊 >>
合作媒體 >>
展會知識 >>
當前位置:首頁 >

怎么可以錯過(aq s l)aqs原理解析,


AQS全稱為AbstractQueuedSynchronizer,可以叫做隊列同步器。

為線程的同步和等待等操作提供一個基礎模板類。盡可能多的實現可重入鎖,讀寫鎖同步器所有需要的功能。隊列同步器內部實現了線程的同步隊列,獨占或是共享的獲取方式等,使其只需要少量的代碼便可以實現目標功能。

一般來說,AQS的子類應以其他類的內部類的形式存在,然后使用代理模式調用子類和AQS本身的方法實現線程的同步。

也就是說,使用ReentrantLock舉例,外界調用ReentrantLock,ReentrantLock內部定義Sync,Sync是AQS的子類,在ReentrantLock的內部實現中調用Sync的方法,最后完成最終的功能,當然ReentrantLock內部稍復雜,又加入和公平鎖和非公平鎖。

AQS內部有一個核心狀態為state。所有通過AQS實現功能的類都是通過修改state的狀態來操作線程的同步狀態。比如在ReentrantLock中,一個鎖中只有一個state狀態,當state為0時,代表所有線程沒有獲取鎖,當state為1時,代表有線程獲取到了鎖。通過是否能把state從0設置成1,當然,設置的方式是使用CAS設置,代表一個線程是否獲取鎖成功。

AQS提供了操作state的方法

int getState() void setState(int newState) boolean compareAndSetState(int expect, int update)

AQS內部維護一個線程的隊列。隊列由內部的節點組成。

隊列的節點為Node,節點分為SHARED和EXCLUSIVE分別時共享模式的節點和獨占模式的節點。

節點的等待狀態為waitStatus

CANCELLED(1):取消狀態,當線程不再希望獲取鎖時,設置為取消狀態SIGNAL(-1):當前節點的后繼者處于等待狀態,當前節點的線程如果釋放或取消了同步狀態,通知后繼節點CONDITION(-2):等待隊列的等待狀態,當調用signal()時,進入同步隊列PROPAGATE(-3):共享模式,同步狀態的獲取的可傳播狀態0:初始狀態

同樣需要使用CAS的方式進行設置。

下面通過ReentrantLock和ReentrantReadWriteLock來解析AQS的獨占模式和共享模式。

獨占模式

ReentrantLock和synchronized功能類似,使用AQS的獨占模式,只有一個線程可以獲取鎖。

AQS為獨占模式提供了如下方法

void acquire(int arg) boolean release(int arg)

由實現類實現

boolean tryAcquire(int acquires) boolean tryRelease(int releases)

ReentrantLock的最基本的使用方式如下

class X { private final ReentrantLock lock = new ReentrantLock(); public void m() { lock.lock(); try { doSomething(); } finally { lock.unlock() } } }

當創建ReentrantLock時默認使用非公平鎖,效率高于公平鎖,暫不討論公平鎖。

獲取鎖

當執行lock()時,進行一次簡短的獲取鎖操作

final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

其中compareAndSetState(0, 1)如果返回true就代表著之前state是0,也就是當前無線程獲取鎖,同時當前線程獲取鎖成功了,將獨占線程設置為當前線程。

如果是false就代表當前有線程占用,當前占用的線程有2個可能

當前線程在占用,因為是可重入鎖,之后同樣會獲取鎖其他線程在占用,在其他線程占用期間,當前線程需要等待

進入acquire(1)

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

acquire(1)包含整個獲取鎖,如果獲取不到就等待的操作,依次執行

tryAcquire(arg) addWaiter(Node.EXCLUSIVE), arg) acquireQueued(final Node node, int arg)

在tryAcquire(arg)中是嘗試獲取鎖,是由ReentrantLock提供的,邏輯比較簡單

當前無線程占有鎖時,即state為0時,獲取鎖當前有線程占有鎖,但當前占有鎖的線程是當前線程時,因為ReentrantLock是可重入鎖,獲取鎖,并把state+1

如果tryAcquire(arg)能夠成功獲取鎖,返回true,if條件為false,執行完成

當執行失敗時,也就是獲取不到鎖時,說明有其他線程目前正在占用鎖,將當前線程包裝成節點放入同步隊列

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //快速入隊 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

先嘗試快速入隊,如果入隊成功直接返回,如果失敗(存在競態)就使用cas反復入隊直到成功為止

入隊完成之后再判斷一次當前是否有可能獲得鎖,也就是前一個節點是head的話,前一個線程有可能已經釋放了,再獲取一次,如果獲取成功,設置當前節點為頭節點,整個獲取過程完成。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

獲取失敗的話先將之前的節點等待狀態設置為SIGNAL,如果之前的節點取消了就向前一直找

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

直到前一個節點不是取消狀態,將其之前的節點等待狀態設置為SIGNAL,因為再外面是無限循環的,設置SIGNAL成功后,之后就返回true了。

然后一直等待直到被喚醒

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

上面就是獲取鎖并等待的過程,總結起來就是:

當lock()執行的時候:

先快速獲取鎖,當前沒有線程執行的時候直接獲取鎖嘗試獲取鎖,當沒有線程執行或是當前線程占用鎖,可以直接獲取鎖將當前線程包裝為node放入同步隊列,設置為尾節點前一個節點如果為頭節點,再次嘗試獲取一次鎖將前一個有效節點設置為SIGNAL然后阻塞直到被喚醒

釋放鎖

當ReentrantLock進行釋放鎖操作時,調用的是AQS的release(1)操作

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

再tryRelease(arg)中會將鎖釋放一次,如果當前state是1,且當前線程是正在占用的線程,釋放鎖成功,返回true,否則因為是可重入鎖,釋放一次可能還在占用,應一直釋放直到state為0為止

private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

然后優先找下一個節點,如果取消了就從尾節點開始找,找到最前面一個可用的節點

將其取消阻塞狀態。

阻塞在acquireQueued的地方在喚醒之后開始繼續執行,當前節點已經是最前面的一個可用(未取消)節點了,經過不斷的for循環以及在shouldParkAfterFailedAcquire中不斷向前尋找可用節點,因此這個被喚醒的節點一定可以使其之前的節點為head。然后獲取鎖成功。

但是此時節點會與新加入的節點競爭,也就是不公平鎖的由來。

在公平鎖中,在tryAcquire時會判斷之前是否有等待的節點hasQueuedPredecessors(),如果有就不會再去獲取鎖了,因此能保證順序執行。

總結

我們可以看到,實現上述的功能,ReentrantLock只要實現的tryAcquire和tryRelease即可實現一個獨占鎖的獲取和釋放的功能。

共享模式

ReentrantReadWriteLock是Java中讀寫鎖的實現,寫寫互斥,讀寫互斥,讀讀共享。讀寫鎖在內部分為讀鎖和寫鎖,因為我們要探索共享模式,因此更關注讀鎖。

AQS為共享模式提供了如下方法

void doAcquireShared(int arg) boolean doReleaseShared(int arg)

由實現類實現

int tryAcquireShared(int unused) boolean tryReleaseShared(int unused)

ReentrantReadWriteLock的讀鎖的最基本的使用方式如下

class X { private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void m() { rwl.readLock().lock(); try { read(); } finally { rwl.readLock().unlock(); } } }

獲取鎖

讀鎖加鎖,先嘗試獲取共享鎖,如果獲取不到,在進行其他操作

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }

在tryAcquireShared中如果當前有寫鎖,返回-1,即未獲取共享鎖,需要執行下一步doAcquireShared。

反之就是可以獲取共享鎖。

設置共享鎖需要修改state的數量,表示獲取共享鎖的線程的數量,當共享鎖的獲取存在競爭時,即compareAndSetState(c, c + SHARED_UNIT))可能設置失敗,此時進入fullTryAcquireShared(current)進行獲取共享鎖的完整版操作。

protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //設置firstReader,計算數量,略 return 1; } return fullTryAcquireShared(current); }

也就是說共享鎖獲取時:

如果當前沒有獨占鎖在占用,AQS根據其實現類的tryAcquireShared來實現讓一個共享鎖直接獲取到鎖(可以直接執行)當有獨占鎖在占用是,讓共享鎖去等待直到獨占鎖解鎖為止,也就是doAcquireShared(arg)的邏輯
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

doAcquireShared(arg)處了將線程封裝成節點入隊外還表達了3個思想:

什么時候該執行什么時候該傳播什么時候該等待(阻塞)

其中入隊、執行和等待的邏輯基本和獨占鎖一樣,

入隊:都是加入等待隊列的末尾,成為tail節點;執行:判斷當前節點的前一個節點是不是頭節點,如果是的話嘗試獲取鎖,如果獲取到了就執行;等待:獲取不到或前一個節點不是頭節點就代表該線程需要暫時等待,直到被叫醒為止。設置前一個節點為SIGNAL狀態,然后進入等待。

其中不同的就是共享鎖的傳播邏輯:

想象一下,當前有一個寫鎖正在占用,有多個讀鎖在等待,當寫鎖釋放時,第二個線程也就是想要獲取讀鎖的線程就可以獲取鎖了。獲取到之后當前正在用的鎖就是讀鎖了,那后面的讀鎖呢,因為讀鎖是共享的,后面的讀鎖應該也能夠依次獲取讀鎖,也就是讀鎖的傳播機制。

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

將當前的節點設置為頭節點,判斷如果是共享鎖,執行doReleaseShared(),喚醒當前節點

private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }

當前節點喚醒之后doAcquireShared(int arg)會繼續執行,因為之前的節點被設置為頭節點,如果后續是獲取共享鎖的節點會繼續執行setHeadAndPropagate,一直傳播下去直到遇到獲取獨占鎖的節點。

共享鎖的獲取總結如下:

嘗試獲取共享鎖,如果當前是共享鎖或無鎖,設置共享鎖的state,獲取鎖如果當前是寫鎖,進入等待流程入隊,加入等待隊列的末尾,成為tail節點判斷當前節點的前一個節點是不是頭節點,如果是的話嘗試獲取鎖,如果獲取到了就執行獲取不到或前一個節點不是頭節點就代表該線程需要暫時等待,直到被叫醒為止。設置前一個節點為SIGNAL狀態,然后進入等待如果可以獲取到鎖,設置頭節點并進入共享鎖節點傳播流程

釋放鎖

共享鎖使用完畢需要釋放鎖,分為tryReleaseShared(arg)和doReleaseShared()2個階段

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

在tryReleaseShared(arg),基本就是tryAcquireShared(int unused)的反向操作

將設置的HoldCounter減少,firstReader設置null,state減少,將tryAcquireShared(int unused)添加的狀態全部反向還原回去

當共享鎖全部釋放完畢,返回true,否則返回false

然后執行doReleaseShared(),剛才已經提及,doReleaseShared()將喚醒下一個可用的節點,獨占節點將會執行,共享節點執行并傳播。

總結

AQS共享模式和獨占模式的實現上最大的差別就在于共享模式獲取鎖后的傳播。

其他的區別主要還是表現在實現類實現的區別上。通過ReentrantLock和ReentrantReadWriteLock可以了解AQS的獨占模式和共享模式,但是要注意將AQS和鎖的實現剝離開,弄明白哪些邏輯是AQS實現的,哪些邏輯是鎖實現的,同時也思考怎么使用AQS實現其他的特定的線程同步問題。

關注我的Github項目,開啟Java進階之路,歡迎star

注明:本文章來源于互聯網,如侵權請聯系客服刪除!