去年问我怎么学Java的那个五年级小学生又来向我问问题了,说Java中线程同步有没有好用的工具类。幸亏没有问我什么算法,瑟瑟发抖。这个我倒是还挺在行的,了解到他喜欢熊出没,于是我就用熊作为主角,给他分享了几个Java并发框架中的辅助工具类。(呼~松了一口气)
看文本片文章,你将了解到:
- CountDownLatch的工作原理和使用场景;
- CyclicBarrier的工作原理和使用场景;
- Semaphore的工作原理和使用场景。
1、闭锁 CountDownLatch
一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。
**使用场景:**在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()
和await()
,前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。
方法详细介绍:CountDownLatch的介绍和使用
为了方便理解,这不,我又发挥了以下我的动画绘制功底,写了一个动图:
如上图,左边三只小熊,可以当成三个线程,每一只撞到栏杆,计数器就减1,这相当于执行了countDown方法;
右边有两只暴走小熊在等待计数器变为0,可以当成两个线程,执行了await方法;
最终左边三只暴走小熊抵达了栏杆处,计数器变为0,唤醒了右边的暴走小熊,暴走小熊就开始动起来了。
1.1、执行原理
CountDownLatch是基于AQS共享模式的使用。
如下图,我们通过给CountDownLatch构造函数传入state的值。
countDown方法本质是释放共享锁,核心实现逻辑是:state>0 && state-1,如果state>0,则state减一,否则执行失败;
await方法本质是获取共享锁,核心实现是:getState()0,如果state0,则表示获取成功,否则线程阻塞进入等待队列;
当state减到0的时候,会唤醒等待队列中的所有线程,尝试继续获取共享锁,这个时候正常是所有线程都能获取成功的。
1.2、使用案例
三个线程共同拉取一块数据,每个线程拉取数据块的一部分,等到所有线程的数据都拉取过来之后,另一个处理线程再开始这个数据块。
下载线程:
1 | class Downloader implements Runnable{ |
数据处理线程:
1 | class DataProcessor implements Runnable { |
运行代码:
1 | ExecutorService executor = Executors.newFixedThreadPool(4); |
执行结果:
1 | 等待下载完数据... |
1.3、其他说明
类似的,我们也可以使用Thread.join
方法实现控制线程执行顺序,但是没有那么灵活。如果我们把任务都丢到线程池里面多线程执行,那么就不能手动的在一个线程里面调用另一个线程的join方法了。
2、栅栏 CyclicBarrier
屏障,或成为栅栏,我们在之前 一文带你彻底理解同步和锁的本质(干货) 一文有讨论到过,主要是达到这种目的:所有线程都准备就绪,就着手下一阶段的工作,否则不能进入下一阶段。
我们还是让上面的小熊来演示一下。
上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。
2.1、执行原理
CyclicBarrier是基于ReentrantLock的Condition来实现的。
如下图,栅栏中有两个关键属性:
- parties:栅栏计数器初始值
- count:栅栏计数器
其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:
- 获取ReentrantLock锁;
- count减1,如果此时count为0,那么唤醒等待队列中所有线程,并结束这一轮处理,重置屏障,否则进入下一步;
- 执行condition.await方法,把当前线程丢到条件队列;
- 当count减少到0的时候,执行condition.signalAll方法把条件队列中的所有线程节点都移动到等待队列;
- 最后唤醒同步队列中的线程节点,线程从condition.await阻塞处醒来继续执行:获取ReentrantLock锁,用当前线程节点替换旧的头节点,最终放ReentrantLock锁,继续让线程往下执行(每个线程依次获取、锁释放锁)。如下图:
await()
能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit)
,以及reset()
方法重新开启下一轮,具体大家可以看源码的实现。
2.2、使用案例
下面的案例模拟了赛跑,只有当所有运动员都在起跑线上准备好了,才允许他们开跑:
1 | public class CyclicBarrierTest { |
2.3、其他说明
CountDownLatch是若干个线程等待另外n个线程完成某件事之后才能执行;而CyclicBarrier是若干个线程互相等待,只有等到所有线程都执行了await只会,这若干个线程才可以继续往下执行。
3、信号量 Semaphore
信号量通过一组许可证来控制对共享资源的访问。
如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程优惠醒来尝试获取许可。
Semaphore提供给了若干个api对应不同的功能:
Semaphore(int permits)
:非公平模式创建;Semaphore(int permits, boolean fair)
:可以指定是否公平模式创建;acquire()
:尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;acquire(int permits)
:跟上一个方法类型,尝试获取permits个许可;acquireUninterruptibly()
:尝试获取一个许可,不可中断;acquireUninterruptibly(int permits)
:尝试获取permits个许可,不可中断;tryAcquire()
:尝试获取一个许可,获取不到则直接返回失败;tryAcquire(int permits)
:尝试获取permits个许可,获取不到则直接返回失败;tryAcquire(int permits, long timeout, TimeUnit unit)
:尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;tryAcquire(long timeout, TimeUnit unit)
:尝试在timeout时间内获取1个许可,超时则返回false,可被中断;release()
:释放一个许可;release(int permits)
:释放n个许可;
下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly()
:
这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。
3.1、执行原理
Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:
- 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
- 执行release的时候,state + releases,把许可加回去。
3.2、使用案例
下面演示了使用semaphore实现限流的机制,模拟20个客户端线程尝试执行业务逻辑,同一时刻最多只有5个线程能够并发的执行。
1 | // 线程池 |
注意,这里使用的是
tryAcquire
失败之后直接返回,线程不会进入AQS等待队列。