【并发编程】聊聊阻塞队列那些事(一)

码农有道公众号

共 11419字,需浏览 23分钟

 ·

2022-07-13 00:27

线程池的手动创建一文中 ,我们介绍了通过 ThreadPoolExecutor创建线程池,其中有一个参数是BlockingQueue< Runnable> workQueue,这个参数的数据类型就是我们今天要介绍的阻塞队列, 其作用是当线程池的核心线程都在执行任务时,此时再有任务提交时用来存放任务。

阻塞队列简介

队列

在介绍阻塞队列之前,我们先来简单介绍下队列这种数据结构。

队列是一种先进先出的线性表,它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,进行插入操作的端称为队尾,进行删除操作的端称为队头

阻塞队列

阻塞队列(BlockingQueue)是一种特殊的队列,其特殊之处在于:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。Java中,阻塞队列它是一个继承了 Queue 接口的接口,也可以证明阻塞队列就是一种特殊队列。

public interface BlockingQueue<E> extends Queue<E>{
...
}

阻塞队列常用于生产者和消费者的场景,我们先简单了解下生产者-消费者模型:两个线程操作一个队列,一个线程往队列中插入数据(生产线程生产数据);一个线程往队列中取出数据(消费线程消费数据)。

生产者的生产速度和消费者的消费之间的速度可能不匹配,就可以通过阻塞队列让速度快的暂时阻塞, 如生产者每秒生产两个数据,而消费者每秒消费一个数据,当队列已满时,生产者就会阻塞(挂起),等待消费者消费后,再进行唤醒。

阻塞队列特点

阻塞队列线程安全

阻塞队列是线程安全的,我们在程序中使用阻塞队列不需要自己去考虑更多的线程安全问题。降低了我们开发的难度和工作量

比如在生产者-消费者模式使用阻塞队列的时候,因为阻塞队列是线程安全的,所以生产者和消费者即便 是多线程环境,自己不需要去考虑更多的线程安全问题,也不会发生线程安全问题。如下图, 左侧有两个生产者线程,它们会将生产出来的结果放到中间的阻塞队列中,而右侧的两个消费者只需直接从阻塞队列中取出结果进行处理即可。

阻塞队列种类

下图展示了Queue 最主要的实现类,可以看出阻塞队列主要 有6种实现类,分别是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue,将在下一篇文章再介绍它们各自的特点 。

阻塞队列的容量

阻塞队列分为有界无界两种。LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量的 队列,FixedThreadPool类型线程池的实现就是用一个LinkedBlockingQueue来存放任务的。

有界阻塞队列典型的代表是ArrayBlockingQueue,使用这种阻塞队列 ,一旦队列容量满了,就无法再往队列里放数据了。

阻塞队列常用方法

阻塞队列中有三组和添加、删除相关的方法,这三组方法比较相似,我们需要对这些方法进行梳理。这三组方法是:

  • • 1.抛出异常:add、remove、element

  • • 2.返回结果但不抛出异常:offer、poll、peek

  • • 3.阻塞:put、take

add、remove、element

  • • 1.add:往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满;

  • • 2.remove:删除元素,如果我们删除的队列是空的,remove 方法就会抛出异常;

  • • 3.element:返回队列的头部节点,但是并不删除,如果队列为空,element就会抛出异常。

三个方法示例代码如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//add方法示例
public class AddExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = 
                new ArrayBlockingQueue<String>(2);
        
        blockingQueue.add("1");
        blockingQueue.add("2");
        System.out.println("blockingQueue  size:" + blockingQueue.size());

        blockingQueue.add("3");
    }
}

******************【运行结果】******************
blockingQueue  size:2
Exception in thread "main" java.lang.
IllegalStateException: Queue full
    at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
    at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:326)
    at thread.blockingqueue.AddExample.main(AddExample.java:13)

在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过add方法放入元素, 前2个能正常放入 ,但是在添加第3个元素的时候就抛出了IllegalStateException:Queue full异常。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//remove方法示例
public class RemoveExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(1);
        blockingQueue.add("1");
        System.out.println("remove element:" + blockingQueue.remove());

        blockingQueue.remove();
    }
}

******************【运行结果】******************
remove element:1
Exception in thread "main" java.util.
NoSuchElementException
    at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
    at thread.blockingqueue.RemoveExample.main(RemoveExample.java:14)

