虚假唤醒与Condition控制线程通信
本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。
生产者消费者示例
首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。
//店员类
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
if (product >= 5) {
System.out.println("库存已满");
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
}
}
// 售货
public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
} else {
System.out.println(Thread.currentThread().getName() + ":" + --product);
}
}
}
// 生产者类
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.get();
}
}
}
//消费者类
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}
public class ProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Productor A").start();
new Thread(consumer,"Consumer B").start();
}
}
这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:
//店员类
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
if (product >= 5) {
System.out.println("库存已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
}
// 售货
public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}
}
// 生产者类
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.get();
}
}
}
//消费者类
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}
public class ProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"Productor A").start();
new Thread(consumer,"Consumer B").start();
}
}
再运行程序,就不会再出现上述的情况:
现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):
//店员类
class Clerk {
private int product = 0;
// 将产品的囤积上限设定为1
public synchronized void get() {
if (product >= 1) {
System.out.println("库存已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
}
}
程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。
解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:
目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?
//店员类
class Clerk {
private int product = 0;
public synchronized void get() {
if (product >= 1) {
System.out.println("库存已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
public synchronized void sale() {
if (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}
// 生产者类
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者类
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}
public class ProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}
但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。
在Object.wait()方法的javadoc中叙述了该如何解决这种问题:
意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:
//店员类
class Clerk {
private int product = 0;
public synchronized void get() {
while (product >= 1) {
System.out.println("库存已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();
}
public synchronized void sale() {
while (product <= 0) {
System.out.println("库存缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();
}
}
// 生产者类
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者类
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}
public class ProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}
Condition控制线程间通信
由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//店员类
class Clerk {
private int product = 0;
private Lock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
public void get() {
reentrantLock.lock();
try {
while (product >= 1) {
System.out.println("库存已满");
try {
//this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
//this.notifyAll();
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
public void sale() {
reentrantLock.lock();
try {
while (product <= 0) {
System.out.println("库存缺货");
try {
//this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
//this.notifyAll();
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
}
// 生产者类
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者类
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk.sale();
}
}
}
public class ProductorAndConsumerForLock {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor, "Productor A").start();
new Thread(consumer, "Consumer B").start();
new Thread(productor, "Productor C").start();
new Thread(consumer, "Consumer D").start();
}
}
效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。
其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。
Condition实现线程交替执行
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//线程交替执行
public class ThreadAlternate {
public static void main(String[] args) {
ThreadAlternateRun alternateRun = new ThreadAlternateRun();
new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopA(i);
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopB(i);
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 20; i++) {
alternateRun.loopC(i);
System.out.println("---------------------------------");
}
}, "C").start();
}
}
class ThreadAlternateRun{
private int position = 1;
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
public void loopA(int round){
lock.lock();
try{
//判断
if(position != 1){
try {
conditionA.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}
//唤醒
position = 2;
conditionB.signal();
}finally {
lock.unlock();
}
}
public void loopB(int round){
lock.lock();
try{
//判断
if(position != 2){
try {
conditionB.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}
//唤醒
position = 3;
conditionC.signal();
}finally {
lock.unlock();
}
}
public void loopC(int round){
lock.lock();
try{
//判断
if(position != 3){
try {
conditionC.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印
for (int i = 1; i <=3 ; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
}
//唤醒
position = 1;
conditionA.signal();
}finally {
lock.unlock();
}
}
}