Java线程池源码解析及高质量代码案例
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | Star先生
来源 | urlify.cn/EvIrIr
引言
Java线程池架构原理及源码解析
构建参数源码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
参数解释
源码详细解析
excute源码
public void execute(Runnable command)
{
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
{
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
图一:ThreadPoolExecutor运行状态图
addIfUnderCorePoolSize源码
private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
addThread源码
private Thread addThread(Runnable firstTask)
{
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
< span style = "color:#ff0000;" > < / span >
if (t != null)
{
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
ThreadFactory接口默认实现DefaultThreadFactory
public Thread newThread(Runnable r)
{
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
Worker的run方法
public void run()
{
try
{
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null)
{
runTask(task);
task = null;
}
}
finally
{
workerDone(this);
}
}
getTask源码
Runnable getTask()
{
for (;;)
{
try
{
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
{
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
}
catch (InterruptedException ie)
{
// On interruption, re-check runState
}
}
}
execute方法部分实现
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运行状态 并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用 ensureQueuedTaskHandled方法
ensureQueuedTaskHandled源码
private void ensureQueuedTaskHandled(Runnable command)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try
{
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
}
finally
{
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
reject源码
void reject(Runnable command)
{
handler.rejectedExecution(command, this);
}
再次回到execute方法
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
addIfUnderMaximumPoolSize源码
private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
workerDone源码
void workerDone(Worker w)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
}
finally
{
mainLock.unlock();
}
}
runTask(task)源码
private void runTask(Runnable task)
{
final ReentrantLock runLock = this.runLock;
runLock.lock();
try
{
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try
{
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
}
catch (RuntimeException ex)
{
if (!ran)
afterExecute(task, ex);
throw ex;
}
}
finally
{
runLock.unlock();
}
}
添加任务处理流程
AbortPolicy()
public static class AbortPolicy implements RejectedExecutionHandler
{
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/*当线程池中的数量等于最大线程数时,直接抛出抛出java.util.concurrent.RejectedExecutionException异常。*/
CallerRunsPolicy()
public static class CallerRunsPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
r.run();
}
}
}
DiscardOldestPolicy()
public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy()
public static class DiscardPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
}
}
违反Java高质量代码案例
异步运算使用Callable接口
public interface Callable<V>{
v call() throws Exception;
}
class TaxCalculator implements Callable<Integer>{
private int seedMoney;
public TaxCalculator(int _seedMoney){
seedMoney=_seedMoney;
}
@Override
public Integer call() throws Exception {
TimeUnit.MILLISECONDS.sleep(10000);
return seedMoney/10;
}
}
public static void main(String[] args) throws Exception{
ExecutorService es=Executors.newSingleThreadExecutor();
Future<Integer> future=es.submit(new TaxCalculator(100));
while(!future.isDone()){
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("#");
}
System.out.println("\n 计算完成,税金是:"+future.get()+"元");
es.shutdown();
}
优先选择线程池
public static void main(String[] args) throws Exception{
Thread t=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程在运行");
}
});
t.start();
while(!t.getState().equals(Thread.State.TERMINATED)){
TimeUnit.MILLISECONDS.sleep(10);
}
t.start();
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
for (int i = 0; i < 4; i++) {
es.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
es.shutdown();
}
线程死锁
static class A {
public synchronized void a1(B b) {
String name = Thread.currentThread().getName();
System.out.println(name + "进入A.a1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "试图访问B.b2()");
b.b2();
}
public synchronized void a2() {
System.out.println("进入 a.a2()");
}
}
static class B {
public synchronized void b1(A a) {
String name = Thread.currentThread().getName();
System.out.println(name + "进入B.b1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "试图访问A.a2()");
a.a2();
}
public synchronized void b2() {
System.out.println("进入 B.b2()");
}
}
public static void main(String[] args) {
final A a = new A();
final B b = new B();
new Thread(new Runnable() {
@Override
public void run() {
a.a1(b);
}
}, "线程A").start();
;
new Thread(new Runnable() {
@Override
public void run() {
b.b1(a);
}
}, "线程B").start();
;
}
public void b2()
{
try
{
if(Lock.trylock(2, TimeUnit.SECONDS))
{
System.out.println("进入 B.b2()");
}
}
catch (InterruptedException e)
{
// TODO: handle exception
}
finally
{
Lock.unlock();
}
}
忽略设置阻塞队列长度
public static void main(String[] args) throws Exception {
BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
5);
for (int i = 0; i < 10; i++) {
bq.add("");
}
}
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
BlockingDeque<E>, java.io.Serializable
{
public final E[] items;
private int count;
public boolean add(E e)
{
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e)
{
final ReentrantLock lock = this.lock;
lock.lock();
try
{
if (count == items.length)
;
else
{
insert(e);
return true;
}
}
finally
{
lock.unlock();
}
}
}
使用stop方法停止线程
class MutiThread implements Runnable {
int a = 0;
@Override
public void run() {
// TODO Auto-generated method stub
synchronized ("") {
a++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
a--;
String tn = Thread.currentThread().getName();
System.out.println(tn + ":a=" + a);
}
}
public static void main(String[] args) {
MutiThread t = new MutiThread();
Thread t1 = new Thread(t);
t1.start();
for (int i = 0; i < 5; i++) {
new Thread(t).start();
}
t1.stop();
}
}
class SafeStopThread extends Thread{
private volatile boolean stop=false;
@Override
public void run()
{//判断线程体是否运行
while(stop)
{}
}
//线程终止
public void terminate(){
stop=true;
}
}
覆写start方法
class MutiThread implements Thread
{
@Override
public void start()
{
//调用线程体
run();
}
}
@Override
public void run()
{
}
}
public static void main(String[] args)
{
MutiThread t = new MutiThread();
t.start();
}
}
class MutiThread implements Thread
{
@Override
public void start()
{
/*线程启动前的业务处理*/
super.start();
/*线程启动后的业务处理*/
}
}
@Override
public void run()
{
}
}
使用过多线程优先级
class MutiThread implements Runnable {
public void start(int _priority) {
Thread t = new Thread(this);
t.setPriority(_priority);
t.start();
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Math.hypot(Math.pow(924526789, i), Math.cos(i));
}
System.out.println("Priority:"+Thread.currentThread().getPriority());
}
public static void main(String[] args) {
for(int i=0;i<20;i++)
{
new MutiThread().start(i%10+1);
}
}
}
public final static int MIN_PRIORITY = 1;
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;
/**
* Returns a reference to the currently executing thread object.
*
* @return the currently executing thread.
*/
}
Lock与synchronized
class Task
{
public void dosomething(){
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
}
StringBuffer sb=new StringBuffer();
sb.append("线程名:"+Thread.currentThread().getName());
sb.append(",线程时间:"+Calendar.getInstance().get(13)+"s");
System.out.println(sb);
}
}
//显示锁任务
class TaskWithLock extends Task implements Runnable{
private final Lock lock=new ReentrantLock();
@Override
public void run() {
try {
lock.lock();
dosomething();
} finally
{
lock.unlock();
}
}};
//內部锁任务
class TaskWithSync extends Task implements Runnable{
@Override
public void run() {
synchronized ("A") {
dosomething();
}
}};
public static void main(String[] args) {
//多个线程共享锁
final Lock lock=new ReentrantLock();
……
}
线程池异常处理
public interface Runnable {
public abstract void run();
}
java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
V get() throws InterruptedException, ExecutionException;
protected void afterExecute(Runnable r, Throwable t) { }
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //
new ArrayBlockingQueue<Runnable>(10000),//
new DefaultThreadFactory()) {
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
}
};
private static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone())
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
log.error(t.getMessage(), t);
}
使用SimpleThread类
//TestThreadPool.java
import java.io.*;
public class TestThreadPool
{
public static void main(String[] args)
{
try
{
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String s;
ThreadPoolManager manager = new ThreadPoolManager(10);
while((s = br.readLine()) != null)
{
manager.process(s);
}
}
catch(IOException e) {}
}
}
import java.util.*;
class ThreadPoolManager
{
private int maxThread;
public Vector vector;
public void setMaxThread(int threadCount)
{
maxThread = threadCount;
}
public ThreadPoolManager(int threadCount)
{
setMaxThread(threadCount);
System.out.println("Starting thread pool...");
vector = new Vector();
for(int i = 1; i <= 10; i++)
{
SimpleThread thread = new SimpleThread(i);
vector.addElement(thread);
thread.start();
}
}
public void process(String argument)
{
int i;
for(i = 0; i < vector.size(); i++)
{
SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
if(!currentThread.isRunning())
{
System.out.println("Thread " + (i + 1) + " is processing:" +
argument);
currentThread.setArgument(argument);
currentThread.setRunning(true);
return;
}
}
if(i == vector.size())
{
System.out.println("pool is full, try in another time.");
}
}
}//end of class ThreadPoolManager
class SimpleThread extends Thread
{
private boolean runningFlag;
private String argument;
public boolean isRunning()
{
return runningFlag;
}
public synchronized void setRunning(boolean flag)
{
runningFlag = flag;
if(flag)
this.notify();
}
public String getArgument()
{
return this.argument;
}
public void setArgument(String string)
{
argument = string;
}
public SimpleThread(int threadNumber)
{
runningFlag = false;
System.out.println("thread " + threadNumber + "started.");
}
public synchronized void run()
{
try{
while(true)
{
if(!runningFlag)
{
this.wait();
}
else
{
System.out.println("processing " + getArgument() + "... done.");
sleep(5000);
System.out.println("Thread is sleeping...");
setRunning(false);
}
}
}
catch(InterruptedException e)
{
System.out.println("Interrupt");
}
}//end of run()
}//end of class SimpleThread
线程使用不当导致内存溢出
class IndexCallable implements Callable
{
private List<?> t;
@override
public object call()
{
……
}
}
JAVA_OPTS="-server -Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m -Djava.awt.headless=true "
工作队列
public class WorkQueue
{
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
public WorkQueue(int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i = 0; i
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r)
{
synchronized(queue)
{
queue.addLast(r);
queue.notify();
}
}
private class PoolWorker extends Thread
{
public void run()
{
Runnable r;
while (true)
{
synchronized(queue)
{
while (queue.isEmpty())
{
try
{
queue.wait();
}
catch (InterruptedException ignored)
{
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try
{
r.run();
}
catch (RuntimeException e)
{
// You might want to log something here
}
}
}
}
}
评论