TheadPoolExecutor源码分析

TheadPoolExecutor源码分析

ThreadPoolExecutor是多线程中经常用到的类,其使用一个线程池执行提交的任务。

实现

没有特殊需求的情况下,通常都是用Executors类的静态方法如newCachedThreadPoll来初始化ThreadPoolExecutor实例:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从Executors的方法实现中看出,BlockingQueue使用的SynchronousQueue,底层使用了栈的实现。值得注意的是,这个SynchronousQueue是没有容量限制的,Executors也将maximumPoolSize设为Integer.MAX_VALUE。

ThreadPoolExecutor的构造方法:

按照javadoc的解释:

  • corePoolSize是池中闲置的最小线程数
  • maximumPoolSize是池中允许的最大线程数
  • keepAliveTime是线程数大于最小线程数时,过量闲置线程的最大存活时间
  • unit是上面存活时间的单位
  • workQueue是用来暂时保存运行前的任务
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

除去第一个做任务非空检查的if。

第二个if,检查当前使用的线程数是否超过corePoolSize。未超过,调用addWorker并指定第二个参数为true。addWorker会再次检查线程数是否超过corePoolSize,如果还未超过,则创建一个新的线程执行任务。

第三个if,当目前使用的线程数大于等于corePoolSize,将任务保存到workQueue中。保存成功,再次检查是否需要再创建一个线程。

最后一个else,调用addWorker并指定第二个参数为false。在创建线程前,检查当时线程数是否超过maximumPoolSize,为超过则创建一个新的线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
  ...
}

问题

一般场景下,不能使用Integer.MAX_VALUE如此大的线程数。所以需要使用构造器自己进行实例化。

如指定corePoolSize=5、maximumPoolSize=20

、keepAliveTime=60L、unit=TimeUnit.SECONDS、workQueue=new SynchronousQueue()。

但是实际执行的时候,线程数一直是5。

回头看ThreadPoolExecutor的实现,如果想要达到我们想要的效果需要程序进入最后的那个else。那重点就在第三个if里的workQueue.offer(command)。

看BlockingQueue接口中该方法的描述:将元素插入到队列中,没有超过容量限制则插入并返回true。

而使用的SynchronousQueue底层实现使用的栈没有容量限制,这就是为什么线程池中的线程数一直是corePoolSize。

comments powered by Disqus