参考文章:

一、为什么要使用线程池

线程池提供了一种限制和管理资源(线程、任务)的方式。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待创建线程就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无线的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

二、线程池的基本概念

线程池主要由Executor框架实现,其中最核心的类是ThreadPoolExecutorThreadPoolExecutor是一个实现了ExecutorService接口的类,它提供了丰富的功能来管理线程池的生命周期和任务调度。

三、线程池的工作原理

pk4tmtJ.png

当用户向线程池提交一个任务时,线程池会先将任务放入一个阻塞队列(workQueue)中。然后,线程池中的工作线程(workerSet)会从这个队列中不断获取任务并执行。如果工作线程数量小于核心线程数(coreSize),则会创建新的线程来处理任务;如果工作线程数量大于核心线程数,则任务会被放入队列中等待。


四、Java提供的线程池实现

Java提供了多种线程池实现方式,其中最常用的是ThreadPoolExecutor类和Executors类提供的静态工厂方法。以下是几种常见的线程池实现:

后我们来看看如何在程序中实现线程池。事实上,大多数程序都会有一个基本的线程池实现。

  • 在 Spring 中,我们可以利用 ThreadPoolTaskExecutor 配合 @Async 注解来实现线程池(不太建议)。

ps.虽然 Spring 框架提供了线程池的实现,但并不特别推荐使用。因为 Spring 毕竟是一个框架,它进行了一定程度的封装,可能隐藏了一些细节。更推荐大家直接使用 Java 并发包中的线程池。

  • 在 Java 中,可以使用JUC并发编程包中的 ThreadPoolExecutor->自定义线程池,来实现非常灵活地自定义线程池。

  • newFixedThreadPool(int nThreads):创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    FixedThreadPool - 线程池大小固定,任务队列无界

    下面是 Executors 类 newFixedThreadPool 方法的源码:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    可以看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的情况,所以KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,由于是无界队列所以容纳的任务数量没有上限。

    因此,FixedThreadPool的行为如下:

    1. 从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads
    2. 线程池线程数达到nThreads以后,新的任务将被放入队列

    FixedThreadPool的优点是能够保证所有的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题

  • newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    SynchronousQueue是一个只有1个元素的队列,入队的任务需要一直等待直到队列中的元素被移出。核心线程数是0,意味着所有任务会先入队列;最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

  • newSingleThreadExecutor():创建一个单线程的线程池,所有任务都按顺序执行。

    这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此以外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。

    SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题


三种ExecutorService特性总结

类型 核心线程数 最大线程数 Keep Alive 时间 任务队列 任务处理策略
FixedThreadPool 固定大小 固定大小(与核心线程数相同) 0 LinkedBlockingQueue 线程池大小固定,没有可用线程的时候任务会放入队列等待,队列长度无限制
SingleThreadExecutor 1 1 0 LinkedBlockingQueue 与 FixedThreadPool 相同,区别在于线程池的大小为1,适用于业务逻辑上只允许1个线程进行处理的场景
CachedThreadPool 0 Integer.MAX_VALUE 1分钟 SynchronousQueue 线程池的数量无限大,新任务会直接分配或者创建一个线程进行执行

五、线程池的几个主要参数的作用

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize核心线程数量。这些线程就好比是公司的正式员工,他们在正常情况下都是随时待命处理任务的。如何去设定这个参数呢?比如,如果我们的 AI 服务只允许四个任务同时进行,那么我们的核心线程数应该就被设置为四。
  • maximumPoolSize最大线程数量,也就是线程池的容量。在极限情况下我们的系统或线程池能有多少个线程在工作。就算任务再多,你最多也只能雇佣这么多的人,因为你需要考虑成本和资源的问题。假设 AI 服务最多只允许四个任务同时执行,那么最大线程数应当设置为四。
  • keepAliveTime线程空闲等待时间,也和工作线程的生命周期有关。这个参数决定了当任务少的时候,临时雇佣的线程会等待多久才会被剔除。这个参数的设定是为了释放无用的线程资源。你可以理解为,多久之后会“解雇”没有任务做的临时工。
  • unit线程空闲时间的单位,最终会转为成纳秒。
  • workQueue:等待队列或者叫任务队列。也就是任务队列。这个队列存储所有等待执行的任务。也可以叫它阻塞队列,因为线程需要按顺序从队列中取出任务来执行。这个队列的长度一定要设定,因为无限长度的队列会消耗大量的系统资源。
  • ThreadFactory:创建线程的工厂,默认使用Executors.defaultThreadFactory()作为线程池工厂实例。它负责控制每个线程的生成,就像一个管理员,负责招聘、管理员工,比如设定员工的名字、工资,或者其他属性。
  • handler:线程池的执行执行处理器,更多的时候成为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包含核心线程和非核心线程)并且继续提交任务。提供了4种拒绝策略实现:
    • AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RjectedExecutionExcetion错误,默认的拒绝策略。
    • DiscardPolicy:抛弃策略,也就是直接忽略提交的任务。
    • DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列头的任务抛弃,然后执行当前提交的任务。
    • CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run()一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化成同步调用。

