【Java 并发】同步器

发布时间 2024-01-06 23:26:53作者: LARRY1024

同步器

JUC 包下的工具类除了 locks,还包含其他的工具类,如 Semaphore、CountDownLatch、CyclicBarrier、Exchanger、Phaser 等,这些类为线程直接的共用集结点模式(common rendezvous patterns)提供了预置功能。

功能 说明
Semaphore 允许线程集等待直到被运行运行为止 限制访问资源的线程总数。

Semaphore

Semaphore 是一个计数信号量,它的作用是限制可以访问某些资源(物理或逻辑的)的线程数目。Semaphore 的构造方法可以指定信号量的数目,也可以指定是否是公平的。

Semaphore 有两个主要的方法:acquire() 和 release()

  • acquire() 方法会尝试获取一个信号量,如果获取不到,就会阻塞当前线程,直到有线程释放信号量。

  • release() 方法会释放一个信号量,释放之后,会唤醒一个等待的线程。

Semaphore 还有一个 tryAcquire() 方法,它会尝试获取一个信号量,如果获取不到,就会返回 false,不会阻塞当前线程。

Semaphore 用来控制同时访问某个特定资源的操作数量,它并不保证线程安全,所以要保证线程安全,还需要加上同步锁。

原理

Semaphore 内部有一个继承了 AQS 的同步器 Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。

如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入 AQS 的等待队列。

示例

【示例】

import java.util.concurrent.Semaphore;

public class ResourcePool {
    private final Semaphore semaphore;

    public ResourcePool(int limit) {
        this.semaphore = new Semaphore(limit);
    }

    public void useResource() {
        try {
            semaphore.acquire(); // 尝试获取一个信号量
            System.out.println(Thread.currentThread().getName() + ": get the resource, remained resource:  " +
                    semaphore.availablePermits() + ", wait thread: " + semaphore.getQueueLength());
            Thread.sleep(1000); // 模拟资源使用时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            System.out.println( Thread.currentThread().getName() + ": release the resource");
        }
    }

    public static void main(String[] args) {
        ResourcePool pool = new ResourcePool(3); // 限制 3 个线程同时访问资源
        for (int i = 0; i < 10; i++) {
            new Thread(pool::useResource).start();
        }
    }
}

示例输出:

Thread-0: get the resource, remained resource:  1, wait thread: 0
Thread-2: get the resource, remained resource:  0, wait thread: 0
Thread-1: get the resource, remained resource:  1, wait thread: 0
Thread-1: release the resource
Thread-3: get the resource, remained resource:  2, wait thread: 6
Thread-0: release the resource
Thread-2: release the resource
Thread-5: get the resource, remained resource:  0, wait thread: 4
Thread-4: get the resource, remained resource:  1, wait thread: 5
Thread-4: release the resource
Thread-6: get the resource, remained resource:  2, wait thread: 3
Thread-7: get the resource, remained resource:  1, wait thread: 2
Thread-8: get the resource, remained resource:  0, wait thread: 1
Thread-3: release the resource
Thread-5: release the resource
Thread-8: release the resource
Thread-9: get the resource, remained resource:  2, wait thread: 0
Thread-7: release the resource
Thread-6: release the resource
Thread-9: release the resource

可以看到,在这次运行中,最开始是 0、2、1 这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。

Exchanger

Exchanger 类用于两个线程交换数据。它支持泛型,也就是说,我们可以在两个线程之间传送任何数据。

它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

Exchanger 可以用于遗传算法、校对工作和数据同步等场景。

【示例】

