高并发编程知识体系

猿天地

共 19874字,需浏览 40分钟

 ·

2020-09-03 08:36

1.问题

  • 1、什么是线程的交互方式?

  • 2、如何区分线程的同步/异步,阻塞/非阻塞?

  • 3、什么是线程安全,如何做到线程安全?

  • 4、如何区分并发模型?

  • 5、何谓响应式编程?

  • 6、操作系统如何调度多线程?

2.关键词

同步,异步,阻塞,非阻塞,并行,并发,临界区,竞争条件,指令重排,锁,amdahl,gustafson

3.全文概要

上一篇我们介绍分布式系统的知识体系,由于单机的性能上限原因我们才不得不发展分布式技术。那么话说回来,如果单机的性能没能最大限度的榨取出来,就盲目的就建设分布式系统,那就有点本末倒置了。而且上一篇我们给的忠告是如果有可能的话,不要用分布式,意思是说如果单机性能满足的话,就不要折腾复杂的分布式架构。如果说分布式架构是宏观上的性能扩展,那么高并发则是微观上的性能调优,这也是上一篇性能部分拆出来的大专题。本文将从线程的基础理论谈起,逐步探究线程的内存模型,线程的交互,线程工具和并发模型的发展。扫除关于并发编程的诸多模糊概念,从新构建并发编程的层次结构。

4.基础理论

4.1基本概念

开始学习并发编程前,我们需要熟悉一些理论概念。既然我们要研究的是并发编程,那首先应该对并发这个概念有所理解才是,而说到并发我们肯定要要讨论一些并行。

  • 并发:一个处理器同时处理多个任务

  • 并行:多个处理器或者是多核的处理器同时处理多个不同的任务


然后我们需要再了解一下同步和异步的区别:

  • 同步:执行某个操作开始后就一直等着按部就班的直到操作结束

  • 异步:执行某个操作后立即离开,后面有响应的话再来通知执行者

接着我们再了解一个重要的概念:

  • 临界区:公共资源或者共享数据

由于共享数据的出现,必然会导致竞争,所以我们需要再了解一下:

  • 阻塞:某个操作需要的共享资源被占用了,只能等待,称为阻塞


  • 非阻塞:某个操作需要的共享资源被占用了,不等待立即返回,并携带错误信息回去,期待重试

如果两个操作都在等待某个共享资源而且都互不退让就会造成死锁:

  • 死锁:参考著名的哲学家吃饭问题

  • 饥饿:饥饿的哲学家等不齐筷子吃饭

  • 活锁:相互谦让而导致阻塞无法进入下一步操作,跟死锁相反,死锁是相互竞争而导致的阻塞

4.2并发级别

理想情况下我们希望所有线程都一起并行飞起来。但是CPU数量有限,线程源源不断,总得有个先来后到,不同场景需要的并发需求也不一样,比如秒杀系统我们需要很高的并发程度,但是对于一些下载服务,我们需要的是更快的响应,并发反而是其次的。所以我们也定义了并发的级别,来应对不同的需求场景。

  • 阻塞:阻塞是指一个线程进入临界区后,其它线程就必须在临界区外等待,待进去的线程执行完任务离开临界区后,其它线程才能再进去。

  • 无饥饿:线程排队先来后到,不管优先级大小,先来先执行,就不会产生饥饿等待资源,也即公平锁;相反非公平锁则是根据优先级来执行,有可能排在前面的低优先级线程被后面的高优先级线程插队,就形成饥饿

  • 无障碍:共享资源不加锁,每个线程都可以自有读写,单监测到被其他线程修改过则回滚操作,重试直到单独操作成功;风险就是如果多个线程发现彼此修改了,所有线程都需要回滚,就会导致死循环的回滚中,造成死锁

  • 无锁:无锁是无障碍的加强版,无锁级别保证至少有一个线程在有限操作步骤内成功退出,不管是否修改成功,这样保证了多个线程回滚不至于导致死循环

  • 无等待:无等待是无锁的升级版,并发编程的最高境界,无锁只保证有线程能成功退出,但存在低级别的线程一直处于饥饿状态,无等待则要求所有线程必须在有限步骤内完成退出,让低级别的线程有机会执行,从而保证所有线程都能运行,提高并发度。

4.3量化模型

首先,多线程不意味着并发,但并发肯定是多线程或者多进程。我们知道多线程存在的优势是能够更好的利用资源,有更快的请求响应。但是我们也深知一旦进入多线程,附带而来的是更高的编码复杂度,线程设计不当反而会带来更高的切换成本和资源开销。但是总体上我们肯定知道利大于弊,这不是废话吗,不然谁还愿意去搞多线程并发程序,但是如何衡量多线程带来的效率提升呢,我们需要借助两个定律来衡量。

  • Amdahl

    S=1/(1-a+a/n)
    其中,a为并行计算部分所占比例,n为并行处理结点个数。这样,当1-a=0时,(即没有串行,只有并行)最大加速比s=n;当a=0时(即只有串行,没有并行),最小加速比s=1;当n→∞时,极限加速比s→ 1/(1-a),这也就是加速比的上限。

  • Gustafson

    系统优化某部件所获得的系统性能的改善程度,取决于该部件被使用的频率,或所占总执行时间的比例。

两面列举了这两个定律来衡量系统改善后提升效率的量化指标,具体的应用我们在下文的线程调优会再详细介绍。

5.内存模型

