ThreadPoolExecutor是Executor框架最核心的类,也是线程池的实现类,有以下4个组件构成。
- corePool:核心线程池大小
- maximumPool:最大线程池大小
- BlockingQueue:用来暂时保存任务的工作队列
- RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者已经饱和(达到最大线程池大小并且工作队列已满),execute()方法将调用Handler(拒绝策略)
通过工具类Executors可以创建三种类型的ThreadPoolExecutor
- FixedThreadPool
- SingleThreadExecutors
- CacheThreadPool
FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
可以看到FixedThreadPool的其实是通过创建ThreadPoolExecutor来实现的
1 | new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) |
当线程池的线程数大于corePoolSize的时候,keepAliveTime为多余的空闲线程等待任务的最长时间,超过时间后多余的线程被终止。FixedThreadPool的创建中把keepAliveTime为多余的空闲线程等待任务的最长时间设置为0L表示多余的空闲线程立即被终止。
FixedThreadPool执行任务的顺序:
- 如果当前运行的线程数少于corePoolSize,则创建新的线程来执行任务;
- 当前运行的线程数量等于corePoolSize,新的任务会被放入LinkedBlockingQueue中;
- 当前线程任务执行完之后,会循环反复从LinkedBlockingQueue中获取任务执行;
FixedThreadPool使用无界队列LinkedBlockingQueue带来的影响:
- 当线程池中数量达到corePoolSize后,新的任务会在队列中等待;
- 使用无界队列时maximumPoolSize参数和keepAliveTime参数无效;
- 运行中的FixedThreadPool不会拒绝任务;
SingleThreadExecutor
SingleThreadExecutor是使用单个worker线程的Executor,下面是SingleThreadExecutor的源码实现
1 | public static ExecutorService newSingleThreadExecutor() { |
SingleThreadExecutor的corePoolSize和maximumPoolSize设置为1,其他参数和newFixedThreadPool一样。
SingleThreadExecutor的execute()方法
- 如果当前运行线程数少于corePollSize(即线程池中无运行的线程),则创建一个新的线程执行任务。
- 在线程池完成预热之后(当前线程中有一个运行线程)将任务加入LinkedBlockingQueue。
- 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来完成。
CacheThreadPool
CacheThreadPool是一个会根据需要创建新线程的线程池。下面是创建CacheThreadPool的源码。
1 | public static ExecutorService newCachedThreadPool() { |
CacheThreadPool的corePoolSize被设置为0,即corePool为空,maximumPoolSize设置为Integer.MAX_VALUE,表示maximumPool是无界的。
CacheThreadPool使用的是没有容量的SynchronousQueue作为线程池的工资队列,但是maximumPool是无界的,这意味着当主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CacheThreadPool会不断创建新的线程,极端情况下,CacheThreadPool会耗尽CPU和内存资源。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,主要用来在给定的延迟之后执行任务或者定期执行任务。
1 | public ScheduledThreadPoolExecutor(int corePoolSize) { |
从ScheduledThreadPoolExecutor的几个构造函数可以看出,ScheduledThreadPoolExecutor中使用的队列是DelayedWorkQueue,由于DelayedWorkQueue是无界队列,所以maximumPoolSize参数设置Integer.MAX_VALUE。
拒绝使用Executors创建线程池
之所以会出现这样的规范,是因为jdk已经封装好的线程池存在潜在风险:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE ,会堆积大量请求OOMCachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量线程OOM
拒绝策略
JDK自带的拒绝策略都实现了RejectedExecutionHandler 接口
CallerRunsPolicy(调用者运行策略)
1 | public static class CallerRunsPolicy implements RejectedExecutionHandler { |
当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
AbortPolicy(中止策略)
1 | public static class AbortPolicy implements RejectedExecutionHandler { |
当触发拒绝策略时,直接抛出拒绝执行的异常,ThreadPoolExecutor中默认的策略就是AbortPolicy,但是Executors中的线程池队列都是无界的不会执行拒绝策略。
DiscardPolicy(丢弃策略)
1 | public static class DiscardPolicy implements RejectedExecutionHandler { |
不做任何操作,偷偷地丢掉任务。
DiscardOldestPolicy(弃老策略)
1 | public static class DiscardOldestPolicy implements RejectedExecutionHandler { |
如果线程池未关闭则弹出头部任务执行(丢弃的是队列中的头部任务,执行的是优先级较高的任务)。