阻塞队列
阻塞队列的定义
阻塞队列是实现了BlockingQueue接口的子类,BlockingQueue接口是继承自Queue
- BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
// 阻塞队列的添加操作,如果队列已满,则抛出异常
boolean add(E e);
// 阻塞队列的添加操作,如果队列已满,则返回false
boolean offer(E e);
// 阻塞队列的添加操作,如果队列已满,则等待至队列有空间
void put(E e) throws InterruptedException;
// 阻塞队列的添加操作,如果队列已满,则等待至队列有空间,或者超时
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// ---------------------------------------------------------------------------------------------
// 阻塞队列的弹出操作,如果deque为空,则等待
E take() throws InterruptedException;
// 阻塞队列的弹出操作,等待指定时间后超时,可响应中断
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// ---------------------------------------------------------------------------------------------
// 返回阻塞队列的大小
int remainingCapacity();
// 移除阻塞队列中的元素
boolean remove(Object o);
// 判断阻塞队列是否包含指定元素
boolean contains(Object o);
// 将阻塞队列中的元素拷贝到指定的数组中
int drainTo(Collection<? super E> c);
// 将阻塞队列中的元素拷贝到指定的数组中,并返回拷贝的元素个数
int drainTo(Collection<? super E> c, int maxElements);
}
以上是阻塞队列BlockingQueue接口定义的方法,下面详细的分析一下,这几个主要的方法
- take()的功能是移除头节点,当队列为空时,则阻塞
- put()的功能是向对位添加元素,当队列满载时,则阻塞
BlocakingQueue的实现类有:
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- DelayQueue
- PriorityBlockingQueue
- LinkedTransferQueue
阻塞队列常用的方法
在阻塞队列中主要有8种方法,按照分类分为3大类:
- 抛出异常 : add、remove、element
- 返回结果但是不抛出异常 : offer、poll、peek
- 阻塞 : put、take
- 分类
动作\失败处理方式 | 抛出异常 | 返回结果但是不抛出异常 | 阻塞 |
---|---|---|---|
添加元素 | add | offer | put |
移除元素 | remove | poll | take |
返回头节点 | element | peek | 无 |
-
实例操作分析
下面使用ArrayBlockingQueue实例来进行演示 -
ArrayBlockingQueueDemo
常见的阻塞队列
- ArrayBlockingQueue
ArrayBlockingQueue是一个容量固定的有界阻塞队列,构造方法如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略代码...
}
fair表示队列是公平的还是不公平的
- LinkBlockingQueue
LinkedBlockingQueue是一个默认构造器创建的是无界阻塞队列也可以指定队列容量,构造方法如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
-
SynchronousQueue
SynchronousQueue是一个内部容量为0的阻塞队列,导致每次取数据都要阻塞等待 -
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级排序的阻塞队列,是一个无界队列。构造方法
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
- DelayQueue
DelayQueue是一个延迟阻塞队列,构造函数为:
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
- 阻塞队列并发安全的原理是什么?
使用两个lock来保证的并发安全,但是在某些情况下不能完全保证并发是安全的
ArrayBlockingQueue
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
通过共用同一个lock来锁住队列后,用首尾两个Condition来分别使线程暂定和唤醒线程;
ConcurrentLinkedQueue
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
可以从方法中看出offer方法就是不停的重试加上CAS机制保证了并发的安全性,这是乐观锁的一种思想。适用用线程竞争不激烈的场景。
如何选择阻塞队列
如何选择阻塞队列可以中线程池来进行分析
- LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看出固定线程数的线程池采用的是无限长度的队列作为缓存池
- SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大线程数的线程池采用的是无容量的队列做为缓存池
DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
具有周期性执行的线程池采用的是延迟队列,从而实现周期性执行的功能
总结
因此选择队列也应该从业务的属性出发,在这几个方面进行取舍长度、特点、性能,从而选择最适合业务场景的队列。