16-线程间通信

线程间的通信是使多个线程成为整体的必用方案之一,需要着重掌握的技术点如下:

  • 使用 wait/notify 实现线程间的通信。
  • 生产者/消费者模式的实现。
  • 利用管道进行线程间通信
  • 使用阻塞队列 BlockingQueue

等待/通知 机制

举个例子来说明 等待/通知 机制,比如厨师和服务员之间的交互要在 “菜品传递台” 上,会有如下几个问题:

  • 厨师做完一道菜的时间不确定,所以厨师将菜品放到 “菜品传递台” 上的时间也不确定。
  • 服务员取到菜的时间却决于厨师,所以服务员就有 “等待(wait)” 的状态。
  • 服务员如何能取到菜呢?这又取决于厨师,厨师将菜放到 “菜品传递台” 上,其实就相当于一种 “通知(notify)”,这时服务员才可以拿到菜并交给顾客。
  • 这整个过程中出现了 “等待/通知” 机制。

方法 wait() 是 Object 类的方法,其作用是使当前执行代码的线程进行等待。该方法用来将当前线程置入 “预执行队列” 中,并且在 wait() 所在的代码行处停止执行,直到接到通知或被中断为止。

在调用 wait() 之前,线程必须先获得该对象的对象锁,即只能在同步方法或同步块中调用 wait() 方法。在执行 wait() 方法后,当前线程释放锁。 在从 wait() 返回前,线程与其他线程竞争重新获得锁。如果调用 wait() 时没有持有适当的锁,会引发 IllegalMonitorStateException

当线程处于 wait() 状态时,调用线程对象的 interrupt() 方法会引发 InterruptedException 异常。

带参数的 wait(long) 方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒。

方法 notify() 也要在同步方法或在同步块中调用,调用前必须先获得对象的锁,否则也会抛出 IllegalMonitorStateException。该方法用来通知那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,则由线程规划器随机挑选出其中一个处于 wait() 状态的线程,对其发出通知 notify,被重新唤醒的线程会试图重新获取对象的锁。

在执行 notify() 方法后,当前线程不会马上释放对象锁,处于 wait() 状态的线程也不能马上获取该对象锁,要等到执行 notify() 方法的线程将程序执行完,也就是退出 synchronized 代码块后,当前线程才会释放锁,而处于 wait() 状态的线程才可以获取该对象锁。

当第一个获得该对象锁的 wait() 线程运行完毕以后,它会释放该对象锁,此时如果没有再次使用 notify() 语句,则其他处于 wait() 状态的线程由于没有得到该对象的通知,还会继续阻塞在 wait 状态。

每个锁对象都有两个队列,一个是 就绪队列 ,一个是 阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待 CPU 的调度;反之,一个线程被 wait 后,就会进入阻塞队列进行等待,等待下一次被唤醒。

生产中/消费者 模式实现

等待/通知 的经典案例就是 “生产者/消费者” 模式。但此模式在使用上有几种变形,不过原理都是一样的。

在多个生产者,多个消费者的模式中,会出现一种情况:在代码中确实已经通过 wait/notify 进行了通信,但不保证 notify 唤醒的是异类,也许是同类,比如 “生产者” 唤醒了 “生产者”,或 “消费者” 唤醒了 “消费者”。如果按这样的情况运行的比率积少成多,就会导致所有的线程都不能继续运行下去,大家都在等待,都处于 WAITING 状态。

假死出现的主要原因是有可能连续唤醒同类,因此把 notify() 改写成 notifyAll() 方法即可。

使用阻塞队列 BlonkingQueue 控制线程通信

BlockingQueue接口虽然也是 Queue 的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。BlockingQueue 具有一个特征:当生产者线程试图向 BlockingQueue 中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从 BlockingQueue 中取出元素时,如果该队列已空,则该线程被阻塞。

两个线程交替向 BlockingQueue 中放入元素、取出元素,即可很好地控制线程通信。BlockingQueue 提供了如下两个支持阻塞的方法:

  • put(E e):尝试把元素 E 放入 BlockingQueue 中,如果该队列已满,则阻塞该线程。
  • take():尝试从 BlockingQueue 的头部取出元素,如果该队列的元素已空,则阻塞该线程。

BlockingQueue 包含5个实现类:

  • ArrayBlockingQueue:基于数组实现。
  • LinkedBlockingQueue:基于链表实现。
  • PriorityBlockingQueue:它并不是标准的阻塞队列。与 PriorityQueue 类似,该队列调用 remove()、poll()、take() 等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。元素大小的判断基于 Comparable 接口实现自然排序,也可使用 Comparator 进行定制排序。
  • SynchronousQueue:同步队列,对该队列的存、取操作必须交替进行。
  • DelayQueue:底层基于 PriorityBlockingQueue 实现,不过 DelayQueue 要求集合元素都实现 Delay 接口。

通过管道进行线程间通信

管道流 pipeStream 是一种特殊的流,用于在线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道中读取数据。通过管道,实现不同线程间的通信,而无需借助类似于临时文件之类的东西。

Java 提供了 4 个类来使线程间可以进行通信:

  • PipedInputStream 和 PipedOutputStream
  • PipedReader 和 PipedWriter