public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                String data1 = "data1";
                System.out.println(Thread.currentThread().getName() + " 正在把 " + data1 + " 交换出去");
                Thread.sleep(1000); // 模拟线程处理耗时
                String data2 = exchanger.exchange(data1);
                System.out.println(Thread.currentThread().getName() + " 交换到了 " + data2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Thread 1").start();

        new Thread(() -> {
            try {
                String data1 = "data2";
                System.out.println(Thread.currentThread().getName() + " 正在把 " + data1 + " 交换出去");
                Thread.sleep(2000); // 模拟线程处理耗时
                String data2 = exchanger.exchange(data1);
                System.out.println(Thread.currentThread().getName() + " 交换到了 " + data2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Thread 2").start();
    }
}

示例输出:

Thread 1 正在把 data1 交换出去
Thread 2 正在把 data2 交换出去
Thread 2 交换到了 data1
Thread 1 交换到了 data2

可以看到,当一个线程调用 exchange 方法后,会处于阻塞状态,只有当另一个线程也调用了 exchange 方法,它才会继续执行。

总结

因为 Exchanger 支持泛型,所以我们可以传输任何的数据,比如 IO 流或者 IO 缓存。根据 JDK 里面注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;

  • 用于成对出现的线程之间交换数据;

  • 可以视作双向的同步队列;

  • 可应用于基因算法、流水线设计等场景。

CountDownLatch

CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。

CountDownLatch 有一个计数器,可以通过countDown()方法对计数器的数目进行减一操作,也可以通过await()方法来阻塞当前线程,直到计数器的值为 0。

CountDownLatch 一般用来控制线程等待,它可以让某个线程一直等待直到倒计时结束,再开始执行。

来看一个 CountDownLatch 的使用示例:

【示例】

public class InitializationDemo {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个倒计数为 3 的 CountDownLatch
        CountDownLatch latch = new CountDownLatch(3);

        Thread service1 = new Thread(new Service("服务 1", 2000, latch));
        Thread service2 = new Thread(new Service("服务 2", 3000, latch));
        Thread service3 = new Thread(new Service("服务 3", 4000, latch));

        service1.start();
        service2.start();
        service3.start();

        // 等待所有服务初始化完成
        latch.await();
        System.out.println("所有服务都准备好了");
    }

    static class Service implements Runnable {
        private final String name;
        private final int timeToStart;
        private final CountDownLatch latch;

        public Service(String name, int timeToStart, CountDownLatch latch) {
            this.name = name;
            this.timeToStart = timeToStart;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(timeToStart);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " 准备好了");
            latch.countDown(); // 减少倒计数
        }
    }
}

示例输出:

服务 1 准备好了
服务 2 准备好了
服务 3 准备好了
所有服务都准备好了

CyclicBarrier

CyclicBarrier 是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。

原理

如果参与者(线程)在等待的过程中,Barrier 被破坏,就会抛出 BrokenBarrierException。可以用isBroken()方法检测 Barrier 是否被破坏。

  • 如果有线程已经处于等待状态,调用 reset 方法会导致已经在等待的线程出现 BrokenBarrierException 异常。并且由于出现了 BrokenBarrierException,将会导致始终无法等待。

  • 如果在等待的过程中,线程被中断,会抛出 InterruptedException 异常,并且这个异常会传播到其他所有的线程。

  • 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出 BrokenBarrierException,屏障被损坏。

  • 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出 BrokenBarrierException 异常。

应用场景

  • CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。

    比如,我们用一个 Excel 保存了用户所有银行流水,每个 sheet 保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

  • CyclicBarrier 的计数器可以通过 reset()方法重置,所以它能处理循环使用的场景。

    比如,我们将一个大任务分成 10 个小任务,用 10 个线程分别执行这 10 个小任务,当 10 个小任务都执行完之后,再合并这 10 个小任务的结果,这个时候就可以用 CyclicBarrier 来实现。

示例

如果玩一个游戏有多个“关卡”,那么,使用 CountDownLatch 显然不太合适,因为需要为每个关卡都创建一个实例。因此,我们可以使用 CyclicBarrier 来实现每个关卡的数据加载等待功能。

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            // 假设总共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的任务%s完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("****** 本关卡所有前置任务完成,开始游戏 ******");
        });

        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}

示例输出:

关卡1的任务加载人物模型完成
关卡1的任务加载背景音乐完成
关卡1的任务加载地图数据完成
****** 本关卡所有前置任务完成,开始游戏 ******
关卡2的任务加载地图数据完成
关卡2的任务加载人物模型完成
关卡2的任务加载背景音乐完成
****** 本关卡所有前置任务完成,开始游戏 ******
关卡3的任务加载背景音乐完成
关卡3的任务加载地图数据完成
关卡3的任务加载人物模型完成
****** 本关卡所有前置任务完成,开始游戏 ******

一旦调用 await 方法的线程数量等于构造方法中传入的任务总量(这里是 3),就代表达到屏障了。CyclicBarrier 允许我们在达到屏障的时候,可以执行一个任务,可以在构造方法传入一个 Runnable 类型的对象。

Phaser

