Java基础-0x07:学习阻塞队列

JUC

阻塞队列

队列与阻塞队列

  • 阻塞队列,顾名思义 首先它是一个队列,而一个阻塞队列在数据结构中起到的作用大致如图所示好:

    s

  • 当阻塞队列是时,从队列中获取元素的操作会被阻塞。(消费者)

  • 当阻塞队列是时,向队列中添加元素的操作会被阻塞。(生产者)

在多线程领域,所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。

  • 为什么需要BlockingQueue?

    好处是不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这写事BlockingQueue都做了

    concurrent包发布以前,多线程环境下,开发者都需要自己控制这些细节,还需要兼顾效率和线程安全。而这会给我们的程序带来更多的复杂度。

阻塞队列接口基础架构

屏幕快照 2020-05-29 15.45.21

阻塞队列接口也是集合类接口的子接口。

其实现类有一下几个:

  • ArrayBlockingQueue ==> 由数组结构组成的有届阻塞队列
  • LinkedBlockingQueue ==> 由链表结构组成的有届(但大小默认值为Integer.MAX_VALUE)阻塞队列
  • PriorityBlockingQueue ==> 支持优先级排序的无届阻塞队列
  • DelayQueue ==> 使用优先级队列实现的延迟无届阻塞队列
  • SynchronousQueue ==> 不存储元素的阻塞队列,也即单个元素的队列
  • LinkedTransferQueue ==> 由链表结构组成的无届阻塞队列
  • LinkedBlockingDeque ==> 由链表结构组成的双向阻塞队列

阻塞队列主要方法的概括:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

BlockingQueue不接受空元素。实现类会在尝试添加、放置或提供null时抛出NullPointerException。空值用作标记值,以指示轮询操作失败。

BlockingQueue可能受容量限制。在任何给定时间,它可能具有剩余容量,超过该容量就不能放置其他元素而不会阻塞。没有任何内部容量约束的BlockingQueue始终报告Integer.MAX_VALUE的剩余容量。

BlockingQueue实现被设计为主要用于生产者-消费者队列,但另外还支持Collection接口。因此,例如,可以使用remove(x)从队列中删除任意元素。但是,这样的操作通常不能非常有效地执行,并且仅用于偶尔的使用,例如当取消排队的消息时。

BlockingQueue实现是线程安全的。所有排队方法都是使用内部锁或其他形式的并发控制来原子地实现其效果的。但是,除非在实现中另外指定,否则批量Collection操作addAll,containsAll,retainAll和removeAll不一定是原子执行的。因此,例如,仅在c中添加一些元素之后,addAll(c)可能会失败(引发异常)。

BlockingQueue本质上不支持任何类型的“close”或“shutdown”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是,生产者插入特殊的流尾对象或有毒对象,当消费者采取这种方法时会对其进行相应的解释。

使用示例,基于典型的生产者-消费者方案。请注意,BlockingQueue可以安全地与多个生产者和多个消费者一起使用。

SynchronousQueue理论:

与其他的阻塞队列不同,SynchronousQueue是一个不存储元素的阻塞队列

每一个put操作必须等待一个take操作,反过来也是一样。

SynchronousQueueDemo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"\t put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"\t put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"\t put 3");
queue.put("3");

} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(queue.take());

} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

运行结果:

1
2
3
4
5
1
Thread-0 put 2
2
Thread-0 put 3
3

放一个,取一个,下一个才能放进来。