线程池知识总结
参考文章:
一、为什么要使用线程池
线程池提供了一种限制和管理资源(线程、任务)的方式。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等待创建线程就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无线的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
二、线程池的基本概念
线程池主要由Executor
框架实现,其中最核心的类是ThreadPoolExecutor
。ThreadPoolExecutor
是一个实现了ExecutorService
接口的类,它提供了丰富的功能来管理线程池的生命周期和任务调度。
三、线程池的工作原理
当用户向线程池提交一个任务时,线程池会先将任务放入一个阻塞队列(workQueue)中。然后,线程池中的工作线程(workerSet)会从这个队列中不断获取任务并执行。如果工作线程数量小于核心线程数(coreSize),则会创建新的线程来处理任务;如果工作线程数量大于核心线程数,则任务会被放入队列中等待。
四、Java提供的线程池实现
Java提供了多种线程池实现方式,其中最常用的是ThreadPoolExecutor
类和Executors
类提供的静态工厂方法。以下是几种常见的线程池实现:
后我们来看看如何在程序中实现线程池。事实上,大多数程序都会有一个基本的线程池实现。
- 在 Spring 中,我们可以利用 ThreadPoolTaskExecutor 配合 @Async 注解来实现线程池(不太建议)。
ps.虽然 Spring 框架提供了线程池的实现,但并不特别推荐使用。因为 Spring 毕竟是一个框架,它进行了一定程度的封装,可能隐藏了一些细节。更推荐大家直接使用 Java 并发包中的线程池。
在 Java 中,可以使用JUC并发编程包中的 ThreadPoolExecutor->自定义线程池,来实现非常灵活地自定义线程池。
newFixedThreadPool(int nThreads):创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
FixedThreadPool - 线程池大小固定,任务队列无界
下面是 Executors 类 newFixedThreadPool 方法的源码:
1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}可以看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的情况,所以KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,由于是无界队列所以容纳的任务数量没有上限。
因此,FixedThreadPool的行为如下:
- 从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads
- 线程池线程数达到nThreads以后,新的任务将被放入队列
FixedThreadPool的优点是能够保证所有的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题。
newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
SynchronousQueue是一个只有1个元素的队列,入队的任务需要一直等待直到队列中的元素被移出。核心线程数是0,意味着所有任务会先入队列;最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。
newSingleThreadExecutor():创建一个单线程的线程池,所有任务都按顺序执行。
这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此以外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。
SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。
三种ExecutorService特性总结
类型 | 核心线程数 | 最大线程数 | Keep Alive 时间 | 任务队列 | 任务处理策略 |
---|---|---|---|---|---|
FixedThreadPool | 固定大小 | 固定大小(与核心线程数相同) | 0 | LinkedBlockingQueue | 线程池大小固定,没有可用线程的时候任务会放入队列等待,队列长度无限制 |
SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 与 FixedThreadPool 相同,区别在于线程池的大小为1,适用于业务逻辑上只允许1个线程进行处理的场景 |
CachedThreadPool | 0 | Integer.MAX_VALUE | 1分钟 | SynchronousQueue | 线程池的数量无限大,新任务会直接分配或者创建一个线程进行执行 |
五、线程池的几个主要参数的作用
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
:核心线程数量。这些线程就好比是公司的正式员工,他们在正常情况下都是随时待命处理任务的。如何去设定这个参数呢?比如,如果我们的 AI 服务只允许四个任务同时进行,那么我们的核心线程数应该就被设置为四。maximumPoolSize
:最大线程数量,也就是线程池的容量。在极限情况下我们的系统或线程池能有多少个线程在工作。就算任务再多,你最多也只能雇佣这么多的人,因为你需要考虑成本和资源的问题。假设 AI 服务最多只允许四个任务同时执行,那么最大线程数应当设置为四。keepAliveTime
:线程空闲等待时间,也和工作线程的生命周期有关。这个参数决定了当任务少的时候,临时雇佣的线程会等待多久才会被剔除。这个参数的设定是为了释放无用的线程资源。你可以理解为,多久之后会“解雇”没有任务做的临时工。unit
:线程空闲时间的单位,最终会转为成纳秒。workQueue
:等待队列或者叫任务队列。也就是任务队列。这个队列存储所有等待执行的任务。也可以叫它阻塞队列,因为线程需要按顺序从队列中取出任务来执行。这个队列的长度一定要设定,因为无限长度的队列会消耗大量的系统资源。ThreadFactory
:创建线程的工厂,默认使用Executors.defaultThreadFactory()
作为线程池工厂实例。它负责控制每个线程的生成,就像一个管理员,负责招聘、管理员工,比如设定员工的名字、工资,或者其他属性。handler
:线程池的执行执行处理器,更多的时候成为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包含核心线程和非核心线程)并且继续提交任务。提供了4种拒绝策略实现:AbortPolicy
:直接拒绝策略,也就是不会执行任务,直接抛出RjectedExecutionExcetion
错误,默认的拒绝策略。DiscardPolicy
:抛弃策略,也就是直接忽略提交的任务。DiscardOldestPolicy
:抛弃最老任务策略,也就是通过poll()
方法取出任务队列头的任务抛弃,然后执行当前提交的任务。CallerRunsPolicy
:调用者执行策略,也就是当前调用Executor#execute()
的线程直接调用任务Runnable#run()
,一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化成同步调用。
线程池参数总结
回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里? 现有条件:比如 AI 生成能力的并发是只允许 4 个任务同时去执行,AI 能力允许 20 个任务排队。
- corePoolSize (核心线程数 => 正式员工数):正常情况下,我们的系统可以同时工作的线程数(随时就绪的状态)
- maximumPoolSize (最大线程数 => 哪怕任务再多,你也最多招这些人):极限情况下,线程池最多可以拥有多少个线程?
- keepAliveTime (空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源。
- workQueue (工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置,不要说队列长度无限,因为也会占用资源)
- threadFactory (线程工厂):控制每个线程的生成、线程的属性(比如线程名)
- RejectedExecutionHandler (拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略。
六、线程池的工作流程
刚开始,没有任何的线程和任务:
当有新任务进来,发现当前员工数量还未达到设定的正式员工数(corePoolSize = 2),则会直接增聘一名新员工来处理这个任务:
又来一个新任务,发现当前员工数量还未达到设定的正式员工数(corePoolSize = 2),则会再次增聘一名新员工来处理这个任务:
又来了一个新任务,但是正式员工数已经达到上限(当前线程数 = corePoolSize = 2),这个新任务将被放到等待队列中(最大长度 workQueue.size 是 2) ,而不是立即增聘新员工:
又来了一个新任务,但是我们的任务队列已经满了(当前线程数 > corePoolSize = 2,已有任务数 = 最大长度 workQueue.size = 2),我们将增设新线程(最大线程数 maximumPoolSize = 4)来处理任务,而不是选择丢弃这个任务:
当达到七个任务时,由于我们的任务队列已经满了、临时工也招满了(当前线程数 = maximumPoolSize = 4,已有任务数 = 最大长度 workQueue.size = 2),此时我们会采用 RejectedExecutionHandler(拒绝策略)来处理多余的任务:
如果当前线程数超过 corePoolSize (正式员工数),并且这些额外的线程没有新的任务可执行,那么在 keepAliveTime 时间达到后,这些额外的线程将会被释放。
总结一下流程:
七、如何通过Guava实现重试机制
下面是一个示例,展示如何在你的线程池配置中加入基于Guava的重试机制:
- 创建线程池:使用你提供的配置创建线程池。
- 包装线程池:将线程池包装为
ListeningExecutorService
。 - 实现重试逻辑:创建一个任务封装,用于捕获异常并在失败时重试。
首先,确保你的项目中已经添加了Guava依赖:
1 | <dependency> |
然后,你可以按照以下方式实现重试机制:
1 | import com.google.common.util.concurrent.*; |
在这个示例中,我们首先创建了一个线程池,并将其包装为ListeningExecutorService
。然后,我们定义了一个executeWithRetry
方法,它接受一个ListeningExecutorService
、一个任务、最大重试次数和重试延迟时间。在这个方法中,我们使用了一个循环来执行任务,并在捕获到异常时进行重试,直到任务成功执行或达到最大重试次数。
请注意,这个示例中的重试逻辑是同步的,即在任务失败后,它会在当前线程中等待指定的延迟时间后再次尝试执行任务。如果你需要异步重试,你可能需要使用ScheduledExecutorService
来安排重试任务。
八、ThreadPoolExecutor与Executors类的具体区别和适用场景是什么?
ThreadPoolExecutor与Executors类在Java中都是用于创建和管理线程池的工具,但它们在具体实现和适用场景上有一些区别。
- 具体区别:
- Executors类:提供了多种静态工厂方法来创建不同类型的线程池。它简化了线程管理,减少了代码量,并且通过复用线程,能够有效地处理大量短期异步任务。Executors类适用于需要快速创建线程池的场景,例如Web服务器处理大量并发请求或异步日志记录。
- ThreadPoolExecutor:是一个更高级的接口,可以理解为一个线程池和一个任务队列的组合。它允许用户自定义线程池的大小、核心线程数、最大线程数等参数。使用ThreadPoolExecutor可以创建自定义线程池,并且可以通过Future接口获取异步计算的结果。
- 适用场景:
- Executors类:适用于执行大量短期异步任务,尤其是任务执行时间不确定的情况。例如,Web服务器处理大量并发请求,或者异步日志记录。
- ThreadPoolExecutor:适用于需要精细控制线程池参数的场景。例如,当需要根据实际情况动态调整线程池大小时,可以选择ThreadPoolExecutor进行自定义配置。
总结来说,Executors类提供了一种快速且简便的方式来创建线程池,适合于那些对线程池要求不高的场景。
九、Java线程池在处理高并发场景下的最佳实践案例有哪些?
在处理高并发场景下,Java线程池的最佳实践案例包括以下几个方面:
- 选择合适的线程池类型:
newFixedThreadPool
:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。这种类型的线程池适用于需要固定线程数的任务。newSingleThreadExecutor
:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。这种类型的线程池适用于对任务执行顺序有严格要求的场景。
- 合理配置线程池参数:
- 核心线程数(corePoolSize):设置线程池的核心线程数,即线程池中始终存在的线程数。
- 最大线程数(maxPoolSize):设置线程池的最大线程数,当任务数量超过核心线程数时,会尝试从工作队列中获取更多的线程。
- 工作队列(workQueue):用于存储等待执行的任务,常见的实现有
LinkedBlockingQueue
等。 - 阻塞队列(blockQueue):用于存储待处理的任务,当所有核心线程都忙时,新任务将被放入阻塞队列中等待。
- 动态调整线程池规模:
- 使用动态化线程池解决方案,根据系统负载和资源情况动态调整线程池的大小。例如,可以通过监控系统资源使用情况(如CPU、内存等),动态增加或减少线程池中的线程数。
- 优化锁策略和避免死锁:
- 尽量减少共享状态的使用,以降低死锁的风险。
- 使用高级并发API,如
java.util.concurrent.locks ReentrantLock
,并结合条件变量(condition
)进行资源竞争的管理。
- 监控和调优:
- 定期监控线程池的性能指标,如任务提交率、完成率、线程池中的线程数等。
- 根据监控结果进行调优,比如调整核心线程数、最大线程数、工作队列大小等,以确保系统在高并发情况下能够高效运行。