导读
这篇文章我们来Java中的读写锁。阅读完本篇文章,你将了解到:
1、读写锁的使用场景和优缺点
2、读写锁的实现原理
3、如何使用读写锁
1、Java中的读写锁
有这样一种场景:
如果对一个共享资源的写操作没有读操作那么频繁,这个时候可以允许多个线程同时读取共享资源;
但是如果有一个线程想去写这些共享资源,那么其他线程此刻就不应该对这些资源进行读和写操作了。
Java中的ReentrantReadWriteLock
正是为这种场景提供的锁。该类里面包括了读锁和写锁。
1.1、可获取读锁的情况
没有其他线程正在持有写锁;
尝试获取读锁的线程同时持有写锁。
1.2、可获取写锁的情况
没有其他线程正在持有读锁;
没有其他线程正在持有写锁。
1.3、读写锁特点
**允许并发读:**只要没有线程正在更新数据,那么多个线程就可以同时读取数据;
**只能独占写:**只要有一个线程正在写数据,那么就会导致其他线程的读或者写均被阻塞;但写的线程可以获取读锁,并通过释放写锁,让锁降级为读锁;(不能由读锁升级为写锁)
只要有一个线程正在读数据,那么其他线程的写入就会阻塞,直到读锁被释放;
公平性:支持非公平锁和公平锁,非公平锁吞吐量较高;
可重入:无论是读锁还是写锁都是支持可重入的。
读写锁可以增加更新不频繁而读取频繁的共享数据结构的吞吐量。
2、实现原理
ReentrantReadWriteLock是可重入读写锁的实现。我们先来看看涉及到的类:
我们可以看到,ReentrantReadWriteLock中也具有非公平锁NonfairSync 和公平锁FairSync 的实现。同时ReentrantReadWriteLock组合了两把锁:写锁WriteLock 和读锁ReadLock 。
我们来看看具体的构造函数:
1 2 3 4 5 public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); }
可以发现,通过参数fair控制是创建非公平锁还是公平锁。同时ReentrantReadWriteLock持有了写锁和读锁。
而本质上,读锁和写锁都是通过持有ReentrantReadWriteLock.sync来进行加锁和释放锁的,用的是同一个AQS,Sync类提供类对ReentrantReadWriteLock的支持 :
1 2 3 protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; }
1 2 3 protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; }
基于对AQS原理的理解,我们知道sync是读写锁实现的关键,而aqs中核心是state字段和双端等待队列。下面我们来看看具体的实现。
2.1、看代码之前您必须了解的内容
在查看ReentrantReadWriteLock之前,您需要了解以下内容:
2.1.1、Sync.HoldCounter类
读锁计数器类,为每个获取读锁的线程进行计数。Sync类中有一个cachedHoldCounter字段,该字段主要是缓存上一个线程的读锁计数器,节省ThreadLocal查找次数。
1 2 3 4 5 6 7 static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); }
2.1.2、Sync.ThreadLocalHoldCounter类
当前线程持有的可重入读锁的数量,当数量下降到0的时候进行删除。
1 2 3 4 5 6 static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } }
2.1.3、读写锁中AQS的state状态设计
AQS中的state为了能够同时记录读锁和写锁的状态,把32位变量分为了两部分:
如上图,高16位存储读状态,读锁是共享锁,这里记录持有读锁的线程数;低16位是写状态,写锁是排他锁,这里0表示没有线程持有,大于0表示持有线程对锁的重入次数。
2.1.4、关于读写锁的数据结构
虽然读写锁看起来有两把锁,但是底层用的都是同一个state,同一个等待队列。只不过是通过ReadLock和WriteLock分别提供了读锁和写锁的API,底层还是用同一个AQS。如下图:
由于读写锁是互斥的 ,所以线程1获取写锁,线程2获取读锁,并发执行的时候,一定有一个会失败;
如果是已经获取了读锁的线程尝试获取写锁,则会获取成功;
公平模式下 ,先进入等待队列的线程先被处理;非公平模式下 ,如果尝试获取写锁的线程节点在头节点后面,尝试获取读锁的线程要让步,进入等待队列;
线程节点获取到读锁之后,会判断下一个节点是否处于共享模式,如果是则会一直传播并唤醒 后续共享模式节点;
如果有其他线程获取了写锁,那么获取写锁就会被阻塞。
公平和非公平是针对等待队列中的线程节点的处理来说的:
公平模式一般都是从队列头开始处理 ,并且如果等待队列还有待处理节点,新的线程全部都入等待队列 ;
非公平模式 一般不管等待队列里面有没有待处理节点,都会先尝试竞争获取锁 ;特殊情况:如果等待队列中有写锁线程,那么新来的读锁线程必须排队让写锁线程先进行处理。
其实关于读写锁的原理就差不多是这么多了。
以下是详细的代码分析,可能会比较枯燥,为了避免让大家一头陷入源码中,于是在上面先把源码做的事情都给讲出来了。建议感兴趣的同学打开电脑跟踪源码一起来阅读。
1.1、ReadLock实现原理
1.1.1、lock
查看ReadLock的lock相关方法,调用的是AQS的acquireShared方法,该方法会以共享模式获取锁:
1 2 3 4 5 6 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
下面看看关键获取锁的tryAcquireShared方法,该方法主要处理逻辑:
因为读写是互斥的,如果另一个线程持有写锁,则失败;
否则,此线程具备锁定write状态的条件,因此判断是否应该进入阻塞。 如果不是,请尝试CAS获取读锁许可并更新读锁计数。 请注意,该步骤不检查重入,这将推迟到最后fullTryAcquireShared方法;
如果第2步失败,或者由于线程不符合锁定条件或者CAS失败或读锁计数饱和,将会使用fullTryAcquireShared进一步重试。
下面是详细的说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
让我们接着看fullTryAcquireShared方法,这个方法可知,只有其他线程持有写锁,或者使用的是公平锁并且头节点后面还有其他等待的线程,或者头节点后面的节点不是共享模式,或者读锁计数器达到了上限,则阻塞,否则一直会循环尝试获取锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
最后我们来看看doAcquireShared方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
1.1.2、unlock
接下来我们看看释放锁的代码。
1 2 3 public void unlock () { sync.releaseShared(1 ); }
AbstractQueuedSynchronizer.releaseShared()
1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
主要处理方法是tryReleaseShared,该方法主要是清理ThreadLocal中的锁计数器,然后CAS修改读锁个数减1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
1.1、WriteLock实现原理
1.1.1、lock
查看WriteLock的lock锁相关方法,调用的是sync.acquire方法,该方法直接继承了ASQ的acquire()方法的实现:
1 2 3 public void lock () { sync.acquire(1 ); }
与ReentrantLock的实现区别在具体的tryAcquire()方法的实现,我们来看看ReentrantReadWriteLock.Sync中该方法的实现,主要做了以下事情:
如果读锁数量>0,或者写锁数量>0,并且不是重入的,那么直接失败了;
如果锁数量为0,那么该线程有资格获取到写锁,进而尝试获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
1.1.2、unlock
查看WriteLock的unlock相关方法,调用的是sync.release方法,该方法直接继承了AQS的release实现:
1 2 3 public void unlock () { sync.release(1 ); }
以下是release方法:
1 2 3 4 5 6 7 8 9 10 11 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
释放锁的逻辑主要在tryRelease方法,下面是详细代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
3、读写锁使用例子
下面是读写锁的使用例子,该例子实现了一个支持并发访问的ArrayList。
因为读写锁是互斥的,保证了不会因为写导致读取出现的不一致。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public class ReentrantReadWriteLockTest { static final int READER_SIZE = 10 ; static final int WRITER_SIZE = 2 ; public static void main (String[] args) { Integer[] initialElements = {33 , 28 , 86 , 99 }; ReadWriteList<Integer> sharedList = new ReadWriteList <>(initialElements); for (int i = 0 ; i < WRITER_SIZE; i++) { new Writer (sharedList).start(); } for (int i = 0 ; i < READER_SIZE; i++) { new Reader (sharedList).start(); } } } class Reader extends Thread { private ReadWriteList<Integer> sharedList; public Reader (ReadWriteList<Integer> sharedList) { this .sharedList = sharedList; } public void run () { Random random = new Random (); int index = random.nextInt(sharedList.size()); Integer number = sharedList.get(index); System.out.println(getName() + " -> get: " + number); try { Thread.sleep(100 ); } catch (InterruptedException ie ) { ie.printStackTrace(); } } } class Writer extends Thread { private ReadWriteList<Integer> sharedList; public Writer (ReadWriteList<Integer> sharedList) { this .sharedList = sharedList; } public void run () { Random random = new Random (); int number = random.nextInt(100 ); sharedList.add(number); try { Thread.sleep(100 ); System.out.println(getName() + " -> put: " + number); } catch (InterruptedException ie ) { ie.printStackTrace(); } } } class ReadWriteList <E> { private List<E> list = new ArrayList <>(); private ReadWriteLock rwLock = new ReentrantReadWriteLock (); public ReadWriteList (E... initialElements) { list.addAll(Arrays.asList(initialElements)); } public void add (E element) { Lock writeLock = rwLock.writeLock(); writeLock.lock(); try { list.add(element); } finally { writeLock.unlock(); } } public E get (int index) { Lock readLock = rwLock.readLock(); readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public int size () { Lock readLock = rwLock.readLock(); readLock.lock(); try { return list.size(); } finally { readLock.unlock(); } } }
References
Java ReadWriteLock and ReentrantReadWriteLock Example
ReentrantReadWriteLock读写锁详解