Synchronized和lock详解

      线程是最小的运算逻辑单元,合理的使用线程可以充分利用CPU资源,线程之间采用堆内存的方式进行交互,多个线程之间共享堆内存,但这会存在一个问题,如果不合理的使用线程,就会导致代码执行的安全性和准确性。那么Java是如何解决呢?

Synchronized

      Synchronized内部通过对象内部的一个监视器锁(Monitor)实现的,监视器锁依赖于底层操作系统的Mutex Lock来实现的,因此线程之间切换会涉及到从用户态到系统态的数据转移,状态之间的数据转移会消耗比较长的时间,因此Synchronized的同步机制在初期效率非常的低。在jdk1.6版本之后,Java对这些做了比较大的改进,添加了偏向锁、轻量级锁、重量锁等机制,减少了操作底层系统锁的概率,提升了Synchorized的性能。
      了解偏向锁、轻量级锁、重量级锁之前,需要了解下Java对象的基础知识,Java对象在堆内存中如何保存的。

偏向锁

      偏向锁是针对在应用程序块中只有一个线程执行的优化,获取偏向锁的步骤如下:

  1. 判断锁标志位是否为01和是否偏向锁标志为0,则表明其他线程未抢占过该资源。
  2. 使用CAS将当前线程ID设置到对象头中的线程ID中,如果成功,则表明该线程已抢占到该资源,继续执行后续代码,否则执行步骤4
  3. CAS设置线程ID失败,表名有其它线程正在占用该资源,此时会进行锁升级为轻量级锁。
          由上述操作步骤可以看出,偏向锁只使用了一次CAS操作,操作成功则会执行同步代码块,这样对于只有一个线程抢占资源的同步块非常的高效,如果多个线程抢占资源时则会进入到轻量级锁的判断。

轻量级锁

      轻量级锁是为了提高多个线程交替的抢占资源时的性能,获取轻量级锁的步骤如下:

  1. 当一个线程需要获取同步资源时,如果此时的锁状态为无锁的状态(锁的标志位01,是否偏向锁为0),虚拟机会在栈中创建一个名为锁记录的空间。
  2. 拷贝当前的锁对象头中的元数据到锁记录的空间中。
  3. 使用CAS操作尝试将对象的元数据的指针指向锁记录的的指针,并将锁记录中的owner指针指向对象,若更新成功则执行步骤4,,否则执行步骤5。
  4. 更新成功,该线程则拥有了对象的锁,此时将对象头中的锁标志设置为00。
  5. 更新操作失败,虚拟机会检查对象头中的指针是否指向当前线程的栈帧,如果指向则表明该线程已经持有了该对象的锁,否则说明有其它线程也在竞争该资源,此时升级为重量级锁。
          轻量级锁每一次获取和释放都使用了CAS操作,相比偏向锁只在第一次获取锁时使用一次CAS操作明显复杂很多,但轻量级锁能够自己释放锁,这个是偏向锁做不到的,这也是轻量级锁能够在多线程交替执行时能适用的因素。

重量锁

      重量级锁是操作底层系统的信号量,如果未抢占到资源则会阻塞,如果抢占到资源则会进行线程的切换,会引发数据从用户态到内核态的转移,从而影响性能。


锁(Lock)

      Lock是Java提供的另一种同步机制,其内部主要使用同步队列器(AQS)实现,之后会详细讲解AQS的内部实现,这里先不做分析。Lock自身需要显示的获取锁和释放锁,相比Synchronized显得比较麻烦,但也拥有Synchronized许多没有的特性。

Lock的优势

  1. 可尝试的获取锁,如果获取到锁则返回true,如果获取失败则返回false。
  2. 在规定时间内获取锁,如果在时间内获取到锁,则返回true,否则返回false。
  3. 在获取锁的等待过程中可响应中断,在获取到锁之后不会被interrupt()方法中断的。
  4. Lock可以支持更复杂的业务场景,比如说需要先获取到锁A,然后在获取锁B,释放时先释放锁A,在释放锁B,使用Lock很简单则很简单。
  5. Lock可以支持公平锁,非公平锁、读写锁等,并且自己可以实现自己需要的锁功能。

Lock的方法模版

方法名 描述
lock() 获取锁,如果获取不到则一直阻塞
tryLock() 尝试获取锁,如果未获取到则返回false,否则返回true
tryLock(long time, TimeUnit unit) 在规定时间尝试获取锁,如果在时间内获取到锁则返回true,否则返回false
lockInterruptibly() 在获取锁的等待过程中可响应中断
unlock() 释放锁,获取锁必须有释放锁的动作,一般将释放锁放在finally代码块中
newCondition 创建一个新的等待条件,相当于在Synchronized中的await()方法。
      上述表格便是Lock提供的方法,我们在平时的开发中大部分也只是使用到上述的方法,我们是来学习的,所以抱着学习的态度,了解其实现方式更有必要。通过看代码ReentrantLock的实现方式,我们发现有一个非常重要的内部类Sync实现了AbstractQueuedSynchronizer,其锁的实现逻辑都在抽象类中。

