TheadPoolExecutor源码分析
TheadPoolExecutor源码分析
ThreadPoolExecutor是多线程中经常用到的类,其使用一个线程池执行提交的任务。
实现
没有特殊需求的情况下,通常都是用Executors类的静态方法如newCachedThreadPoll来初始化ThreadPoolExecutor实例:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从Executors的方法实现中看出,BlockingQueue使用的SynchronousQueue,底层使用了栈的实现。值得注意的是,这个SynchronousQueue是没有容量限制的,Executors也将maximumPoolSize设为Integer.MAX_VALUE。
ThreadPoolExecutor的构造方法:
按照javadoc的解释:
- corePoolSize是池中闲置的最小线程数
- maximumPoolSize是池中允许的最大线程数
- keepAliveTime是线程数大于最小线程数时,过量闲置线程的最大存活时间
- unit是上面存活时间的单位
- workQueue是用来暂时保存运行前的任务
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
除去第一个做任务非空检查的if。
第二个if,检查当前使用的线程数是否超过corePoolSize。未超过,调用addWorker并指定第二个参数为true。addWorker会再次检查线程数是否超过corePoolSize,如果还未超过,则创建一个新的线程执行任务。
第三个if,当目前使用的线程数大于等于corePoolSize,将任务保存到workQueue中。保存成功,再次检查是否需要再创建一个线程。
最后一个else,调用addWorker并指定第二个参数为false。在创建线程前,检查当时线程数是否超过maximumPoolSize,为超过则创建一个新的线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
...
}
问题
一般场景下,不能使用Integer.MAX_VALUE如此大的线程数。所以需要使用构造器自己进行实例化。
如指定corePoolSize=5、maximumPoolSize=20
、keepAliveTime=60L、unit=TimeUnit.SECONDS、workQueue=new SynchronousQueue
但是实际执行的时候,线程数一直是5。
回头看ThreadPoolExecutor的实现,如果想要达到我们想要的效果需要程序进入最后的那个else。那重点就在第三个if里的workQueue.offer(command)。
看BlockingQueue接口中该方法的描述:将元素插入到队列中,没有超过容量限制则插入并返回true。
而使用的SynchronousQueue底层实现使用的栈没有容量限制,这就是为什么线程池中的线程数一直是corePoolSize。