线程池参数总结

回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里? 现有条件:比如 AI 生成能力的并发是只允许 4 个任务同时去执行,AI 能力允许 20 个任务排队。

  • corePoolSize (核心线程数 => 正式员工数):正常情况下,我们的系统可以同时工作的线程数(随时就绪的状态)
  • maximumPoolSize (最大线程数 => 哪怕任务再多,你也最多招这些人):极限情况下,线程池最多可以拥有多少个线程?
  • keepAliveTime (空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源。
  • workQueue (工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置,不要说队列长度无限,因为也会占用资源)
  • threadFactory (线程工厂):控制每个线程的生成、线程的属性(比如线程名)
  • RejectedExecutionHandler (拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略。

六、线程池的工作流程

刚开始,没有任何的线程和任务:

pk4twct.webp

当有新任务进来,发现当前员工数量还未达到设定的正式员工数(corePoolSize = 2),则会直接增聘一名新员工来处理这个任务:

pk4t0jP.webp

又来一个新任务,发现当前员工数量还未达到设定的正式员工数(corePoolSize = 2),则会再次增聘一名新员工来处理这个任务:

pk4tDnf.webp

又来了一个新任务,但是正式员工数已经达到上限(当前线程数 = corePoolSize = 2),这个新任务将被放到等待队列中(最大长度 workQueue.size 是 2) ,而不是立即增聘新员工:

pk4trB8.webp

又来了一个新任务,但是我们的任务队列已经满了(当前线程数 > corePoolSize = 2,已有任务数 = 最大长度 workQueue.size = 2),我们将增设新线程(最大线程数 maximumPoolSize = 4)来处理任务,而不是选择丢弃这个任务:

pk4tsHS.webp

当达到七个任务时,由于我们的任务队列已经满了、临时工也招满了(当前线程数 = maximumPoolSize = 4,已有任务数 = 最大长度 workQueue.size = 2),此时我们会采用 RejectedExecutionHandler(拒绝策略)来处理多余的任务:

pk4tIBT.webp

如果当前线程数超过 corePoolSize (正式员工数),并且这些额外的线程没有新的任务可执行,那么在 keepAliveTime 时间达到后,这些额外的线程将会被释放。

总结一下流程:

pk4toHU.webp


七、如何通过Guava实现重试机制

下面是一个示例,展示如何在你的线程池配置中加入基于Guava的重试机制:

  1. 创建线程池:使用你提供的配置创建线程池。
  2. 包装线程池:将线程池包装为ListeningExecutorService
  3. 实现重试逻辑:创建一个任务封装,用于捕获异常并在失败时重试。

首先,确保你的项目中已经添加了Guava依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version> <!-- 请使用最新的版本号 -->
</dependency>

然后,你可以按照以下方式实现重试机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import com.google.common.util.concurrent.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

@Component
public class ThreadPoolConfig {

@Value("${threadpool.corePoolSize}")
private int corePoolSize;

@Value("${threadpool.maximumPoolSize}")
private int maximumPoolSize;

@Value("${threadpool.keepAliveTime}")
private long keepAliveTime;

@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
private int count = 1;

@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("线程:" + count);
count++;
return thread;
}
};

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),
threadFactory
);
return threadPoolExecutor;
}

@Bean
public ListeningExecutorService listeningExecutorService(ThreadPoolExecutor threadPoolExecutor) {
return MoreExecutors.listeningDecorator(threadPoolExecutor);
}

