导读
本文为您介绍JUC中ReentrantLock的各种API和使用案例,阅读完本文,您将了解到:
如何使用tryLock避免顺序死锁问题
如何通过tryLock来重试获取锁
如何使用可中断锁,以及可中断锁的实现原理;
ReentrantLock和synchronized的区别,以及如何选择?
ReentrantLock是J.U.C包下提供的独占锁锁,根据其名称可知,该锁是可重入
的。主要提供以下功能更:
等待可中断 :该类提供了lockInterruptibly()
方法实现了等待可中断机制,这种机制使得如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回而无需获取锁;
公平锁 :在并发获取同一个锁的场景,线程必须按照申请获取锁的顺序进行获取。一般是在ReentrantLock构造函数中传递true实例化一个公平锁;
Lock lock = new ReentrantLock(true);
支持多条件的锁 :synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的。
如果线程重入了一个锁,那么ReentrantLock的锁计数器会加1,每个解锁请求,锁计数器减1。当锁计数器为0的时候,表示资源被解锁了。
synchronized通过操作Mark Word实现同步,锁标识存储于Mark Word中;ReentrantLock通过AQS(抽象同步队列)实现同步,锁标识存储于AQS的state属性中。
1、关键方法
lock()
: 调用该方法会使锁计数器加1,如果共享资源最初是空闲的,则将锁定并授予线程;
unlock()
: 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源;
tryLock()
: 如果资源没有被任何其他线程占用,那么该方法返回true,并且锁计数器加1。如果资源不是空闲的,则该方法返回false。这个时候线程不会阻塞,而是直接退出返回结果;
lockInterruptible()
: 该方法使得资源空闲时允许该线程在获取资源时被其他线程中断。也就是说:如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回,不会继续等待获取锁;
getHoldCount()
: 获取资源上持有的锁的计数器;
isHeldByCurrentThread
: 如果资源锁有当前线程持有,则此方法返回true。
请始终在finally块中调用unlock语句,以确保即使在方法body中引发了异常,也可以释放锁。
比较难衡量的是究竟要休眠多久,这得看另一个已经进入临界区的线程究竟需要执行多久。如果频繁的休眠,会导致频繁切换用户态和内核态,比较占用资源。
2、使用案例
2.1、通过tryLock避免锁顺序死锁问题
对于具有锁顺序的场景,我们可以通过使用tryLock避免死锁问题。下面是一个具有锁顺序的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 package com.itzhai.concurrency;import java.util.Random;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;public class RetryTest { public static void main (String[] args) { BusinessObject ob1 = new BusinessObject (); ob1.setBoName("业务实体1" ); ob1.setLock(new ReentrantLock ()); BusinessObject ob2 = new BusinessObject (); ob2.setBoName("业务实体2" ); ob2.setLock(new ReentrantLock ()); Thread thread1 = new Thread () { public void run () { try { doService(ob1, ob2, 10 , TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }; thread1.setName("线程1" ); thread1.start(); Thread thread2 = new Thread () { public void run () { try { doService(ob2, ob1, 10 , TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }; thread2.setName("线程2" ); thread2.start(); } public static boolean doService (BusinessObject bo1, BusinessObject bo2, long timeout, TimeUnit unit) throws InterruptedException { boolean isSuccess = false ; String threadName = Thread.currentThread().getName(); long stopTime = System.nanoTime() + unit.toNanos(timeout); while (true ) { System.out.println (threadName + " 请求获取 " + bo1.getBoName()); if (!bo1.getLock().tryLock()) { System.out.println (threadName + " 锁定失败 " + bo1.getBoName()); if (isTimeout(threadName, stopTime)) return false ; continue ; } try { System.out.println (threadName + " 锁定成功 " + bo1.getBoName()); Thread.sleep(new Random ().nextInt(100 )); System.out.println (threadName + " 请求获取 " + bo2.getBoName()); if (!bo2.getLock().tryLock()) { System.out.println (threadName + " 锁定失败 " + bo2.getBoName()); if (isTimeout(threadName, stopTime)) return false ; continue ; } try { System.out.println(threadName + " 锁定成功 " + bo2.getBoName()); System.out.println("+++" + threadName + " 执行完成" ); isSuccess = true ; return true ; } finally { if (bo2.getLock().isHeldByCurrentThread()) { bo2.getLock().unlock(); System.out.println(threadName + " 释放锁 " + bo2.getBoName()); } } } catch (Exception e) { System.out.println(e); } finally { if (bo1.getLock().isHeldByCurrentThread()) { bo1.getLock().unlock(); System.out.println(threadName + " 释放锁 " + bo1.getBoName()); } if (!isSuccess) { System.out.println("休眠一下再试..." ); Thread.sleep(new Random ().nextInt(100 )); } } } } private static boolean isTimeout (String threadName, long stopTime) throws InterruptedException { if (stopTime < System.nanoTime()) { System.out.println(threadName + "重试超时..." ); return true ; } return false ; } } class BusinessObject { private ReentrantLock lock; private String boName; public ReentrantLock getLock () { return lock; } public void setLock (ReentrantLock lock) { this .lock = lock; } public String getBoName () { return boName; } public void setBoName (String boName) { this .boName = boName; } }
可以通过使用循环重试或者定时重试让tryLock失败的时候,重新循环的去重试。很重要的一点是:您需要设置一个超时时间,避免一直在重试 。
可以看到,这个逻辑还是稍微有点复杂的,一定要在释放锁之后,休眠片刻,以便让其他线程有机会重试获取当前线程刚刚释放的锁。
2.2、使用带有时间限制的tryLock重试获取锁
如果一个资源锁,多个线程都会尝试获取,如果每个线程占有锁的时间都不长,那么通过这种方式还是比较好的,可以避免线程阻塞导致的用户态内核态切换。这也就是自旋锁的特点。当然,如果每个线程都占用锁很长时间,那么这种方式就不太可取了。
如果使用的是synchronized关键字,其实在轻量级锁阶段就会有自旋等待的操作,并且synchronized实现了自适应的自旋等待,优化的还是比较好的。不同的点是:tryLock在自旋超时后就放弃了,而synchronized则会进一步升级为重量级锁,继续等待线程调度去尝试获取锁,这一点上是比较消耗性能的。
如果你的业务运行获取搜失败,或者执行失败,那么通过tryLock还是不错的。像前端发起的请求,一次执行失败了,那可以让前端重新发起,这种场景就比较适合tryLock了。
下面是使用的方法:
1 2 3 4 5 6 7 8 9 10 if (lock2.tryLock(3 , TimeUnit.SECONDS)) { try { System.out.println(Thread.currentThread().getName() + "获取到了lock2" ); } finally { lock2.unlock(); System.out.println(Thread.currentThread().getName() + "释放了lock2" ); } } else { System.out.println(Thread.currentThread().getName() + "获取lock2失败" ); }
2.3、可中断锁
如果在一个线程中使用lock.lockInterruptibly()
,只有该线程执行了interrupt()方法之后,lockInterruptibly
才起作用。
可以查看源码:
1 2 3 4 5 6 7 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
也就是说,会先去查看中断状态位,如果中断状态为为true,那么就不会尝试获取锁了,直接跑异常。这个中断状态为正是interrupt()方法设置的。
tryLock()底层代码也会判断中断状态,决定是否要继续尝试获取锁:
1 2 3 4 5 6 7 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
也就是说lock.lockInterruptibly
相当于不限超时时间的tryLock
。
要了解中断锁的使用,就必须知道Thread.interrupt()意味着什么。
Thread.interrupt()
该方法将会设置线程的中断状态位。
判断线程是否被中断,可以使用Thread.currentThread().isInterrupted()方法。我们可以在程序中检查这个状态位,来做一些逻辑处理。
判断是否被中断,如果中断则清楚中断位:Thread.currentThread().interrupted()。
如果一个线程处于阻塞状态:如调用了thread.sleep,thread.join,thread.wait,condition.await,以及可中断通道上的I/O操作方法后,**这些阻塞方法会定时去判断中断状态位,如果中断状态位为true,则会在调用处抛出InterruptedException异常,并且在抛出异常后立即清除中断位。**这些方法声明处都会抛出InterruptedException,表示这些方法是可中断的,会对interrupt调用做出响应,异常都是由可中断方法自己抛出,并不是由interrupt方法直接引起的。
synchronized在获取锁的过程中不能被中断,因为锁定的位置根本无法抛出异常。
下面一个例子是演示通过lock.lockInterruptibly
获取锁:
1号获取到锁的线程进行sleep阻塞;
后续2,3,4,5号线程在等待锁的过程中,被interrupt打断了,导致抛出InterruptedException异常;
6号线程在1号线程释放锁之后,成功获取到了所;
但是6号线程在休眠过程中很快又被打断然后释放锁了;
最终7号线程获取到锁,接着又被打断,释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package com.itzhai.concurrency;import java.util.concurrent.locks.ReentrantLock;public class AcquireLockRunnable implements Runnable { public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (); Thread firstThread = new Thread (new AcquireLockRunnable (lock, 1 ), "线程(1)" ); firstThread.start(); Thread.sleep(1000 ); Thread[] others = new Thread [6 ]; for (int i = 0 ; i < 6 ; i++) { others[i] = new Thread (new AcquireLockRunnable (lock, i + 2 ), "线程(" + (i + 2 ) + ")" ); others[i].start(); } print("开始给所有线程发送中断..." ); for (int i = 0 ; i < 6 ; i++) { Thread.sleep(500 * i / 2 ); print("Interrupt " + others[i].getName()); others[i].interrupt(); } } private int id; private ReentrantLock lock; private AcquireLockRunnable (ReentrantLock lock, int id) { this .lock = lock; this .id = id; } public void run () { print("开启线程 " + id + " 尝试获取锁..." ); try { lock.lockInterruptibly(); } catch (InterruptedException e) { print("获取锁失败, 原因: " + e); return ; } print("获取到锁(" + id + ")" ); try { try { if (id == 1 ) { Thread.sleep(3000 ); } else { Thread.sleep(2500 ); } } catch (InterruptedException e) { print("线程 " + id + " 休眠过程中被打断" ); } } finally { lock.unlock(); print("释放锁(" + id + ")" ); } } static void print (String p) { System.out.println(Thread.currentThread().getName() + ": " + p); } }
输出结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 线程(1): 开启线程 1 尝试获取锁... 线程(1): 获取到锁(1) 线程(2): 开启线程 2 尝试获取锁... 线程(3): 开启线程 3 尝试获取锁... 线程(4): 开启线程 4 尝试获取锁... 线程(5): 开启线程 5 尝试获取锁... 线程(6): 开启线程 6 尝试获取锁... main: 开始给所有线程发送中断... 线程(7): 开启线程 7 尝试获取锁... main: Interrupt 线程(2) 线程(2): 获取锁失败, 原因: java.lang.InterruptedException main: Interrupt 线程(3) 线程(3): 获取锁失败, 原因: java.lang.InterruptedException main: Interrupt 线程(4) 线程(4): 获取锁失败, 原因: java.lang.InterruptedException main: Interrupt 线程(5) 线程(5): 获取锁失败, 原因: java.lang.InterruptedException 线程(1): 释放锁(1) 线程(6): 获取到锁(6) main: Interrupt 线程(6) 线程(6): 线程 6 休眠过程中被打断 线程(6): 释放锁(6) 线程(7): 获取到锁(7) main: Interrupt 线程(7) 线程(7): 线程 7 休眠过程中被打断 线程(7): 释放锁(7) Process finished with exit code 0
2.4、条件变量
我们知道,synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。
类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的:
Condition.await()
方法相当于Object.wait();
Condition.await(long time, TimeUnit unit)
方法相当于Object.wait(long timeout);
Condition.signal()
方法相当于Object.notify()方法;
Condition.signalAll()
方法相当于Object.notifyAll()方法;
对比:
与synchronized代码块的wait方式相比,Lock中的condition.wait更加灵活;
condition.wait()可以将不同条件的线程放入不同的等待队列 ,而synchronized中所有线程会放入同一个等待队列 ,在大量线程唤醒的时候会造成资源浪费 ;
Lock对象同步只需要依赖Lock对象,而synchronized必须配合一个特定的对象使用。
下面是一个消费者生产者的例子,其中有一个共同的锁ReentrantLock
,具有两个条件:
队列未满的条件,当符合条件的时候,通知生产者可以生产消息了;
队列非空的条件,当符合条件的时候,通知消费者可以消费消息了。
以下是完整的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 package com.itzhai.concurrency;import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProducerConsumerSolutionUsingLock { public static void main (String[] args) { ProducerConsumerImpl sharedObject = new ProducerConsumerImpl (); Producer p = new Producer (sharedObject); Consumer c = new Consumer (sharedObject); c.start(); p.start(); while (Thread.activeCount() > 2 ) { Thread.yield (); } System.out.println("总共消费消息条数: " + sharedObject.getConsumeCount()); } } class ProducerConsumerImpl { private static final int CAPACITY = 10 ; private final Queue queue = new LinkedList <>(); private final Random theRandom = new Random (); private int consumeCount = 0 ; private final Lock lock = new ReentrantLock (); private final Condition queueNotFull = lock.newCondition(); private final Condition queueNotEmpty = lock.newCondition(); public void produce () throws InterruptedException { lock.lock(); try { while (queue.size() == CAPACITY) { System.out.println(Thread.currentThread().getName() + ", 生产者准备生产消息, 但是队列满了, 等待..." ); queueNotFull.await(); } int number = theRandom.nextInt(); boolean isAdded = queue.offer(number); if (isAdded) { System.out.printf("%s 添加 %d 到队列 %n" , Thread .currentThread().getName(), number); System.out.println(Thread.currentThread().getName() + ", 通知消费者队列中已经有消息了" ); queueNotEmpty.signalAll(); } } finally { lock.unlock(); } } public void consume () throws InterruptedException { lock.lock(); try { while (queue.size() == 0 ) { System.out.println(Thread.currentThread().getName() + ", 消费者准备消费消息, 但是队列空了, 等待..." ); queueNotEmpty.await(); } Object value = queue.poll(); if (value != null ) { consumeCount ++; System.out.printf("%s 从消息队列消费 %d %n" , Thread .currentThread().getName(), value); System.out.println(Thread.currentThread().getName() + ", 通知生产者现在消息队列有空间了, 可以来生产消息了" ); queueNotFull.signalAll(); } } finally { lock.unlock(); } } public int getConsumeCount () { return consumeCount; } } class Producer extends Thread { ProducerConsumerImpl pc; public Producer (ProducerConsumerImpl sharedObject) { super ("PRODUCER" ); this .pc = sharedObject; } @Override public void run () { try { for (int i = 0 ; i < 100 ; i++) { pc.produce(); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer extends Thread { ProducerConsumerImpl pc; public Consumer (ProducerConsumerImpl sharedObject) { super ("CONSUMER" ); this .pc = sharedObject; } @Override public void run () { try { for (int i = 0 ; i < 100 ; i++) { pc.consume(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
3、ReentrantLock与synchronized的异同
3.1、synchronized
优点
JDK进行了优化,会有锁升级的过程,在没有竞争场景下通过偏向锁或者轻量级锁提高性能。未来JVM会更多支持原生锁的优化;
Java语法内存,使用简单,无需手动获取锁和解锁。通过底层指令实现的管程自动达到同步的目的;
缺点
3.2、ReentrantLock
优点
支持公平锁:
支持多条件的锁:ReentrantLock能够将wait/notify/notifyAll对象化;
能够获取到锁定状态:
实现了可中断的锁;
支持带超时的获取锁尝试,避免了死锁;
缺点
需要手动锁定解锁,使用比较繁琐,容易出错;
随着JDK对synchronized锁的不断优化,ReentrantLock性能未必会比synchronized锁高;
3.3、如何选择
这两个锁如何选择呢?
我们可以看到,ReentrantLock更加灵活,提供给了包括定时的锁等待
,可中断的锁等待
,公平性
,以及实现非块结构的加锁
。
但是ReentrantLock的危险性比synchronized要高,内置锁会随着JDK版本不断做优化。
仅当内置锁不能满足需求的时候,才可以考虑使用ReentrantLock。
References
Thread的中断机制(interrupt)
Java ReentrantLock Interruption Example
Java Lock and Condition Example using Producer Consumer Solution