生产者消费者模型

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。在多线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

mark

线程状态与锁的回顾

在看生产者与消费者模型之前先回顾一下线程的状态转换与synchronized的几种程度的锁:

mark

偏向锁:获取偏向锁锁时,测试一下对象头的Mark Word里是否存储了当前线程ID,测试成功,则线程获得了锁;测试失败,则需要再测试一下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁):如果没有设置,则使用CAS竞争锁;如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。等到竞争出现才释放锁,也就意味着一旦出现其他线程来竞争锁,偏向锁才会撤销(参考《二、synchronized底层实现与优化》中的图示),恢复到无锁或者标记该对象不适合作为偏向锁!

轻量级锁:在栈中开辟空间复制Java对象头,尝试CAS修改对象头的MarkWord为指向自己在栈中开辟空间的指针,如果成功,则获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。 释放轻量级锁会使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。

重量级锁:重量级锁是JVM中最为基础的锁实现。在这种状态下,JVM虚拟机会阻塞加锁失败的线程,并且在目标锁被释放的时候,唤醒这些线程。 Java线程的阻塞以及唤醒,都是依靠操作系统来完成的。举例来说,对于符合posix接口的操作系统(如macOS和绝大部分的Linux),上述操作通过pthread的互斥锁(mutex)来实现的。此外,这些操作将涉及系统调用,需要从操作系统的用户态切换至内核态,其开销非常之大。

总结:Java虚拟机中synchronized关键字的实现,按照代价由高到低可以分为重量级锁、轻量锁和偏向锁三种。

生产者消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。 mark

wait()与notify()方法

wait()作用:使得当前线程立刻停止运行,处于等待状态(wait),并将当前线程置入锁对象的等待队列中,直到被通知(notify())或被中断为止,wait()调用后立即释放对象锁

wait()使用条件:只能在同步方法或同步代码块中使用,必须是内建锁。

痴汉方法1.0版本 —— 死等,直到被唤醒或中断:

1public final void wait() throws InterruptedException

痴汉2.0版本,超时等待,若规定时间内没被唤醒则线程退出,单位毫秒:

1public final void wait(long timeout) throws InterruptedException

痴汉3.0版本,在上个方法的基础上增加了纳秒控制

1public final void wait(long timeout, int nanos) throws InterruptedException

notify()(痴汉等待的姑娘) 语意:唤醒处于等待状态的线程 使用条件:notify()也必须在同步方法或同步代码块中调用,用来唤醒等待该对象的其他线程,如果有多个线程等待,随机挑选一个被唤醒,具体挑选谁要看版本实现:The choice is arbitrary and occurs at the discretion of the implementation.(就像抛绣球一样,所以wait()和notify()等待和唤醒的对象肯定是同一个对象)

notify()方法调用后,当前对象不会立马释放对象锁,要等到当前线程执行完毕后再释放锁。

 1package com.xpu.demo_08;
 2class MyThread implements Runnable{
 3    private boolean flag;
 4    //锁对象
 5    private Object object;
 6
 7    public MyThread(boolean flag, Object object) {
 8        this.flag = flag;
 9        this.object = object;
10    }
11
12    public void waitMethod(){
13        synchronized (object){
14            System.out.println(Thread.currentThread().getName()
15                    +"wait start...");
16            try {
17                object.wait();
18            } catch (InterruptedException e) {
19                e.printStackTrace();
20            }
21            System.out.println(Thread.currentThread().getName()
22                    +"wait end...");
23        }
24    }
25
26    public void notifyMethod(){
27        synchronized (object){
28            System.out.println(Thread.currentThread().getName()
29                    +"notify start...");
30            //object.notify();
31            object.notifyAll();
32            System.out.println(Thread.currentThread().getName()
33                    +"notify end...");
34        }
35    }
36    @Override
37    public void run() {
38        if(flag){
39            waitMethod();
40        }else {
41            notifyMethod();
42        }
43    }
44}
45public class Demo {
46    public static void main(String[] args) {
47        Object object = new Object();
48        MyThread myThread1 = new MyThread(true, object);
49        MyThread myThread2 = new MyThread(false, object);
50        Thread notifyThread = new Thread(myThread2, "notifyThread");
51
52        for (int i = 0; i < 10; i++) {
53            Thread waitThread = new Thread(myThread1, "waitThread"+i);
54            waitThread.start();
55        }
56        //waitThread.start();
57        try {
58            Thread.sleep(1000);
59        } catch (InterruptedException e) {
60            e.printStackTrace();
61        }
62        notifyThread.start();
63    }
64}

