线程协作


线程协作

线程之间相互协作,可以通过以下几种方式来进行实现:

  1. 信号量 - semaphore
  2. 栅栏 - CyclicBarrier
  3. 条件 - Condittion
  4. 屏障 - CountDownLatch

信号量

线程之间信号量(semaphore)的作用是于标记当前资源能否被线程访问的信号,线程访问资源时尝试用acquire,访问完成使用release归还资源

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private static final Semaphore semaphore = new Semaphore(3); // 允许3个线程同时访问

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            final int threadNumber = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadNumber + " is trying to acquire a permit.");
                    semaphore.acquire(); // 获取许可
                    System.out.println("Thread " + threadNumber + " has acquired a permit.");
                    // 模拟访问共享资源
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Thread " + threadNumber + " is releasing the permit.");
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

countDownLatch

countDownLatch的作用是多线程之间的倒计时器,用于多线程之间进行相互协作。有两种用法:

  1. 主线程等待其他线程运行完毕后执行
  2. 其他线程等待主线程运行完毕后执行
  • 演示代码
public static void main(String[] args) throws InterruptedException {
      private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setNameFormat("线程_%s").build());

    CountDownLatch count = new CountDownLatch(2);
    for (int i = 0; i < 2; i++) {
        threadPool.submit(() -> {
            String msg = String.format("%s开始执行...", Thread.currentThread().getName());
            System.out.println(msg);
            count.countDown();
        });
    }
    count.await();
    System.out.println(Thread.currentThread().getName() + "执行完毕...");
}

GZRSF1.png

最后达到的效果是,主线程需要等待其他线程执行完毕后才能继续执行;

CyclicBarriar

CyclicBarriar是一组线程到达某个状态后才开始执行,与CountDownLatch不同。CountDownLatch是达到count=0以后开始执行,是用于事件驱动。CyclicBarriar是线程驱动,例如N个线程到达wait的状态统一开始执行。

  • CyclicBarriar演示代码
public class CyclicBarriarDemo {

    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("线程_%s").build());

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 0; i < 6; i++) {
            int finalI1 = i;
            threadPool.submit(() -> {
                String msg1 = String.format("[%s]-小老鼠[%d]开始出洞", Thread.currentThread().getName(), finalI1);
                System.out.println(msg1);
                Integer sleepTime = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(sleepTime);
                    String msg2 = String.format("[%s]-小老鼠[%d]到达指定地点", Thread.currentThread().getName(), finalI1);
                    System.out.println(msg2);
                    cyclicBarrier.await();
                    String msg3 = String.format("[%s]-小老鼠[%d]合力获取到奶酪(🧀)", Thread.currentThread().getName(), finalI1);
                    System.out.println(msg3);
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
  • 执行结果
    GZ57WR.png

  • 有启始动作的方法

CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
    String msg1 = String.format("[%s]-发现奶酪(🧀)", Thread.currentThread().getName());
    System.out.println(msg1);
});
  • 执行结果

GZIufs.png

CyclicBarrier在使用上与countDownLatch很相同,都能实现等待某一条件然后在开始执行的功能,区别在与CountDownLatch是让单个线程等待倒计数完成后开始执行,而CyclicBarrier是让一组线程满足等待条件后开始执行,并且CountDownLatch不能重复使用,CyclicBarrier可以多次使用。
因此CoundDownLatch是被称为计数器,而CyclicBarrier被称为屏障;

Condittion

Condittion能让不同线程之间相互协作,等待满足条件后继续执行。

public class ConditionDemo {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo conditionDemo = new ConditionDemo();

        new Thread(() ->{
            try {
                conditionDemo.method1();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        conditionDemo.method0();
    }

    public void method0() throws InterruptedException {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + ":条件未准备完毕...");
            condition.await();
            System.out.println(Thread.currentThread().getName() + ":开始执行...");
        }
        finally {
            lock.unlock();
        }
    }

    public void method1() throws InterruptedException {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + ":条件准备完毕...");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }
}

Condition是lock锁提供的一种可供线程之间进行交互的一种方法。由于signal底层会使用到当前线程和lock锁的持有线程进行对比,相等才会进行唤醒其他线程。

总结

  • semaphore 信号量的作用是只有获得信号量的线程才能执行,其余线程只能等待持有信号量的线程释放后才有机会执行。

  • countDownLatch是等待一组线程执行完毕后等待线程才可以执行,是事件触发

  • CyclicBarriar是一组线程到达**wait()**状态以后才可以开始执行,触发动作是设定的一组线程都到达某个临界状态,并且可被复用,


  TOC