前置知识
此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:
Youtube上的一个教程,非常实战,能快速找到感觉。
Oracle的并发官方教程,很全面,但不深入。
国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。
免费的翻译书,没有看,据说不错。
问题描述
生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:
- 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
- 生产者、消费者不可同时访问缓冲区;
- 生产者不可向满缓冲区放产品;
- 消费者不可从空缓冲区取产品。
信号量解法
public class PCSemaphore { private final static int BUFFER_SIZE = 10; public static void main(String[] args) { Semaphore mutex = new Semaphore(1); Semaphore full = new Semaphore(0); Semaphore empty = new Semaphore(BUFFER_SIZE); Queuebuffer = new LinkedList<>(); Producer producer = new Producer(mutex, empty, full, buffer); Consumer consumer = new Consumer(mutex, empty, full, buffer); // 可以初始化多个生产者、消费者 new Thread(producer, "p1").start(); new Thread(producer, "p2").start(); new Thread(consumer, "c1").start(); new Thread(consumer, "c2").start(); new Thread(consumer, "c3").start(); }}class Producer implements Runnable { private Semaphore mutex, empty, full; private Queue buffer; private Integer counter = 0; public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue buffer) { this.mutex = mutex; this.empty = empty; this.full = full; this.buffer = buffer; } @Override public void run() { while (true) { try { empty.acquire(); mutex.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } buffer.offer(counter++); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } full.release(); mutex.release(); } }}class Consumer implements Runnable { private Semaphore mutex, empty, full; private Queue buffer; public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue buffer) { this.mutex = mutex; this.empty = empty; this.full = full; this.buffer = buffer; } @Override public void run() { String threadName = Thread.currentThread().getName(); while (true) { try { full.acquire(); mutex.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Integer product = buffer.poll(); int left = buffer.size(); System.out.printf("%s consumed %d left %d%n", threadName, product, left); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } empty.release(); mutex.release(); } }}
教科书的解法,问题有三:
不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:
同样地不可交换生产者中的empty和mutex信号量的请求顺序。 即必须遵守先资源信号量,再互斥信号量的请求顺序。 教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。
效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。
注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。
wait() & notify()
public class PCWaitNotify { public final static int BUFFER_SIZE = 10; public static void main(String[] args) { Object mutex = new Object(); AtomicInteger counter = new AtomicInteger(0); Queuebuffer = new LinkedList<>(); Producer producer = new Producer(buffer, counter, mutex); Consumer consumer = new Consumer(buffer, mutex); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); }}class Producer implements Runnable { private Random rand = new Random(); private Queue buffer; private AtomicInteger counter; // 支持原子操作的基本类型包装类 private Object mutex; public Producer(Queue buffer, AtomicInteger counter, Object mutex) { this.buffer = buffer; this.counter = counter; this.mutex = mutex; } @Override public void run() { while (true) { synchronized (mutex) { while (buffer.size() == PCWaitNotify.BUFFER_SIZE) { try { mutex.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buffer.offer(counter.incrementAndGet()); mutex.notify(); } try { Thread.sleep(rand.nextInt(800)); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Consumer implements Runnable { private Random rand = new Random(); private Queue buffer; private Object mutex; public Consumer(Queue buffer, Object mutex) { this.buffer = buffer; this.mutex = mutex; } @Override public void run() { while (true) { synchronized (mutex) { while (buffer.size() == 0) { try { mutex.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed " + buffer.poll() + " left " + buffer.size()); mutex.notify(); } try { Thread.sleep(rand.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } } }}
以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。
BlockingQueue
public class PCBlockingQueue { private final static int BUFFER_SIZE = 10; public static void main(String[] args) { BlockingQueuebuffer = new LinkedBlockingQueue<>(BUFFER_SIZE); AtomicInteger counter = new AtomicInteger(0); Producer producer = new Producer(buffer, counter); Consumer consumer = new Consumer(buffer); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); }}class Producer implements Runnable { private Random rand = new Random(); private AtomicInteger counter; private BlockingQueue buffer; public Producer(BlockingQueue buffer, AtomicInteger counter) { this.buffer = buffer; this.counter = counter; } @Override public void run() { while (true) { try { Thread.sleep(rand.nextInt(800)); Integer product = counter.incrementAndGet(); buffer.put(product); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Consumer implements Runnable { private Random rand = new Random(); private BlockingQueue buffer; public Consumer(BlockingQueue buffer) { this.buffer = buffer; } @Override public void run() { while (true) { try { Thread.sleep(rand.nextInt(600)); Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。 System.out.println("consumed " + product + " left " + buffer.size()); } catch (InterruptedException e) { e.printStackTrace(); } } }}
你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。