谈谈Java的线程池

mark

本文主要讲述了使用线程池的好处,Executors创建的五种线程池特点,简单介绍了Fork/Join框架。围绕Executor框架展开,阐述了线程池的工作流程,探讨了ThreadPoolExecutor的全部构造参数和意义,以及阿里巴巴不推荐使用Executors创建线程池的原因,另外,介绍了我们应该怎么样合理的创建线程池,对于CPU密集型和IO密集型以及混合型的创建方式。探讨了新任务提交后的执行流程,另外简单画了一下线程池生命周期图。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。开发 中使用线程池的三个优点如下: 1、降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁带来的消耗。 2、提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。 3、提高线程的可管理性:使用线程池可以统一进行线程分配、调度和监控。

Executors创建的五种线程池

利用Executors创建不同的线程池满足不同场景的需求

1、newFixedThreadPool(int nThreads): 指定工作线程数量的线程池, 如果线程池中正在执行的任务达到设置的线程最大数(无可用线程),则新得任务会放到阻塞队列里等待,有可用线程时按顺序执行。

2、newCachedThreadPool():处理大量短时间工作任务的线程池

  • 试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;
  • 如果线程闲置的时间超过阈值,则会被终止并移出缓存;
  • 系统长时间闲置的时候,不会消耗什么资源

3、newSingleThreadExecutor():创建唯一的工作线程来执行任务,如果线程异常结束,会有另一个线程取代它

4、newSingleThreadScheduledExecutor()与newScheduledThreadPool(int corePoolSize):定时或者周期性的工作调度,两者的区别在于单一工作线程还是多个线程

5、newWorkStealingPool():内部会构建ForkJoinPool,利用working-stealing算法,并行地处理任务,不保证处理顺序

Fork/Join框架

把大任务分割成若干个小任务并行执行,最终汇总每个小任务结果后得到大任务结果的框架。newWorkStealingPool()就是实现了Fork/Join框架的线程池,什么时working-stealing算法呢?简单来说就是某个线程从其他队列里窃取任务来执行。

mark

Executor框架

在 Java 5 之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor 框架便是 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

mark

J.U.C的三个Executor接口

  • Executor:运行新任务的简单接口,将任务提交和任务执行细节解耦
  • ExecutorService:具备管理执行器和任务生命周期的方法,提交任务机制更完善
  • ScheduledExecutorService:支持Future和定期执行任务

ThreadPoolExecutor

ThreadPoolExecutor顾名思义,是一个线程池管理工具类,该类主要提供了任务管理,线程的调度和相关的hook方法来控制线程池的状态。

mark

如上图,有新的任务提交的时候,如果线程池已经处于shutdown状态,此时新任务会被拒绝,用户可以通过实现RejectExecutionHandler来自定义处理逻辑。如果没有被拒绝,那么新任务将被放置在WorkQueue中,WorkQueue就是工作队列,里面存储了将要执行的任务,任务将被添加到线程池的工作线程中去执行。WorkQueue的数据结构各不相同,但是作用都是暂时存储用户提交的任务:

mark

然后由线程池对象去执行调度这些任务,我们可以再看看Worker:

mark

这里的线程由线程工厂所创建。

ThreadPoolExecutor构造函数

下面我们可以看看ThreadPoolExecutor的构造函数

 1public ThreadPoolExecutor(int corePoolSize,
 2                          int maximumPoolSize,
 3                          long keepAliveTime,
 4                          TimeUnit unit,
 5                          BlockingQueue<Runnable> workQueue,
 6                          ThreadFactory threadFactory,
 7                          RejectedExecutionHandler handler) {
 8    if (corePoolSize < 0 ||
 9        maximumPoolSize <= 0 ||
10        maximumPoolSize < corePoolSize ||
11        keepAliveTime < 0)
12        throw new IllegalArgumentException();
13    if (workQueue == null || threadFactory == null || handler == null)
14        throw new NullPointerException();
15    this.acc = System.getSecurityManager() == null ?
16        null :
17    AccessController.getContext();
18    this.corePoolSize = corePoolSize;
19    this.maximumPoolSize = maximumPoolSize;
20    this.workQueue = workQueue;
21    this.keepAliveTime = unit.toNanos(keepAliveTime);
22    this.threadFactory = threadFactory;
23    this.handler = handler;
24}

