暴力停止ExecutorService的线程
停止,stop,这里说的是真的停止。如何优雅的结束,这里就不提了。
这里要用Thread.stop()
。众所周知,stop()方法在JDK中是废弃的。
该方法天生是不安全的。使用thread.stop()停止一个线程,导致释放(解锁)所有该线程已经锁定的监视器(因沿堆栈向上传播的未检查异常ThreadDeath而解锁)。如果之前受这些监视器保护的任何对象处于不一致状态,则不一致状态的对象(受损对象)将对其他线程可见,这可能导致任意的行为。
有时候我们会有这种需求,不需要考虑线程执行到哪一步。一般这种情况是外部执行stop,比如执行业务的线程因为各种原因假死或者耗时较长,由于设计问题又无法响应优雅的停止指令。
现在大家在项目中都很少直接使用线程,而是通过concurrent包中的类来实现多线程,例如ExecutorService的各种实现类。
一个简单的停止线程的例子:
public class ExecutorServiceTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
final AtomicReference<Thread> t = new AtomicReference<>();
Future<?> firstFuture = executor.submit(new Runnable() {
public void run() {
Thread currentThread = Thread.currentThread();
t.set(currentThread);
while (true) {
}
}
});
try {
firstFuture.get(500, TimeUnit.MILLISECONDS);
} catch (Exception e) {
while (t.get().isAlive()) {
t.get().stop();
TimeUnit.MILLISECONDS.sleep(50);
}
}
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("submit again");
}
});
executor.shutdown();
}
}
如果你运行了上面的代码就会发现程序假死了,通过stack dump看是发生了死锁:
"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007fa91006e800 nid=0x5903 waiting on condition [0x00007000060f8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076ab76ea0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
死锁发生在第二次submit后,在LinkedBlockingQueue.take()时,LinkedBlockingQueue在ThreadPoolExecutor中用来暂存task的。真正执行任务线程的时候再从队列中取出。我们都知道LinkedBlockingQueue是线程的安全的,其高并发和线程安全是通过一个ReentrantLock代替内置锁来实现的(减小了锁的粒度)。submit第二个task时,再次执行take会再次获取锁。但是由于stop直接杀死了线程,没有释放当次执行take方法时获取ReentrantLock锁,导致了死锁。
stop直接停止了线程,抛出了ThreadDeath
。ThreadDeath
是Error,不是Exception。
public class ThreadDeath extends Error {
private static final long serialVersionUID = -4417128565033088268L;
}
这种情况下,原有的ExecutorService实例就不能再使用了,因为我们无法通过程序来释放未释放的锁(由虚拟机的GC来解决)。如此,便需要重建ExecutorService实例。
对上面的代码做了修改:
public class ExecutorServiceTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
final AtomicReference<ExecutorService> es = new AtomicReference<>();
es.set(executor);
final AtomicReference<Thread> t = new AtomicReference<>();
Future<?> future = es.get().submit(new Runnable() {
public void run() {
Thread currentThread = Thread.currentThread();
t.set(currentThread);
currentThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof ThreadDeath || e instanceof IllegalMonitorStateException) {
e.printStackTrace();
es.get().shutdownNow();
es.set(Executors.newSingleThreadExecutor());
}
}
});
while (true) {
}
}
});
try {
future.get(500, TimeUnit.MILLISECONDS);
} catch (Exception e) {
while (t.get().isAlive()) {
t.get().stop();
TimeUnit.MILLISECONDS.sleep(50);
}
}
es.get().submit(new Runnable() {
@Override
public void run() {
System.out.println("submit again");
}
});
es.get().shutdown();
}
}
注:这个例子只考虑了ExecutorService实例在单线程环境中的使用,如果需要在多线程环境中需要考虑重建实例时的排他性。
修改后的核心是UncaughtExceptionHandler:
当线程由于未捕获的异常突然终止而调用处理程序的接口。 当线程由于未捕获的异常即将终止时,Java虚拟机将使用Thread.getUncaughtExceptionHandler()向线程查询其UncaughtExceptionHandler,并将调用处理程序的uncaughtException方法,将线程和异常作为参数传递。 如果一个线程没有显示它的UncaughtExceptionHandler,那么它的ThreadGroup对象充当它的UncaughtExceptionHandler。 如果ThreadGroup对象没有处理异常的特殊要求,它可以将调用转发到默认的未捕获的异常处理程序。