宏观上分布式系统需要解决的首要问题是数据一致性,同样,微观上并发编程要解决的首要问题也是数据一致性。貌似我们搞了这么多年的斗争都是在公关一致性这个世界性难题。既然并发编程要从微观开始,那么我们肯定要对CPU和内存的工作机理有所了解,尤其是数据在CPU和内存直接的传输机制。

5.1整体原则

探究内存模型之前我们要抛出三个概念:

  • 原子性

    在32位的系统中,对于4个字节32位的Integer的操作对应的JVM指令集映射到汇编指令为一个原子操作,所以对Integer类型的数据操作是原子性,但是Long类型为8个字节64位,32位系统要分为两条指令来操作,所以不是原子操作。

    对于32位操作系统来说,单次次操作能处理的最长长度为32bit,而long类型8字节64bit,所以对long的读写都要两条指令才能完成(即每次读写64bit中的32bit)

  • 可见性

    线程修改变量对其他线程即时可见

  • 有序性

    串行指令顺序唯一,并行线程直接指令可能出现不一致,也即是指令被重排了

    而指令重排也是有一定原则(摘自《深入理解Java虚拟机第12章》):

    • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;

    • 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作;

    • volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;

    • 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;

    • 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;

    • 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;

    • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行;

    • 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;

5.2逻辑内存

我们谈的逻辑内存也即是JVM的内存格局。JVM将操作系统提供的物理内存和CPU缓存在逻辑分为堆,栈,方法区,和程序计数器。在《从宏观微观角度浅析JVM虚拟机》 一文我们详细介绍了JVM的内存模型分布,并发编程我们主要关注的是堆栈的分配,因为线程都是寄生在栈里面的内存段,把栈里面的方法逻辑读取到CPU进行运算。

5.3物理内存

而实际的物理内存包含了主存和CPU的各级缓存还有寄存器,而为了计算效率,CPU往往回就近从缓存里面读取数据。在并发的情况下就会造成多个线程之间对共享数据的错误使用。

5.4内存映射

由于可能发生对象的变量同时出现在主存和CPU缓存中,就可能导致了如下问题:

  • 线程修改的变量对外可见

  • 读写共享变量时出现竞争资源

由于线程内的变量对栈外是不可见的,但是成员变量等共享资源是竞争条件,所有线程可见,就会出现如下当一个线程从主存拿了一个变量1修改后变成2存放在CPU缓存,还没来得及同步回主存时,另外一个线程又直接从主存读取变量为1,这样就出现了脏读。

现在我们弄清楚了线程同步过程数据不一致的原因,接下来要解决的目标就是如何避免这种情况的发生,经过大量的探索和实践,我们从概念上不断的革新比如并发模型的流水线化和无状态函数式化,而且也提供了大量的实用工具。接下来我们从无到有,先了解最简单的单个线程的一些特点,弄清楚一个线程有多少能耐后,才能深刻认识多个线程一起打交道会出现什么幺蛾子。

6.线程单元

6.1状态

我们知道应用启动体现的就是静态指令加载进内存,进而进入CPU运算,操作系统在内存开辟了一段栈内存用来存放指令和变量值,从而形成了进程。而其实我们的JVM也就是一个进程而且,而线程是进程的最小单位,也就是说进程是由很多个线程组成的。而由于进程的上下文关联的变量,引用,计数器等现场数据占用了打段的内存空间,所以频繁切换进程需要整理一大段内存空间来保存未执行完的进程现场,等下次轮到CPU时间片再恢复现场进行运算。这样既耗费时间又浪费空间,所以我们才要研究多线程。毕竟由于线程干的活毕竟少,工作现场数据毕竟少,所以切换起来比较快而且暂用少量空间。而线程切换直接也需要遵守一定的法则,不然到时候把工作现场破坏了就无法恢复工作了。

线程状态

我们先来研究线程的生命周期,看看Thread类里面对线程状态的定义就知道

public enum State {    /**
    * Thread state for a thread which has not yet started.
    */

   NEW,    /**
    * Thread state for a runnable thread.  A thread in the runnable
    * state is executing in the Java virtual machine but it may
    * be waiting for other resources from the operating system
    * such as processor.
    */

   RUNNABLE,    /**
    * Thread state for a thread blocked waiting for a monitor lock.
    * A thread in the blocked state is waiting for a monitor lock
    * to enter a synchronized block/method or
    * reenter a synchronized block/method after calling
    * {@link Object#wait() Object.wait}.
    */

   BLOCKED,    /**
    * Thread state for a waiting thread.
    * A thread is in the waiting state due to calling one of the
    * following methods:
    *

        *  
  • {@link Object#wait() Object.wait} with no timeout

  •     *  
  • {@link #join() Thread.join} with no timeout

  •     *  
  • {@link LockSupport#park() LockSupport.park}

  •     *

    *
    *

A thread in the waiting state is waiting for another thread to
    * perform a particular action.
    *
    * For example, a thread that has called Object.wait()
    * on an object is waiting for another thread to call
    * Object.notify() or Object.notifyAll() on
    * that object. A thread that has called Thread.join()
    * is waiting for a specified thread to terminate.
    */
   WAITING,    /**
    * Thread state for a waiting thread with a specified waiting time.
    * A thread is in the timed waiting state due to calling one of
    * the following methods with a specified positive waiting time:
    *


        *  
  • {@link #sleep Thread.sleep}

  •     *  
  • {@link Object#wait(long) Object.wait} with timeout

  •     *  
  • {@link #join(long) Thread.join} with timeout

  •     *  
  • {@link LockSupport#parkNanos LockSupport.parkNanos}

  •     *  
  • {@link LockSupport#parkUntil LockSupport.parkUntil}

  •     *

    */