// 使用Guava进行重试的示例方法
public void executeWithRetry(ListeningExecutorService service, Runnable task, int maxRetries, long delayMillis) {
service.submit(() -> {
boolean success = false;
int attempts = 0;
while (!success && attempts < maxRetries) {
try {
task.run();
success = true;
} catch (Exception e) {
attempts++;
System.out.println("任务执行失败,正在进行第 " + attempts + " 次重试");
if (attempts >= maxRetries) {
System.out.println("达到最大重试次数,任务执行失败");
break;
}
try {
Thread.sleep(delayMillis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
});
}
}

在这个示例中,我们首先创建了一个线程池,并将其包装为ListeningExecutorService。然后,我们定义了一个executeWithRetry方法,它接受一个ListeningExecutorService、一个任务、最大重试次数和重试延迟时间。在这个方法中,我们使用了一个循环来执行任务,并在捕获到异常时进行重试,直到任务成功执行或达到最大重试次数。

请注意,这个示例中的重试逻辑是同步的,即在任务失败后,它会在当前线程中等待指定的延迟时间后再次尝试执行任务。如果你需要异步重试,你可能需要使用ScheduledExecutorService来安排重试任务。


八、ThreadPoolExecutor与Executors类的具体区别和适用场景是什么?

ThreadPoolExecutor与Executors类在Java中都是用于创建和管理线程池的工具,但它们在具体实现和适用场景上有一些区别。

  1. 具体区别
    • Executors类:提供了多种静态工厂方法来创建不同类型的线程池。它简化了线程管理,减少了代码量,并且通过复用线程,能够有效地处理大量短期异步任务。Executors类适用于需要快速创建线程池的场景,例如Web服务器处理大量并发请求或异步日志记录。
    • ThreadPoolExecutor:是一个更高级的接口,可以理解为一个线程池和一个任务队列的组合。它允许用户自定义线程池的大小、核心线程数、最大线程数等参数。使用ThreadPoolExecutor可以创建自定义线程池,并且可以通过Future接口获取异步计算的结果。
  2. 适用场景
    • Executors类:适用于执行大量短期异步任务,尤其是任务执行时间不确定的情况。例如,Web服务器处理大量并发请求,或者异步日志记录。
    • ThreadPoolExecutor:适用于需要精细控制线程池参数的场景。例如,当需要根据实际情况动态调整线程池大小时,可以选择ThreadPoolExecutor进行自定义配置。

总结来说,Executors类提供了一种快速且简便的方式来创建线程池,适合于那些对线程池要求不高的场景。

九、Java线程池在处理高并发场景下的最佳实践案例有哪些?

在处理高并发场景下,Java线程池的最佳实践案例包括以下几个方面:

  1. 选择合适的线程池类型
    • newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。这种类型的线程池适用于需要固定线程数的任务。
    • newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。这种类型的线程池适用于对任务执行顺序有严格要求的场景。
  2. 合理配置线程池参数
    • 核心线程数(corePoolSize):设置线程池的核心线程数,即线程池中始终存在的线程数。
    • 最大线程数(maxPoolSize):设置线程池的最大线程数,当任务数量超过核心线程数时,会尝试从工作队列中获取更多的线程。
    • 工作队列(workQueue):用于存储等待执行的任务,常见的实现有LinkedBlockingQueue等。
    • 阻塞队列(blockQueue):用于存储待处理的任务,当所有核心线程都忙时,新任务将被放入阻塞队列中等待。
  3. 动态调整线程池规模
    • 使用动态化线程池解决方案,根据系统负载和资源情况动态调整线程池的大小。例如,可以通过监控系统资源使用情况(如CPU、内存等),动态增加或减少线程池中的线程数。
  4. 优化锁策略和避免死锁
    • 尽量减少共享状态的使用,以降低死锁的风险。
    • 使用高级并发API,如java.util.concurrent.locks ReentrantLock,并结合条件变量(condition)进行资源竞争的管理。
  5. 监控和调优
    • 定期监控线程池的性能指标,如任务提交率、完成率、线程池中的线程数等。
    • 根据监控结果进行调优,比如调整核心线程数、最大线程数、工作队列大小等,以确保系统在高并发情况下能够高效运行。

文章结束!