Executor框架

Apr 11, 2016


Executor框架简介

两级调度模型

HotSpot VM的线程模型中, Java线程被一对一映射为本地操作系统中的线程. Java线程启动时就会创建一个本地操作系统线程. 当Java线程终止时, 操作系统线程也会被回收.

两级调度模型分别如下:

  • 在上层, Java多线程程序通常把应用分解为多个任务, 然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程
  • 在下层, 操作系统内核将这些线程映射到硬件处理器上

java-concurrency-executor-model

结构与成员

Executor的结构主要由3部分组成如下:

  • 任务 : 任务的表示通过实现Runnalbe接口和Callable接口
  • 任务的执行 : 任务执行机制的核心接口Executor, 以及继承了ExecutorExecutorService接口. Executor框架有两个关键类实现了ExecutorService接口
    • ThreadPoolExecutor
    • ScheduledThreadPoolExecutor
  • 异步计算的结果 : 包括Future和实现Future接口的FutureTask

Executor框架的类与接口如下:

java-concurrency-executor-framework-package

Executor框架的使用示意图如下:

java-concurrency-executor-usage

Executor框架的成员包括:

  • ThreadPoolExecutor : 通常使用Executors工厂类来创建
    • SingleThreadPool : 适用于需要保证各个任务顺序执行, 并且在任意时间点, 不会有多个线程是活动的场景
    • FixedThreadPool : 适用于为了满足资源管理的需求, 而限制当前线程数量的应用场景. 适用于负载较重的服务器
    • CachedThreadPool : 适用于执行很多短期异步任务的小程序, 或者负载较轻的服务器
  • ScheduledThreadPoolExecutor : 通常使用Executors工厂类来创建
    • ScheduledThreadPool : 适用于需要多个后台线程周期执行任务, 同时为了满足资源管理需求而限制后台线程数量的场景
    • SingleScheduledThreadPool : 适用于需要单个后台线程周期执行任务, 同时需要保证顺序执行各个任务的场景
  • Future : 当我们把RnnableCallable的实现类submitThreadPoolExecutorScheduledThreadPoolExecutor时, 会返回一个FutureTask对象, 表示异步计算的结果
  • Runnalbe和Callable : Runnable表示不会返回结果的任务, Callable表示可以返回结果的任务. Executors可以把Runnable包装为Callable

ThreadPoolExecutor详解

Executors框架的核心类是ThreadPoolExecutor, 它是线程池的实现类, 下面是参数的主要说明:

参数 说明
corePoolSize 核心线程池的大小
maximumPoolSize 最大线程池的大小
BlockingQueue 保存任务的工作对列
RejectedExecutionHandler 当任务无法执行时的处理机制

FixedThreadPool详解

FixedThreadPool的创建代码如下:

public static ExecutorService newSingleThreadPool( int nThreads ) {
    return new ThreadPoolExecutor( nThreads, nThreads, 
                                    0L, TimeUnit.MILLISECONDS, 
                                    new LinkedBlockingQueue<Runnable>() );

FixedThreadPool的运行示意图如下:

java-concurrency-fixed-thread-pool-execution

LinkedBlockingQueue是基于链表实现的, 所以该任务队列是一个无界队列, 因此

  • 线程池中的线程数量不会超过corePoolSize
  • maximumPoolSize是一个无效参数
  • keepAliveTime是一个无效参数
  • 不会拒绝任务(也就是不会调用RejectedExecutionHandler.rejectedExecution)

SingleThreadPool详解

SingleThreadPool的创建代码如下:

public static ExecutorService newFixedThreadPool() {
    return new ThreadPoolExecutor( 1, 1, 
                                    0L, TimeUnit.MILLISECONDS, 
                                    new LinkedBlockingQueue<Runnable>() );

SingleThreadPoolFixedThreadPool基本上一样, 只是corePoolSize的大小为1, 使用的队列也是无界队列, 所以具有FixedThreadPool一样的性质

SingleThreadPool的运行示意图如下:

java-concurrency-single-thread-pool-execution

CachedThreadPool详解

CachedThreadPool的创建代码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 
                                    60L, TimeUnit.SECONDS, 
                                    new SynchronousQueue<Runnable>() );

CachedThreadPool的运行示意图如下:

java-concurrency-cached-thread-pool-execution

CachedThreadPoolcorePoolSize被设置为0, 也就是corePool为空; maximumPoolSize被设置为Integer.MAX_VALUE, 也就是maximumPool是无界的.这里把keepAliveTime设置为60s, 意味着空闲线程等待新任务的最长时间为60秒, 空闲线程超过60秒就会被终止.

由于maximumPool是无界的, 如果主线程提交任务的速度高于maximumPool中线程处理任务的速度, 那么就会不断创建新线程, 极端情况下, 主机的CPU和内存资源将会被耗尽.

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor. 它主要用于在给定的延迟之后运行任务, 或者定期执行任务.

运行机制

ScheduledThreadPoolExecutor的运行示意图如下:

java-concurrency-scheduled-thread-pool-execution

实现

ScheduledThreadPoolExecutor的任务执行步骤如下:

java-concurrency-scheduled-thread-pool-execution-steps

步骤说明如下:

  1. 线程从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take()). 到期任务是指ScheduledFutureTasktime大于等于当前时间
  2. 执行ScheduledFutureTask
  3. 修改ScheduledFutureTasktime变量为下次将要执行的时间
  4. ScheduledFutureTask返回DelayQueue

DelayQueue.take()的执行步骤如下:

java-concurrency-scheduled-thread-pool-execution-delay-queue-take

  1. 获取Lock
  2. 获取周期任务

    2.1. 如果PriorityQueue为空, 当前线程到Condition中等待

    2.2. 如果PriorityQueue的头元素的time时间比当前时间大, 当前线程到Condition中等待到time时间

    2.3.1 获取PriorityQueue的头元素

    2.3.2 如果PriorityQueue不为空, 则唤醒Condition中等待的所有线程

  3. 释放Lock

DelayQueue.offer()的执行步骤如下:

java-concurrency-scheduled-thread-pool-execution-delay-queue-offer

  1. 获取Lock
  2. 添加任务

    2.1. 向PriorityQueue添加任务

    2.2. 如果添加到PriorityQueue的是头元素, 唤醒在Condition中等待的所有线程

  3. 释放Lock

FutureTask详解

简介

FutureTask代表异步计算的结果, 实现了Runnable接口和Future接口.

FutureTask有3种状态, 如下图所示:

java-concurrency-future-task-status

使用

FutureTask的使用方法有get()cancel(), 如下图所示:

java-concurrency-future-task-get-and-cancel

实现

FutureTask的实现基于AbstractQueuedSynchronizer(简称AQS). AQS是一个同步框架, 它提供通用机制来原子性管理同步状态, 阻塞, 唤醒线程, 以及维护被阻塞线程的队列.

基于AQS实现的同步器包括: ReentrantLock, Semaphore, ReentrantReadAndWriteLock, CoundDownLatch, FutureTask

每一个基于AQS实现的同步器都会包含两种类型的操作, 如下:

  • 至少一个acquire操作. 这个操作阻塞调用线程, 除非知道AQS的状态允许这个线程继续执行. FutureTaskacquire操作为get().
  • 至少一个release操作, 这个操作改变AQS的状态, 改变后的状态可允许一个或多个阻塞线程被解除阻塞. FutureTaskrelease操作为run()cancel(...).

下面是FutureTask的设计示意图:

java-concurrency-future-task-implementation

FutureTask的级联唤醒示意图:

java-concurrency-future-task-awake-chain


上一篇博客:Java中的线程池
下一篇博客:Add Two Numbers