   TIMED_WAITING,    /**
    * Thread state for a terminated thread.
    * The thread has completed execution.
    */

   TERMINATED;
}

生命周期

线程的状态:NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED。注释也解释得很清楚各个状态的作用,而各个状态的转换也有一定的规则需要遵循的。

6.2动作

介绍完线程的状态和生命周期,接下来我了解的线程具备哪些常用的操作。首先线程也是一个普通的对象Thread,所有的线程都是Thread或者其子类的对象。那么这个内存对象被创建出来后就会放在JVM的堆内存空间,当我们执行start()方法的时候,对象的方法体在栈空间分配好对应的栈帧来往执行引擎输送指令(也即是方法体翻译成JVM的指令集)。

线程操作

  • 新建线程:new  Thread(),新建一个线程对象,内存为线程在栈上分配好内存空间

  • 启动线程:start(),告诉系统系统准备就绪,只要资源允许随时可以执行我栈里面的指令了

  • 执行线程:run(),分配了CPU等计算资源,正在执行栈里面的指令集

  • 停止线程(过时):stop(),把CPU和内存资源回收,线程消亡,由于太过粗暴,已经被标记为过时

  • 线程中断:

    • interrupt(),中断是对线程打上了中断标签,可供run()里面的方法体接收中断信号,至于线程要不要中断,全靠业务逻辑设计,而不是简单粗暴的把线程直接停掉

    • isInterrupt(),主要是run()方法体来判断当前线程是否被置为中断

    • interrupted(),静态方法,也是用户判断线程是否被置为中断状态,同时判断完将线程中断状态复位

  • 线程休眠:sleep(),静态方法,线程休眠指定时间段,此间让出CPU资源给其他线程,但是线程依然持有对象锁,其他线程无法进入同步块,休眠完成后也未必立刻执行,需要等到资源允许才能执行

  • 线程等待(对象方法):wait(),是Object的方法,也即是对象的内置方法,在同步块中线程执行到该方法时,也即让出了该对象的锁,所以无法继续执行

  • 线程通知(对象方法):notify(),notifyAll(),此时该对象持有一个或者多个线程的wait,调用notify()随机的让一个线程恢复对象的锁,调用notifyAll()则让所有线程恢复对象锁

  • 线程挂起(过时):suspend(),线程挂起并没有释放资源,而是只能等到resume()才能继续执行

  • 线程恢复(过时):resume(),由于指令重排可能导致resume()先于suspend()执行,导致线程永远挂起,所以该方法被标为过时

  • 线程加入:join(),在一个线程调用另外一个线程的join()方法表明当前线程阻塞知道被调用线程执行结束再进行,也即是被调用线程织入进来

  • 线程让步:yield(),暂停当前线程进而执行别的线程,当前线程等待下一轮资源允许再进行,防止该线程一直霸占资源,而其他线程饿死

  • 线程等待:park(),基于线程对象的操作,较对象锁更为精准

  • 线程恢复:unpark(Thread thread),对应park()解锁,为不可重入锁

线程分组

为了管理线程,于是有了线程组的概念,业务上把类似的线程放在一个ThreadGroup里面统一管理。线程组表示一组线程,此外,线程组还可以包括其他线程组。线程组形成一个树,其中除了初始线程组以外的每个线程组都有一个父线程。线程被允许访问它自己的线程组信息,但不能访问线程组的父线程组或任何其他线程组的信息。

守护线程

通常情况下,线程运行到最后一条指令后则完成生命周期,结束线程,然后系统回收资源。或者单遇到异常或者return提前返回,但是如果我们想让线程常驻内存的话,比如一些监控类线程,需要24小时值班的,于是我们又创造了守护线程的概念。

setDaemon()传入true则会把线程一直保持在内存里面,除非JVM宕机否则不会退出。

线程优先级

线程优先级其实只是对线程打的一个标志,但并不意味这高优先级的一定比低优先级的先执行,具体还要看操作系统的资源调度情况。通常线程优先级为5,边界为[1,10]。

 /**
 * The minimum priority that a thread can have.
 */

public final static int MIN_PRIORITY = 1;/**
 * The default priority that is assigned to a thread.
 */

public final static int NORM_PRIORITY = 5; /**
 * The maximum priority that a thread can have.
 */

public final static int MAX_PRIORITY = 10;

本节介绍了线程单元的转态切换和常用的一些操作方法。如果只是单线程的话,其他都没必要研究这些,重头戏在于多线程直接的竞争配合操作,下一节则重点介绍多个线程的交互需要关注哪些问题。

7.线程交互

其实上一节介绍的线程状态切换和线程操作都是为线程交互做准备的。不然如果只是单线程完全没必要搞什么通知,恢复,让步之类的操作了。

7.1交互方式

线程交互也就是线程直接的通信,最直接的办法就是线程直接直接通信传值,而间接方式则是通过共享变量来达到彼此的交互。

  • 等待:释放对象锁,允许其他线程进入同步块

  • 通知:重新获取对象锁,继续执行

  • 中断:状态交互,通知其他线程进入中断

  • 织入:合并线程,多个线程合并为一个

7.2线程安全

