阻塞队列实现生产者-消费者模式

​ BlockingQueue接口主要用于解决生产者-消费者问题。 当从队列中获取或移除元素时,如果队列为空,需要等待,直到队列不空;同时如果向队列中添加元素时,如果队列无可用空间,也需要等待。

​ BlockingQueue的实现是线程安全的,如果你试图向队列中存入null将抛出异常。它的实现类包括ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue :

  ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

  PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

  DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

几个重要方法:

· put(E e): 向队列中插入元素,如果队列已满,则需要等待

· E take(): 从队列头部获取并移除元素,如果队列为空则需要等待可用的元素

以下使用ArrayBlockingQueue来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Message {
private String msg;
public Message(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
}
// 消费者
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue q) {
this.queue = q;
}
@Override
public void run() {
try {
Message msg;
while (!(msg = queue.take()).getMsg().equals("exit")) {
Thread.sleep(100);
System.out.println("Consumed " + msg.getMsg());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者
public class Producer implements Runnable{
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q) {
this.queue = q;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Message msg = new Message("" + i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("produced " + msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加退出
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ProducerConsumerService {
public static void main(String[] args) {
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
System.out.println("producer and consumer has been started");
}
}

参考文章:Java BlockingQueue Example

原创技术分享,您的支持将鼓励我继续创作