【并发编程】聊聊阻塞队列那些事(一)
共 11419字,需浏览 23分钟
·
2022-07-13 00:27
在线程池的手动创建一文中 ,我们介绍了通过 ThreadPoolExecutor创建线程池,其中有一个参数是BlockingQueue< Runnable> workQueue,这个参数的数据类型就是我们今天要介绍的阻塞队列, 其作用是当线程池的核心线程都在执行任务时,此时再有任务提交时用来存放任务。
阻塞队列简介
队列
在介绍阻塞队列之前,我们先来简单介绍下队列这种数据结构。
队列是一种先进先出的线性表,它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,进行插入操作的端称为队尾,进行删除操作的端称为队头
阻塞队列
阻塞队列(BlockingQueue)是一种特殊的队列,其特殊之处在于:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。Java中,阻塞队列它是一个继承了 Queue 接口的接口,也可以证明阻塞队列就是一种特殊队列。
public interface BlockingQueue<E> extends Queue<E>{
...
}
阻塞队列常用于生产者和消费者的场景,我们先简单了解下生产者-消费者模型:两个线程操作一个队列,一个线程往队列中插入数据(生产线程生产数据);一个线程往队列中取出数据(消费线程消费数据)。
生产者的生产速度和消费者的消费之间的速度可能不匹配,就可以通过阻塞队列让速度快的暂时阻塞, 如生产者每秒生产两个数据,而消费者每秒消费一个数据,当队列已满时,生产者就会阻塞(挂起),等待消费者消费后,再进行唤醒。
阻塞队列特点
阻塞队列线程安全
阻塞队列是线程安全的,我们在程序中使用阻塞队列不需要自己去考虑更多的线程安全问题。降低了我们开发的难度和工作量。
比如在生产者-消费者模式使用阻塞队列的时候,因为阻塞队列是线程安全的,所以生产者和消费者即便 是多线程环境,自己不需要去考虑更多的线程安全问题,也不会发生线程安全问题。如下图, 左侧有两个生产者线程,它们会将生产出来的结果放到中间的阻塞队列中,而右侧的两个消费者只需直接从阻塞队列中取出结果进行处理即可。
阻塞队列种类
下图展示了Queue 最主要的实现类,可以看出阻塞队列主要 有6种实现类,分别是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue,将在下一篇文章再介绍它们各自的特点 。
阻塞队列的容量
阻塞队列分为有界和无界两种。LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量的 队列,FixedThreadPool类型线程池的实现就是用一个LinkedBlockingQueue来存放任务的。
有界阻塞队列典型的代表是ArrayBlockingQueue,使用这种阻塞队列 ,一旦队列容量满了,就无法再往队列里放数据了。
阻塞队列常用方法
阻塞队列中有三组和添加、删除相关的方法,这三组方法比较相似,我们需要对这些方法进行梳理。这三组方法是:
• 1.抛出异常:add、remove、element
• 2.返回结果但不抛出异常:offer、poll、peek
• 3.阻塞:put、take
add、remove、element
• 1.add:往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满;
• 2.remove:删除元素,如果我们删除的队列是空的,remove 方法就会抛出异常;
• 3.element:返回队列的头部节点,但是并不删除,如果队列为空,element就会抛出异常。
三个方法示例代码如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//add方法示例
public class AddExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
blockingQueue.add("1");
blockingQueue.add("2");
System.out.println("blockingQueue size:" + blockingQueue.size());
blockingQueue.add("3");
}
}
******************【运行结果】******************
blockingQueue size:2
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:326)
at thread.blockingqueue.AddExample.main(AddExample.java:13)
在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过add方法放入元素, 前2个能正常放入 ,但是在添加第3个元素的时候就抛出了IllegalStateException:Queue full异常。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//remove方法示例
public class RemoveExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.add("1");
System.out.println("remove element:" + blockingQueue.remove());
blockingQueue.remove();
}
}
******************【运行结果】******************
remove element:1
Exception in thread "main" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.blockingqueue.RemoveExample.main(RemoveExample.java:14)
这段程序中,我们创建了一个容量为1的BlockingQueue并往中间放入一个元素,然后通过remove方法执行删除操作,删除第一个元素正常,此时BlockingQueue为空,再执行删除操作,抛出NoSuchElementException异常。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//element示例
public class ElementExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.element();
}
}
******************【运行结果】******************
Exception in thread "main" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.element(AbstractQueue.java:136)
at thread.blockingqueue.ElementExample.main(ElementExample.java:11)
这段程序中,我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,也就是说ArrayBlockingQueue为空,我们调用 element 方法,也得到NoSuchElementException异常。
offer、poll、peek
这一组方法当发现队列满了无法添加,或者队列为空无法删除的时候,是通过返回值来提示我们,而不是像第一组方法那样抛出异常。
• 1.offer:用来插入一个元素,用返回值来提示插入是否成功。添加成功返回 true,队列已经满,调用 offer 方法返回false。
• 2.poll:移除并返回队列的头节点,如果当队列里面是空的返回 null 作为提示。
• 3.peek:返回队列的头元素但并不删除。如果队列里面是空的,它便会返回 null 作为提示。
三个方法示例代码如下:
//offer示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class OfferExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
boolean res1 = blockingQueue.offer("1");
boolean res2 = blockingQueue.offer("2");
boolean res3 = blockingQueue.offer("3");
System.out.println("res1:" + res1);
System.out.println("res2:" + res2);
System.out.println("res3:" + res3);
System.out.println("blockingQueue size:" + blockingQueue.size());
}
}
******************【运行结果】******************
res1:true
res2:true
res3:false
blockingQueue size:2
在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过offer方法放入元素,前面两次添加成功了,返回true,但是第三次添加的时候,已经超过了队列的最大容量,所以会返回 false。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//poll示例
public class PollExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.offer("1");
//不允许放入null,否则会抛出NullPointerException异常
//blockingQueue.offer(null);
System.out.println("poll element:" + blockingQueue.poll());
System.out.println("poll element:" + blockingQueue.poll());
}
}
******************【运行结果】******************
poll element:1
poll element:null
这段程序中,我们创建了一个容量为1的BlockingQueue并往中间放入一个元素,然后执行poll方法,正常执行将头结点返回并删除,此时BlockingQueue为空,再执行poll操作,返回null。注意,在使用这组方法时,队列中不允许插入null值,否则会抛出NullPointerException异常。
//peek示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class PeekExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
System.out.println(blockingQueue.peek());
}
}
******************【运行结果】******************null
我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,然后直接调用 peek,返回结果 null。
put、take
• 1.put:插入元素。队列没满的时候是正常的插入,如果队列已满,插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。
• 2.take:获取并移除队列的头结点。队列里有数据的时候会正常取出数据并删除;队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
两个个方法示例代码如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//put示例
public class PutExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
blockingQueue.put("1");
blockingQueue.put("2");
System.out.println("【0】blockingQueue size:" + blockingQueue.size());
blockingQueue.put("3");
System.out.println("【1】blockingQueue size:" + blockingQueue.size());
}
}
******************【运行结果】******************
【0】blockingQueue size:2
//阻塞中
在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过put方法添加完前2个元素,【0】处打印队列的元素个数是2,说明前面两个元素正常放入到队列了。当执行第三次put操作,则由于队列已满使得线程阻塞,后面的【1】处打印得不到执行 。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//take示例
public class TakeExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.take();
System.out.println("测试能否执行到这一句");
}
}
******************【运行结果】******************//阻塞中
我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,然后直接调用take,线程阻塞,打印语句得不到执行。
本文源码地址:
https://github.com/qinlizhong1/javaStudy/tree/master/javaExample/src/main/java/thread/blockingqueue
本文示例代码环境:
操作系统:macOs 12.1
JDK版本:12.0.1
maven版本: 3.8.4
— 完 —
欢迎关注↓↓↓ 如有帮助,辛苦点赞和在看