我们最关注的还是通过共享变量来达到交互的方式。线程如果都各自干活互不搭理的话自然相安无事,但多数情况下线程直接需要打交道,而且需要分享共享资源,那么这个时候最核心的就是线程安全了。

什么是线程安全?

当多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替运行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获取正确的结果,那这个对象是线程安全的。(摘自《深入Java虚拟机》)

如何保证线程安全?

我们最早接触线程安全可能是JDK提供的一些号称线程安全的容器,比如Vetor较ArrayList是线程安全,HashTable较HashMap是线程安全?其实线程安全类并不代表也不等同线程安全的程序,而线程不安全的类同样可以完成线程安全的程序。我们关注的也就是写出线程安全的程序,那么如何写出线程安全的代码呢?下面列举了线程安全的主要设计技术:

无状态

这个有点函数式编程的味道,下文并发模式会介绍到,总之就是线程只有入参和局部变量,如果变量是引用的话,确保变量的创建和调用生命周期都发生在线程栈内,就可以确保线程安全。

无共享状态

完全要求线程无状态比较难实现,必要的状态是无法避免的,那么我们就必须维护不同线程之间的不同状态,这可是个麻烦事。幸好我们有ThreadLocal这个神器,该对象跟当前线程绑定,而且只对当前线程可见,完美解决了无共享状态的问题。

不可变状态

最后实在没办法避免状态共享,在线程之间共享状态,最怕的就是无法确保能维护好正确的读写顺序,而且多线程确实也无法正确维护好这个共享变量。那么我们索性粗暴点,把共享的状态定位不可变,比如价格final修饰一下,这样就达到安全状态共享。

消息传递

一个线程通常也不是所有步骤都需要共享状态,而是部分环节才需要的,那么我们把共享状态的代码拆开,无共享状态的那部分自然不用关心,而共享状态的小段代码,则通过加入消息组件来传递状态。这个设计到并发模式的流水线编程模式,下文并发模式会重点介绍。

线程安全容器

JUC里面提供大量的并发容器,涉及到线程交互的时候,使用安全容器可以避免大部分的错误,而且大大降低了代码的复杂度。

  • 通过synchronized给方法加上内置锁来实现线程安全的类如Vector,HashTable,StringBuffer

  • AtomicXXX如AtomicInteger

  • ConcurrentXXX如ConcurrentHashMap

  • BlockingQueue/BlockingDeque

  • CopyOnWriteArrayList/CopyOnWriteArraySet

  • ThreadPoolExecutor

synchronized同步

该关键字确保代码块同一时间只被一个线程执行,在这个前提下再设计符合线程安全的逻辑

其作用域为

  • 对象:对象加锁,进入同步代码块之前获取对象锁

  • 实例方法:对象加锁,执行实例方法前获取对象实例锁

  • 类方法:类加锁,执行类方法前获取类锁

volatile约束

volatile确保每次操作都能强制同步CPU缓存和主存直接的变量。而且在编译期间能阻止指令重排。读写并发情况下volatile也不能确保线程安全,上文解析内存模型的时候有提到过。

这节我们论述了编写线程安全程序的指导思想,其中我们提到了JDK提供的JUC工具包,下一节将重点介绍并发编程常用的趁手工具。

8.线程工具

前文我们介绍了内存理论和线程的一些特征,大家都知道并发编程容易出错,而且出了错还不好调试排查,幸好JDK里面集成了大量实用的API工具,我们能熟悉这些工具,写起并发程序来也事半功倍。

工具篇其实就是对锁的不断变种,适应更多的开发场景,提高性能,提供更方便的工具,从最粗暴的同步修饰符,到灵活的可重入锁,到宽松的条件,接着到允许多个线程访问的信号量,最后到读写分离锁。

8.1同步控制

由于大多数的并发场景都是需要访问到共享资源的,为了保证线程安全,我们不得已采用锁的技术来做同步控制,这节我们介绍的是适用不同场景各种锁技术。

ReentrantLock

可重入互斥锁具有与使用synchronized的隐式监视器锁具有相同的行为和语义,但具有更好扩展功能。
ReentrantLock由最后成功锁定的线程拥有,而且还未解锁。当锁未被其他线程占有时,线程调用lock()将返回并且成功获取锁。如果当前线程已拥有锁,则该方法将立即返回。这可以使用方法isHeldByCurrentThread()和getHoldCount()来检查。

构造函数接受可选的fairness参数。当设置为true时,在竞争条件下,锁定有利于赋予等待时间最长线程的访问权限。否则,锁将不保证特定的访问顺序。在多线程访问的情况,使用公平锁比默认设置,有着更低的吞吐量,但是获得锁的时间比较小而且可以避免等待锁导致的饥饿。但是,锁的公平性并不能保证线程调度的公平性。因此,使用公平锁的许多线程中的一个可以连续多次获得它,而其他活动线程没有进展并且当前没有持有锁。不定时的tryLock()方法不遵循公平性设置。即使其他线程正在等待,如果锁可用,它也会成功。

  • 任意指定锁的起始位置

  • 中断响应

  • 锁申请等待限时tryLock()

  • 公平锁

Condition

Condition从拥有监控方法(wait,notify,notifyAll)的Object对象中抽离出来成为独特的对象,高效的让每个对象拥有更多的等待线程。和锁对比起来,如果说用Lock代替synchronized,那么Condition就是用来代替Object本身的监控方法。

