AQS实现原理分析
AQS
队列同步器(AbstractQueuedSynchronizer)简称AQS,是J.U.C同步构件的基础,包括ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphor都是基于AQS实现的。
理解AQS是理解这些同步工具的基础,基于AQS提供的同步语义可以定制各种功能的同步工具。
AQS原理
-
用一个int类型的状态变量(volatile)state记录同步状态,默认值是0
用一个双向链表实现的队列对线程进行排队和调度 -
A线程使用compareAndSet(state,0,1)原子设置state的值,设置成功说明state当前无其他线程争用,A线程取锁的使用权。
-
设置不成功,说明B线程对state的值进行了设置,并且没有复位(state!=0),B线程持有锁的使用权(B线程还没有释放锁)。A线程会构造成一个Node节点加入队列尾部并挂起。
-
当B线程执行完同步操作后,对state进行复位(state==0),即释放锁,然后从队列头开始寻找,发现正在沉睡的A线程,将其唤醒。
AQS同步队列
static final class Node {/**共享模式 */static final Node SHARED = new Node();/**独占模式 */static final Node EXCLUSIVE = null;/**取消状态,由于在同步队列中等待的线程等待超时或被中断,* 需要从同步队列中取消等待。 */static final int CANCELLED = 1;/**通知状态,当前节点的后继节点包含的线程需要运行(unpark)* 当前节点的线程如果释放了同步状态或者被取消,将通知后续节点。*/static final int SIGNAL = -1;/**条件阻塞状态,节点线程等待在Condition上,* 当其他线程对Condition调用了signal()方法后,* 该节点将会从等待队列中转移到同步队列中,* 加入到对同步状态的获取中。 */static final int CONDITION = -2;/**传播状态,表示当前场景下后续的acquireShared能够得以执行。*/static final int PROPAGATE = -3;/**节点的的状态 * 初始状态为0 表示当前节点在sync队列中,等待着获取状态。*/volatile int waitStatus;/** 前驱节点 */volatile Node prev;/** 后继节点 */volatile Node next;/**节点对应的线程,等待获取同步状态的线程。 */volatile Thread thread;/**下一等待节点*/Node nextWaiter;/**是否共享模式 */final boolean isShared() {return nextWaiter == SHARED;}/**获取前驱节点 */final Node predecessor() throws NullPointerException {}Node() {}Node(Thread thread, Node mode) {}Node(Thread thread, int waitStatus) {}
}
AQS基于一个FIFO双向队列实现,被设计给那些依赖一个代表状态的原子int值的同步器使用。我们都知道,既然叫同步器,那个肯定有个代表同步状态(临界资源)的东西,在AQS中即为一个叫state的int值,该值通过CAS进行原子修改。
在AQS中存在一个FIFO队列,队列中的节点表示被阻塞的线程,队列节点元素有4种类型, 每种类型表示线程被阻塞的原因,这四种类型分别是:
CANCELLED: 表示该线程是因为超时或者中断原因而被放到队列中CONDITION: 表示该线程是因为某个条件不满足而被放到队列中,需要等待一个条件,直到条件成立后才会出队SIGNAL: 表示该线程需要被唤醒PROPAGATE: 表示在共享模式下,当前节点执行释放release操作后,当前结点需要传播通知给后面所有节点
由于一个共享资源同一时间只能由一条线程持有,也可以被多个线程持有,因此AQS中存在两种模式,如下:
-
1、独占模式
独占模式表示共享状态值state每次能由一条线程持有,其他线程如果需要获取,则需要阻塞,如JUC中的
ReentrantLock -
2、共享模式
共享模式表示共享状态值state每次可以由多个线程持有,如JUC中的
CountDownLatch
AQS中的共享状态值
之前提到,AQS是基于一个共享的int类型的state值来实现同步器同步的,其声明如下:
/*** 同步状态值*/
private volatile int state;/*** 获取同步状态值*/
protected final int getState() {return state;
}/*** 修改同步状态值*/
protected final void setState(int newState) {state = newState;
}
由源码我们可以看出,AQS声明了一个int类型的state值,为了达到多线程同步的功能,必然对该值的修改必须多线程可见,因此,state采用volatile修饰,而且getState()和setState()方法采用final进行修饰,目的是限制AQS的子类只能调用这两个方法对state的值进行设置和获取,而不能对其进行重写自定义设置/获取逻辑。
AQS中提供对state值修改的方法不仅仅只有setState()和getState(),还有诸如采用CAS机制进行设置的compareAndSetState()方法,同样,该方法也是采用final修饰的,不允许子类重写,只能调用。
AQS中的tryXXX方法
一般基于AQS实现的同步器,如ReentrantLock,CountDownLatch等,对于state的获取操作,子类只需重写其tryAcquire()和tryAcquireShared()方法即可,这两个方法分别对应独占模式和共享模式下对state的获取操作;而对于释放操作,子类只需重写tryRelease()和tryReleaseShared()方法即可。
至于如何维护队列的出队、入队操作,子类不用管,AQS已经帮你做好了。
AQS 设计妙处
CAS自旋锁
当我们执行一个有确定结果的操作,同时又需要并发正确执行,通常可以采用自旋锁实现。在AQS中,自旋锁采用 死循环 + CAS 实现。针对AQS中的enq()进行讲解:
private Node enq(final Node node) {// 死循环 + CAS ,解决入队并发问题/*** 假设有三个线程同时都需要入队操作,那么使用死循环和CAS可保证并发安全,同一时间只有一个节点安全入队,入队失败的线程则循环重试* * 1、如果不要死循环可以吗?只用CAS.* 不可以,因为如果其他线程修改了tail的值,导致1处代码返回false,那么方法enq方法将退出,导致该入队的节点却没能入队* * 2、如果只用死循环,不需要CAS可以吗?* 不可以,首先不需要使用CAS,那就没必要再使用死循环了,再者,如果不使用CAS,那么当执行1处代码时,将会改变队列的结构*/for (;;) {// 获取尾部节点Node t = tail;// 如果还没有初始化,那么就初始化if (t == null) { // Must initializeif (compareAndSetHead(new Node()))// 刚开始肯定是头指针和尾指针相等tail = head;} else {// 当前结点的前驱节点等于尾部节点node.prev = t;// 如果当前尾结点仍然是t,那么执行入队并返回true,否则返回false,然后重试if (compareAndSetTail(t, node)) { // 1t.next = node;return t;}}}
}
首先入队操作要求的最终结果必须是一个节点插入到队列中去,只能成功,不能失败!然而这个入队的操作是需要并发执行的,有可能同时有很多的线程需要执行入队操作,因此我们需要采取相关的线程同步机制。自旋锁采取乐观策略,即使用了CAS中的compareAndSet()操作,如果某次执行返回fasle,那么当前操作必须重试,因此,采用for死循环直到成功为止,成功,则break跳出for循环或者直接return操作退出方法。
模板方法
在AQS中,模板方法设计模式体现在其acquire()、release()方法上,我们先来看下源码:
public final void acquire(int arg) {// 首先尝试获取共享状态,如果获取成功,则tryAcquire()返回trueif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
其中调用tryAcquire()方法的默认实现是抛出一个异常,也就是说tryAcquire()方法留给子类去实现,acquire()方法定义了一个模板,一套处理逻辑,相关具体执行方法留给子类去实现。
自定义AQS并发同步器
下边以JDK文档的一个实例进行介绍:
class Mutex implements Lock, java.io.Serializable {// 自定义同步器private static class Sync extends AbstractQueuedSynchronizer {// 判断是否锁定状态protected boolean isHeldExclusively() {return getState() == 1;}// 尝试获取资源,立即返回。成功则返回true,否则false。public boolean tryAcquire(int acquires) {assert acquires == 1; // 这里限定只能为1个量if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源return true;}return false;}// 尝试释放资源,立即返回。成功则为true,否则false。protected boolean tryRelease(int releases) {assert releases == 1; // 限定为1个量if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);//释放资源,放弃占有状态return true;}}// 真正同步类的实现都依赖继承于AQS的自定义同步器!private final Sync sync = new Sync();//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。public void lock() {sync.acquire(1);}//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。public boolean tryLock() {return sync.tryAcquire(1);}//unlock<-->release。两者语文一样:释放资源。public void unlock() {sync.release(1);}//锁是否占有状态public boolean isLocked() {return sync.isHeldExclusively();}
}
实现自己的同步类一般都会自定义同步器(sync),并且将该类定义为内部类,供自己使用;而同步类自己(Mutex)则实现某个接口,对外服务。当然,接口的实现要直接依赖sync,它们在语义上也存在某种对应关系!!而sync只用实现资源state的获取-释放方式tryAcquire-tryRelelase,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,我们不用关心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,不同的地方就在获取-释放资源的方式tryAcquire-tryRelelase。掌握了这点,AQS的核心便被攻破了!
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
