0%

ReentrantLock介绍与使用

导读

本文为您介绍JUC中ReentrantLock的各种API和使用案例,阅读完本文,您将了解到:

  1. 如何使用tryLock避免顺序死锁问题
  2. 如何通过tryLock来重试获取锁
  3. 如何使用可中断锁,以及可中断锁的实现原理;
  4. ReentrantLock和synchronized的区别,以及如何选择?

ReentrantLock是J.U.C包下提供的独占锁锁,根据其名称可知,该锁是可重入的。主要提供以下功能更:

  • 等待可中断:该类提供了lockInterruptibly()方法实现了等待可中断机制,这种机制使得如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回而无需获取锁;
  • 公平锁:在并发获取同一个锁的场景,线程必须按照申请获取锁的顺序进行获取。一般是在ReentrantLock构造函数中传递true实例化一个公平锁;
    • Lock lock = new ReentrantLock(true);
  • 支持多条件的锁:synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的。

如果线程重入了一个锁,那么ReentrantLock的锁计数器会加1,每个解锁请求,锁计数器减1。当锁计数器为0的时候,表示资源被解锁了。

synchronized通过操作Mark Word实现同步,锁标识存储于Mark Word中;ReentrantLock通过AQS(抽象同步队列)实现同步,锁标识存储于AQS的state属性中。

1、关键方法

  • lock(): 调用该方法会使锁计数器加1,如果共享资源最初是空闲的,则将锁定并授予线程;
  • unlock(): 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源;
  • tryLock(): 如果资源没有被任何其他线程占用,那么该方法返回true,并且锁计数器加1。如果资源不是空闲的,则该方法返回false。这个时候线程不会阻塞,而是直接退出返回结果;
  • lockInterruptible(): 该方法使得资源空闲时允许该线程在获取资源时被其他线程中断。也就是说:如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回,不会继续等待获取锁;
  • getHoldCount(): 获取资源上持有的锁的计数器;
  • isHeldByCurrentThread: 如果资源锁有当前线程持有,则此方法返回true。

请始终在finally块中调用unlock语句,以确保即使在方法body中引发了异常,也可以释放锁。

比较难衡量的是究竟要休眠多久,这得看另一个已经进入临界区的线程究竟需要执行多久。如果频繁的休眠,会导致频繁切换用户态和内核态,比较占用资源。

2、使用案例

2.1、通过tryLock避免锁顺序死锁问题