AbstractQueuedSynchronizer(AQS)

      队列同步器(AQS)内部采用一个int类型静态的内部静态变量来控制可以是否获取到锁,并采用FIFO队列的数据结构控制当前需要获取资源的线程。其内部的核心属性如下:

核心属性

AQS内部属性
属性名 属性描述 备注
Node head 队列的头部 位于队列头部的线程是正在占用资源的线程
Node tail 队列尾部 新加入的阻塞线程添加到队列的尾部
int state 当前AQS的内部状态
AQS只使用到了,head是队列头部,tail是队列的尾部,state表示AQS的状态。head和tail的Node类型的,Node类型是AQS的一个内部实现类,用于封装请求资源线程的状态、线程信息、线程的模式。下表是Node的相关属性和相关方法。
Node内部属性
属性名 属性描述 备注
Node prev 该节点的前一个节点
Node next 该节点的下一个节点
int waitStatus 当前节点的状态 1:该节点处于取消的状态,比如该节点被中断; 0: 节点初始化的状态;-1: 该节点处于被唤醒的状态; -2: 该线程获得过资源,现处于等待状态; -3:
Thread thread 线程信息
Node nextWaiter 节点模式 若是独占模式,则该值为null;否则是共享模式, 该值为一个不可变的Node节点

核心方法

  • setState(int count): 设置当前AQS的内部的变量值;
  • getState(): 获取当前AQS内部变量的值;
  • compareAndSetState(int expect, int update): 使用CAS操作将新的count值替换掉原来的count值。

      除了提供上述方法之外还提供了许多模版方法,我们在实现自身的锁功能时,大部分只需要使用AQS提供的模版方法即可,模版方法列表如下:

模版方法

方法名 方法描述
acquire(int count) 线程以独占的模式获取资源,如果成功,则执行之后的代码块,否则一直等待
release(int count) 线程释放独占的资源,如果成功释放则返回true, 否则返回false
acquireShared(int count) 线程共享式的获取资源,如果成功则执行代码块,否则会一直等待有可用的资源
releaseShared(int count) 线程共享式的释放资源,如果释放成功返回的可获取的剩余资源
      模版方法常用于我们在实现自己定义的锁功能(实现Lock类中方法)会使用到,例如ReentrenLock中的lock()时,需要使用模版方法的acquire(int count)方法实现,之后介绍完需要的重写方法后会详细分析模版方法的代码实现。模版方法在实现用户自定义锁功能时,不需要用户重写上述方法,但是队列同步器却能实现客户个性化的锁机制,这依赖于AQS中需要重写的方法。需要重写的方法列表如下:

重写方法

方法名 方法描述
tryAcquire(int count) 独享式的获取资源,如果成功获取则返回true,否则返回false
tryRelease(int count) 独享式的释放资源,如果成功释放则返回true,否则返回false
tryAcquireShared(int count) 共享式的获取资源, 如果返回成功则返回剩余的资源数
tryReleaseShared(int count) 共享式的释放资源, 如果释放成功则返回true否则返回false
isHeldExclusively() 当前线程是否是独占资源,即该线程是否是独占式的模式
      上述代码便是我们需要在实现自定义锁功能时需要重写AQS的方法,但并发不需要每个方法都要重写,如果我们需要独占式的获得锁和释放锁,那么只需要重写tryAcquire(int count)和tryReleased(int count)方法,如果需要共享式的获取锁资源,则重写其中的tryAcquireShared(int count)和tryReleaseShared(int count)方法,根据自己实现的锁功能来重写上述的方法。
      了解了锁和AQS的基础概念和使用方法,我们本着学习的态度,当然要了解其内部实现的原理。那么,我们就以常用的ReentrenLock作为学习入口,了解其内部的工作机制,如果有机会我们再写一个我们自定义的锁功能。

ReentrenLock

      ReentrenLock是一个独占式的可重入锁。独占式即只允许有一个线程获得资源执行后续的代码块,其他的线程必须等待该线程结束释放锁后才能获取到资源。可重入指的是拥有相同线程ID的线程在该抢占线程占用资源时, 不会阻塞。

lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final void lock() {
//调用模版方法acquire(int arg)
acquire(1);
}
//
public final void acquire(int arg) {
/**
* 调用重写的tryAcquire(int arg)方法,判断是否获取到资源
* 若未抢占到资源,则创建新的节点(独占模式)添加到尾部
* 自旋式的获取资源,若未获取到则设置状态并阻塞线程
* /
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上边的代码便是独占锁的调用方法,acquire(int arg)是AQS提供的模版方法。tryAcquire(arg)是需要用户自定义的重写方法,用于判断是否获取到资源,如果未获取到资源则调用addWaiter(Node model)方法将线程信息进行封装加入到AQS队列的尾部, 然后通过acquireQueued(Node p)判断当前的线程信息是否位于队列头部所指向的节点,若不是队列头部所指向的节点则表示未抢占到资源,则设置该节点的状态并阻塞线程。tryAcquire(int arg)方法源码:

arg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final boolean tryAcquire(int acquires) {
//获取当前线程的信息
final Thread current = Thread.currentThread();
//获取AQS的的当前状态(即AQS内存的静态变量值)
int c = getState();
//如果c==0,表明没有线程获取到资源,进入下述的判断
if (c == 0) {
//判断当前的线程是否是队列的头部,如果是则CAS操作替换AQS的内部状态,操作成功之后则持有锁线程的变为当前线程并返回true
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果持有锁的线程是当前线程,将内部变量添加请求的变量然后设置到AQS的内部静态变量
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

上述方法是可重入锁公平的获取资源重写的方法,所谓的公平锁指的是先到先得的原则(即先请求获取锁的线程在有空闲资源的时候先获得锁),在上述的代码体现就是hasQueuedPredecessors()[判断当前线程是不是队列的头部next所指的线程]。若是需要非公平的获取锁,则去掉hasQueuedPredecessors()方法即可。上述方法如果返回false,表示未获取到资源,则需要调用addWaiter(Node.EXCLUSIVE)方法,源码如下:

mode)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Node addWaiter(Node mode) {
//创建新的节点信息
Node node = new Node(Thread.currentThread(), mode);
//获取当前AQS内部队列的队尾
Node pred = tail;
//如果队尾不为空,则表示有线程在使用或者在等待资源
if (pred != null) {
//将当前新增的节点的前一个节点指向当前AQS的队尾
node.prev = pred;
//使用原子性操作将当前节点设置到AQS的队尾
if (compareAndSetTail(pred, node)) {
//将之前尾节点的下一个节点指向当前节点
pred.next = node;
return node;
}
}
//对于空队列的或者在使用compareAndSetTail(pre, node)失败时,则调用enq继续处理该节点。
enq(node);
return node;
}

//自旋式添加节点到队列尾部
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果为空,则设置一个新节点作为队列头部
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//和addWaiter方法处理相似,设置该节点到队尾,然后将之前的队列尾部节点的下一个节点指向该节点。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

上述方法是将未获取到资源的线程包装成一个AQS内部的节点添加到队列尾部,添加到队列尾部成功之后,则会调用acquireQueued(Node p, int arg)方法设置节点的状态和是否阻塞线程。源码如下:

node, int arg):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
//如果当前节点的前一个节点是队列头部,则尝试获取资源,获取成功则将当前节点设置到队列头部。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果不满足上述条件,则进行请求失败阻塞流程的处理并且检查该线程是否被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//这种情况我理解的是针对于上述的操作如果出现了异常,则执行该方法
if (failed)
cancelAcquire(node);
}
}
pred, Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取新增节点的前一个节点的状态
int ws = pred.waitStatus;
//如果前节点的状态处于可唤醒,则返回true
if (ws == Node.SIGNAL)
return true;
//如果大于0,表示该线程已经取消了抢占资源,所以将新增节点的前一个节点的指针指向状态不为取消的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//设置前节点的状态为可唤醒的状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

//阻塞当前线程并且减产该线程是否被中断
private final boolean parkAndCheckInterrupt() {
//使用LockSupport.park(this)工具阻塞当前线程,该方法可以不在线程锁内执行
LockSupport.park(this);
return Thread.interrupted();
}

上述的方法便是加锁的完整代码,虽然看起来比较长并且感觉做了很多的东西,但是只要一步步去看源码,你会发现也很简单,我做了一个总结:

  1. 判断当前线程是否抢占到了资源,若没有抢占到,则执行操作2。
  2. 将当前的线程信息封装成一个AQS内部节点添加到队列的尾部,此时会出现三种情况:
    • AQS内部的队列已初始化,调用compareAndSetTail设置新增节点到队尾。
    • compareAndSetTail设置队尾失败,则调用enq方法继续在内部代码块中采用自选式操作将新增节点设置到队尾
    • AQS内部的队列还未初始化过,则新创建一个新的节点作为队列头部,然后设置之前的新增节点到队尾。
      如果上述操作任一执行成功,则执行步骤3。
  3. 对新加入到队列中的节点进行处理,包括判断在添加到队尾的期间是否有线程释放了资源,新增的节点则再次尝试获取资源,若获取到资源则返回false,否则执行步骤4。
  4. 判断新增节点node的前节点prev的状态,并执行如下操作:
    • 前节点prev的状态是可唤醒的,直接返回true,继续执行后续操作。
    • 前节点prev的状态是已取消,则将新增节点node的前节点指向状态不是取消的节点,并且返回false。重新进入判断是否可以尝试获取到资源。
    • 前节点的prev的状态是初始化状态或者等待状态,则将前节点prev的状态设置为可唤醒的状态。
      若上述操作的结果最终返回true,会继续执行步骤5。
  5. 判断线程是否被中断,并且阻塞当前线程,防止一直执行循环,从而造成CPU的消耗。
  6. 若上述代码出现了异常情况,则会执行finally代码,取消新增节点的,即将新增节点的状态设置为已取消。

unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//释放锁
public void unlock() {
sync.release(1);
}

//AQS释放资源
public final boolean release(int arg) {
//调用释放资源的重写方法,如果成功则唤醒后续等待的线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

释放锁的代码只是在内部调用了AQS模版方法的release(int arg)方法,而release(int arg)是AQS内部实现,一般不需要我们进行重写,其内部代码的实现首先会调用重写的tryRelease(int arg), 判断是否释放资源成功,若成功则需要调用后续的unparkSuccessOr(h)唤醒后续等待的线程。
tryRelase(int arg)方法实现如下:

releases)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
//获取当前的状态减去请求释放的资源数
int c = getState() - releases;
//如果当前线程不是AQS内部正在使用资源的那个线程时,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果AQS内部静态变量为0,表示没有线程正在占用该线程返回true,并且将当前AQS内部正在使用资源的那个线程设置为null.
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease(int releases)方法的内部实现比较简单,如果当前线程是AQS内部正在实现资源的线程,则将AQS内部的剩余资源数减去请求释放的资源数,如果剩余资源数为0,表示当前线程没有占用资源,则将正在使用资源的线程变量设置为null,并且将变量值设置到AQS内部,返回true。否则抛出异常或者返回false。 tryRelease(int releases)返回true时,表示当前线程不再占用该线程,需要判断AQS队列内部是否有后续节点,若有后续节点,则调用unparkSuccessor(h)唤醒候选线程。unparkSuccessor(h)的源码实现如下:

h)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

判断AQS内部队列头部节点的状态是否为可唤醒(-1),若为可唤醒则设置头部节点状态为0。继续判断头部节点的下节点,如果后续节点为null或者状态为已取消,则从后向前判断队列上节点状态小于0且最节点头部节点的节点s,判断节点s是否为null,若不为null则唤醒后续线程。这段代码比较疑惑的是为什么要从后向前判断状态,不能从前向后判断第一个小于0的节点直接break掉吗?

上述代码是释放锁的代码,相对于获取锁的代码来说简单很多,也比较易懂,其中的操作实现如下:

  1. 首先调用AQS内部的的模版方法release(int arg), release方法会调用tryRelease(int release)修改资源变量,若成功则执行步骤2。
  2. 判断AQS内部队列是否有后续节点,若有后续节点则调用unparkSuccessor(h)方法唤醒后续节点,执行步骤3;若没有后续节点则直接返回true。
  3. 首先将队列头部节点状态设置为0,其次判断头部节点的后续节点的状态,找到状态为可唤醒的节点,若不为null,则调用LockSupport.unpark(Object o)方法唤醒后续线程.
  4. 返回false。

小结: 该小节便是ReentrenLock使用AQS实现的一个可重入的独占锁,使用的AQS相对简单。如果大家想要继续深入学习,可以看看CountDownLatch的代码实现,其内部也是采用了AQS实现,具体的实现就不做介绍了,相信只要好好的看了改节的都能够理解和分析CountDownLatch的源码。

ReentrenLock只是用到了acquire(int arg)和release(int arg)方法,并未使用到我们之前所述的acquireShared(int arg)、releaseShared(int arg), acquiredInterruplty(int arg)等方法,这些模版方法和acquired(int arg)和release(int arg)很相似,只是针对其中做了特殊的处理,就不多做解析。

总结

Synchornized和Lock是Java实现同步机制的两个重要途径,Synchornized隐式的获取锁和释放锁,不需要程序员做其它操作就可以实现同步机制,但是相应的对于某些复杂场景难以实现。Lock
需要显示的获取锁和释放锁,如果操作不当可能会造成死锁的现象,但也支持复杂的场景,例如读写锁可以针对写加排它锁,读加共享锁,对读多写少的情况非常的好。因此,我觉得在一般情况下,都需要掌握并且首先考虑使用Lock,而不是Synchronized,当然具体的场景具体实现。
最后,写一些自己的写这篇文章的一些后感吧。自己在写的时候总想写很多,但有时不知道该怎么去描述,最后写出来可能废话一大堆,没有什么实用的东西,但是自己还是希望自己能够写完、毕竟总是会慢慢进步的,如果不做就永远不会进步。相信自己以后能够更好地写更多的文章,加油吧!