阻塞队列


阻塞队列

阻塞队列的定义

阻塞队列是实现了BlockingQueue接口的子类,BlockingQueue接口是继承自Queue

  • BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
    
    // 阻塞队列的添加操作,如果队列已满,则抛出异常
    boolean add(E e);
    
    // 阻塞队列的添加操作,如果队列已满,则返回false
    boolean offer(E e);

    // 阻塞队列的添加操作,如果队列已满,则等待至队列有空间
    void put(E e) throws InterruptedException;

    // 阻塞队列的添加操作,如果队列已满,则等待至队列有空间,或者超时
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // ---------------------------------------------------------------------------------------------

    // 阻塞队列的弹出操作,如果deque为空,则等待
    E take() throws InterruptedException;

    // 阻塞队列的弹出操作,等待指定时间后超时,可响应中断
    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    // ---------------------------------------------------------------------------------------------

    // 返回阻塞队列的大小    
    int remainingCapacity();

    // 移除阻塞队列中的元素    
    boolean remove(Object o);

    // 判断阻塞队列是否包含指定元素
    boolean contains(Object o);

    // 将阻塞队列中的元素拷贝到指定的数组中
    int drainTo(Collection<? super E> c);

    // 将阻塞队列中的元素拷贝到指定的数组中,并返回拷贝的元素个数
    int drainTo(Collection<? super E> c, int maxElements);
}

以上是阻塞队列BlockingQueue接口定义的方法,下面详细的分析一下,这几个主要的方法

  1. take()的功能是移除头节点,当队列为空时,则阻塞
  2. put()的功能是向对位添加元素,当队列满载时,则阻塞

BlocakingQueue的实现类有:

  1. ArrayBlockingQueue
  2. LinkedBlockingQueue
  3. SynchronousQueue
  4. DelayQueue
  5. PriorityBlockingQueue
  6. LinkedTransferQueue

阻塞队列实现

阻塞队列常用的方法

在阻塞队列中主要有8种方法,按照分类分为3大类:

  1. 抛出异常 : add、remove、element
  2. 返回结果但是不抛出异常 : offer、poll、peek
  3. 阻塞 : put、take
  • 分类
动作\失败处理方式 抛出异常 返回结果但是不抛出异常 阻塞
添加元素 add offer put
移除元素 remove poll take
返回头节点 element peek
  • 实例操作分析
    下面使用ArrayBlockingQueue实例来进行演示

  • ArrayBlockingQueueDemo

常见的阻塞队列

  • ArrayBlockingQueue
    ArrayBlockingQueue是一个容量固定的有界阻塞队列,构造方法如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
 //省略代码...
}

fair表示队列是公平的还是不公平的

  • LinkBlockingQueue
    LinkedBlockingQueue是一个默认构造器创建的是无界阻塞队列也可以指定队列容量,构造方法如下:
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
  • SynchronousQueue
    SynchronousQueue是一个内部容量为0的阻塞队列,导致每次取数据都要阻塞等待

  • PriorityBlockingQueue
    PriorityBlockingQueue是一个支持优先级排序的阻塞队列,是一个无界队列。构造方法

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

    public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
  • DelayQueue

DelayQueue是一个延迟阻塞队列,构造函数为:

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}
  • 阻塞队列并发安全的原理是什么?
    使用两个lock来保证的并发安全,但是在某些情况下不能完全保证并发是安全的

ArrayBlockingQueue

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

通过共用同一个lock来锁住队列后,用首尾两个Condition来分别使线程暂定和唤醒线程;

ConcurrentLinkedQueue

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

可以从方法中看出offer方法就是不停的重试加上CAS机制保证了并发的安全性,这是乐观锁的一种思想。适用用线程竞争不激烈的场景。

如何选择阻塞队列

如何选择阻塞队列可以中线程池来进行分析

  1. LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

可以看出固定线程数的线程池采用的是无限长度的队列作为缓存池

  1. SynchronousQueue
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大线程数的线程池采用的是无容量的队列做为缓存池

DelayedWorkQueue

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

具有周期性执行的线程池采用的是延迟队列,从而实现周期性执行的功能

总结

因此选择队列也应该从业务的属性出发,在这几个方面进行取舍长度特点性能,从而选择最适合业务场景的队列。


  TOC