博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
经典并发问题:生产者-消费者
阅读量:6203 次
发布时间:2019-06-21

本文共 7802 字,大约阅读时间需要 26 分钟。

前置知识

此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:

Youtube上的一个教程,非常实战,能快速找到感觉。

Oracle的并发官方教程,很全面,但不深入。

国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。

免费的翻译书,没有看,据说不错。

问题描述

生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:

  • 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
  • 生产者、消费者不可同时访问缓冲区;
  • 生产者不可向满缓冲区放产品;
  • 消费者不可从空缓冲区取产品。

信号量解法

public class PCSemaphore {    private final static int BUFFER_SIZE = 10;    public static void main(String[] args) {        Semaphore mutex = new Semaphore(1);        Semaphore full = new Semaphore(0);        Semaphore empty = new Semaphore(BUFFER_SIZE);        Queue
buffer = new LinkedList<>(); Producer producer = new Producer(mutex, empty, full, buffer); Consumer consumer = new Consumer(mutex, empty, full, buffer); // 可以初始化多个生产者、消费者 new Thread(producer, "p1").start(); new Thread(producer, "p2").start(); new Thread(consumer, "c1").start(); new Thread(consumer, "c2").start(); new Thread(consumer, "c3").start(); }}class Producer implements Runnable { private Semaphore mutex, empty, full; private Queue
buffer; private Integer counter = 0; public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue
buffer) { this.mutex = mutex; this.empty = empty; this.full = full; this.buffer = buffer; } @Override public void run() { while (true) { try { empty.acquire(); mutex.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } buffer.offer(counter++); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } full.release(); mutex.release(); } }}class Consumer implements Runnable { private Semaphore mutex, empty, full; private Queue
buffer; public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue
buffer) { this.mutex = mutex; this.empty = empty; this.full = full; this.buffer = buffer; } @Override public void run() { String threadName = Thread.currentThread().getName(); while (true) { try { full.acquire(); mutex.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Integer product = buffer.poll(); int left = buffer.size(); System.out.printf("%s consumed %d left %d%n", threadName, product, left); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } empty.release(); mutex.release(); } }}

教科书的解法,问题有三:

  1. 不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:

    image_1camh79sc162efh2a42ir2eq19.png-20.5kB
    同样地不可交换生产者中的empty和mutex信号量的请求顺序。
    即必须遵守先资源信号量,再互斥信号量的请求顺序。
    教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。

  2. 将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。

  3. 效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。

注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。

wait() & notify()

public class PCWaitNotify {    public final static int BUFFER_SIZE = 10;    public static void main(String[] args) {        Object mutex = new Object();        AtomicInteger counter = new AtomicInteger(0);        Queue
buffer = new LinkedList<>(); Producer producer = new Producer(buffer, counter, mutex); Consumer consumer = new Consumer(buffer, mutex); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); }}class Producer implements Runnable { private Random rand = new Random(); private Queue
buffer; private AtomicInteger counter; // 支持原子操作的基本类型包装类 private Object mutex; public Producer(Queue
buffer, AtomicInteger counter, Object mutex) { this.buffer = buffer; this.counter = counter; this.mutex = mutex; } @Override public void run() { while (true) { synchronized (mutex) { while (buffer.size() == PCWaitNotify.BUFFER_SIZE) { try { mutex.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buffer.offer(counter.incrementAndGet()); mutex.notify(); } try { Thread.sleep(rand.nextInt(800)); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Consumer implements Runnable { private Random rand = new Random(); private Queue
buffer; private Object mutex; public Consumer(Queue
buffer, Object mutex) { this.buffer = buffer; this.mutex = mutex; } @Override public void run() { while (true) { synchronized (mutex) { while (buffer.size() == 0) { try { mutex.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("consumed " + buffer.poll() + " left " + buffer.size()); mutex.notify(); } try { Thread.sleep(rand.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } } }}

以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。

BlockingQueue

public class PCBlockingQueue {    private final static int BUFFER_SIZE = 10;    public static void main(String[] args) {        BlockingQueue
buffer = new LinkedBlockingQueue<>(BUFFER_SIZE); AtomicInteger counter = new AtomicInteger(0); Producer producer = new Producer(buffer, counter); Consumer consumer = new Consumer(buffer); new Thread(producer).start(); new Thread(producer).start(); new Thread(consumer).start(); }}class Producer implements Runnable { private Random rand = new Random(); private AtomicInteger counter; private BlockingQueue
buffer; public Producer(BlockingQueue
buffer, AtomicInteger counter) { this.buffer = buffer; this.counter = counter; } @Override public void run() { while (true) { try { Thread.sleep(rand.nextInt(800)); Integer product = counter.incrementAndGet(); buffer.put(product); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Consumer implements Runnable { private Random rand = new Random(); private BlockingQueue
buffer; public Consumer(BlockingQueue
buffer) { this.buffer = buffer; } @Override public void run() { while (true) { try { Thread.sleep(rand.nextInt(600)); Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。 System.out.println("consumed " + product + " left " + buffer.size()); } catch (InterruptedException e) { e.printStackTrace(); } } }}

你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。

转载于:https://www.cnblogs.com/sequix/p/8776716.html

你可能感兴趣的文章
C# (类型、对象、线程栈和托管堆)在运行时的相互关系
查看>>
文件上传速度查询方法
查看>>
Linux基线合规检查中各文件的作用及配置脚本
查看>>
Nexus3.x.x上传第三方jar
查看>>
零元学Expression Blend 4 - Chapter 25 以Text相关功能就能简单做出具有设计感的登入画面...
查看>>
spring boot: spring Aware的目的是为了让Bean获得Spring容器的服务
查看>>
新购阿里云服务器ECS创建之后无法ssh连接的问题处理
查看>>
峰识别 峰面积计算 peak detection peak area 源代码 下载
查看>>
Django REST framework
查看>>
正则表达式基础恶补
查看>>
总结一下php5.2.16与apache2.0的C++扩展开发整个过程
查看>>
Follow That Page - web monitor: we send you an email when your favorite page has changed.
查看>>
C#类、接口、虚方法和抽象方法-抽象类与接口的区别与联系
查看>>
(转)C#中 DirectoryEntry组件应用实例
查看>>
ubuntu 12.04解决Broadcom STA无线网卡驱动安装失败解决
查看>>
解决SVN提交代码时的错误:“Could not execute PROPPATCH”
查看>>
C#操作XmlDocument对象 报缺少根节点 一一道来
查看>>
出了本练内功的书:《完美软件开发:方法与逻辑》
查看>>
object c 快速构建对象
查看>>
C链表反转(时间复杂度O(n))
查看>>