0%

虚假唤醒与Condition控制线程通信

mark

本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。

生产者消费者示例

首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//店员类
class Clerk {
private int product = 0;

// 进货
public synchronized void get() {
if (product >= 5) {
System.out.println("库存已满");
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
}
}

// 售货
public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
} else {
System.out.println(Thread.currentThread().getName() + ":" + --product);
}
}
}

// 生产者类
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.get();
}

}
}

//消费者类
class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}

public class ProductorAndConsumer {

public static void main(String[] args) {
Clerk clerk = new Clerk();

Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

new Thread(productor,"Productor A").start();
new Thread(consumer,"Consumer B").start();
}
}

mark

这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//店员类
class Clerk {
private int product = 0;

// 进货
public synchronized void get() {
if (product >= 5) {
System.out.println("库存已满");

try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
}

// 售货
public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}
}

// 生产者类
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.get();
}
}
}

//消费者类
class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}

}
}

public class ProductorAndConsumer {

public static void main(String[] args) {
Clerk clerk = new Clerk();

Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

new Thread(productor,"Productor A").start();
new Thread(consumer,"Consumer B").start();
}
}

再运行程序,就不会再出现上述的情况:

mark

现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//店员类
class Clerk {
private int product = 0;

// 将产品的囤积上限设定为1
public synchronized void get() {
if (product >= 1) {
System.out.println("库存已满");

try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
}
}

mark

程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。

解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:

mark

目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//店员类
class Clerk {
private int product = 0;

public synchronized void get() {
if (product >= 1) {
System.out.println("库存已满");

try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}

public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}

// 生产者类
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
clerk.get();
}
}
}

// 消费者类
class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}

public class ProductorAndConsumer {

public static void main(String[] args) {
Clerk clerk = new Clerk();

Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}

但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。

mark

在Object.wait()方法的javadoc中叙述了该如何解决这种问题:

mark

意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//店员类
class Clerk {
private int product = 0;

public synchronized void get() {
while (product >= 1) {
System.out.println("库存已满");

try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}

public synchronized void sale() {
while (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}

// 生产者类
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}

// 消费者类
class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}

public class ProductorAndConsumer {

public static void main(String[] args) {
Clerk clerk = new Clerk();

Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}

mark

Condition控制线程间通信

由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//店员类
class Clerk {
private int product = 0;
private Lock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();

public void get() {
reentrantLock.lock();
try {
while (product >= 1) {
System.out.println("库存已满");
try {
//this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
//this.notifyAll();
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}

public void sale() {
reentrantLock.lock();
try {
while (product <= 0) {
System.out.println("库存缺货");
try {
//this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
//this.notifyAll();
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
}

// 生产者类
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}

// 消费者类
class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}

public class ProductorAndConsumerForLock {

public static void main(String[] args) {
Clerk clerk = new Clerk();

Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}

效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。

mark

其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。

Condition实现线程交替执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//线程交替执行
public class ThreadAlternate {
public static void main(String[] args) {
ThreadAlternateRun alternateRun = new ThreadAlternateRun();

new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopA(i);
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopB(i);
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopC(i);
System.out.println("---------------------------------");
}
}, "C").start();
}
}

class ThreadAlternateRun{
private int position = 1;
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();

public void loopA(int round){
lock.lock();
try{
//判断
if(position != 1){
try {
conditionA.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}

//唤醒
position = 2;
conditionB.signal();
}finally {
lock.unlock();
}
}

public void loopB(int round){
lock.lock();
try{
//判断
if(position != 2){
try {
conditionB.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}

//唤醒
position = 3;
conditionC.signal();
}finally {
lock.unlock();
}
}

public void loopC(int round){
lock.lock();
try{
//判断
if(position != 3){
try {
conditionC.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}

//唤醒
position = 1;
conditionA.signal();
}finally {
lock.unlock();
}
}
}

mark

  • 本文作者: Tim
  • 本文链接: https://zouchanglin.cn/2774350939.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!