九又四分之三站台

0%

线程池

ThreadPoolExecutor是Executor框架最核心的类,也是线程池的实现类,有以下4个组件构成。

  1. corePool:核心线程池大小
  2. maximumPool:最大线程池大小
  3. BlockingQueue:用来暂时保存任务的工作队列
  4. RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者已经饱和(达到最大线程池大小并且工作队列已满),execute()方法将调用Handler(拒绝策略)

通过工具类Executors可以创建三种类型的ThreadPoolExecutor

  1. FixedThreadPool
  2. SingleThreadExecutors
  3. CacheThreadPool

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池

1
2
3
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

可以看到FixedThreadPool的其实是通过创建ThreadPoolExecutor来实现的

1
new  ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

当线程池的线程数大于corePoolSize的时候,keepAliveTime为多余的空闲线程等待任务的最长时间,超过时间后多余的线程被终止。FixedThreadPool的创建中把keepAliveTime为多余的空闲线程等待任务的最长时间设置为0L表示多余的空闲线程立即被终止。

FixedThreadPool执行任务的顺序:

  1. 如果当前运行的线程数少于corePoolSize,则创建新的线程来执行任务;
  2. 当前运行的线程数量等于corePoolSize,新的任务会被放入LinkedBlockingQueue中;
  3. 当前线程任务执行完之后,会循环反复从LinkedBlockingQueue中获取任务执行;

FixedThreadPool使用无界队列LinkedBlockingQueue带来的影响:

  1. 当线程池中数量达到corePoolSize后,新的任务会在队列中等待;
  2. 使用无界队列时maximumPoolSize参数和keepAliveTime参数无效;
  3. 运行中的FixedThreadPool不会拒绝任务;

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor,下面是SingleThreadExecutor的源码实现

1
2
3
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor的corePoolSize和maximumPoolSize设置为1,其他参数和newFixedThreadPool一样。

SingleThreadExecutor的execute()方法

  1. 如果当前运行线程数少于corePollSize(即线程池中无运行的线程),则创建一个新的线程执行任务。
  2. 在线程池完成预热之后(当前线程中有一个运行线程)将任务加入LinkedBlockingQueue。
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来完成。

CacheThreadPool

CacheThreadPool是一个会根据需要创建新线程的线程池。下面是创建CacheThreadPool的源码。

1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

CacheThreadPool的corePoolSize被设置为0,即corePool为空,maximumPoolSize设置为Integer.MAX_VALUE,表示maximumPool是无界的。
CacheThreadPool使用的是没有容量的SynchronousQueue作为线程池的工资队列,但是maximumPool是无界的,这意味着当主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CacheThreadPool会不断创建新的线程,极端情况下,CacheThreadPool会耗尽CPU和内存资源。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,主要用来在给定的延迟之后执行任务或者定期执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

从ScheduledThreadPoolExecutor的几个构造函数可以看出,ScheduledThreadPoolExecutor中使用的队列是DelayedWorkQueue,由于DelayedWorkQueue是无界队列,所以maximumPoolSize参数设置Integer.MAX_VALUE。

拒绝使用Executors创建线程池

之所以会出现这样的规范,是因为jdk已经封装好的线程池存在潜在风险:

  • FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE ,会堆积大量请求OOM

  • CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量线程OOM

拒绝策略

JDK自带的拒绝策略都实现了RejectedExecutionHandler 接口

CallerRunsPolicy(调用者运行策略)

1
2
3
4
5
6
7
8
9
10
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。

AbortPolicy(中止策略)

1
2
3
4
5
6
7
8
9
10
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

当触发拒绝策略时,直接抛出拒绝执行的异常,ThreadPoolExecutor中默认的策略就是AbortPolicy,但是Executors中的线程池队列都是无界的不会执行拒绝策略。

DiscardPolicy(丢弃策略)

1
2
3
4
5
6
7
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

不做任何操作,偷偷地丢掉任务。

DiscardOldestPolicy(弃老策略)

1
2
3
4
5
6
7
8
9
10
11
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

如果线程池未关闭则弹出头部任务执行(丢弃的是队列中的头部任务,执行的是优先级较高的任务)。