TheadPoolExecutor源码分析

TheadPoolExecutor源码分析

ThreadPoolExecutor是多线程中经常用到的类,其使用一个线程池执行提交的任务。

实现

没有特殊需求的情况下,通常都是用Executors类的静态方法如newCachedThreadPoll来初始化ThreadPoolExecutor实例:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从Executors的方法实现中看出,BlockingQueue使用的SynchronousQueue,底层使用了栈的实现。值得注意的是,这个SynchronousQueue是没有容量限制的,Executors也将maximumPoolSize设为Integer.MAX_VALUE。

ThreadPoolExecutor的构造方法:

按照javadoc的解释:

除去第一个做任务非空检查的if。

第二个if,检查当前使用的线程数是否超过corePoolSize。未超过,调用addWorker并指定第二个参数为true。addWorker会再次检查线程数是否超过corePoolSize,如果还未超过,则创建一个新的线程执行任务。

第三个if,当目前使用的线程数大于等于corePoolSize,将任务保存到workQueue中。保存成功,再次检查是否需要再创建一个线程。

最后一个else,调用addWorker并指定第二个参数为false。在创建线程前,检查当时线程数是否超过maximumPoolSize,为超过则创建一个新的线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
  ...
}

问题

一般场景下,不能使用Integer.MAX_VALUE如此大的线程数。所以需要使用构造器自己进行实例化。

如指定corePoolSize=5、maximumPoolSize=20

、keepAliveTime=60L、unit=TimeUnit.SECONDS、workQueue=new SynchronousQueue()。

但是实际执行的时候,线程数一直是5。

回头看ThreadPoolExecutor的实现,如果想要达到我们想要的效果需要程序进入最后的那个else。那重点就在第三个if里的workQueue.offer(command)。

看BlockingQueue接口中该方法的描述:将元素插入到队列中,没有超过容量限制则插入并返回true。

而使用的SynchronousQueue底层实现使用的栈没有容量限制,这就是为什么线程池中的线程数一直是corePoolSize。

Comments

comments powered by Disqus