并发编程

并发编程原理和应用
帅旋
关注
充电
IT宅站长,技术博主,共享单车手,全网id:arthinking。

图解几个好玩的并发辅助工具类

发布于 2020-03-27 | 更新于 2024-02-22

去年问我怎么学Java的那个五年级小学生又来向我问问题了,说Java中线程同步有没有好用的工具类。幸亏没有问我什么算法,瑟瑟发抖。这个我倒是还挺在行的,了解到他喜欢熊出没,于是我就用熊作为主角,给他分享了几个Java并发框架中的辅助工具类。(呼~松了一口气)

image-20200327222907794

看文本片文章,你将了解到:

  1. CountDownLatch的工作原理和使用场景;
  2. CyclicBarrier的工作原理和使用场景;
  3. Semaphore的工作原理和使用场景。

1、闭锁 CountDownLatch

一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。

**使用场景:**在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()await(),前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。

方法详细介绍:CountDownLatch的介绍和使用

为了方便理解,这不,我又发挥了以下我的动画绘制功底,写了一个动图:

countdownlatch

如上图,左边三只小熊,可以当成三个线程,每一只撞到栏杆,计数器就减1,这相当于执行了countDown方法;

右边有两只暴走小熊在等待计数器变为0,可以当成两个线程,执行了await方法;

最终左边三只暴走小熊抵达了栏杆处,计数器变为0,唤醒了右边的暴走小熊,暴走小熊就开始动起来了。

1.1、执行原理

CountDownLatch是基于AQS共享模式的使用。

如下图,我们通过给CountDownLatch构造函数传入state的值。

countDown方法本质是释放共享锁,核心实现逻辑是:state>0 && state-1,如果state>0,则state减一,否则执行失败;

await方法本质是获取共享锁,核心实现是:getState()0,如果state0,则表示获取成功,否则线程阻塞进入等待队列;

image-20200317233628586

当state减到0的时候,会唤醒等待队列中的所有线程,尝试继续获取共享锁,这个时候正常是所有线程都能获取成功的。

1.2、使用案例

三个线程共同拉取一块数据,每个线程拉取数据块的一部分,等到所有线程的数据都拉取过来之后,另一个处理线程再开始这个数据块。

下载线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Downloader implements Runnable{

private CountDownLatch latch;

private String downloaderName;

/**
* 构造函数
* @param downLatch 注意, 所有需要协作的线程需要使用同一个闭锁
* @param downloaderName
*/
public Downloader(CountDownLatch downLatch, String downloaderName){
this.latch = downLatch;
this.downloaderName = downloaderName;
}

public void run() {
this.download();

try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
}
System.out.println(this.downloaderName + "下载完成...");
this.latch.countDown();
}

private void download(){
System.out.println(this.downloaderName + "正在下载文件...");
}

}

数据处理线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DataProcessor implements Runnable {

private CountDownLatch latch;

/**
* 构造函数
* @param latch 注意, 所有需要协作的线程需要使用同一个闭锁
*/
public DataProcessor(CountDownLatch latch){
this.latch = latch;
}

public void run() {
System.out.println("等待下载完数据...");
try {
this.latch.await();
} catch (InterruptedException e) {
System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
}
System.out.println("数据下载完成, 开始处理数据...");
}

}

运行代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executor = Executors.newFixedThreadPool(4);

CountDownLatch latch = new CountDownLatch(3);

Downloader d1 = new Downloader(latch, "下载线程1");
Downloader d2 = new Downloader(latch, "下载线程2");
Downloader d3 = new Downloader(latch, "下载线程3");

DataProcessor processor = new DataProcessor(latch);
executor.execute(d1);
executor.execute(d2);
executor.execute(d3);
executor.execute(processor);
executor.shutdown();

执行结果:

1
2
3
4
5
6
7
8
等待下载完数据...
下载线程1正在下载文件...
下载线程2正在下载文件...
下载线程3正在下载文件...
下载线程3下载完成...
下载线程2下载完成...
下载线程1下载完成...
数据下载完成, 开始处理数据...

1.3、其他说明

类似的,我们也可以使用Thread.join方法实现控制线程执行顺序,但是没有那么灵活。如果我们把任务都丢到线程池里面多线程执行,那么就不能手动的在一个线程里面调用另一个线程的join方法了。

2、栅栏 CyclicBarrier

屏障,或成为栅栏,我们在之前 一文带你彻底理解同步和锁的本质(干货) 一文有讨论到过,主要是达到这种目的:所有线程都准备就绪,就着手下一阶段的工作,否则不能进入下一阶段。

我们还是让上面的小熊来演示一下。

cyclicbarrier

上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。

2.1、执行原理

CyclicBarrier是基于ReentrantLock的Condition来实现的。

如下图,栅栏中有两个关键属性:

  • parties:栅栏计数器初始值
  • count:栅栏计数器

其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:

  • 获取ReentrantLock锁;
  • count减1,如果此时count为0,那么唤醒等待队列中所有线程,并结束这一轮处理,重置屏障,否则进入下一步;
  • 执行condition.await方法,把当前线程丢到条件队列;
  • 当count减少到0的时候,执行condition.signalAll方法把条件队列中的所有线程节点都移动到等待队列;
  • 最后唤醒同步队列中的线程节点,线程从condition.await阻塞处醒来继续执行:获取ReentrantLock锁,用当前线程节点替换旧的头节点,最终放ReentrantLock锁,继续让线程往下执行(每个线程依次获取、锁释放锁)。如下图:

image-20200319235023688

await()能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit),以及reset()方法重新开启下一轮,具体大家可以看源码的实现。

2.2、使用案例

下面的案例模拟了赛跑,只有当所有运动员都在起跑线上准备好了,才允许他们开跑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CyclicBarrierTest {

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Runner(barrier, "1号选手"));
executor.submit(new Runner(barrier, "2号选手"));
executor.submit(new Runner(barrier, "3号选手"));
barrier.reset();
executor.submit(new Runner(barrier, "4号选手"));
executor.submit(new Runner(barrier, "5号选手"));
executor.submit(new Runner(barrier, "6号选手"));
executor.shutdown();
}
}

class Runner implements Runnable {

/**
* 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
*/
private CyclicBarrier barrier;

private String name;

public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}

@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 准备好了...");
// barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " 起跑!");
for (int i = 0; i < 10; i++) {
System.out.println(this.name + "正在跑步" + i);
}
}
}

2.3、其他说明

CountDownLatch是若干个线程等待另外n个线程完成某件事之后才能执行;而CyclicBarrier是若干个线程互相等待,只有等到所有线程都执行了await只会,这若干个线程才可以继续往下执行

3、信号量 Semaphore

信号量通过一组许可证来控制对共享资源的访问。

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程优惠醒来尝试获取许可。

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;
  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
  • acquireUninterruptibly():尝试获取一个许可,不可中断;
  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
  • release():释放一个许可;
  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly()

semaphore

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

3.1、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
  • 执行release的时候,state + releases,把许可加回去。

image-20200321181406717

3.2、使用案例

下面演示了使用semaphore实现限流的机制,模拟20个客户端线程尝试执行业务逻辑,同一时刻最多只有5个线程能够并发的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = () -> {
try {
// 获取许可
if(semp.tryAcquire()) {
System.out.println("线程获得许可: " + NO);
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
} else {
System.out.println("达到并发上限,请求失败,请稍后再试");
}

} catch (InterruptedException e) {
System.out.println("执行异常");
}
};
exec.execute(run);
}
// 退出线程池
exec.shutdown();

注意,这里使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/cpj/concurrent-helper-classes.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。