Java线程池源码解析及高质量代码案例
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | Star先生
来源 | urlify.cn/EvIrIr
引言
Java线程池架构原理及源码解析
构建参数源码
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
参数解释
源码详细解析
excute源码
public void execute(Runnable command)
{
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
    {
        if (runState == RUNNING && workQueue.offer(command))
        {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
图一:ThreadPoolExecutor运行状态图
addIfUnderCorePoolSize源码
private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);
    }
    finally
    {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}
addThread源码
private Thread addThread(Runnable firstTask)
{
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);
    < span style = "color:#ff0000;" > < / span >
                   if (t != null)
    {
        w.thread = t;
        workers.add(w);
        int nt = ++poolSize;
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}
ThreadFactory接口默认实现DefaultThreadFactory
public Thread newThread(Runnable r)
{
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}
Worker的run方法
public void run()
{
    try
    {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null)
        {
            runTask(task);
            task = null;
        }
    }
    finally
    {
        workerDone(this);
    }
}
getTask源码
Runnable getTask()
{
    for (;;)
    {
        try
        {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit())
            {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        }
        catch (InterruptedException ie)
        {
            // On interruption, re-check runState
        }
    }
}
execute方法部分实现
if (runState == RUNNING && workQueue.offer(command))
{
    if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
    reject(command); // is shutdown or saturated
如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运行状态 并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用 ensureQueuedTaskHandled方法
ensureQueuedTaskHandled源码
private void ensureQueuedTaskHandled(Runnable command)
{
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean reject = false;
    Thread t = null;
    try
    {
        int state = runState;
        if (state != RUNNING && workQueue.remove(command))
            reject = true;
        else if (state < STOP &&
                 poolSize < Math.max(corePoolSize, 1) &&
                 !workQueue.isEmpty())
            t = addThread(null);
    }
    finally
    {
        mainLock.unlock();
    }
    if (reject)
        reject(command);
    else if (t != null)
        t.start();
}
reject源码
void reject(Runnable command)
{
    handler.rejectedExecution(command, this);
}
再次回到execute方法
if (runState == RUNNING && workQueue.offer(command))
{
    if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
    reject(command); // is shutdown or saturated
addIfUnderMaximumPoolSize源码
private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    }
    finally
    {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}
workerDone源码
void workerDone(Worker w)
{
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
        if (--poolSize == 0)
            tryTerminate();
    }
    finally
    {
        mainLock.unlock();
    }
}
runTask(task)源码
private void runTask(Runnable task)
{
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try
    {
        if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            thread.interrupt();
        boolean ran = false;
        beforeExecute(thread, task);
        try
        {
            task.run();
            ran = true;
            afterExecute(task, null);
            ++completedTasks;
        }
        catch (RuntimeException ex)
        {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    }
    finally
    {
        runLock.unlock();
    }
}
添加任务处理流程
AbortPolicy()
public static class AbortPolicy implements RejectedExecutionHandler
{
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }
    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always.
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
/*当线程池中的数量等于最大线程数时,直接抛出抛出java.util.concurrent.RejectedExecutionException异常。*/
CallerRunsPolicy()
public static class CallerRunsPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }
    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        if (!e.isShutdown())
        {
            r.run();
        }
    }
}
DiscardOldestPolicy()
public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }
    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        if (!e.isShutdown())
        {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
DiscardPolicy()
public static class DiscardPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }
    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
    }
}
违反Java高质量代码案例
异步运算使用Callable接口
public interface Callable<V>{
    v call() throws Exception;
}
class TaxCalculator implements Callable<Integer>{
    private int seedMoney;
    public TaxCalculator(int _seedMoney){
        seedMoney=_seedMoney;
    }
    @Override
    public Integer call() throws Exception {
       TimeUnit.MILLISECONDS.sleep(10000);
        return seedMoney/10;
    }
}
public static void main(String[] args) throws Exception{
        ExecutorService es=Executors.newSingleThreadExecutor();
        Future<Integer> future=es.submit(new TaxCalculator(100));
        while(!future.isDone()){
            TimeUnit.MILLISECONDS.sleep(200);
            System.out.println("#");
        }
        System.out.println("\n 计算完成,税金是:"+future.get()+"元");
        es.shutdown();
    }
优先选择线程池
public static void main(String[] args) throws Exception{
    Thread t=new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("线程在运行");
        }
    });
    t.start();
    while(!t.getState().equals(Thread.State.TERMINATED)){
        TimeUnit.MILLISECONDS.sleep(10);
    }
    t.start();
}
public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 4; i++) {
            es.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        es.shutdown();
    }