corePoolSize:核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制,除非将allowCoreThreadTimeOut设置为true。

maximumPoolSize:线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。

keepAliveTime:非核心线程的闲置超时时间,超过这个时间就会被回收。

unit:指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。

workQueue:线程池中的任务队列,常用的有三种队列

  • SynchronousQueue:是一种无缓冲的等待队列,在某次添加元素后必须等待其他线程取走后才能继续添加;
  • LinkedBlockingDeque:是一个无界缓存的等待队列,不指定容量则为Integer最大值,锁是分离的;
  • ArrayBlockingQueue:是一个有界缓存的等待队列,必须指定大小,锁是没有分离的;

threadFactory:线程工厂,提供创建新线程的功能,通过线程工厂可以对线程的一些属性进行定制。

RejectedExecutionHandler:当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法,线程池有以下四种拒绝策略。

  • AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectedExecutionException 异常。
  • CallerRunsPolicy:当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
  • DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
  • DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

为什么不推荐Executors创建线程池

1、对于SingleThreadExecutor,SingleThreadExecutor是单线程线程池,只有一个核心线程:

1public static ExecutorService newSingleThreadExecutor() {
2    return new FinalizableDelegatedExecutorService
3        (new ThreadPoolExecutor(1, 1,
4                                0L, TimeUnit.MILLISECONDS,
5                                new LinkedBlockingQueue<Runnable>()));
6}

所以当一个任务提交时,首先会创建一个核心线程来执行任务,如果超过核心线程的数量,将会放入队列中,因为LinkedBlockingQueue是长度为Integer.MAX_VALUE的队列,可以认为是无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起OOM异常,同时因为无界队列,maximumPoolSize和keepAliveTime参数将无效,压根就不会创建非核心线程。

mark

2、对于FixedThreadPool,FixedThreadPool是固定核心线程的线程池,固定核心线程数由用户传入。

1public static ExecutorService newFixedThreadPool(int nThreads) {
2    return new ThreadPoolExecutor(nThreads, nThreads,
3                                  0L, TimeUnit.MILLISECONDS,
4                                  new LinkedBlockingQueue<Runnable>());
5}

它和SingleThreadExecutor类似,唯一的区别就是核心线程数不同,并且由于使用的是LinkedBlockingQueue,在资源有限的时候容易引起OOM异常。

3、对于CachedThreadPool,CachedThreadPool是一个根据需要创建新线程的线程池。

1public static ExecutorService newCachedThreadPool() {
2    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                  60L, TimeUnit.SECONDS,
4                                  new SynchronousQueue<Runnable>());
5}

当一个任务提交时,corePoolSize为0不创建核心线程,SynchronousQueue是一个不存储元素的队列,可以理解为队里永远是满的,其中每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。因此最终会创建非核心线程来执行任务。

对于非核心线程空闲60s时将被回收,第二个参数是线程池所能容纳的最大线程数,因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常。

综上所述,不推荐使用Executors创建线程池,自己把控线程池参数比较好!

线程池大小如何选定

既然不推荐Executors创建线程池,那么我们应该如何选择创建线程池的参数呢?

CPU密集型:线程数 = 按照核数或者核数 + 1 , CPU数量可以根据Runtime.availableProcessors方法获取。

I/O密集型:线程数 = CPU核数 * (1 + 平均等待时间/平均工作时间)。

混合型: 将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整。

创建线程池的阻塞队列推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生。

拒绝策略可以默认,也可以根据需要自定义策略。

新任务提交execute执行后的判断

1、如果运行的线程少于corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;

2、如果线程池中的线程数量大于等于corePoolSize且小于maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;

3、如果设置的corePoolSize和maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;

4、如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;

mark

线程池的5种状态

  • RUNNING:能接受新提交的任务, 并且也能处理阻塞队列中的任务
  • SHUTDOWN:不再接受新提交的任务,但可以处理存量任务
  • STOP:不再接受新提交的任务,也不处理存量任务
  • TIDYING:所有的任务都已终止
  • TERMINATED:terminated()方法执行完后进入该状态

mark