TheadPoolExecutor源码分析
TheadPoolExecutor源码分析
ThreadPoolExecutor是多线程中经常用到的类,其使用一个线程池执行提交的任务。
实现
没有特殊需求的情况下,通常都是用Executors类的静态方法如newCachedThreadPoll来初始化ThreadPoolExecutor实例:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
从Executors的方法实现中看出,BlockingQueue使用的SynchronousQueue,底层使用了栈的实现。值得注意的是,这个SynchronousQueue是没有容量限制的,Executors也将maximumPoolSize设为Integer.MAX_VALUE。
ThreadPoolExecutor的构造方法:
按照javadoc的解释:
- corePoolSize是池中闲置的最小线程数
- maximumPoolSize是池中允许的最大线程数
- keepAliveTime是线程数大于最小线程数时,过量闲置线程的最大存活时间
- unit是上面存活时间的单位
- workQueue是用来暂时保存运行前的任务
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
除去第一个做任务非空检查的if。
第二个if,检查当前使用的线程数是否超过corePoolSize。未超过,调用addWorker并指定第二个参数为true。addWorker会再次检查线程数是否超过corePoolSize,如果还未超过,则创建一个新的线程执行任务。
第三个if,当目前使用的线程数大于等于corePoolSize,将任务保存到workQueue中。保存成功,再次检查是否需要再创建一个线程。
最后一个else,调用addWorker并指定第二个参数为false。在创建线程前,检查当时线程数是否超过maximumPoolSize,为超过则创建一个新的线程。
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
  ...
}
问题
一般场景下,不能使用Integer.MAX_VALUE如此大的线程数。所以需要使用构造器自己进行实例化。
如指定corePoolSize=5、maximumPoolSize=20
、keepAliveTime=60L、unit=TimeUnit.SECONDS、workQueue=new SynchronousQueue
但是实际执行的时候,线程数一直是5。
回头看ThreadPoolExecutor的实现,如果想要达到我们想要的效果需要程序进入最后的那个else。那重点就在第三个if里的workQueue.offer(command)。
看BlockingQueue接口中该方法的描述:将元素插入到队列中,没有超过容量限制则插入并返回true。
而使用的SynchronousQueue底层实现使用的栈没有容量限制,这就是为什么线程池中的线程数一直是corePoolSize。
