【Java并发编程】锁

Java并发/多线程编程系列blog(四)锁含Lock接口/队列同步器/重入锁/读写锁/LockSupport工具/Condition接口

Lock接口

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以 及超时获取锁等多种synchronized关键字所不具备的同步特性。使用synchronized关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先 获取再释放。当然,这种方式简化了同步的管理,可是扩展性没有显示的锁获取和释放来的好。例如,针对一个场景,手把手进行锁获取和释放,先获得锁A,然后再获取锁B,当锁B获得后,释放锁A同时获取锁C,当锁C获得后,再释放B同时获取锁D,以此类推。这种场景下,synchronized关键字就不那么容易实现了,而使用Lock却容易许多。

实例代码:

1
2
3
4
5
6
7
Lock lock = new ReentrantLock();
lock.lock();
try{

}finally{
lock.unlock();
}

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。 不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放**。

Lock接口提供的synchronized关键字所不具备的主要特性如表所示:
CB0in.png

Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的API如表所示:
CBCMh.png
Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的。

队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3 个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操 作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:

  • 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
  • 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

队列同步器的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态

1 .getState():获取当前同步状态。
2 .setState(int newState):设置当前同步状态。
3 .compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性。

同步器可重写的方法与描述如表所示:
CBA2T.png
实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述如表所示:
CBRno.png
同步器提供的模板方法基本上分为3类:

  • 独占式获取与释放同步状态
  • 共享式获取与释放
  • 同步状态和查询同步队列中的等待线程情况

自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

实例代码展示独占锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Mutex implements Lock { 
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer { // 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); return true;
}
return false;

}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null);
setState(0);
return true;

}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();

}

}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }

}

上述示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获 取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

重入锁

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对 资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

实现重进入

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题:

  • 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
  • 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。

锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。 ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例

获取同步状态的实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean nonfairTryAcquire(int acquires) { 
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。

成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值,该方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryRelease(int releases) { 
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。公平锁代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryAcquire(int acquires) { 
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了 hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

公平和非公平锁在获取锁时的区别的代码实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FairAndUnfairTest { 
private static Lock fairLock = new ReentrantLock2(true); private static Lock unfairLock = new ReentrantLock2(false); public void fair() {
testLock(fairLock);

}
public void unfair() {
testLock(unfairLock);

}
private void testLock(Lock lock) {
// 启动5个Job(略)

}
private static class Job extends Thread {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
public void run() {
// 连续2次打印当前的Thread和等待队列中的Thread(略)
}
}
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
public Collection<Thread> getQueuedThreads() {
List<Thread> arrayList = new ArrayList<Thread>(super. getQueuedThreads());
Collections.reverse(arrayList);
return arrayList;
}

}
}

测试结果:
CRQQ6.png
在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。可以 看出,公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

读写锁

之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock,它提供的特性如表所示:
CRAzO.png

读写锁的接口与示例

ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方 法,而其实现——ReentrantReadWriteLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,这些方法以及描述如表所示。
CRBOe.png
读写锁的实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package Second;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Cache {

static Map<String, Object> map = new HashMap<String , Object>();
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
static Lock r = lock.readLock();
static Lock w = lock.writeLock();
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
}finally {
r.unlock();
}
}
//设置key所对应的value并返回旧的value
public static final Object set(String key, Object obj) {
w.lock();
try {
return map.put(key, obj);
}finally {
w.unlock();
}
}
//清空所有内容
public static final void clear() {
w.lock();
try {
map.clear();
}finally {
w.unlock();
}
}
}

上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。

LockSupport工具

当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。

LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread) 方法来唤醒一个被阻塞的线程。Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开,这些方法以及描述如表所示。
CRduN.png

LockSupport增加了park(Object blocker)、parkNanos(Object blocker,long nanos) 和parkUntil(Object blocker,long deadline)3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。 下面的示例中,将对比parkNanos(long nanos)方法和parkNanos(Object blocker,long nanos)方 法来展示阻塞对象blocker的用处,代码片段和线程dump(部分)如表5-11所示。 从表的线程dump结果可以看出,代码片段的内容都是阻塞当前线程10秒,但从线程 dump结果可以看出,有阻塞对象的parkNanos方法能够传递给开发人员更多的现场信息。这是 由于在Java 5之前,当线程阻塞(使用synchronized关键字)在一个对象上时,通过线程dump能够查看到该线程的阻塞对象,方便问题定位,而Java 5推出的Lock等并发工具时却遗漏了这一点,致使在线程dump时无法提供阻塞对象的信息。因此,在Java 6中,LockSupport新增了上述3个含有阻塞对象的park方法,用以替代原有的park方法。
CRg8j.png

Condition接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性,对比项与结果如表所示:
CRspk.png

Condition接口与实例

Condition的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lock lock = new ReentrantLock(); 
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}

如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

Condition定义的(部分)方法以及描述如表所示:
CRJgn.png
获取一个Condition必须通过Lock的newCondition()方法。下面通过一个有界队列的示例来深入了解Condition的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class BoundedQueue<T> {
private Object[] items; //添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}

}

上述示例中,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个 元素。以添加方法为例。首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数 组中已经有新元素可以获取。在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。回想之前提到的等待/通知的经典范式,二者是非常类似的。

参考:
《Java并发编程的艺术》

如果觉得还不错的话,把它分享给朋友们吧(ง •̀_•́)ง