Phaser 是 Java 7 中引入的一个并发同步工具,它提供了对动态数量的线程的同步能力,这与 CyclicBarrier 和 CountDownLatch 不同,因为它们都需要预先知道等待的线程数量。Phaser 是多阶段的,意味着它可以同步不同阶段的多个操作。

Phaser 可以理解为一个线程的计数器,它可以将这个计数器加一或减一,当这个计数器的值为 0 的时候,所有调用 await() 方法而在等待的线程就会继续执行。

Phaser 的计数器可以被动态地更新,也可以被动态地增加或减少。Phaser 还提供了一些方法来帮助我们更好地控制线程的到达。

名词解释:

  • party:Phaser 的上下文中,一个 party 可以是一个线程,也可以是一个任务。

    当我们在 Phaser 上注册一个 party 时,Phaser 会递增它的参与者数量。

  • arrive:对应一个 party 的状态,初始时是 unarrived。

    当调用 arriveAndAwaitAdvance() 或者 arriveAndDeregister() 进入 arrive 状态,可以通过 getUnarrivedParties() 获取当前未到达的数量。

  • register:注册一个新的 party 到 Phaser。

  • deRegister:减少一个 party。

  • phase:阶段,当所有注册的 party 都 arrive 之后,将会调用 Phaser 的 onAdvance() 方法来判断是否要进入下一阶段。

Phaser 的终止有两种途径,Phaser 维护的线程执行完毕或者 onAdvance() 返回 true。

示例

假设我们游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块,但后面的第二个关卡和第三个关卡都不需要。这里我们可以用 Phaser 来实现这个需求。

【示例】

import java.util.Random;
import java.util.concurrent.Phaser;

public class PhaserDemo {
    static class PreTaskThread implements Runnable {

        private String task;
        private Phaser phaser;

        public PreTaskThread(String task, Phaser phaser) {
            this.task = task;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            for (int i = 1; i < 4; i++) {
                try {
                    // 第二个关卡起不加载NPC,跳过
                    if (i >= 2 && "加载新手教程".equals(task)) {
                        continue;
                    }
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.printf("关卡%d,需要加载%d个模块,当前模块【%s】%n", i, phaser.getRegisteredParties(), task);

                    // 从第二个关卡起,不加载NPC
                    if (i == 1 && "加载新手教程".equals(task)) {
                        System.out.println("下次关卡移除加载【新手教程】模块");
                        phaser.arriveAndDeregister(); // 移除一个模块
                    } else {
                        phaser.arriveAndAwaitAdvance();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(4) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println(String.format("第%d次关卡准备完成", phase + 1));
                return phase == 3 || registeredParties == 0;
            }
        };

        new Thread(new PreTaskThread("加载地图数据", phaser)).start();
        new Thread(new PreTaskThread("加载人物模型", phaser)).start();
        new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
        new Thread(new PreTaskThread("加载新手教程", phaser)).start();
    }
}

示例输出:

关卡1,需要加载4个模块,当前模块【加载人物模型】
关卡1,需要加载4个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡1,需要加载3个模块,当前模块【加载地图数据】
关卡1,需要加载3个模块,当前模块【加载背景音乐】
第1次关卡准备完成
关卡2,需要加载3个模块,当前模块【加载地图数据】
关卡2,需要加载3个模块,当前模块【加载人物模型】
关卡2,需要加载3个模块,当前模块【加载背景音乐】
第2次关卡准备完成
关卡3,需要加载3个模块,当前模块【加载人物模型】
关卡3,需要加载3个模块,当前模块【加载地图数据】
关卡3,需要加载3个模块,当前模块【加载背景音乐】
第3次关卡准备完成

这里要注意关卡 1 的输出,在“加载新手教程”线程中调用了 arriveAndDeregister() 减少一个 party 之后,后面的线程使用 getRegisteredParties() 得到的是已经被修改后的 parties 了。但是,当前这个阶段(phase),仍然是需要 4 个 parties 都 arrive 才触发屏障的。从下一个阶段开始,才需要 3 个 parties 都 arrive 就触发屏障。

Phaser 类用来控制某个阶段的线程数量很有用,但它并不在意这个阶段具体有哪些线程 arrive,只要达到它当前阶段的 parties 值,就触发屏障。

所以,这里的案例虽然制定了特定的线程(加载新手教程)来更直观地表述 Phaser 的功能,但其实 Phaser 是没有分辨具体是哪个线程的功能的,它在意的只是数量,这一点需要注意。


参考: