虚假唤醒与Condition控制线程通信

mark

本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。

生产者消费者示例

首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。

 1//店员类
 2class Clerk {
 3    private int product = 0;
 4
 5    // 进货
 6    public synchronized void get() {
 7        if (product >= 5) {
 8            System.out.println("库存已满");
 9        } else {
10            System.out.println(Thread.currentThread().getName() + ":" + ++product);
11        }
12    }
13
14    // 售货
15    public synchronized void sale() {
16        if (product <= 0) {
17            System.out.println("库存缺货");
18        } else {
19            System.out.println(Thread.currentThread().getName() + ":" + --product);
20        }
21    }
22}
23
24// 生产者类
25class Productor implements Runnable {
26
27    private Clerk clerk;
28
29    public Productor(Clerk clerk) {
30        this.clerk = clerk;
31    }
32
33    @Override
34    public void run() {
35        for (int i = 0; i < 10; i++) {
36            clerk.get();
37        }
38
39    }
40}
41
42//消费者类
43class Consumer implements Runnable {
44
45    private Clerk clerk;
46
47    public Consumer(Clerk clerk) {
48        this.clerk = clerk;
49    }
50
51    @Override
52    public void run() {
53        for (int i = 0; i < 10; i++) {
54            clerk.sale();
55        }
56    }
57}
58
59public class ProductorAndConsumer {
60
61    public static void main(String[] args) {
62        Clerk clerk = new Clerk();
63
64        Productor productor = new Productor(clerk);
65        Consumer consumer = new Consumer(clerk);
66
67        new Thread(productor,"Productor A").start();
68        new Thread(consumer,"Consumer B").start();
69    }
70}

mark

这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:

 1//店员类
 2class Clerk {
 3    private int product = 0;
 4
 5    // 进货
 6    public synchronized void get() {
 7        if (product >= 5) {
 8            System.out.println("库存已满");
 9
10            try {
11                this.wait();
12            } catch (InterruptedException e) {
13                e.printStackTrace();
14            }
15        } else {
16            System.out.println(Thread.currentThread().getName() + ":" + ++product);
17            this.notifyAll();
18        }
19    }
20
21    // 售货
22    public synchronized void sale() {
23        if (product <= 0) {
24            System.out.println("库存缺货");
25            try {
26                this.wait();
27            } catch (InterruptedException e) {
28                e.printStackTrace();
29            }
30        } else {
31            System.out.println(Thread.currentThread().getName() + ":" + --product);
32            this.notifyAll();
33        }
34    }
35}
36
37// 生产者类
38class Productor implements Runnable {
39
40    private Clerk clerk;
41
42    public Productor(Clerk clerk) {
43        this.clerk = clerk;
44    }
45
46    @Override
47    public void run() {
48        for (int i = 0; i < 10; i++) {
49            clerk.get();
50        }
51    }
52}
53
54//消费者类
55class Consumer implements Runnable {
56
57    private Clerk clerk;
58
59    public Consumer(Clerk clerk) {
60        this.clerk = clerk;
61    }
62
63    @Override
64    public void run() {
65        for (int i = 0; i < 10; i++) {
66            clerk.sale();
67        }
68
69    }
70}
71
72public class ProductorAndConsumer {
73
74    public static void main(String[] args) {
75        Clerk clerk = new Clerk();
76
77        Productor productor = new Productor(clerk);
78        Consumer consumer = new Consumer(clerk);
79
80        new Thread(productor,"Productor A").start();
81        new Thread(consumer,"Consumer B").start();
82    }
83}

再运行程序,就不会再出现上述的情况:

mark

现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):

 1//店员类
 2class Clerk {
 3    private int product = 0;
 4
 5    // 将产品的囤积上限设定为1
 6    public synchronized void get() {
 7        if (product >= 1) {
 8            System.out.println("库存已满");
 9
10            try {
11                this.wait();
12            } catch (InterruptedException e) {
13                e.printStackTrace();
14            }
15        } else {
16            System.out.println(Thread.currentThread().getName() + ":" + ++product);
17            this.notifyAll();
18        }
19    }
20}

mark

程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。

解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:

mark

