生产者-消费者问题-BlockingQueue实现 | Word count: 1.5k | Reading time: 5min | Post View:
Java 工程实践里那些“小但重要”的选择 我整理这类内容主要是为了给团队 code review 找一份对照清单。Java 生态里很多“老写法”依然能跑,但已经不是最佳实践了,新人不知道就会继续传递。这篇文章覆盖的几个点是我反复在 PR 里指出的高频问题。
生产者-消费者问题-BlockingQueue实现
生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
BlockingQueue
阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出; 常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种) 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
使用方法
放入数据: offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true,否则返回false.(本方法不阻塞当前执行方法的线程) offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中 加入BlockingQueue,则返回失败。 put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断 直到BlockingQueue里面有空间再继续. 获取数据: poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间, 取不到时返回null; poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内, 队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。 take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到 BlockingQueue有新的数据被加入; drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import java.util.concurrent.BlockingDeque;import java.util.concurrent.BlockingQueue;public class Producer extends Thread { private BlockingQueue<String> bq; public Producer (BlockingQueue<String> bq) { this .bq = bq; } @Override public void run () { try { String string =Thread.currentThread().getName() + "'s product." ; System.out.println(Thread.currentThread().getName() + ": I have made a product:" ); bq.put(string); }catch (InterruptedException e){ e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.util.concurrent.BlockingDeque;import java.util.concurrent.BlockingQueue;public class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer (BlockingQueue<String> bq) { this .bq = bq; } @Override public void run () { try { String string = bq.take(); System.out.println("准备消费:" + string); } catch (Exception ex) { ex.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import java.util.concurrent.BlockingDeque;import java.util.concurrent.LinkedBlockingDeque;public class BlockingQueueTest { public static void main (String[] args) { BlockingDeque<String> bq = new LinkedBlockingDeque<>(2 ); Consumer consumer = new Consumer(bq); Producer producer = new Producer(bq); for (int i=0 ;i < 5 ;i++){ new Thread(producer,"producer" + (i+1 )).start(); new Thread(consumer,"consumer" + (i+1 )).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 //输出 producer1: I have made a product: producer2: I have made a product: 准备消费:producer1's product. 准备消费:producer2's product. producer3: I have made a product: producer4: I have made a product: 准备消费:producer3's product. 准备消费:producer4's product. producer5: I have made a product: 准备消费:producer5's product.
常见踩坑 code review 中反复指出的问题:用 == 比较字符串;SimpleDateFormat 作为静态字段在多线程下使用;HashMap 在多线程下使用导致死循环(JDK7)或数据丢失;Stream 上重复使用导致 IllegalStateException;时区处理混乱导致跨时区业务出错。这些都不是新问题,但每个新人都会再踩一遍。