阻塞队列

  |  

摘要: 本文是关于 Java 中的阻塞队列的要点总结。

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


在文章 Java核心技术1-线程并发 中,我们介绍了线程并发的基础概念,以及使用线程执行任务的流程和代码模板。

在文章 Java核心技术1-线程同步 中,我们介绍了线程同步相关的概念,涉及到的内容比较多,包括如何使用锁、条件对象、synchronized 这三种方式解决竞争条件的问题,如何使用线程局部变量、锁测试与超时、可重入锁和读写锁这些机制,以及同步阻塞,监视器,Volatile 域,原子性,死锁的概念。

这两部分内容,基本上形成了 Java 并发程序设计基础的底层结构。但是实际编程中,应该尽可能原理底层结构,而是使用专业人士实现好的较高层次的结构,方便性和安全性都要更好。

用队列将线程问题形式化

许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。

还是以银行转账程序为例,转账线程将转账指令对象插入一个队列中,而不是直接访问银行对象;另一个线程从队列中取出指令执行转账,只有该线程可以访问该银行对象的内部,因此不需要同步。线程安全的队列类的实现者需要考虑锁和条件,但是银行转账程序的实现者不需要考虑这个问题了。

阻塞的来源

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)导致线程阻塞。

工作者线程可以周期性地将中间结果存储在阻塞队列中。其他的工作者线程移出中间结果并进一步加以修改。队列会自动地平衡负载。如果第一个线程集运行得比第二个慢,第二个线程集在等待结果时会阻塞。

阻塞队列方法

阻塞队列的方法如下:

方法 正常动作 特殊情况的动作
add 添加一个元素 如果队列满,抛出 IllegalStateException 异常
element 返回队列的头元素 如果队列空,抛出 NoSuchElementException 异常
remove 移出并返回头元素 如果队列空,则抛出 NoSuchElementException
offer 添加一个元素并返回 true 如果队列满,返回 false
peek 返回队列的头元素 如果队列空,返回 null
poll 移出并返回头元素 如果队列空,则返回 null
put 添加一个元素 如果队列满,则阻塞
take 移出并返回头元素 如果队列空,则阻塞

阻塞队列方法分为以下三类,取决于当队列满或空时它们的响应方式。

  1. add, element, remove
  2. offer, peek, poll
  3. put, take
  • 如果将队列当作线程管理工具来使用,将要用到 put 和 take 方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add、remove 和 element 操作抛出异常。
  • 在多线程程序中,队列会在任何时候空或满,因此要用 offer、poll 和 peek 方法代替 add、remove 和 element。
  • poll 和 peek 方法返回空来指示失败。因此,向这些队列中插入 null 值是非法的。
  • offer 和 poll 有带超时版本
    1
    2
    3
    4
    // 超时返回 false
    boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
    // 超时返回 null
    Object head = q.poll(100, TimeUnit.MILLISECONDS);

阻塞队列的几个变种

java.util.concurrent 包中有阻塞队列的几个变种。

  • LinkedBlockingQueue: 容量无上限,但也可以指定最大容量。
  • LinkedBlockingDeque: LinkedBlockingQueue 的双端版本。
  • ArrayBlockingQueue: 需要指定容量,并指定是否需要公平性(等待了最长时间的线程优先处理)。
  • PriorityBlockingQueue: 没有容量上限,但若队列空,则取元素操作会阻塞。
  • DelayQueue: 包含的是实现 Delayed 接口的对象。getDelay 方法返回对象的残留延迟。负值表示延迟已经结束。元素只有在延迟用完的情况下才能从 DelayQueue 移除。还必须实现 compareTo 方法。DelayQueue 使用该方法对元素进行排序。
    1
    2
    3
    interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
    }
  • TransferQueue 接口: 允许生产者线程等待,直到消费者准备就绪可以接收一个元素。生产者调用 q.transfer(item); 会阻塞,直到另一个线程将元素 item 删除。LinkedTransferQueue 类实现了这个接口。

例子: 文件搜索

使用阻塞队列控制一组线程,在一个目录及其子目录下搜索所有文件,打印出包含指定关键字的行。

生产者线程枚举所有子目录下的所有文件并把它们放到一个阻塞队列中。

大量的搜索线程,从队列中取出一个文件,打开,打印所有包含关键字的行,然后取出下一个文件。

为了发出完成信号,生产者线程放置一个虚拟对象到队列中,当消费者线程取到这个虚拟对象时,将其放回并终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest {
private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final File DUMMY = new File("");
private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

public static void main(String[] args) {
try (Scanner in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();

Runnable enumerator = () -> {
try {
enumerate(new File(directory));
queue.put(DUMMY);
} catch (InterruptedException e) {
}
};

new Thread(enumerator).start();

for(int i = 0; i <= SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
boolean done = false;
while(!done) {
File file = queue.take();
if(file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
new Thread(searcher).start();
}
}
}

/**
* 在给定目录及其子目录下枚举所有文件
*/
public static void enumerate(File directory) throws InterruptedException {
File[] files = directory.listFiles();
for(File file: files) {
if(file.isDirectory()) {
enumerate(file);
} else {
queue.put(file);
}
}
}

/**
* 给定关键词搜索文件,打印所有匹配的行
*/
public static void search(File file, String keyword) throws IOException {
try (Scanner in = new Scanner(file, "UTF-8")) {
int lineNumber = 0;
while(in.hasNextLine()) {
lineNumber++;
String line = in.nextLine();
if(line.contains(keyword)) {
System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
}
}
}
}
}

Share