线程死锁
static class A {
        public synchronized void a1(B b) {
            String name = Thread.currentThread().getName();
            System.out.println(name + "进入A.a1()");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println(name + "试图访问B.b2()");
            b.b2();
        }
        public synchronized void a2() {
            System.out.println("进入 a.a2()");
        }
    }
    static class B {
        public synchronized void b1(A a) {
            String name = Thread.currentThread().getName();
            System.out.println(name + "进入B.b1()");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println(name + "试图访问A.a2()");
            a.a2();
        }
        public synchronized void b2() {
            System.out.println("进入 B.b2()");
        }
    }
    public static void main(String[] args) {
        final A a = new A();
        final B b = new B();
        new Thread(new Runnable() {
            @Override
            public void run() {
                a.a1(b);
            }
        }, "线程A").start();
        ;
        new Thread(new Runnable() {
            @Override
            public void run() {
                b.b1(a);
            }
        }, "线程B").start();
        ;
    }
public  void b2()
{
    try
    {
        if(Lock.trylock(2, TimeUnit.SECONDS))
        {
            System.out.println("进入 B.b2()");
        }
    }
    catch (InterruptedException e)
    {
        // TODO: handle exception
    }
    finally
    {
        Lock.unlock();
    }
}
忽略设置阻塞队列长度
public static void main(String[] args) throws Exception {
        BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
                5);
        for (int i = 0; i < 10; i++) {
            bq.add("");
        }
    }
public  class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
    BlockingDeque<E>, java.io.Serializable
{
    public final E[] items;
    private int count;
    public boolean add(E e)
    {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    public boolean offer(E e)
    {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try
        {
            if (count == items.length)
                ;
            else
            {
                insert(e);
                return true;
            }
        }
        finally
        {
            lock.unlock();
        }
    }
}
使用stop方法停止线程
class MutiThread implements Runnable {
    int a = 0;
    @Override
    public void run() {
        // TODO Auto-generated method stub
        synchronized ("") {
            a++;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            a--;
            String tn = Thread.currentThread().getName();
            System.out.println(tn + ":a=" + a);
        }
    }
    public static void main(String[] args) {
        MutiThread t = new MutiThread();
        Thread t1 = new Thread(t);
        t1.start();
        for (int i = 0; i < 5; i++) {
            new Thread(t).start();
        }
        t1.stop();
    }
}
class SafeStopThread extends Thread{
    private volatile boolean stop=false;
    @Override
    public void run()
    {//判断线程体是否运行
        while(stop)
        {}
    }
    //线程终止
    public void terminate(){
        stop=true;
    }
}
覆写start方法
class MutiThread implements Thread
{
    @Override
    public void start()
    {
        //调用线程体
        run();
    }
}
@Override
public void run()
{
}
}
public static void main(String[] args)
{
    MutiThread t = new MutiThread();
    t.start();
}
}
class MutiThread implements Thread
{
    @Override
    public void start()
    {
        /*线程启动前的业务处理*/
        super.start();
        /*线程启动后的业务处理*/
    }
}
@Override
public void run()
{
}
}
使用过多线程优先级
class MutiThread implements Runnable {
    public void start(int _priority) {
        Thread t = new Thread(this);
        t.setPriority(_priority);
        t.start();
    }
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            Math.hypot(Math.pow(924526789, i), Math.cos(i));
        }
        System.out.println("Priority:"+Thread.currentThread().getPriority());
    }
    public static void main(String[] args) {
        for(int i=0;i<20;i++)
        {
            new MutiThread().start(i%10+1);
        }
    }
}
  public final static int MIN_PRIORITY = 1;
/**
  * The default priority that is assigned to a thread.
  */
public final static int NORM_PRIORITY = 5;
/**
 * The maximum priority that a thread can have.
 */
public final static int MAX_PRIORITY = 10;
/**
 * Returns a reference to the currently executing thread object.
 *
 * @return  the currently executing thread.
 */
}
Lock与synchronized
class  Task
{
    public void dosomething(){
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        StringBuffer sb=new StringBuffer();
        sb.append("线程名:"+Thread.currentThread().getName());
        sb.append(",线程时间:"+Calendar.getInstance().get(13)+"s");
        System.out.println(sb);
    }
}
//显示锁任务
class TaskWithLock extends Task implements Runnable{
private final Lock lock=new ReentrantLock();
    @Override
    public void run() {
        try {
            lock.lock();
            dosomething();
        } finally
        {
            lock.unlock();
        }
    }};
    //內部锁任务
    class TaskWithSync extends Task implements Runnable{
        @Override
        public void run() {
                synchronized ("A") {
                    dosomething();
                }
        }};
public static void main(String[] args) {
        //多个线程共享锁
        final Lock lock=new ReentrantLock();
        ……
    }
线程池异常处理
public interface Runnable {   
    public abstract void run();   
}
java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)  
public interface UncaughtExceptionHandler {   
    void uncaughtException(Thread t, Throwable e);   
}
/**   
 * Waits if necessary for the computation to complete, and then   
 * retrieves its result.   
 *   
 * @return the computed result   
 * @throws CancellationException if the computation was cancelled   
 * @throws ExecutionException if the computation threw an exception   
 * @throws InterruptedException if the current thread was interrupted while waiting   
 */   