Condition实例跟Object本身的监控相似,同样提供wait()方法让调用的线程暂时挂起让出资源,知道其他线程通知该对象转态变化,才可能继续执行。Condition实例来源于Lock实例,通过Lock调用newCondition()即可。Condition较Object原生监控方法,可以保证通知顺序。

Semaphore

锁和同步块同时只能允许单个线程访问共享资源,这个明显有些单调,部分场景其实可以允许多个线程访问,这个时候信号量实例就派上用场了。信号量逻辑上维持了一组许可证, 线程调用acquire()阻塞直到许可证可用后才能执行。执行release()意味着释放许可证,实际上信号量并没有真正的许可证,只是采用了计数功能来实现这个功能。

ReadWriteLock

顾名思义读写锁将读写分离,细化了锁的粒度,照顾到性能的优化。

CountDownLatch

这个锁有点“关门放狗”的意思,尤其在我们压测的时候模拟实时并行请求,该实例将线程积累到指定数量后,调用countDown()方法让所有线程同时执行。

CyclicBarrier

CyclicBarrier是加强版的CountDownLatch,上面讲的是一次性“关门放狗”,而循环栅栏则是集齐了指定数量的线程,在资源都允许的情况下同时执行,然后下一批同样的操作,周而复始。

LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。

8.2线程池

线程池总览

线程多起来的话就需要管理,不然就会乱成一锅。我们知道线程在物理上对应的就是栈里面的一段内存,存放着局部变量的空间和待执行指令集。如果每次执行都要从头初始化这段内存,然后再交给CPU执行,效率就有点低了。假如我们知道该段栈内存会被经常用到,那我们就不要回收,创建完就让它在栈里面呆着,要用的时候取出来,用完换回去,是不是就省了初始化线程空间的时间,这样是我们搞出线程池的初衷。

其实线程池很简单,就是搞了个池子放了一堆线程。既然我们搞线程池是为了提高效率,那就要考虑线程池放多少个线程比较合适,太多了或者太少了有什么问题,怎么拒绝多余的请求,除了异常怎么处理。首先我们来看跟线程池有关的一张类图。

线程池归结起来就是这几个类的使用技巧了,重点关注ThreadPoolExecutor和Executors即可。

创建线程池

万变不离其宗,创建线程池的各种马甲方法最后都是调用到这方法里面,包含核心线程数,最大线程数,线程工厂,拒绝策略等参数。其中线程工厂则可以实现自定义创建线程的逻辑。

public interface ThreadFactory {    Thread newThread(Runnable r);
}

创建的核心构造方法ThreadPoolExecutor.java  1301

 /**
    * Creates a new {@code ThreadPoolExecutor} with the given initial
    * parameters.
    *
    * @param corePoolSize the number of threads to keep in the pool, even
    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    * @param maximumPoolSize the maximum number of threads to allow in the
    *        pool
    * @param keepAliveTime when the number of threads is greater than
    *        the core, this is the maximum time that excess idle threads
    *        will wait for new tasks before terminating.
    * @param unit the time unit for the {@code keepAliveTime} argument
    * @param workQueue the queue to use for holding tasks before they are
    *        executed.  This queue will hold only the {@code Runnable}
    *        tasks submitted by the {@code execute} method.
    * @param threadFactory the factory to use when the executor
    *        creates a new thread
    * @param handler the handler to use when execution is blocked
    *        because the thread bounds and queue capacities are reached
    * @throws IllegalArgumentException if one of the following holds:

    *         {@code corePoolSize < 0}

    *         {@code keepAliveTime < 0}

    *         {@code maximumPoolSize <= 0}

    *         {@code maximumPoolSize < corePoolSize}
    * @throws NullPointerException if {@code workQueue}
    *         or {@code threadFactory} or {@code handler} is null
    */

   public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler)

拒绝策略包含:

    /** 实际上并未真正丢弃任务,但是线程池性能会下降
    * A handler for rejected tasks that runs the rejected task
    * directly in the calling thread of the {@code execute} method,
    * unless the executor has been shut down, in which case the task
    * is discarded.
    */

   public static class CallerRunsPolicy implements RejectedExecutionHandler

   /** 粗暴停止抛异常
    * A handler for rejected tasks that throws a
    * {@code RejectedExecutionException}.
    */    public static class AbortPolicy implements RejectedExecutionHandler

   /** 悄无声息的丢弃拒绝的任务
    * A handler for rejected tasks that silently discards the
    * rejected task.
    */    public static class DiscardPolicy implements RejectedExecutionHandler


   /** 丢弃最老的请求
    * A handler for rejected tasks that discards the oldest unhandled
    * request and then retries {@code execute}, unless the executor
    * is shut down, in which case the task is discarded.
    */    public static class DiscardOldestPolicy implements RejectedExecutionHandler

包括Executors.java中的创建线程池的方法,具体实现也是通过ThreadPoolExecutor来创建的。

public static ExecutorService newCachedThreadPool() {
   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                 60L, TimeUnit.SECONDS,
                                 new SynchronousQueue());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue()));
}

调用线程池

ThreadPoolExecutor.java 1342

/** 同步执行线程,出现异常打印堆栈信息
* Executes the given task sometime in the future.  The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
*         {@code RejectedExecutionHandler}, if the task
*         cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command)/**
* 异步提交线程任务,出现异常无法同步追踪堆栈,本质上也是调用execute()方法
*/
public Future submit(Runnable task, T result) {    if (task == null) throw new NullPointerException();
   RunnableFuture ftask = newTaskFor(task, result);
   execute(ftask);    return ftask;
}

