暴力停止ExecutorService的线程

暴力停止ExecutorService的线程

停止,stop,这里说的是真的停止。如何优雅的结束,这里就不提了。

这里要用Thread.stop()。众所周知,stop()方法在JDK中是废弃的。

该方法天生是不安全的。使用thread.stop()停止一个线程,导致释放(解锁)所有该线程已经锁定的监视器(因沿堆栈向上传播的未检查异常ThreadDeath而解锁)。如果之前受这些监视器保护的任何对象处于不一致状态,则不一致状态的对象(受损对象)将对其他线程可见,这可能导致任意的行为。

有时候我们会有这种需求,不需要考虑线程执行到哪一步。一般这种情况是外部执行stop,比如执行业务的线程因为各种原因假死或者耗时较长,由于设计问题又无法响应优雅的停止指令。

现在大家在项目中都很少直接使用线程,而是通过concurrent包中的类来实现多线程,例如ExecutorService的各种实现类。

一个简单的停止线程的例子:

public class ExecutorServiceTest {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        final AtomicReference<Thread> t = new AtomicReference<>();

        Future<?> firstFuture = executor.submit(new Runnable() {
            public void run() {
                Thread currentThread = Thread.currentThread();
                t.set(currentThread);
                while (true) {
                }
            }
        });
        try {
            firstFuture.get(500, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            while (t.get().isAlive()) {
                t.get().stop();
                TimeUnit.MILLISECONDS.sleep(50);
            }
        }

        executor.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("submit again");
            }
        });
        executor.shutdown();
    }
}

如果你运行了上面的代码就会发现程序假死了,通过stack dump看是发生了死锁:

"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007fa91006e800 nid=0x5903 waiting on condition [0x00007000060f8000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000076ab76ea0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

死锁发生在第二次submit后,在LinkedBlockingQueue.take()时,LinkedBlockingQueue在ThreadPoolExecutor中用来暂存task的。真正执行任务线程的时候再从队列中取出。我们都知道LinkedBlockingQueue是线程的安全的,其高并发和线程安全是通过一个ReentrantLock代替内置锁来实现的(减小了锁的粒度)。submit第二个task时,再次执行take会再次获取锁。但是由于stop直接杀死了线程,没有释放当次执行take方法时获取ReentrantLock锁,导致了死锁。

stop直接停止了线程,抛出了ThreadDeathThreadDeath是Error,不是Exception。

public class ThreadDeath extends Error {
    private static final long serialVersionUID = -4417128565033088268L;
}

这种情况下,原有的ExecutorService实例就不能再使用了,因为我们无法通过程序来释放未释放的锁(由虚拟机的GC来解决)。如此,便需要重建ExecutorService实例。

对上面的代码做了修改:

public class ExecutorServiceTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        final AtomicReference<ExecutorService> es = new AtomicReference<>();
        es.set(executor);
        final AtomicReference<Thread> t = new AtomicReference<>();

        Future<?> future = es.get().submit(new Runnable() {
            public void run() {
                Thread currentThread = Thread.currentThread();
                t.set(currentThread);
                currentThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    public void uncaughtException(Thread t, Throwable e) {
                        if (e instanceof ThreadDeath || e instanceof IllegalMonitorStateException) {
                            e.printStackTrace();
                            es.get().shutdownNow();
                            es.set(Executors.newSingleThreadExecutor());
                        }
                    }
                });
                while (true) {
                }
            }
        });
        try {
            future.get(500, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            while (t.get().isAlive()) {
                t.get().stop();
                TimeUnit.MILLISECONDS.sleep(50);
            }
        }

        es.get().submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("submit again");
            }
        });
        es.get().shutdown();
    }

}

注:这个例子只考虑了ExecutorService实例在单线程环境中的使用,如果需要在多线程环境中需要考虑重建实例时的排他性。

修改后的核心是UncaughtExceptionHandler:

当线程由于未捕获的异常突然终止而调用处理程序的接口。 当线程由于未捕获的异常即将终止时,Java虚拟机将使用Thread.getUncaughtExceptionHandler()向线程查询其UncaughtExceptionHandler,并将调用处理程序的uncaughtException方法,将线程和异常作为参数传递。 如果一个线程没有显示它的UncaughtExceptionHandler,那么它的ThreadGroup对象充当它的UncaughtExceptionHandler。 如果ThreadGroup对象没有处理异常的特殊要求,它可以将调用转发到默认的未捕获的异常处理程序。

(转载本站文章请注明作者和出处乱世浮生,请勿用于任何商业用途)

comments powered by Disqus