目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?

 1//店员类
 2class Clerk {
 3    private int product = 0;
 4
 5    public synchronized void get() {
 6        if (product >= 1) {
 7            System.out.println("库存已满");
 8
 9            try {
10                this.wait();
11            } catch (InterruptedException e) {
12                e.printStackTrace();
13            }
14        }
15        System.out.println(Thread.currentThread().getName() + ":" + ++product);
16        this.notifyAll();
17    }
18
19    public synchronized void sale() {
20        if (product <= 0) {
21            System.out.println("库存缺货");
22            try {
23                this.wait();
24            } catch (InterruptedException e) {
25                e.printStackTrace();
26            }
27        }
28        System.out.println(Thread.currentThread().getName() + ":" + --product);
29        this.notifyAll();
30    }
31}
32
33// 生产者类
34class Productor implements Runnable {
35
36    private Clerk clerk;
37
38    public Productor(Clerk clerk) {
39        this.clerk = clerk;
40    }
41
42    @Override
43    public void run() {
44        for (int i = 0; i < 10; i++) {
45            try {
46                Thread.sleep(100);
47            } catch (InterruptedException e) {
48                // TODO Auto-generated catch block
49                e.printStackTrace();
50            }
51            clerk.get();
52        }
53    }
54}
55
56// 消费者类
57class Consumer implements Runnable {
58
59    private Clerk clerk;
60
61    public Consumer(Clerk clerk) {
62        this.clerk = clerk;
63    }
64
65    @Override
66    public void run() {
67        for (int i = 0; i < 10; i++) {
68            clerk.sale();
69        }
70    }
71}
72
73public class ProductorAndConsumer {
74
75    public static void main(String[] args) {
76        Clerk clerk = new Clerk();
77
78        Productor productor = new Productor(clerk);
79        Consumer consumer = new Consumer(clerk);
80
81        new Thread(productor, "Productor A").start();
82        new Thread(consumer, "Consumer B").start();
83        new Thread(productor, "Productor C").start();
84        new Thread(consumer, "Consumer D").start();
85    }
86}

但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。

mark

在Object.wait()方法的javadoc中叙述了该如何解决这种问题:

mark

意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:

 1//店员类
 2class Clerk {
 3    private int product = 0;
 4
 5    public synchronized void get() {
 6        while (product >= 1) {
 7            System.out.println("库存已满");
 8
 9            try {
10                this.wait();
11            } catch (InterruptedException e) {
12                e.printStackTrace();
13            }
14        }
15        System.out.println(Thread.currentThread().getName() + ":" + ++product);
16        this.notifyAll();
17    }
18
19    public synchronized void sale() {
20        while (product <= 0) {
21            System.out.println("库存缺货");
22            try {
23                this.wait();
24            } catch (InterruptedException e) {
25                e.printStackTrace();
26            }
27        }
28        System.out.println(Thread.currentThread().getName() + ":" + --product);
29        this.notifyAll();
30    }
31}
32
33// 生产者类
34class Productor implements Runnable {
35
36    private Clerk clerk;
37
38    public Productor(Clerk clerk) {
39        this.clerk = clerk;
40    }
41
42    @Override
43    public void run() {
44        for (int i = 0; i < 10; i++) {
45            try {
46                Thread.sleep(100);
47            } catch (InterruptedException e) {
48                e.printStackTrace();
49            }
50            clerk.get();
51        }
52    }
53}
54
55// 消费者类
56class Consumer implements Runnable {
57
58    private Clerk clerk;
59
60    public Consumer(Clerk clerk) {
61        this.clerk = clerk;
62    }
63
64    @Override
65    public void run() {
66        for (int i = 0; i < 10; i++) {
67            clerk.sale();
68        }
69    }
70}
71
72public class ProductorAndConsumer {
73
74    public static void main(String[] args) {
75        Clerk clerk = new Clerk();
76
77        Productor productor = new Productor(clerk);
78        Consumer consumer = new Consumer(clerk);
79
80        new Thread(productor, "Productor A").start();
81        new Thread(consumer, "Consumer B").start();
82        new Thread(productor, "Productor C").start();
83        new Thread(consumer, "Consumer D").start();
84    }
85}

mark

Condition控制线程间通信

由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:

  1import java.util.concurrent.locks.Condition;
  2import java.util.concurrent.locks.Lock;
  3import java.util.concurrent.locks.ReentrantLock;
  4
  5//店员类
  6class Clerk {
  7    private int product = 0;
  8    private Lock reentrantLock = new ReentrantLock();
  9    Condition condition = reentrantLock.newCondition();
 10
 11    public void get() {
 12        reentrantLock.lock();
 13        try {
 14            while (product >= 1) {
 15                System.out.println("库存已满");
 16                try {
 17                    //this.wait();
 18                    condition.await();
 19                } catch (InterruptedException e) {
 20                    e.printStackTrace();
 21                }
 22            }
 23            System.out.println(Thread.currentThread().getName() + ":" + ++product);
 24            //this.notifyAll();
 25            condition.signalAll();
 26        }finally {
 27            reentrantLock.unlock();
 28        }
 29    }
 30
 31    public void sale() {
 32        reentrantLock.lock();
 33        try {
 34            while (product <= 0) {
 35                System.out.println("库存缺货");
 36                try {
 37                    //this.wait();
 38                    condition.await();
 39                } catch (InterruptedException e) {
 40                    e.printStackTrace();
 41                }
 42            }
 43            System.out.println(Thread.currentThread().getName() + ":" + --product);
 44            //this.notifyAll();
 45            condition.signalAll();
 46        }finally {
 47            reentrantLock.unlock();
 48        }
 49    }
 50}
 51
 52// 生产者类
 53class Productor implements Runnable {
 54
 55    private Clerk clerk;
 56
 57    public Productor(Clerk clerk) {
 58        this.clerk = clerk;
 59    }
 60
 61    @Override
 62    public void run() {
 63        for (int i = 0; i < 10; i++) {
 64            try {
 65                Thread.sleep(100);
 66            } catch (InterruptedException e) {
 67                e.printStackTrace();
 68            }
 69            clerk.get();
 70        }
 71    }
 72}
 73
 74// 消费者类
 75class Consumer implements Runnable {
 76
 77    private Clerk clerk;
 78
 79    public Consumer(Clerk clerk) {
 80        this.clerk = clerk;
 81    }
 82
 83    @Override
 84    public void run() {
 85        for (int i = 0; i < 10; i++) {
 86            clerk.sale();
 87        }
 88    }
 89}
 90
 91public class ProductorAndConsumerForLock {
 92
 93    public static void main(String[] args) {
 94        Clerk clerk = new Clerk();
 95
 96        Productor productor = new Productor(clerk);
 97        Consumer consumer = new Consumer(clerk);
 98
 99        new Thread(productor, "Productor A").start();
100        new Thread(consumer, "Consumer B").start();
101        new Thread(productor, "Productor C").start();
102        new Thread(consumer, "Consumer D").start();
103    }
104}

效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。

mark

其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。

Condition实现线程交替执行

  1import java.util.concurrent.locks.Condition;
  2import java.util.concurrent.locks.Lock;
  3import java.util.concurrent.locks.ReentrantLock;
  4
  5//线程交替执行
  6public class ThreadAlternate {
  7    public static void main(String[] args) {
  8        ThreadAlternateRun alternateRun = new ThreadAlternateRun();
  9
 10        new Thread(()->{
 11            for (int i = 0; i < 20; i++) {
 12                alternateRun.loopA(i);
 13            }
 14        }, "A").start();
 15        new Thread(()->{
 16            for (int i = 0; i < 20; i++) {
 17                alternateRun.loopB(i);
 18            }
 19        }, "B").start();
 20        new Thread(()->{
 21            for (int i = 0; i < 20; i++) {
 22                alternateRun.loopC(i);
 23                System.out.println("---------------------------------");
 24            }
 25        }, "C").start();
 26    }
 27}
 28
 29class ThreadAlternateRun{
 30    private int position = 1;
 31    Lock lock = new ReentrantLock();
 32    Condition conditionA = lock.newCondition();
 33    Condition conditionB = lock.newCondition();
 34    Condition conditionC = lock.newCondition();
 35
 36    public void loopA(int round){
 37        lock.lock();
 38        try{
 39            //判断
 40            if(position != 1){
 41                try {
 42                    conditionA.await();
 43                } catch (InterruptedException e) {
 44                    e.printStackTrace();
 45                }
 46            }
 47
 48            //打印
 49            for (int i = 1; i <=3 ; i++) {
 50                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
 51            }
 52
 53            //唤醒
 54            position = 2;
 55            conditionB.signal();
 56        }finally {
 57            lock.unlock();
 58        }
 59    }
 60
 61    public void loopB(int round){
 62        lock.lock();
 63        try{
 64            //判断
 65            if(position != 2){
 66                try {
 67                    conditionB.await();
 68                } catch (InterruptedException e) {
 69                    e.printStackTrace();
 70                }
 71            }
 72
 73            //打印
 74            for (int i = 1; i <=3 ; i++) {
 75                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
 76            }
 77
 78            //唤醒
 79            position = 3;
 80            conditionC.signal();
 81        }finally {
 82            lock.unlock();
 83        }
 84    }
 85
 86    public void loopC(int round){
 87        lock.lock();
 88        try{
 89            //判断
 90            if(position != 3){
 91                try {
 92                    conditionC.await();
 93                } catch (InterruptedException e) {
 94                    e.printStackTrace();
 95                }
 96            }
 97
 98            //打印
 99            for (int i = 1; i <=3 ; i++) {
100                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
101            }
102
103            //唤醒
104            position = 1;
105            conditionA.signal();
106        }finally {
107            lock.unlock();
108        }
109    }
110}

mark