虚假唤醒与Condition控制线程通信
本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。
生产者消费者示例
首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。
1//店员类
2class Clerk {
3 private int product = 0;
4
5 // 进货
6 public synchronized void get() {
7 if (product >= 5) {
8 System.out.println("库存已满");
9 } else {
10 System.out.println(Thread.currentThread().getName() + ":" + ++product);
11 }
12 }
13
14 // 售货
15 public synchronized void sale() {
16 if (product <= 0) {
17 System.out.println("库存缺货");
18 } else {
19 System.out.println(Thread.currentThread().getName() + ":" + --product);
20 }
21 }
22}
23
24// 生产者类
25class Productor implements Runnable {
26
27 private Clerk clerk;
28
29 public Productor(Clerk clerk) {
30 this.clerk = clerk;
31 }
32
33 @Override
34 public void run() {
35 for (int i = 0; i < 10; i++) {
36 clerk.get();
37 }
38
39 }
40}
41
42//消费者类
43class Consumer implements Runnable {
44
45 private Clerk clerk;
46
47 public Consumer(Clerk clerk) {
48 this.clerk = clerk;
49 }
50
51 @Override
52 public void run() {
53 for (int i = 0; i < 10; i++) {
54 clerk.sale();
55 }
56 }
57}
58
59public class ProductorAndConsumer {
60
61 public static void main(String[] args) {
62 Clerk clerk = new Clerk();
63
64 Productor productor = new Productor(clerk);
65 Consumer consumer = new Consumer(clerk);
66
67 new Thread(productor,"Productor A").start();
68 new Thread(consumer,"Consumer B").start();
69 }
70}
这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:
1//店员类
2class Clerk {
3 private int product = 0;
4
5 // 进货
6 public synchronized void get() {
7 if (product >= 5) {
8 System.out.println("库存已满");
9
10 try {
11 this.wait();
12 } catch (InterruptedException e) {
13 e.printStackTrace();
14 }
15 } else {
16 System.out.println(Thread.currentThread().getName() + ":" + ++product);
17 this.notifyAll();
18 }
19 }
20
21 // 售货
22 public synchronized void sale() {
23 if (product <= 0) {
24 System.out.println("库存缺货");
25 try {
26 this.wait();
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 }
30 } else {
31 System.out.println(Thread.currentThread().getName() + ":" + --product);
32 this.notifyAll();
33 }
34 }
35}
36
37// 生产者类
38class Productor implements Runnable {
39
40 private Clerk clerk;
41
42 public Productor(Clerk clerk) {
43 this.clerk = clerk;
44 }
45
46 @Override
47 public void run() {
48 for (int i = 0; i < 10; i++) {
49 clerk.get();
50 }
51 }
52}
53
54//消费者类
55class Consumer implements Runnable {
56
57 private Clerk clerk;
58
59 public Consumer(Clerk clerk) {
60 this.clerk = clerk;
61 }
62
63 @Override
64 public void run() {
65 for (int i = 0; i < 10; i++) {
66 clerk.sale();
67 }
68
69 }
70}
71
72public class ProductorAndConsumer {
73
74 public static void main(String[] args) {
75 Clerk clerk = new Clerk();
76
77 Productor productor = new Productor(clerk);
78 Consumer consumer = new Consumer(clerk);
79
80 new Thread(productor,"Productor A").start();
81 new Thread(consumer,"Consumer B").start();
82 }
83}
再运行程序,就不会再出现上述的情况:
现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):
1//店员类
2class Clerk {
3 private int product = 0;
4
5 // 将产品的囤积上限设定为1
6 public synchronized void get() {
7 if (product >= 1) {
8 System.out.println("库存已满");
9
10 try {
11 this.wait();
12 } catch (InterruptedException e) {
13 e.printStackTrace();
14 }
15 } else {
16 System.out.println(Thread.currentThread().getName() + ":" + ++product);
17 this.notifyAll();
18 }
19 }
20}
程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。
解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:
目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?
1//店员类
2class Clerk {
3 private int product = 0;
4
5 public synchronized void get() {
6 if (product >= 1) {
7 System.out.println("库存已满");
8
9 try {
10 this.wait();
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 System.out.println(Thread.currentThread().getName() + ":" + ++product);
16 this.notifyAll();
17 }
18
19 public synchronized void sale() {
20 if (product <= 0) {
21 System.out.println("库存缺货");
22 try {
23 this.wait();
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 }
28 System.out.println(Thread.currentThread().getName() + ":" + --product);
29 this.notifyAll();
30 }
31}
32
33// 生产者类
34class Productor implements Runnable {
35
36 private Clerk clerk;
37
38 public Productor(Clerk clerk) {
39 this.clerk = clerk;
40 }
41
42 @Override
43 public void run() {
44 for (int i = 0; i < 10; i++) {
45 try {
46 Thread.sleep(100);
47 } catch (InterruptedException e) {
48 // TODO Auto-generated catch block
49 e.printStackTrace();
50 }
51 clerk.get();
52 }
53 }
54}
55
56// 消费者类
57class Consumer implements Runnable {
58
59 private Clerk clerk;
60
61 public Consumer(Clerk clerk) {
62 this.clerk = clerk;
63 }
64
65 @Override
66 public void run() {
67 for (int i = 0; i < 10; i++) {
68 clerk.sale();
69 }
70 }
71}
72
73public class ProductorAndConsumer {
74
75 public static void main(String[] args) {
76 Clerk clerk = new Clerk();
77
78 Productor productor = new Productor(clerk);
79 Consumer consumer = new Consumer(clerk);
80
81 new Thread(productor, "Productor A").start();
82 new Thread(consumer, "Consumer B").start();
83 new Thread(productor, "Productor C").start();
84 new Thread(consumer, "Consumer D").start();
85 }
86}
但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。
在Object.wait()方法的javadoc中叙述了该如何解决这种问题:
意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:
1//店员类
2class Clerk {
3 private int product = 0;
4
5 public synchronized void get() {
6 while (product >= 1) {
7 System.out.println("库存已满");
8
9 try {
10 this.wait();
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 System.out.println(Thread.currentThread().getName() + ":" + ++product);
16 this.notifyAll();
17 }
18
19 public synchronized void sale() {
20 while (product <= 0) {
21 System.out.println("库存缺货");
22 try {
23 this.wait();
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 }
28 System.out.println(Thread.currentThread().getName() + ":" + --product);
29 this.notifyAll();
30 }
31}
32
33// 生产者类
34class Productor implements Runnable {
35
36 private Clerk clerk;
37
38 public Productor(Clerk clerk) {
39 this.clerk = clerk;
40 }
41
42 @Override
43 public void run() {
44 for (int i = 0; i < 10; i++) {
45 try {
46 Thread.sleep(100);
47 } catch (InterruptedException e) {
48 e.printStackTrace();
49 }
50 clerk.get();
51 }
52 }
53}
54
55// 消费者类
56class Consumer implements Runnable {
57
58 private Clerk clerk;
59
60 public Consumer(Clerk clerk) {
61 this.clerk = clerk;
62 }
63
64 @Override
65 public void run() {
66 for (int i = 0; i < 10; i++) {
67 clerk.sale();
68 }
69 }
70}
71
72public class ProductorAndConsumer {
73
74 public static void main(String[] args) {
75 Clerk clerk = new Clerk();
76
77 Productor productor = new Productor(clerk);
78 Consumer consumer = new Consumer(clerk);
79
80 new Thread(productor, "Productor A").start();
81 new Thread(consumer, "Consumer B").start();
82 new Thread(productor, "Productor C").start();
83 new Thread(consumer, "Consumer D").start();
84 }
85}
Condition控制线程间通信
由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:
1import java.util.concurrent.locks.Condition;
2import java.util.concurrent.locks.Lock;
3import java.util.concurrent.locks.ReentrantLock;
4
5//店员类
6class Clerk {
7 private int product = 0;
8 private Lock reentrantLock = new ReentrantLock();
9 Condition condition = reentrantLock.newCondition();
10
11 public void get() {
12 reentrantLock.lock();
13 try {
14 while (product >= 1) {
15 System.out.println("库存已满");
16 try {
17 //this.wait();
18 condition.await();
19 } catch (InterruptedException e) {
20 e.printStackTrace();
21 }
22 }
23 System.out.println(Thread.currentThread().getName() + ":" + ++product);
24 //this.notifyAll();
25 condition.signalAll();
26 }finally {
27 reentrantLock.unlock();
28 }
29 }
30
31 public void sale() {
32 reentrantLock.lock();
33 try {
34 while (product <= 0) {
35 System.out.println("库存缺货");
36 try {
37 //this.wait();
38 condition.await();
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 }
42 }
43 System.out.println(Thread.currentThread().getName() + ":" + --product);
44 //this.notifyAll();
45 condition.signalAll();
46 }finally {
47 reentrantLock.unlock();
48 }
49 }
50}
51
52// 生产者类
53class Productor implements Runnable {
54
55 private Clerk clerk;
56
57 public Productor(Clerk clerk) {
58 this.clerk = clerk;
59 }
60
61 @Override
62 public void run() {
63 for (int i = 0; i < 10; i++) {
64 try {
65 Thread.sleep(100);
66 } catch (InterruptedException e) {
67 e.printStackTrace();
68 }
69 clerk.get();
70 }
71 }
72}
73
74// 消费者类
75class Consumer implements Runnable {
76
77 private Clerk clerk;
78
79 public Consumer(Clerk clerk) {
80 this.clerk = clerk;
81 }
82
83 @Override
84 public void run() {
85 for (int i = 0; i < 10; i++) {
86 clerk.sale();
87 }
88 }
89}
90
91public class ProductorAndConsumerForLock {
92
93 public static void main(String[] args) {
94 Clerk clerk = new Clerk();
95
96 Productor productor = new Productor(clerk);
97 Consumer consumer = new Consumer(clerk);
98
99 new Thread(productor, "Productor A").start();
100 new Thread(consumer, "Consumer B").start();
101 new Thread(productor, "Productor C").start();
102 new Thread(consumer, "Consumer D").start();
103 }
104}
效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。
其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。
Condition实现线程交替执行
1import java.util.concurrent.locks.Condition;
2import java.util.concurrent.locks.Lock;
3import java.util.concurrent.locks.ReentrantLock;
4
5//线程交替执行
6public class ThreadAlternate {
7 public static void main(String[] args) {
8 ThreadAlternateRun alternateRun = new ThreadAlternateRun();
9
10 new Thread(()->{
11 for (int i = 0; i < 20; i++) {
12 alternateRun.loopA(i);
13 }
14 }, "A").start();
15 new Thread(()->{
16 for (int i = 0; i < 20; i++) {
17 alternateRun.loopB(i);
18 }
19 }, "B").start();
20 new Thread(()->{
21 for (int i = 0; i < 20; i++) {
22 alternateRun.loopC(i);
23 System.out.println("---------------------------------");
24 }
25 }, "C").start();
26 }
27}
28
29class ThreadAlternateRun{
30 private int position = 1;
31 Lock lock = new ReentrantLock();
32 Condition conditionA = lock.newCondition();
33 Condition conditionB = lock.newCondition();
34 Condition conditionC = lock.newCondition();
35
36 public void loopA(int round){
37 lock.lock();
38 try{
39 //判断
40 if(position != 1){
41 try {
42 conditionA.await();
43 } catch (InterruptedException e) {
44 e.printStackTrace();
45 }
46 }
47
48 //打印
49 for (int i = 1; i <=3 ; i++) {
50 System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
51 }
52
53 //唤醒
54 position = 2;
55 conditionB.signal();
56 }finally {
57 lock.unlock();
58 }
59 }
60
61 public void loopB(int round){
62 lock.lock();
63 try{
64 //判断
65 if(position != 2){
66 try {
67 conditionB.await();
68 } catch (InterruptedException e) {
69 e.printStackTrace();
70 }
71 }
72
73 //打印
74 for (int i = 1; i <=3 ; i++) {
75 System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
76 }
77
78 //唤醒
79 position = 3;
80 conditionC.signal();
81 }finally {
82 lock.unlock();
83 }
84 }
85
86 public void loopC(int round){
87 lock.lock();
88 try{
89 //判断
90 if(position != 3){
91 try {
92 conditionC.await();
93 } catch (InterruptedException e) {
94 e.printStackTrace();
95 }
96 }
97
98 //打印
99 for (int i = 1; i <=3 ; i++) {
100 System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
101 }
102
103 //唤醒
104 position = 1;
105 conditionA.signal();
106 }finally {
107 lock.unlock();
108 }
109 }
110}