阻塞队列BlockingQueue扩展了Queue、Collection接口,对元素的插入和提取使用了“阻塞”处理,我们知道Collection下的实现类一般都采用了长度自行管理方式(也就是变长),比如这样的代码是可以正常运行的:
public static void main(Stringargs){
//定义初始长度为5
List<String>list=new ArrayList<String>(5);
//加入10个元素
for(int i=0;i<10;i++){
list.add(/"/");
}
}
上面的代码定义了列表的初始长度为5,在实际使用时,当加入的元素超过初始容量时,ArrayList会自行扩容,确保能够正常加入元素。那BlockingQueue也是集合,也实现了Collection接口,它的容量是否会自行管理呢?我们来看代码:
public static void main(Stringargs)throws Exception{
//定义初始长度为5
BlockingQueue<String>bq=new ArrayBlockingQueue<String>(5);
//加入10个元素
for(int i=0;i<10;i++){
bq.add(/"/");
}
}
BlockingQueue能够自行扩容吗?答案是不能,运行结果如下:
Exception in thread/"main/"java.lang.IllegalStateException:Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:71)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)
at Client.main(Client.java:12)
没错,报队列已满异常,这是阻塞队列和非阻塞队列一个重要区别:阻塞队列的容量是固定的,非阻塞队列则是变长的。阻塞队列可以在声明时指定队列的容量,若指定的容量,则元素的数量不可超过该容量,若不指定,队列的容量为Integer的最大值。
阻塞队列和非阻塞队列有此区别的原因是阻塞队列是为了容纳(或排序)多线程任务而存在的,其服务的对象是多线程应用,而非阻塞队列容纳的则是普通的数据元素。我们来看一下ArrayBlockingQueue类最常用的add方法。
public class ArrayBlockingQueue<E>extends AbstractQueue<E>
implements BlockingQueue<E>,java.io.Serializable{
//容纳元素的数组
private final Eitems;
//元素数量计数器
private int count;
public boolean add(E e){
//调用offer方法尝试写入
if(offer(e))
return true;
else
//写入失败,队列已满
throw new IllegalStateException(/"Queue full/");
}
public boolean offer(E e){
final ReentrantLock lock=this.lock;
//申请锁,只允许同时有一个线程操作
lock.lock();
try{
//元素计数器的计数与数组长度相同,表示队列已满
if(count==items.length)
return false;
else{//队列未满,插入元素
insert(e);
return true;
}
}finally{
//释放锁
lock.unlock();
}
}
}
上面在加入元素时,如果判断出当前队列已满,则返回false,表示插入失败,之后再包装成队列满异常。此处需要注意offer方法,如果我们直接调用offer方法插入元素,在超出容量的情况下,它除了返回false外,不会提供任何其他信息,如果我们的代码不做插入判断,那就会造成数据的“默默”丢失,这就是它与非阻塞队列的不同之处。
阻塞队列的这种机制对异步计算是非常有帮助的,例如我们定义深度为100的阻塞队列容纳100个任务,多个线程从该队列中获取任务并处理,当所有的线程都在繁忙,并且队列中任务数量已经为100时,也预示着系统运算压力非常巨大,而且处理结果的时间也会比较长,于是在第101个任务期望加入时,队列拒绝加入,而且返回异常,由系统自行处理,避免了异步运算的不可知性。但是如果应用期望无论等待多长时间都要运行该任务,不希望返回异常,那该怎么处理呢?
此时就需要用BlockingQueue接口定义的put方法了,它的作用也是把元素加入到队列中,但它与add、offer方法不同,它会等待队列空出元素,再让自己加入进去,通俗地讲,put方法提供的是一种“无赖”式的插入,无论等待多长时间都要把该元素插入到队列中,它的实现代码如下:
public void put(E e)throws InterruptedException{
//容纳元素的数组
final Eitems=this.items;
final ReentrantLock lock=this.lock;
//可中断锁
lock.lockInterruptibly();
try{
try{
//队列满,等待其他线程移除元素
while(count==items.length)
notFull.await();
}catch(InterruptedException ie){
//被中断了,唤醒其他线程
notFull.signal();
throw ie;
}
//插入元素
insert(e);
}finally{
//释放锁
lock.unlock();
}
}
put方法的目的就是确保元素肯定会加入到队列中,问题是此种等待是一个循环,会不停地消耗系统资源,当等待加入的元素数量较多时势必会对系统性能产生影响,那该如何解决呢?JDK已经想到了这个问题,它提供了带有超时时间的offer方法,其实现方法与put比较类似,只是使用Condition的awaitNanos方法来判断当前线程已经等待了多少纳秒,超时则返回false。
与插入元素相对应,取出元素也有不同的实现,例如remove、poll、take等方法,对于此类方法的理解要建立在阻塞队列的长度固定的基础上,然后根据是否阻塞、阻塞是否超时等实际情况选用不同的插入和提取方法。
注意 阻塞队列的长度是固定的。