V get() throws InterruptedException, ExecutionException;
protected void afterExecute(Runnable r, Throwable t) { } 
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //   
        new ArrayBlockingQueue<Runnable>(10000),//   
        new DefaultThreadFactory()) {   
    protected void afterExecute(Runnable r, Throwable t) {   
        super.afterExecute(r, t);   
        printException(r, t);   
    }   
};   
private static void printException(Runnable r, Throwable t) {   
    if (t == null && r instanceof Future<?>) {   
        try {   
            Future<?> future = (Future<?>) r;   
            if (future.isDone())   
                future.get();   
        } catch (CancellationException ce) {   
            t = ce;   
        } catch (ExecutionException ee) {   
            t = ee.getCause();   
        } catch (InterruptedException ie) {   
            Thread.currentThread().interrupt(); // ignore/reset   
        }   
    }   
    if (t != null)   
        log.error(t.getMessage(), t);   
}
使用SimpleThread类
//TestThreadPool.java
import java.io.*;
public class TestThreadPool
{
    public static void main(String[] args)
    {
        try
        {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String s;
            ThreadPoolManager manager = new ThreadPoolManager(10);
            while((s = br.readLine()) != null)
            {
                manager.process(s);
            }
        }
        catch(IOException e) {}
    }
}
import java.util.*;
  
  class ThreadPoolManager
     
{
     
       private int maxThread;
       public Vector vector;
       public void setMaxThread(int threadCount)
      
    {
           maxThread = threadCount;
          
    }
      
       public ThreadPoolManager(int threadCount)
      
    {
           setMaxThread(threadCount);
          System.out.println("Starting thread pool...");
           vector = new Vector();
           for(int i = 1; i <= 10; i++)
               {
               SimpleThread thread = new SimpleThread(i);
               vector.addElement(thread);
               thread.start();
              
        }
          
    }
      
       public void process(String argument)
      
    {
           int i;
           for(i = 0; i < vector.size(); i++)
              {
              SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
               if(!currentThread.isRunning())
                   {
                  System.out.println("Thread " + (i + 1) + " is processing:" +
                  argument);
                 currentThread.setArgument(argument);
                   currentThread.setRunning(true);
                   return;
                 
            }
              
        }
          if(i == vector.size())
               {
               System.out.println("pool is full, try in another time.");
              
        }
          
    }
      
}//end of class ThreadPoolManager
class SimpleThread extends Thread
     
{
       private boolean runningFlag;
       private String argument;
       public boolean isRunning()
      
    {
           return runningFlag;
          
    }
      public synchronized void setRunning(boolean flag)
      
    {
           runningFlag = flag;
           if(flag)
               this.notify();
          
    }
      
       public String getArgument()
      
    {
           return this.argument;
          
    }
       public void setArgument(String string)
      
    {
           argument = string;
          
    }
      
       public SimpleThread(int threadNumber)
      
    {
           runningFlag = false;
           System.out.println("thread " + threadNumber + "started.");
          
    }
      
       public synchronized void run()
      
    {
           try{
               while(true)
                   {
                   if(!runningFlag)
                       {
                       this.wait();
                      
                }
                   else
                       {
                       System.out.println("processing " + getArgument() + "... done.");
                       sleep(5000);
                       System.out.println("Thread is sleeping...");
                       setRunning(false);
                      
                }
                  
            }
              
        }
        catch(InterruptedException e)
        {
               System.out.println("Interrupt");
              
        }
          
    }//end of run()
      
}//end of class SimpleThread
线程使用不当导致内存溢出
class IndexCallable implements Callable
{
    private List<?> t;
    @override
    public object call()
    {
        ……
    }
}
JAVA_OPTS="-server -Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m -Djava.awt.headless=true "
工作队列
public class WorkQueue
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedList queue;
    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new PoolWorker[nThreads];
        for (int i = 0; i
                threads[i] = new PoolWorker();
                threads[i].start();
    }
}
        public void execute(Runnable r)
{
    synchronized(queue)
    {
        queue.addLast(r);
        queue.notify();
    }
}
private class PoolWorker extends Thread
{
    public void run()
    {
        Runnable r;
        while (true)
        {
            synchronized(queue)
            {
                while (queue.isEmpty())
                {
                    try
                    {
                        queue.wait();
                    }
                    catch (InterruptedException ignored)
                    {
                    }
                }
                r = (Runnable) queue.removeFirst();
            }
            // If we don't catch RuntimeException,
            // the pool could leak threads
            try
            {
                r.run();
            }
            catch (RuntimeException e)
            {
                // You might want to log something here
            }
        }
    }
}
}


评论