这段程序中,我们创建了一个容量为1的BlockingQueue并往中间放入一个元素,然后通过remove方法执行删除操作,删除第一个元素正常,此时BlockingQueue为空,再执行删除操作,抛出NoSuchElementException异常。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//element示例
public class ElementExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(1);
        blockingQueue.element();
    }
}

******************【运行结果】******************
Exception in thread "main" java.util.
NoSuchElementException
    at java.base/java.util.AbstractQueue.element(AbstractQueue.java:136)
    at thread.blockingqueue.ElementExample.main(ElementExample.java:11)

这段程序中,我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,也就是说ArrayBlockingQueue为空,我们调用 element 方法,也得到NoSuchElementException异常。

offer、poll、peek

这一组方法当发现队列满了无法添加,或者队列为空无法删除的时候,是通过返回值来提示我们,而不是像第一组方法那样抛出异常。

  • • 1.offer:用来插入一个元素,用返回值来提示插入是否成功。添加成功返回 true,队列已经满,调用 offer 方法返回false。

  • • 2.poll:移除并返回队列的头节点,如果当队列里面是空的返回 null 作为提示。

  • • 3.peek:返回队列的头元素但并不删除。如果队列里面是空的,它便会返回 null 作为提示。

三个方法示例代码如下:

//offer示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class OfferExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(2);

        boolean res1 = blockingQueue.offer("1");
        boolean res2 = blockingQueue.offer("2");
        boolean res3 = blockingQueue.offer("3");

        System.out.println("res1:" + res1);
        System.out.println("res2:" + res2);
        System.out.println("res3:" + res3);
        System.out.println("blockingQueue  size:" + blockingQueue.size());
    }
}

******************【运行结果】******************
res1:true
res2:true
res3:
false
blockingQueue  size:2

在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过offer方法放入元素,前面两次添加成功了,返回true,但是第三次添加的时候,已经超过了队列的最大容量,所以会返回 false

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//poll示例
public class PollExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(1);
        blockingQueue.offer("1");

        //不允许放入null,否则会抛出NullPointerException异常
        //blockingQueue.offer(null);
        System.out.println("poll element:" + blockingQueue.poll());
        System.out.println("poll element:" + blockingQueue.poll());
    }
}

******************【运行结果】******************
poll element:1
poll element:
null

这段程序中,我们创建了一个容量为1的BlockingQueue并往中间放入一个元素,然后执行poll方法,正常执行将头结点返回并删除,此时BlockingQueue为空,再执行poll操作,返回null注意,在使用这组方法时,队列中不允许插入null值,否则会抛出NullPointerException异常。

//peek示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class PeekExample {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(1);

        System.out.println(blockingQueue.peek());
    }
}

******************【运行结果】******************
null

我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,然后直接调用 peek,返回结果 null。

put、take

  • • 1.put:插入元素。队列没满的时候是正常的插入,如果队列已满,插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。

  • • 2.take:获取并移除队列的头结点。队列里有数据的时候会正常取出数据并删除;队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

两个个方法示例代码如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//put示例
public class PutExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(2);

        blockingQueue.put("1");
        blockingQueue.put("2");
        System.out.println("【0】blockingQueue  size:" + blockingQueue.size());
        blockingQueue.put("3");

        System.out.println("【1】blockingQueue  size:" + blockingQueue.size());
    }
}

******************【运行结果】******************

0】blockingQueue  size:2

//阻塞中 

在这段程序中,我们创建了一个容量为 2 的 BlockingQueue,通过put方法添加完前2个元素,【0】处打印队列的元素个数是2,说明前面两个元素正常放入到队列了。当执行第三次put操作,则由于队列已满使得线程阻塞,后面的【1】处打印得不到执行 。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//take示例
public class TakeExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue =
                new ArrayBlockingQueue<String>(1);

        blockingQueue.take();
        System.out.println("测试能否执行到这一句");
    }
}

******************【运行结果】******************
//阻塞中 

我们新建了一个容量为 1 的 ArrayBlockingQueue,但是并没有往里面添加元素,然后直接调用take,线程阻塞,打印语句得不到执行。

本文源码地址:
https://github.com/qinlizhong1/javaStudy/tree/master/javaExample/src/main/java/thread/blockingqueue

本文示例代码环境:
操作系统:macOs 12.1
JDK版本:12.0.1
maven版本: 3.8.4

—  —

欢迎关注↓↓↓
如有帮助,辛苦点赞和在看
浏览 47
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报