虚假唤醒与Condition控制线程通信

本文通过生产者消费者模型主要讲述了什么是虚假唤醒,以及处理处理虚假唤醒。另外还使用了Condition 来控制线程间的通信,Condition接口描述了可能会与锁有关联的条件变量,这些变量在用法上与使用Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock 可能与多个Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的Object 版本中的不同。 在Condition 对象中,与wait、notify 和notifyAll 方法对应的分别是await、signal 和signalAll。 Condition允许发生虚假唤醒,这通常作为对基础平台语义的让步。不过Condition还是应该总是在一个循环中被等待,避免虚假唤醒的发生。

生产者消费者示例

首先引入下面这段生产者和消费者的程序,店员类作为生产产品和消费产品的中介,其中的数据product为共享数据,产品最多只能囤积5个,当产品达到5个还在生产时,就会提示库存已满,类似地,如果产品只有0个了还在消费,会提示库存缺货。

//店员类
class Clerk {
    private int product = 0;

    // 进货
    public synchronized void get() {
        if (product >= 5) {
            System.out.println("库存已满");
        } else {
            System.out.println(Thread.currentThread().getName() + ":" + ++product);
        }
    }

    // 售货
    public synchronized void sale() {
        if (product <= 0) {
            System.out.println("库存缺货");
        } else {
            System.out.println(Thread.currentThread().getName() + ":" + --product);
        }
    }
}

// 生产者类
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.get();
        }

    }
}

//消费者类
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }
    }
}

public class ProductorAndConsumer {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Productor A").start();
        new Thread(consumer,"Consumer B").start();
    }
}

这是一种不好的情况,因为当产品已满时,还在不停地生产,当缺货时,还在不停地消费。为此,我们引入等待唤醒机制:

//店员类
class Clerk {
    private int product = 0;

    // 进货
    public synchronized void get() {
        if (product >= 5) {
            System.out.println("库存已满");

            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + ":" + ++product);
            this.notifyAll();
        }
    }

    // 售货
    public synchronized void sale() {
        if (product <= 0) {
            System.out.println("库存缺货");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + ":" + --product);
            this.notifyAll();
        }
    }
}

// 生产者类
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.get();
        }
    }
}

//消费者类
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }

    }
}

public class ProductorAndConsumer {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor,"Productor A").start();
        new Thread(consumer,"Consumer B").start();
    }
}

再运行程序,就不会再出现上述的情况:

现在,我们将产品的囤积上限设定为1(这种情况在现实中也是有可能出现的):

//店员类
class Clerk {
    private int product = 0;

    // 将产品的囤积上限设定为1
    public synchronized void get() {
        if (product >= 1) {
            System.out.println("库存已满");

            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + ":" + ++product);
            this.notifyAll();
        }
    }
}

程序的输出貌似没有问题,但请注意图中箭头所指的地方,这表示程序没有结束,还一直在执行。这是因为,当循坏到最后一轮时,由于产品已满引发了wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程,此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行,但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。

解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:

目前是没问题的,但是如果现在有两个(多个)消费者线程和生产者线程,并且我们在生产者类的run方法中添加一个sleep()方法的执行,情况会如何呢?

//店员类
class Clerk {
    private int product = 0;

    public synchronized void get() {
        if (product >= 1) {
            System.out.println("库存已满");

            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":" + ++product);
        this.notifyAll();
    }

    public synchronized void sale() {
        if (product <= 0) {
            System.out.println("库存缺货");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":" + --product);
        this.notifyAll();
    }
}

// 生产者类
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            clerk.get();
        }
    }
}

// 消费者类
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }
    }
}

public class ProductorAndConsumer {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor, "Productor A").start();
        new Thread(consumer, "Consumer B").start();
        new Thread(productor, "Productor C").start();
        new Thread(consumer, "Consumer D").start();
    }
}

但是结果明显是不对的, 错误的原因在于,当一个消费者线程遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。这时候,生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程消费一个产品使得产品为0,另外一个消费者线程再消费一个产品使得产品变为了负数,这种现象称为虚假唤醒。

在Object.wait()方法的javadoc中叙述了该如何解决这种问题:

意思即将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:

//店员类
class Clerk {
    private int product = 0;

    public synchronized void get() {
        while (product >= 1) {
            System.out.println("库存已满");

            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":" + ++product);
        this.notifyAll();
    }

    public synchronized void sale() {
        while (product <= 0) {
            System.out.println("库存缺货");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":" + --product);
        this.notifyAll();
    }
}

// 生产者类
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clerk.get();
        }
    }
}

// 消费者类
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }
    }
}

public class ProductorAndConsumer {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor, "Productor A").start();
        new Thread(consumer, "Consumer B").start();
        new Thread(productor, "Productor C").start();
        new Thread(consumer, "Consumer D").start();
    }
}

Condition控制线程间通信

由于我们之前用的是synchronized做的同步,接下来使用Condition控制线程通信试试:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//店员类
class Clerk {
    private int product = 0;
    private Lock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();

    public void get() {
        reentrantLock.lock();
        try {
            while (product >= 1) {
                System.out.println("库存已满");
                try {
                    //this.wait();
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + ":" + ++product);
            //this.notifyAll();
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }
    }

    public void sale() {
        reentrantLock.lock();
        try {
            while (product <= 0) {
                System.out.println("库存缺货");
                try {
                    //this.wait();
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + ":" + --product);
            //this.notifyAll();
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }
    }
}

// 生产者类
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clerk.get();
        }
    }
}

// 消费者类
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }
    }
}

public class ProductorAndConsumerForLock {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();

        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor, "Productor A").start();
        new Thread(consumer, "Consumer B").start();
        new Thread(productor, "Productor C").start();
        new Thread(consumer, "Consumer D").start();
    }
}

效果依旧,即在使用Condition的时候,之前的wait使用了Condition对象的await()方法,而notify和notifyAll使用Condition对象的signal()和signalAll()来搞定。

其中,Lock 替代了 synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。

Condition实现线程交替执行

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//线程交替执行
public class ThreadAlternate {
    public static void main(String[] args) {
        ThreadAlternateRun alternateRun = new ThreadAlternateRun();

        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                alternateRun.loopA(i);
            }
        }, "A").start();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                alternateRun.loopB(i);
            }
        }, "B").start();
        new Thread(()->{
            for (int i = 0; i < 20; i++) {
                alternateRun.loopC(i);
                System.out.println("---------------------------------");
            }
        }, "C").start();
    }
}

class ThreadAlternateRun{
    private int position = 1;
    Lock lock = new ReentrantLock();
    Condition conditionA = lock.newCondition();
    Condition conditionB = lock.newCondition();
    Condition conditionC = lock.newCondition();

    public void loopA(int round){
        lock.lock();
        try{
            //判断
            if(position != 1){
                try {
                    conditionA.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //打印
            for (int i = 1; i <=3 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
            }

            //唤醒
            position = 2;
            conditionB.signal();
        }finally {
            lock.unlock();
        }
    }

    public void loopB(int round){
        lock.lock();
        try{
            //判断
            if(position != 2){
                try {
                    conditionB.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //打印
            for (int i = 1; i <=3 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
            }

            //唤醒
            position = 3;
            conditionC.signal();
        }finally {
            lock.unlock();
        }
    }

    public void loopC(int round){
        lock.lock();
        try{
            //判断
            if(position != 3){
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //打印
            for (int i = 1; i <=3 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + round);
            }

            //唤醒
            position = 1;
            conditionA.signal();
        }finally {
            lock.unlock();
        }
    }
}