CountDownLatch、CyclicBarrier、Semaphore

`

CountDownLatch

CountDownLatch是一个辅助器,允许一个或多个线程一直等待,直到一组线程全部准备就绪后同步执行,并互相等待其他线程的操作执行完成。

public CountDownLatch(int count) {  
    if (count < 0) throw new IllegalArgumentException("count < 0");  
    this.sync = new Sync(count);  
}

构造方法要求需要传入一个count值,用来计数,表示有多少组线程会同时执行。

CountDownLatch有两个常用的方法:

public void await() throws InterruptedException {  
    sync.acquireSharedInterruptibly(1);  
}

public void countDown() {  
    sync.releaseShared(1);  
}

当一个线程调用await方法时,该线程会被阻塞。每当有线程调用一次countDown方法,计数器就会被减一,当count的值被减为0的时候,被阻塞的线程才会继续执行。

源码

private static final class Sync extends AbstractQueuedSynchronizer {  
    private static final long serialVersionUID = 4982264981922014374L;  
  
    Sync(int count) {  
        setState(count);  
    }  
  
    int getCount() {  
        return getState();  
    }  
  
    protected int tryAcquireShared(int acquires) {  
        return (getState() == 0) ? 1 : -1;  
    }  
  
    protected boolean tryReleaseShared(int releases) {  
        // Decrement count; signal when transition to zero  
        for (;;) {  
            int c = getState();  
            if (c == 0)  
                return false;  
            int nextc = c - 1;  
            if (compareAndSetState(c, nextc))  
                return nextc == 0;  
        }  
    }  
}

实践

public class CountDownTest {  
    static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    public static void main(String[] args) throws InterruptedException {  
        CountDownLatch latch = new CountDownLatch(2);  
        Worker w1 = new Worker("张三", 2000, latch);  
        Worker w2 = new Worker("李四", 3000, latch);  
        w1.start();  
        w2.start();  
  
        long startTime = System.currentTimeMillis();  
        latch.await();  
        System.out.println("bug全部解决,领导可以给客户交差了,任务总耗时:"+ (System.currentTimeMillis() - startTime));  
  
    }  
  
    static class Worker extends Thread{  
        String name;  
        int workTime;  
        CountDownLatch latch;  
  
        public Worker(String name, int workTime, CountDownLatch latch) {  
            this.name = name;  
            this.workTime = workTime;  
            this.latch = latch;  
        }  
  
        @Override  
        public void run() {  
            System.out.println(name+"开始修复bug,当前时间:"+sdf.format(new Date()));  
            doWork();  
            System.out.println(name+"结束修复bug,当前时间:"+sdf.format(new Date()));  
            latch.countDown();  
        }  
  
        private void doWork() {  
            try {  
                //模拟工作耗时  
                Thread.sleep(workTime);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

CyclicBarrier

CyclicBarrier循环栅栏,一组线程会互相等待,直到所有线程都到达一个同步点。这个就非常有意思了,就像一群人被困到了一个栅栏前面,只有等最后一个人到达之后,他们才可以合力把栅栏(屏障)突破。

构造方法

public CyclicBarrier(int parties) {  
    this(parties, null);  
}

public CyclicBarrier(int parties, Runnable barrierAction) {  
    if (parties <= 0) throw new IllegalArgumentException();  
    this.parties = parties;  
    this.count = parties;  
    this.barrierCommand = barrierAction;  
}

第一个构造的参数,指的是需要几个线程一起到达,才可以使所有线程取消等待。第二个构造,额外指定了一个参数,用于在所有线程达到屏障时,优先执行 barrierAction。

实践

class BarrierTest {  
    public static void main(String[] args) {  
        CyclicBarrier barrier = new CyclicBarrier(2,() -> {  
            try {  
                System.out.println("等待裁判吹口哨");  
                Thread.sleep(1000);  
                System.out.println("裁判吹口哨");  
            }catch (Exception e){  
            }  
  
        });  //①  
        Runner runner1 = new Runner(barrier, "张三");  
        Runner runner2 = new Runner(barrier, "李四");  
        Runner runner3 = new Runner(barrier, "王五");  
        Runner runner4 = new Runner(barrier, "孙六");  
  
        ExecutorService service = Executors.newFixedThreadPool(2);  
        service.execute(runner1);  
        service.execute(runner2);  
        service.execute(runner3);  
        service.execute(runner4);  
  
        service.shutdown();  
  
    }  
  
}  
  
  
class Runner implements Runnable{  
  
    private CyclicBarrier barrier;  
    private String name;  
  
    public Runner(CyclicBarrier barrier, String name) {  
        this.barrier = barrier;  
        this.name = name;  
    }  
  
    @Override  
    public void run() {  
        try {  
            //模拟准备耗时  
            Thread.sleep(new Random().nextInt(5000));  
            System.out.println(name + ":准备OK");  
            barrier.await();  
            System.out.println(name +": 开跑");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (BrokenBarrierException e){  
            e.printStackTrace();  
        }  
    }  
}

输出结果

李四:准备OK
张三:准备OK
等待裁判吹口哨
裁判吹口哨
张三: 开跑
李四: 开跑
王五:准备OK
孙六:准备OK
等待裁判吹口哨
裁判吹口哨
孙六: 开跑
王五: 开跑

Semaphore

Semaphore 信号量,用来控制同一时间,资源可被访问的线程数量,一般可用于流量的控制。

打个比方,现在有一段公路交通比较拥堵,那怎么办呢。此时,就需要警察叔叔出面,限制车的流量。

比如,现在有 20 辆车要通过这个地段, 警察叔叔规定同一时间,最多只能通过 5 辆车,其他车辆只能等待。只有拿到许可的车辆可通过,等车辆通过之后,再归还许可,然后把它发给等待的车辆,获得许可的车辆再通行,依次类推。

class SemaphoreTest {  
    private static int count = 20;  
  
    public static void main(String[] args) {  
  
        ExecutorService executorService = Executors.newFixedThreadPool(count);  
  
        //指定最多只能有五个线程同时执行  
        Semaphore semaphore = new Semaphore(5);  
  
        Random random = new Random();  
        for (int i = 0; i < count; i++) {  
            final int no = i;  
            executorService.execute(new Runnable() {  
                @Override  
                public void run() {  
                    try {  
                        //获得许可  
                        semaphore.acquire();  
                        System.out.println(no +":号车可通行");  
                        //模拟车辆通行耗时  
                        Thread.sleep(random.nextInt(2000));  
                        //释放许可  
                        semaphore.release();  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
  
        executorService.shutdown();  
  
    }  
}

这样每次就仅允许5辆车同时通行,但是此时存在一个问题就是只有5个通行证,那这5个通行证需要发给谁呢?
所以此时就需要用到锁了,既然大家都想要拿到通行证,那么就所有人同时去抢,谁能拿到就谁先走。
Semaphore提供了两个构造方法,其中有一个boolean的参数,可以控制锁是否公平,具体原理基于AQS实现。

/**默认采用非公平锁**/
public Semaphore(int permits) {  
    sync = new NonfairSync(permits);  
}

public Semaphore(int permits, boolean fair) {  
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);  
}

公平锁就是按照先来后到的顺序,谁先来的谁先走,非公平锁就是有一个后来的车,但是是个救护车拥有优先通过权,所以不需要排队可以直接通过。