线程由运行态—>阻塞(wait)

  • 调用sleep(),立刻交出CPU,不释放锁
  • 线程调用阻塞式IO(BIO)方法
  • 线程获取锁失败进入阻塞状态
  • 线程调用wait()
  • 线程调用suspend(),将线程挂起(一般不常用,容易造成死锁)

每个锁对象都有两个队列,一个等待队列一个同步队列,同步队列存储获取锁失败的线程,等待队列存储调用wait()等待的线程,将线程唤醒实际上是将处于等待队列的线程移动到同步队列去竞争锁

mark 生产者与消费者一般需要第三者来解耦的,所以现在就模拟一个简单的商品的生产者与消费者,由生产者线程生产出一个商品之后将由消费者线程开始消费!首先创建一个商品类Goods,类中有商品库存以及生产&消费方法。

多生产多消费模型

  1class Goods{
  2    //商品名称
  3    private String goodsName;
  4
  5    //商品数量
  6    private int count;
  7
  8    //生产商品的方法
  9    public synchronized void setGoodsName(String goodsName){
 10        //当前还有商品,需要等待消费者消费
 11        while(count > 5){
 12            try {
 13                wait();
 14            } catch (InterruptedException e) {
 15                e.printStackTrace();
 16            }
 17        }
 18        try {
 19            Thread.sleep(200);
 20        } catch (InterruptedException e) {
 21            e.printStackTrace();
 22        }
 23        this.goodsName = goodsName;
 24        this.count++;
 25        System.out.println(Thread.currentThread().
 26                getName()+"生产"+toString());
 27
 28        //生产商品后唤醒消费者可以消费了
 29        notify();
 30    }
 31
 32    //消费商品的方法
 33    public synchronized void getGoods(){
 34        //还没有商品,消费者需要等待
 35        while(this.count == 0){
 36            try {
 37                wait();
 38            } catch (InterruptedException e) {
 39                e.printStackTrace();
 40            }
 41        }
 42        try {
 43            Thread.sleep(200);
 44        } catch (InterruptedException e) {
 45            e.printStackTrace();
 46        }
 47        this.count--;
 48        System.out.println(Thread.currentThread().
 49                getName()+"消费"+toString());
 50
 51        //消费完商品后唤醒生产者该继续生产了
 52        notifyAll();
 53    }
 54    @Override
 55    public String toString() {
 56        return "Goods{" +
 57                "goodsName='" + goodsName + '\'' +
 58                ", count=" + count +
 59                '}';
 60    }
 61}
 62
 63//生产者
 64class Producer implements Runnable{
 65    private Goods goods;
 66
 67    public Producer(Goods goods) {
 68        this.goods = goods;
 69    }
 70
 71    @Override
 72    public void run() {
 73        while(true)
 74            this.goods.setGoodsName("生产纪梵希限量套装");
 75    }
 76}
 77
 78//消费者
 79class Consumer implements Runnable{
 80    private Goods goods;
 81
 82    public Consumer(Goods goods) {
 83        this.goods = goods;
 84    }
 85
 86    @Override
 87    public void run() {
 88        while(true)
 89            goods.getGoods();
 90    }
 91}
 92
 93public class Demo {
 94    public static void main(String[] args) {
 95        Goods goods = new Goods();
 96
 97        Producer producer = new Producer(goods);
 98        Consumer consumer = new Consumer(goods);
 99
100        List<Thread> list = new ArrayList<>();
101        //创建5个生产者
102        for (int i = 0; i < 5; i++) {
103            Thread thread = new Thread(producer, "生产者"+i);
104            list.add(thread);
105        }
106
107        //创建10个消费者
108        for (int i = 0; i < 10; i++) {
109            Thread thread = new Thread(consumer, "消费者"+i);
110            list.add(thread);
111        }
112
113        for(Thread thread:list){
114            thread.start();
115        }
116    }
117}

需要注意的问题

一、sleep和wait的区别是什么?

1、sleep()是Thread类中定义的方法,到了一定的时间后该线程自动唤醒,不会释放对象锁。 2、wait()是Object类中定义的方法,要想唤醒必须使用notify()、notifyAll()方法才能唤醒,会释放对象锁。

二、什么时候使用notify()和notifyAll()?

如果是单生产者或者单消费者,那么使用notify();是没问题的,但是如果是多生产者或多消费者,肯定要使用notifyAll(),否则容易造成相互等待,出现死锁!