编辑
2020-04-03
编程语言
00
请注意,本文编写于 1269 天前,最后修改于 105 天前,其中某些信息可能已经过时。

目录

生产者消费者示例
Condition控制线程间通信
Condition实现线程交替执行

mark

本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。

生产者消费者示例

首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。

java
//店员类 class Clerk { private int product = 0; // 进货 public synchronized void get() { if (product >= 5) { System.out.println("库存已满"); } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); } } // 售货 public synchronized void sale() { if (product <= 0) { System.out.println("库存缺货"); } else { System.out.println(Thread.currentThread().getName() + ":" + --product); } } } // 生产者类 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.get(); } } } //消费者类 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.sale(); } } } public class ProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Productor A").start(); new Thread(consumer,"Consumer B").start(); } }

mark

这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:

java
//店员类 class Clerk { private int product = 0; // 进货 public synchronized void get() { if (product >= 5) { System.out.println("库存已满"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } } // 售货 public synchronized void sale() { if (product <= 0) { System.out.println("库存缺货"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } } } // 生产者类 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.get(); } } } //消费者类 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.sale(); } } } public class ProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Productor A").start(); new Thread(consumer,"Consumer B").start(); } }

再运行程序,就不会再出现上述的情况:

mark

现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):

java
//店员类 class Clerk { private int product = 0; // 将产品的囤积上限设定为1 public synchronized void get() { if (product >= 1) { System.out.println("库存已满"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } } }

mark

程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。

解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:

mark

目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?

java
//店员类 class Clerk { private int product = 0; public synchronized void get() { if (product >= 1) { System.out.println("库存已满"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } public synchronized void sale() { if (product <= 0) { System.out.println("库存缺货"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } } // 生产者类 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } clerk.get(); } } } // 消费者类 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.sale(); } } } public class ProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor, "Productor A").start(); new Thread(consumer, "Consumer B").start(); new Thread(productor, "Productor C").start(); new Thread(consumer, "Consumer D").start(); } }

但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。

mark

在Object.wait()方法的javadoc中叙述了该如何解决这种问题:

mark

意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:

java
//店员类 class Clerk { private int product = 0; public synchronized void get() { while (product >= 1) { System.out.println("库存已满"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } public synchronized void sale() { while (product <= 0) { System.out.println("库存缺货"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } } // 生产者类 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } clerk.get(); } } } // 消费者类 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.sale(); } } } public class ProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor, "Productor A").start(); new Thread(consumer, "Consumer B").start(); new Thread(productor, "Productor C").start(); new Thread(consumer, "Consumer D").start(); } }

mark

Condition控制线程间通信

由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:

java
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //店员类 class Clerk { private int product = 0; private Lock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); public void get() { reentrantLock.lock(); try { while (product >= 1) { System.out.println("库存已满"); try { //this.wait(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); //this.notifyAll(); condition.signalAll(); }finally { reentrantLock.unlock(); } } public void sale() { reentrantLock.lock(); try { while (product <= 0) { System.out.println("库存缺货"); try { //this.wait(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); //this.notifyAll(); condition.signalAll(); }finally { reentrantLock.unlock(); } } } // 生产者类 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } clerk.get(); } } } // 消费者类 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 10; i++) { clerk.sale(); } } } public class ProductorAndConsumerForLock { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor, "Productor A").start(); new Thread(consumer, "Consumer B").start(); new Thread(productor, "Productor C").start(); new Thread(consumer, "Consumer D").start(); } }

效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。

mark

其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。

Condition实现线程交替执行

java
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //线程交替执行 public class ThreadAlternate { public static void main(String[] args) { ThreadAlternateRun alternateRun = new ThreadAlternateRun(); new Thread(()->{ for (int i = 0; i < 20; i++) { alternateRun.loopA(i); } }, "A").start(); new Thread(()->{ for (int i = 0; i < 20; i++) { alternateRun.loopB(i); } }, "B").start(); new Thread(()->{ for (int i = 0; i < 20; i++) { alternateRun.loopC(i); System.out.println("---------------------------------"); } }, "C").start(); } } class ThreadAlternateRun{ private int position = 1; Lock lock = new ReentrantLock(); Condition conditionA = lock.newCondition(); Condition conditionB = lock.newCondition(); Condition conditionC = lock.newCondition(); public void loopA(int round){ lock.lock(); try{ //判断 if(position != 1){ try { conditionA.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //打印 for (int i = 1; i <=3 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round); } //唤醒 position = 2; conditionB.signal(); }finally { lock.unlock(); } } public void loopB(int round){ lock.lock(); try{ //判断 if(position != 2){ try { conditionB.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //打印 for (int i = 1; i <=3 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round); } //唤醒 position = 3; conditionC.signal(); }finally { lock.unlock(); } } public void loopC(int round){ lock.lock(); try{ //判断 if(position != 3){ try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //打印 for (int i = 1; i <=3 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round); } //唤醒 position = 1; conditionA.signal(); }finally { lock.unlock(); } } }

mark

本文作者:Tim

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!