对于具有锁顺序的场景,我们可以通过使用tryLock避免死锁问题。下面是一个具有锁顺序的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.itzhai.concurrency;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by arthinking on 1/3/2020.
*/
public class RetryTest {

public static void main(String[] args) {

BusinessObject ob1 = new BusinessObject();
ob1.setBoName("业务实体1");
ob1.setLock(new ReentrantLock());

BusinessObject ob2 = new BusinessObject();
ob2.setBoName("业务实体2");
ob2.setLock(new ReentrantLock());

Thread thread1 = new Thread() {
public void run (){
try {
doService(ob1, ob2, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
thread1.setName("线程1");
thread1.start();

Thread thread2 = new Thread() {
public void run (){
try {
doService(ob2, ob1, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
thread2.setName("线程2");
thread2.start();
}

public static boolean doService(BusinessObject bo1,
BusinessObject bo2,
long timeout,
TimeUnit unit) throws InterruptedException {
boolean isSuccess = false;
String threadName = Thread.currentThread().getName();
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
// 获取第一个锁
System.out.println (threadName + " 请求获取 " + bo1.getBoName());
if (!bo1.getLock().tryLock()) {
System.out.println (threadName + " 锁定失败 " + bo1.getBoName());
if (isTimeout(threadName, stopTime)) return false;
continue;
}

try {
System.out.println (threadName + " 锁定成功 " + bo1.getBoName());
Thread.sleep(new Random().nextInt(100));

// 获取第二个锁
System.out.println (threadName + " 请求获取 " + bo2.getBoName());
if (!bo2.getLock().tryLock()) {
System.out.println (threadName + " 锁定失败 " + bo2.getBoName());
if (isTimeout(threadName, stopTime)) return false;
continue;
}
try {
System.out.println(threadName + " 锁定成功 " + bo2.getBoName());
System.out.println("+++" + threadName + " 执行完成");
isSuccess = true;
return true;
} finally {
if (bo2.getLock().isHeldByCurrentThread()) {
bo2.getLock().unlock();
System.out.println(threadName + " 释放锁 " + bo2.getBoName());
}
}
} catch (Exception e) {
System.out.println(e);
} finally {
if (bo1.getLock().isHeldByCurrentThread()) {
bo1.getLock().unlock();
System.out.println(threadName + " 释放锁 " + bo1.getBoName());
}
if (!isSuccess) {
// 如果当前线程业务没有执行完成,则在释放锁之后,尝试休眠一下,以便让其他线程有处理机会
System.out.println("休眠一下再试...");
Thread.sleep(new Random().nextInt(100));
}
}
}
}

private static boolean isTimeout(String threadName, long stopTime) throws InterruptedException {
if (stopTime < System.nanoTime()) {
System.out.println(threadName + "重试超时...");
return true;
}
return false;
}

}

class BusinessObject {

private ReentrantLock lock;

private String boName;

public ReentrantLock getLock() {
return lock;
}

public void setLock(ReentrantLock lock) {
this.lock = lock;
}

public String getBoName() {
return boName;
}

public void setBoName(String boName) {
this.boName = boName;
}
}

可以通过使用循环重试或者定时重试让tryLock失败的时候,重新循环的去重试。很重要的一点是:您需要设置一个超时时间,避免一直在重试

可以看到,这个逻辑还是稍微有点复杂的,一定要在释放锁之后,休眠片刻,以便让其他线程有机会重试获取当前线程刚刚释放的锁。

2.2、使用带有时间限制的tryLock重试获取锁

如果一个资源锁,多个线程都会尝试获取,如果每个线程占有锁的时间都不长,那么通过这种方式还是比较好的,可以避免线程阻塞导致的用户态内核态切换。这也就是自旋锁的特点。当然,如果每个线程都占用锁很长时间,那么这种方式就不太可取了。

如果使用的是synchronized关键字,其实在轻量级锁阶段就会有自旋等待的操作,并且synchronized实现了自适应的自旋等待,优化的还是比较好的。不同的点是:tryLock在自旋超时后就放弃了,而synchronized则会进一步升级为重量级锁,继续等待线程调度去尝试获取锁,这一点上是比较消耗性能的。

如果你的业务运行获取搜失败,或者执行失败,那么通过tryLock还是不错的。像前端发起的请求,一次执行失败了,那可以让前端重新发起,这种场景就比较适合tryLock了。

下面是使用的方法:

1
2
3
4
5
6
7
8
9
10
if (lock2.tryLock(3, TimeUnit.SECONDS)) {
try {
System.out.println(Thread.currentThread().getName() + "获取到了lock2");
} finally {
lock2.unlock();
System.out.println(Thread.currentThread().getName() + "释放了lock2");
}
} else {
System.out.println(Thread.currentThread().getName() + "获取lock2失败");
}

2.3、可中断锁

如果在一个线程中使用lock.lockInterruptibly(),只有该线程执行了interrupt()方法之后,lockInterruptibly才起作用。

可以查看源码:

1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

也就是说,会先去查看中断状态位,如果中断状态为为true,那么就不会尝试获取锁了,直接跑异常。这个中断状态为正是interrupt()方法设置的。

tryLock()底层代码也会判断中断状态,决定是否要继续尝试获取锁:

1
2
3
4
5
6
7
8
> public final boolean tryAcquireNanos(int arg, long nanosTimeout)
> throws InterruptedException {
> if (Thread.interrupted())
> throw new InterruptedException();
> return tryAcquire(arg) ||
> doAcquireNanos(arg, nanosTimeout);
> }
>

也就是说lock.lockInterruptibly相当于不限超时时间的tryLock

要了解中断锁的使用,就必须知道Thread.interrupt()意味着什么。

Thread.interrupt()

该方法将会设置线程的中断状态位。

判断线程是否被中断,可以使用Thread.currentThread().isInterrupted()方法。我们可以在程序中检查这个状态位,来做一些逻辑处理。

判断是否被中断,如果中断则清楚中断位:Thread.currentThread().interrupted()。

如果一个线程处于阻塞状态:如调用了thread.sleep,thread.join,thread.wait,condition.await,以及可中断通道上的I/O操作方法后,**这些阻塞方法会定时去判断中断状态位,如果中断状态位为true,则会在调用处抛出InterruptedException异常,并且在抛出异常后立即清除中断位。**这些方法声明处都会抛出InterruptedException,表示这些方法是可中断的,会对interrupt调用做出响应,异常都是由可中断方法自己抛出,并不是由interrupt方法直接引起的。

synchronized在获取锁的过程中不能被中断,因为锁定的位置根本无法抛出异常。

下面一个例子是演示通过lock.lockInterruptibly获取锁:

  • 1号获取到锁的线程进行sleep阻塞;
  • 后续2,3,4,5号线程在等待锁的过程中,被interrupt打断了,导致抛出InterruptedException异常;
  • 6号线程在1号线程释放锁之后,成功获取到了所;
  • 但是6号线程在休眠过程中很快又被打断然后释放锁了;
  • 最终7号线程获取到锁,接着又被打断,释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.itzhai.concurrency;

import java.util.concurrent.locks.ReentrantLock;

/**
* Created by arthinking on 1/3/2020.
*/
public class AcquireLockRunnable implements Runnable{

public static void main(String[] args) throws InterruptedException {

ReentrantLock lock = new ReentrantLock();

// 1号线程启动并获取锁,然后休眠2000毫秒
Thread firstThread = new Thread(new AcquireLockRunnable(lock, 1), "线程(1)");
firstThread.start();
Thread.sleep(1000);

// 依次启动其他线程
Thread[] others = new Thread[6];
for (int i = 0; i < 6; i++) {
others[i] = new Thread(new AcquireLockRunnable(lock, i + 2), "线程(" + (i + 2) + ")");
others[i].start();
}
print("开始给所有线程发送中断...");
for (int i = 0; i < 6; i++) {
Thread.sleep(500 * i / 2);
print("Interrupt " + others[i].getName());
others[i].interrupt();
}
}


private int id;
private ReentrantLock lock;

private AcquireLockRunnable(ReentrantLock lock, int id) {
this.lock = lock;
this.id = id;
}

public void run() {
print("开启线程 " + id + " 尝试获取锁...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
print("获取锁失败, 原因: " + e);
return;
}
print("获取到锁(" + id + ")");
try {
try {
if (id == 1) {
Thread.sleep(3000);
} else {
Thread.sleep(2500);
}
} catch (InterruptedException e) {
print("线程 " + id + " 休眠过程中被打断");
}
} finally {
lock.unlock();
print("释放锁(" + id + ")");
}
}

static void print(String p) {
System.out.println(Thread.currentThread().getName() + ": " + p);
}
}

输出结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
线程(1): 开启线程 1 尝试获取锁...
线程(1): 获取到锁(1)
线程(2): 开启线程 2 尝试获取锁...
线程(3): 开启线程 3 尝试获取锁...
线程(4): 开启线程 4 尝试获取锁...
线程(5): 开启线程 5 尝试获取锁...
线程(6): 开启线程 6 尝试获取锁...
main: 开始给所有线程发送中断...
线程(7): 开启线程 7 尝试获取锁...
main: Interrupt 线程(2)
线程(2): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(3)
线程(3): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(4)
线程(4): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(5)
线程(5): 获取锁失败, 原因: java.lang.InterruptedException
线程(1): 释放锁(1)
线程(6): 获取到锁(6)
main: Interrupt 线程(6)
线程(6): 线程 6 休眠过程中被打断
线程(6): 释放锁(6)
线程(7): 获取到锁(7)
main: Interrupt 线程(7)
线程(7): 线程 7 休眠过程中被打断
线程(7): 释放锁(7)

Process finished with exit code 0

2.4、条件变量

我们知道,synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。

类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的:

  • Condition.await()方法相当于Object.wait();
  • Condition.await(long time, TimeUnit unit)方法相当于Object.wait(long timeout);
  • Condition.signal()方法相当于Object.notify()方法;
  • Condition.signalAll()方法相当于Object.notifyAll()方法;

对比:

与synchronized代码块的wait方式相比,Lock中的condition.wait更加灵活;

condition.wait()可以将不同条件的线程放入不同的等待队列,而synchronized中所有线程会放入同一个等待队列,在大量线程唤醒的时候会造成资源浪费

Lock对象同步只需要依赖Lock对象,而synchronized必须配合一个特定的对象使用。

下面是一个消费者生产者的例子,其中有一个共同的锁ReentrantLock,具有两个条件:

  • 队列未满的条件,当符合条件的时候,通知生产者可以生产消息了;
  • 队列非空的条件,当符合条件的时候,通知消费者可以消费消息了。

以下是完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.itzhai.concurrency;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by arthinking on 1/3/2020.
*/
public class ProducerConsumerSolutionUsingLock {

public static void main(String[] args) {

// 生产者线程和消费者线程需要操作的对象
ProducerConsumerImpl sharedObject = new ProducerConsumerImpl();

// 创建生产者线程和消费者线程
Producer p = new Producer(sharedObject);
Consumer c = new Consumer(sharedObject);

// 启动生产者线程和消费者线程
c.start();
p.start();

while (Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println("总共消费消息条数: " + sharedObject.getConsumeCount());
}
}


class ProducerConsumerImpl {
// 队列大小
private static final int CAPACITY = 10;
private final Queue queue = new LinkedList<>();
private final Random theRandom = new Random();

private int consumeCount = 0;

// 锁和条件变量
private final Lock lock = new ReentrantLock();
// 队列未满的条件
private final Condition queueNotFull = lock.newCondition();
// 队列非空的条件
private final Condition queueNotEmpty = lock.newCondition();

/**
* 生产者生产消息
* @throws InterruptedException
*/
public void produce() throws InterruptedException {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println(Thread.currentThread().getName()
+ ", 生产者准备生产消息, 但是队列满了, 等待...");
queueNotFull.await();
}

int number = theRandom.nextInt();
boolean isAdded = queue.offer(number);
if (isAdded) {
System.out.printf("%s 添加 %d 到队列 %n", Thread
.currentThread().getName(), number);

// 通知消费者队列中已经有消息了
System.out.println(Thread.currentThread().getName()
+ ", 通知消费者队列中已经有消息了");
queueNotEmpty.signalAll();
}
} finally {
lock.unlock();
}
}

/**
* 消费者消费消息
* @throws InterruptedException
*/
public void consume() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println(Thread.currentThread().getName()
+ ", 消费者准备消费消息, 但是队列空了, 等待...");
queueNotEmpty.await();
}

Object value = queue.poll();
if (value != null) {
consumeCount ++;
System.out.printf("%s 从消息队列消费 %d %n", Thread
.currentThread().getName(), value);

// signal producer thread that, buffer may be empty now
System.out.println(Thread.currentThread().getName()
+ ", 通知生产者现在消息队列有空间了, 可以来生产消息了");
queueNotFull.signalAll();
}

} finally {
lock.unlock();
}
}

public int getConsumeCount() {
return consumeCount;
}

}

class Producer extends Thread {
ProducerConsumerImpl pc;

public Producer(ProducerConsumerImpl sharedObject) {
super("PRODUCER");
this.pc = sharedObject;
}

@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
pc.produce();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer extends Thread {
ProducerConsumerImpl pc;

public Consumer(ProducerConsumerImpl sharedObject) {
super("CONSUMER");
this.pc = sharedObject;
}

@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
pc.consume();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

3、ReentrantLock与synchronized的异同

3.1、synchronized

优点

  • JDK进行了优化,会有锁升级的过程,在没有竞争场景下通过偏向锁或者轻量级锁提高性能。未来JVM会更多支持原生锁的优化;
  • Java语法内存,使用简单,无需手动获取锁和解锁。通过底层指令实现的管程自动达到同步的目的;

缺点

  • 可控性比较低,不够灵活,一个锁只能绑定一个条件。锁对象的wait()和notify()或notifyAll()方法可以实现一个隐含的条件,如果要关联多个条件,就不得额外添加锁了;

  • 非公平锁,在锁被释放的时候,任何等待的锁都有机会获得锁,有的线程会长时间得不到执行,但非公平锁优势是吞吐量会大一点;

  • 程序无法获取到是否已经获取到了锁。

3.2、ReentrantLock

优点

  • 支持公平锁:
  • 支持多条件的锁:ReentrantLock能够将wait/notify/notifyAll对象化;
  • 能够获取到锁定状态:
  • 实现了可中断的锁;
  • 支持带超时的获取锁尝试,避免了死锁;

缺点

  • 需要手动锁定解锁,使用比较繁琐,容易出错;
  • 随着JDK对synchronized锁的不断优化,ReentrantLock性能未必会比synchronized锁高;

3.3、如何选择

这两个锁如何选择呢?

我们可以看到,ReentrantLock更加灵活,提供给了包括定时的锁等待可中断的锁等待公平性,以及实现非块结构的加锁

但是ReentrantLock的危险性比synchronized要高,内置锁会随着JDK版本不断做优化。

仅当内置锁不能满足需求的时候,才可以考虑使用ReentrantLock。

References

Thread的中断机制(interrupt)

Java ReentrantLock Interruption Example

Java Lock and Condition Example using Producer Consumer Solution

欢迎关注我的其它发布渠道