线程池优化

线程池已经是我们使用线程的一个优化成果了,而线程池本身的优化其实就是根据实际业务选择好不同类型的线程池,预估并发线程数量,控制好线程池预留线程数(最大线程数一般设为2N+1最好,N是CPU核数),这些涉及CPU数量,核数还有具体业务。

另外我们还注意到ForkJoinPool继承了AbstractExecutorService,这是在JDK7才加上去的,目的就是提高任务派生出来更多任务的执行效率,由上图的继承关系我们可以知道跟普通线程池最大的差异是执行的任务类型不同。

public void execute(ForkJoinTask task) {    if (task == null)        throw new NullPointerException();
   externalPush(task);
}public void execute(Runnable task) {        if (task == null)            throw new NullPointerException();
       ForkJoinTask job;        if (task instanceof ForkJoinTask) // avoid re-wrap
           job = (ForkJoinTask) task;        else
           job = new ForkJoinTask.RunnableExecuteAction(task);
       externalPush(job);
}

8.3并发容器

其实我们日常开发大多数并发场景直接用JDK 提供的线程安全数据结构足矣,下面列举了常用的列表,集合等容器,具体就不展开讲,相信大家都用得很熟悉了。

  • ConcurrentHashMap

  • CopyOnWriteArrayList

  • ConcurrentLinkedQueue

  • BlockingQueue

  • ConcurrentSkipListMap

  • Vector

  • HashTable

9.线程调优

9.1性能指标

回想一下,当我们在谈性能优化的时候,我们可能指的是数据库的读写次数,也可能指网站的响应时间。通常我们会用QPS,TPS,RT,并发数,吞吐量,更进一步的还会对比CPU负载来衡量一个系统的性能。

当然我们知道一个系统的吞吐量和响应时间跟外部网络,分布式架构等都存在强关联,性能优化也跟各级缓存设计,数据冗余等架构有很大关系,假设其他方面我们都已经完成了,聚焦到本文我们暂时关心的是单节点的性能优化。毕竟一屋不扫何以扫天下,整体系统的优化也有赖于各个节点的调优。从感官上来谈,当请求量很少的时候,我们可以很轻松的通过各种缓存优化来提高响应时间。但是随着用户激增,请求次数的增加,我们的服务也对应着需要并发模型来支撑。但是一个节点的并发量有个上限,当达到这个上限后,响应时间就会变长,所以我们需要探索并发到什么程度才是最优的,才能保证最高的并发数,同时响应时间又能保持在理想情况。由于我们暂时不关注节点以外的网络情况,那么下文我们特指的RT是指服务接收到请求后,完成计算,返回计算结果经历的时间。

单线程

单线程情况下,服务接收到请求后开始初始化,资源准备,计算,返回结果,时间主要花在CPU计算和CPU外的IO等待时间,多个请求来也只能排队一个一个来,那么RT计算如下

RT = T(cpu) + T(io)

QPS = 1000ms / RT

多线程

单线程情况很好计算,多线程情况就复杂了,我们目标是计算出最佳并发量,也就是线程数N

单核情况:N = [T(cpu) + T(io)] / T(cpu)

M核情况:N = [T(cpu) + T(io)] / T(cpu) * M

由于多核情况CPU未必能全部使用,存在一个资源利用百分比P

那么并发的最佳线程数 N = [T(cpu) + T(io)] / T(cpu) M P

吞吐量

我们知道单线程的QPS很容易算出来,那么多线程的QPS

QPS = 1000ms / RT N = 1000ms / T(cpu) + T(io) [T(cpu) + T(io)] / T(cpu) M P= 1000ms / T(cpu) M P

在机器核数固定情况下,也即是并发模式下最大的吞吐量跟服务的CPU处理时间和CPU利用率有关。CPU利用率不高,就是通常我们听到最多的抱怨,压测时候qps都打满了,但是cpu的load就是上不去。并发模型中多半个共享资源有关,而共享资源又跟锁息息相关,那么大部分时候我们想对节点服务做性能调优时就是对锁的优化,这个下一节会提到。

前面我们是假设机器核数固定的情况下做优化的,那假如我们把缓存,IO,锁都优化了,剩下的还有啥空间去突破呢?回想一下我们谈基础理论的时候提到的Amdahl定律,公式之前已经给出,该定律想表达的结论是随着核数或者处理器个数的增加,可以增加优化加速比,但是会达到上限,而且增加趋势愈发不明显。

9.2锁优化

说真的,我们并不喜欢锁的,只不过由于临界资源的存在不得已为之。如果业务上设计能避免出现临界资源,那就没有锁优化什么事了。但是,锁优化的一些原则还是要说一说的。

时间

既然我们并不喜欢锁,那么就按需索取,只在核心的同步块加锁,用完立马释放,减少锁定临界区的时间,这样就可以把资源竞争的风险降到最低。

粒度

进一步看,有时候我们核心同步块可以进一步分离,比如只读的情况下并不需要加锁,这时候就可以用读写锁各自的读写功能。

还有一种情况,有时候我们反而会小心翼翼的到处加锁来防止意外出现,可能出现三个同步块加了三个锁,这也造成CPU的过多停顿,根据业务其实可以把相关逻辑合并起来,也就是锁粗化。

锁的分离和粗化具体还得看业务如何操作。

尺度

除了锁暂用时间和粒度外,还有就是锁的尺度,还是根据业务来,能用共享锁定的情况就不要用独享锁。

死锁

这个不用说都知道,死锁防不胜防,我们前面也介绍很多现成的工具,比如可重入锁,还有线程本地变量等方式,都可以一定程度避免死锁。

9.3JVM锁机制

我们在代码层面把锁的应用都按照安全法则做到最好了,那接下来要做的就是下钻到JVM级别的锁优化。具体实现原理我们暂不展开,后续有机会再搞个专题写写JVM锁实现。

自旋锁(Spin Lock)

自旋锁的原理非常简单。如果持有锁的线程可以在短时间内释放锁资源,那么等待竞争锁的那些线程不需要在内核状态和用户状态之间进行切换。它只需要等待,并且锁可以在释放锁之后立即获得锁。这可以避免消耗用户线程和内核切换。

但是,自旋锁让CPU空等着什么也不干也是一种浪费。如果自旋锁的对象一直无法获得临界资源,则线程也无法在没有执行实际计算的情况下一致进行CPU空转,因此需要设置自旋锁的最大等待时间。如果持有锁的线程在旋转等待的最大时间没有释放锁,则自旋锁线程将停止旋转进入阻塞状态。

JDK1.6开启自旋锁  -XX:+UseSpinning,1.7之后控制器收回到JVM自主控制

偏向锁(Biased Lock)

偏向锁偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁。如果在运行过程中,遇到了其他线程抢占锁,则持有偏向锁的线程会被挂起,JVM会消除它身上的偏向锁,将锁恢复到标准的轻量级锁。

JDK1.6开启自旋锁  -XX:+UseBiasedLocking,1.7之后控制器收回到JVM自主控制

轻量级锁(Lightweight Lock)

轻量级锁是由偏向锁升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁竞争的时候,偏向锁就会升级为轻量级锁。

重量级锁(Heavyweight Lock)

如果锁检测到与另一个线程的争用,则锁定会膨胀至重量级锁。也就是我们常规用的同步修饰产生的同步作用。

9.4无锁

最后其实我想说的是,虽然锁很符合我们人类的逻辑思维,设计起来也相对简单,但是摆脱不了临界区的限制。那么我们不妨换个思路,进入无锁的时间,也就是我们可能会增加业务复杂度的情况下,来消除锁的存在。

CAS策略

著名的CAS(Compare And Swap),是多线程中用于实现同步的原子指令。它将内存位置的内容与给定值进行比较,并且只有它们相同时,才将该内存位置的内容修改为新的给定值。这是作为单个原子操作完成的。原子性保证了新值是根据最新信息计算出来的; 如果在此期间该值已被另一个线程更新,则写入将失败。操作的结果必须表明它是否进行了替换; 这可以通过简单的Boolean来响应,或通过返回从内存位置读取的值(而不是写入它的值)来完成。

也就是一个原子操作包含了要操作的数据和给定认为正确的值进行对比,一致的话就继续,不一致则会重试。这样就在没有锁的情况下完成并发操作。

我们知道原子类 AtomicInteger内部实现的原理就是采用了CAS策略来完成的。

AtomicInteger.java  132

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

类似的还有AtomicReference.java  115

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(V expect, V update) {    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

有兴趣的同学可以再了解一下Unsafe的实现,进一步可以了解Distuptor无锁框架。

10.并发模型

前面我们大费周章的从并发的基础概念到多线程的使用方法和优化技巧。但都是战术层面的,本节我们试着从战略的高度来扩展一下并发编程的世界。可能大多数情况下我们谈并发都会想到多线程,但是本节我们要打破这种思维,在完全不用搞多线程那一套的情况下实现并发。

首先我们用”多线程模式“来回顾前文所讲的所有关于Thread衍生出来的定义,开发和优化的技术。

多线程模式

单位线程完成完整的任务,也即是一条龙服务线程。

  • 优势:

    • 映射现实单一任务,便于理解和编码

  • 劣势:


    • 有状态多线程共享资源,导致资源竞争,死锁问题,线程等待阻塞,失去并发意义

    • 有状态多线程非阻塞算法,有利减少竞争,提升性能,但难以实现

    • 多线程执行顺序无法预知

流水线模型

介绍完传统多线程工作模式后,我们来学习另外一种并发模式,传统的多线程工作模式,理解起来很直观,接下来我们要介绍的另外一种并发模式看起来就不那么直观了。

流水线模型,特点是无状态线程,无状态也意味着无需竞争共享资源,无需等待,也就是非阻塞模型。流水线模型顾名思义就是流水线上有多个环节,每个环节完成自己的工作后就交给下一个环节,无需等待上游,周而复始的完成自己岗位上的一亩三分地就行。各个环节之间交付无需等待,完成即可交付。

而工厂的流水线也不止一条,所以有多条流水线同时工作。

不同岗位的生产效率是不一样的,所以不同流水线之间也可以发生协同。

我们说流水线模型也称为响应式模型或者事件驱动模型,其实就是流水线上上游岗位完成生产就通知下游岗位,所以完成了一个事件的通知,每完成一次就通知一下,就是响应式的意思。

流水线模型总体的思想就是纵向切分任务,把任务里面耗时过久的环节单独隔离出来,避免完成一个任务需要耗费等待的时间。在实现上又分为Actors和Channels模型

  • Actors


    该模型跟我们讲述的流水线模型基本一致,可以理解为响应式模型

  • Channels

由于各个环节直接不直接交互,所以上下游之间并不知道对方是谁,好比不同环节直接用的是几条公共的传送带来接收物品,各自只需要把完成后的半成品扔到传送带,即使后面流水线优化了,去掉中间的环节,对于个体岗位来说也是无感知的,它只是周而复始的从传送带拿物品来加工。


流水线的优缺点:

优势:

  • 无共享状态:无需考虑资源抢占,死锁等问题

  • 独享内存:worker可以持有内存,合并多次操作到内存后再持久化,提升效率

  • 贴合底层:单线程模式贴合硬件运行流程,便于代码维护

  • 任务顺序可预知

劣势:

  • 不够直观:一个任务被拆分为流水线上多个环节,代码层面难以直观理解业务逻辑

由于流水线模式跟人类的顺序执行思维不一样,比较费解,那么有没有办法让我们编码的时候像写传统的多线程代码一样,而运行起来又是流水线模式呢?答案是肯定的,比如基于Java的Akka/Reator/Vert.x/Play/Qbit框架,或者golang就是为流水线模式而生的并发语言,还有nodeJS等等。

流水线模型的开发实践可以参考流水线模型实践。

其实流水线模型背后用的也还是多线程来实现,只不过对于传统多线程模式下我们需要小心翼翼来处理跟踪资源共享问题,而流水线模式把以前一个线程做的事情拆成多个,每一个环节再用一条线程来完成,避免共享,线程直接通过管道传输消息。

这一块展开也是一个专题,主要设计NIO,Netty和Akka的编程实践,先占坑后面补上。

函数式模型

函数式并行模型类似流水线模型,单一的函数是无状态的,所以避免了资源竞争的复杂度,同时每个函数类似流水线里面的单一环境,彼此直接通过函数调用传递参数副本,函数之外的数据不会被修改。函数式模式跟流水线模式相辅相成逐渐成为更为主流的并发架构。具体的思想和编程实践也是个大专题,篇幅限制本文就先不展开,拟在下个专题中详细介绍《函数式编程演化》。

11.总结

由于CPU和I/O天然存在的矛盾,传统顺序的同步工作模式导致任务阻塞,CPU空等着没有执行,浪费资源。多线程为突破了同步工作模式的情况下浪费CPU资源,即使单核情况下也能将时间片拆分成单位给更多的线程来轮询享用。多线程在不同享状态的情况下非常高效,不管协同式还是抢占式都能在单位时间内执行更多的任务,从而更好的榨取CPU资源。

但是多数情况下线程之间是需要通信的,这一核心场景导致了一系列的问题,也就是线程安全。内存被共享的单位由于被不同线程轮番读取写入操作,这种操作带来的后果往往是写代码的人类没想到的,也就是并发带来的脏数据等问题。解决了资源使用效率问题,又带来了新的安全问题,如何解决?悲观方式就是对于存在共享内存的场景,无论如何只同意同一时刻一个线程操作,也就是同步操作方法或者代码段或者显示加锁。或者volatile来使共享的主存跟每条线程的工作内存同步(每次读都从主存刷新,每次写完都刷到主存)

要保证线程安全:

  • 1、不要使用多线程,

  • 2、多线程各干各的不要共享内存,

  • 3、共享的内存空间是不可变的(常量,final),

  • 4、实在要变每次变完要同步到主存volatile(依赖当前值的逻辑除外),

  • 5、原子变量,

  • 6、根据具体业务,避免脏数据(这块就是多线程最容易犯错的地方)

线程安全后,要考虑的就是效率问题,如果不解决效率问题,那还干嘛要多线程。。。
如果所有线程都很自觉,快速执行完就跑路,那就是我们的理想情况了。但是,部分线程又臭又长(I/O阻塞),不能让一直赖在CPU不走,就把他上下文(线程号,变量,执行到哪等数值的快照)保存到内存,然后让它滚蛋下一个线程来。但是切换太快的话也不合适,毕竟每次保存线程的作案现场也要花不少时间的,单位时间执行线程数要控制在一个适当的个数。创建线程也是一项很吃力的工作,一个线程就是在栈内存里面开辟一段内存空间,根据字节码分配临时变量空间,不同操作系统通常不一样。不能频繁的创建销毁线程。那就搞个线程池出来,用的时候拿出来,用完扔回去,简单省事。但是线程池的创建也有门道,不能无限创建不然就失去意义了。操作系统有一定上限,线程池太多线程内存爆了,系统奔溃,所以需要一个机制。容纳1024个线程,多了排队再多了扔掉。回到线程切换,由于创建线程耗费资源,切换也花费,有时候切换线程的时间甚至比让线程待在cpu无所事事更长,那就给加个自旋锁,就是让它自己再cpu打滚啥事不干,一会儿轮到它里面就能干活。

既然多线程同步又得加锁耗资源,不同步又有共享安全问题。那能不能把这些锁,共享,同步,要注意的问题封装起来。搞出一个异步的工作机制,不用管底层的同步问题,只管业务问题。传统是工匠干活一根筋干完,事件驱动是流水线,把一件事拆分成多个环节,每个环节有唯一标识,各个环节批量生产,在流水线对接。这样在CPU单独干,不共享,不阻塞,干完自己的通知管工,高效封装了内部线程的运行规则,把业务关系暴露给管理者。

本文主要将的数基于JAVA的传统多线程并发模型,下面例牌给出知识体系图。


后台回复 学习资料 领取学习视频


如有收获,点个在看,诚挚感谢

浏览 15
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报