线程池的一个BUG,被我发现了
点击上方“码农突围”,马上关注
这里是码农充电第一站,回复“666”,获取一份专属大礼包 真爱,请设置“星标”或点个“在看
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@a5acd19 rejected from java.util.concurrent.ThreadPoolExecutor@30890a38[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
public class ThreadPoolTest {
public static void main(String[] args) {
final ThreadPoolTest threadPoolTest = new ThreadPoolTest();
for (int i = 0; i < 8; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Future<String> future = threadPoolTest.submit();
try {
String s = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (Error e) {
e.printStackTrace();
}
}
}
}).start();
}
//子线程不停gc,模拟偶发的gc
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.gc();
}
}
}).start();
}
/**
* 异步执行任务
* @return
*/
public Future<String> submit() {
//关键点,通过Executors.newSingleThreadExecutor创建一个单线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
FutureTask<String> futureTask = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
Thread.sleep(50);
return System.currentTimeMillis() + "";
}
});
executorService.execute(futureTask);
return futureTask;
}
}
分析&疑问
Executors.newSingleThreadExecotor
的源码实现:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
FinalizableDelegatedExecutorService
,这个包装类重写了 finalize
函数,也就是说这个类会在被GC回收之前,先执行线程池的shutdown方法。submit
函数的栈帧未执行完出栈之前, executorService
应该是可达的才对。finalize
也可能会被执行A reachable object is any object that can be accessed in any potential continuing computation from any live thread. Optimizing transformations of a program can be designed that reduce the number of objects that are reachable to be less than those which would naively be considered reachable. For example, a Java compiler or code generator may choose to set a variable or parameter that will no longer be used to null to cause the storage for such an object to be potentially reclaimable sooner.
class A {
@Override protected void finalize() {
System.out.println(this + " was finalized!");
}
public static void main(String[] args) throws InterruptedException {
A a = new A();
System.out.println("Created " + a);
for (int i = 0; i < 1_000_000_000; i++) {
if (i % 1_000_00 == 0)
System.gc();
}
System.out.println("done.");
}
}
//打印结果
Created A@1be6f5c3
A@1be6f5c3 was finalized!//finalize方法输出
done.
...
System.out.println(a);
//打印结果
Created A@1be6f5c3
done.
A@1be6f5c3
A a = new A();
System.out.println("Created " + a);
a = null;//手动置null
for (int i = 0; i < 1_000_000_000; i++) {
if (i % 1_000_00 == 0)
System.gc();
}
System.out.println("done.");
System.out.println(a);
//打印结果
Created A@1be6f5c3
A@1be6f5c3 was finalized!
done.
null
executorService.execute(futureTask)
,为什么也会提前finalize呢?
//入口函数
public class FinalizedTest {
public static void main(String[] args) {
final FinalizedTest finalizedTest = new FinalizedTest();
for (int i = 0; i < 8; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
TFutureTask future = finalizedTest.submit();
}
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.gc();
}
}
}).start();
}
public TFutureTask submit(){
TExecutorService TExecutorService = Executors.create();
TExecutorService.execute();
return null;
}
}
//Executors.java,模拟juc的Executors
public class Executors {
/**
* 模拟Executors.createSingleExecutor
* @return
*/
public static TExecutorService create(){
return new FinalizableDelegatedTExecutorService(new TThreadPoolExecutor());
}
static class FinalizableDelegatedTExecutorService extends DelegatedTExecutorService {
FinalizableDelegatedTExecutorService(TExecutorService executor) {
super(executor);
}
/**
* 析构函数中执行shutdown,修改线程池状态
* @throws Throwable
*/
@Override
protected void finalize() throws Throwable {
super.shutdown();
}
}
static class DelegatedTExecutorService extends TExecutorService {
protected TExecutorService e;
public DelegatedTExecutorService(TExecutorService executor) {
this.e = executor;
}
@Override
public void execute() {
e.execute();
}
@Override
public void shutdown() {
e.shutdown();
}
}
}
//TThreadPoolExecutor.java,模拟juc的ThreadPoolExecutor
public class TThreadPoolExecutor extends TExecutorService {
/**
* 线程池状态,false:未关闭,true已关闭
*/
private AtomicBoolean ctl = new AtomicBoolean();
@Override
public void execute() {
//启动一个新线程,模拟ThreadPoolExecutor.execute
new Thread(new Runnable() {
@Override
public void run() {
}
}).start();
//模拟ThreadPoolExecutor,启动新建线程后,循环检查线程池状态,验证是否会在finalize中shutdown
//如果线程池被提前shutdown,则抛出异常
for (int i = 0; i < 1_000_000; i++) {
if(ctl.get()){
throw new RuntimeException("reject!!!["+ctl.get()+"]");
}
}
}
@Override
public void shutdown() {
ctl.compareAndSet(false,true);
}
}
Exception in thread "Thread-1" java.lang.RuntimeException: reject!!![true]
Thread.sleep
测试一下:
//TThreadPoolExecutor.java,修改后的execute方法
public void execute() {
try {
//显式的sleep 1 ns,主动切换线程
TimeUnit.NANOSECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟ThreadPoolExecutor,启动新建线程后,循环检查线程池状态,验证是否会在finalize中shutdown
//如果线程池被提前shutdown,则抛出异常
for (int i = 0; i < 1_000_000; i++) {
if(ctl.get()){
throw new RuntimeException("reject!!!["+ctl.get()+"]");
}
}
}
Exception in thread "Thread-3" java.lang.RuntimeException: reject!!![true]
总结
Executors.newSingleThreadExecutor
的实现里通过finalize来自动关闭线程池的做法是有Bug的,在经过优化后可能会导致线程池的提前shutdown,从而导致异常。
JUC Executors.FinalizableDelegatedExecutorService
public void execute(Runnable command) {
try {
e.execute(command);
} finally { reachabilityFence(this); }
}
-End-
最近有一些小伙伴,让我帮忙找一些 面试题 资料,于是我翻遍了收藏的 5T 资料后,汇总整理出来,可以说是程序员面试必备!所有资料都整理到网盘了,欢迎下载!
点击👆卡片,关注后回复【面试题
】即可获取