基础三连:如何避免重复创建线程?创建线程池的方式有哪些?各自优缺点?

分布式朝闻道

共 5189字,需浏览 11分钟

 · 2021-11-08

【前言:笔者本身是比较排斥所谓的大白话讲技术方式的。技术本身是严肃的,标准的,大白话方式一来废话很多,二来需要消化完再换种通俗方式讲出来,本身就是经过了信息的二次加工,对于技术本身的严肃来说,已经失真了。所以,大白话虽好,可不要贪杯哦。笔者今后基本不会考虑这种写作方式了,不仅累,而且写的我很难受。

建议读者朋友们,大白话方式的文章仅仅作为了解,娱乐方式,真正的学习还是要去看书,实践。实践,是检验真理的唯一标准。大白话,永远都只是别人的认知,即便吸收了,也是变味的。谨记。


我们在日常生活使用App的时候应该都收到过消息推送,这种消息推送的实现机制往往都依赖消息推送服务实现。事实上,互联网大厂都有自己的消息推送服务(又名Message Push Server),通过消息推送服务实现对App的消息推送, 忽略掉技术实现的细节,大概的过程就是图1中所画的这样子。


图1

如果让你来实现这样的一个消息推送服务,要求保证消息推送的实时性,你会怎么设计呢?

你可能会说,这有什么难的,用多线程技术就行了,为每个客户端都创建一个线程,然后启动线程,每来一条需要推送的消息就用创建好的线程发送出去不就行了么?如图2所示。

图2

就像上图这样,每当有一个客户端与服务端建立连接,就通过创建一个线程来执行消息推送服务。

看起来很美好,但是这样真的可行吗?答案可能会让你失望,这种方案是无法在生产环境实战的。原因就在于不断创建线程这个操作本身是不合理的。

首先,相信大家在学习完前面的内容,再结合平时的学习工作经验,都对线程或多或少建立了一定的认识,那就是:创建线程本身是一个比较重的操作。一方面是因为线程创建本身需要占用内存资源,这里的内存资源并非是JVM的内存,乍一听是不是觉得挺反常识的。

不慌,我们先看一段代码,在循环中一直创建线程,看看会出现什么情况,如图3所示。

图3

运行代码,结果如图4所示

图4

可以看到出现了java.lang.OutOfMemoryError,即内存已用尽。出现这个错误的本质原因就在于我们创建了太多的线程,线程对象本身以及线程的调用栈都是要占用内存的,而操作系统的内存是有上限的,这决定了我们能创建的线程数也是有限制的,而无限制的创建线程会使内存不断消耗最终超过内存上限从而报错。

事实上,能创建多少线程数是有一个计算公式的:可创建的线程数 =(进程的最大内存 – JVM分配的内存 – 操作系统保留的内存)/ 线程栈大小。如图所示,粉红色的部分就是可分配线程的内存大小,如果不显式设置-Xss或-XX:ThreadStackSize参数的时候,在Linux x64上ThreadStackSize的默认值就是1024K,也就是1MB大小,如图5所示。

图5

这里还要强调的一点是,在Java语言中,当我们每创建一个线程的时候,Java虚拟机就会在JVM内存中创建出一个Thread对象,与此同时创建一个操作系统的线程,最终在系统底层映射的是操作系统的本地线程(Native Thread),在windows系统中是1对1映射(即一个Java线程映射一个操作系统线程),在Linux系统是N对M映射(即多个Java线程映射多个操作系统线程,N与M不完全相等),这里就仅做了解,不详细展开了,感兴趣的兄弟可以去看一下操作系统的线程部分的知识。需要明确的是,这里说的映射关系是系统自动完成的,不需要用户主动感知。

我们主要要记住一点,那就是操作系统的线程使用的内存并不是JVM分配的内存,而是系统中剩余的内存,也就是公式中的(进程的最大内存– JVM分配的内存 – 操作系统保留的内存)。

这么一来,你越给JVM分配内存,那你能创建的线程就越少,也就越容易发生Exception in thread "main" java.lang.OutOfMemoryError:unable to create new native thread这样的异常。

 

回到我们在文章开头的那个案例,如果系统接入的移动设备非常多,达到数百万甚至上千万台,那么通过为每台设备创建一个线程的方式执行推送就是不现实的,Java进程会大量报错,代码执行效率很低。

那怎么办才能既不会大量创建线程导致java.lang.OutOfMemoryError,又能实现业务需求呢?

这时候,我们就需要用到线程池这个神器了。通过线程池可以避免重复创建线程,此时发送推送消息的方式如图6所示;

图6

是不是和上面基于线程创建的方式不一样了?我们通过创建了一个消息推送线程池,对线程进行了复用,让这些固定数量的线程去执行不断产生的消息推送任务。事实上,线程池就像一个水池,水池里有一个队列对待处理的任务进行存储,同时池子里还装了固定数量的工作线程去队列中获取任务进行执行。

原理如图7所示:

图7

就是说,应用程序不断的往线程池中提交任务,有的任务就被工作线程直接执行了,如果工作线程都是处于繁忙的执行状态,那么应用程序就先把任务提交到任务队列里缓存起来,然后工作线程会从队列里取任务进行调度。

这就好比是说,你去食堂打饭,点了一份水饺,因为你去的比较早,不用排队直接问食堂大妈说,我要一份水饺,猪肉大葱馅儿的。大妈盛了一份做好的水饺给你,你去结账就行了。

有一天你因为处理线上问题导致吃饭时间稍微耽误了,去了一看,我天,排了几十人的队伍,可是没办法,这家店的水饺做的实在是太好吃了。得嘞,为了吃饭只能自觉排队等着轮到自己。

在这个案例中,去得早直接让大妈盛一份水饺,就相当于是说直接让工作线程把任务执行了,去的迟了就相当于是说任务需要进队列,然后被工作线程慢慢调度。在这个场景里,食堂大妈就相当于工作线程,等待过程排的长队就相当于任务队列。


创建线程池的方式

事实上,在Java中,已经给我们准备好了现成的线程池供我们使用,这就是大名鼎鼎的Executors框架。它提供了多种创建线程池的方式。

图8

如图8所示,Executors框架提供了多种类型的线程池,我们依次说明一下:

l  newSingleThreadExecutor()方法:它返回的线程池实例中只有一个工作线程,如果提交超过一个任务到线程池中,那么任务会被保存在队列中。等工作线程空闲了就从队列中取出其他任务进行执行。获取任务遵循队列的先入先出原则。

l  newCachedThreadPool()方法:它返回的线程池中的线程数量是可变的,理论上可以创建Integer.MAX_VALUE个线程。当然,如果有空闲的线程能够被复用,就还是优先使用可被复用的线程。当目前所有的线程都处于工作状态,但是仍然有新任务被提交了,那么就会创建新的线程来调度新任务。是不是很熟悉啊?相信自己的直觉,这个方式其实就和不断创建线程执行任务的方式是一个原理。

l  newFixedThreadPool()方法:它返回一个带固定数量线程的线程池。这个线程池中的线程数量从线程池一开始创建就固定不变。如果提交一个任务到这个线程池里,线程池中恰好有空闲的线程,那么就会立即执行任务;否则,没有空闲的工作线程,那么新提交的任务就只能被暂存在一个任务队列里面,等待空闲线程去处理任务队列中的任务。

l  newSingleThreadScheduledExecutor():这个方法,实际上返回的是ScheduledExecutorService对象实例。如果看一下代码,我们会发现,ScheduledExecutorService实际上是继承了接口ExecutorService,并扩展出了周期性调度任务的能力。

l  newScheduledThreadPool():和newSingleThreadScheduledExecutor()类似,它也是返回一个ScheduledExecutorService对象实例,只不过它能够指定线程的数量。

通过一个脑图,我们简单总结一下Executors框架创建线程池的种类及其功能,如图9所示。

图9

这些线程池的使用方法也很简单,我们以newCachedThreadPool()为例,简单展示一下如何向线程池中提交任务,如图10所示。

图10

我们模拟向线程池中提交请求,打印请求id和当前线程名称,执行该测试代码,观察一下日志打印,运行结果如图11所示。

图11

可以看到,通过submit(Runnable task)方法向 newCachedThreadPool()提交任务,每个任务都是被一个新的线程所执行。我们一共提交了10个任务,就分配了10个线程进行调度。

那么我们回到开头的案例,如果要使用线程池来实现发送消息推送的需求,我们应该如何去做呢?

首先就排除掉newCachedThreadPool()这种方式!原因在于newCachedThreadPool()方式是一个最大线程数为Integer.MAX_VALUE的线程池,这意味这我们可以一次性创建2147483647个线程!足足21亿4000万个!这也太夸张了,如果系统面临同时并发发送大量的任务的场景,而且任务本身执行速度不是很快的情况下,系统是会开启等量线程进行处理的,这么多线程会在短时间快速耗尽系统资源,从而又出现java.lang.OutOfMemoryError这个恐怖的异常!

而且newCachedThreadPool()持有的是一个SynchronousQueue,后面的章节中我们会详细讲解这部分内容,现在只需要知道SynchronousQueue是一个缓存值为1的阻塞队列,通俗的说,它根本就没有缓冲任务的能力,具体的原理如图12所示。

 


12

那么我们就猜想,是不是应该控制一下线程池的数量,不要让系统无休止的疯狂创建新线程呢?

Bingo,你又答对了。

如果我们使用Executors.newFixedThreadPool()指定固定数量的线程池,那么系统至少不会无休止的创建线程了,比方说,我们指定了100个线程,那么就是通过Executors.newFixedThreadPool(100)的方式创建线程池。这么做就可以高枕无忧了吧?你是不是想松一口气然后去打一盘游戏呢?

先别着急,我们说,newFixedThreadPool()确实是控制了线程数量,但是别忘了,线程池里还有一个东西叫做任务队列!这玩意儿也是要占用内存空间的。Talk is cheap,Show me the Code。我们来看看newFixedThreadPool()的方法签名吧,如下图13所示。

图13

看到了吧,它用的队列是newLinkedBlockingQueue(),是个无界队列!这意味着,就算我们限制了线程数量是固定不能一直创建,当海量并发任务提交过来的时候,会因为线程数不够而将任务入队列,要看到这个队列是无界队列,所以这就是说,任务会一直入队。但是这在现实中是不可能发生的,为什么呢?很简单啊兄弟,内存是有限的啊,如果一直提交任务,内存肯定是会被耗尽的!

通过上文的分析,我们发现,线程池的核心其实就是工作线程,还有任务队列,用一张图来总结一下线程池的基本架构,如图14所示。

14

这张图是线程池ThreadPoolExecutor的较为完整的架构原理了,大家先自己仔细研究一下,关于这张图的详细分析,我们在接下来的章节将会细致的进行分析。

回到解决开头的案例的思路中来,让我们继续看看Executors框架还有没有其他合适的方法能够供我们高效的进行消息推送。到目前为止,就剩newSingleThreadExecutor()这个线程池创建方式了。它的方法签名如图15所示:

图15

可以看到,它本质上就是newFixedThreadPool(1),对吧,那其实它也会面临newFixedThreadPool()的问题,那就是无界队列可能会因为海量任务一直提交而不断入队消耗内存最后导致内存爆满,原理如图16所示。

16

    有一丝丝失望,怎么感觉每一个线程池都有问题啊。别灰心,其实技术本身就没有完美一说,都是权衡的结果,每个线程池创建方式都有其适用场景,只不过极限情况下,满足不了就会出问题。而作为软件开发者,我们有义务去深入学习原理,去避免出现问题。

    那有没有一种方式,能够让我们灵活的定制线程池,去适配自己的业务场景呢?答案是显而易见的,当然有。之后的文章中,我们会一起深入学习线程池的工作原理并实现自定义的线程池定制,达到灵活运用线程池满足实战条件的业务场景。

 



浏览 148
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报