ThreadPoolExecutor线程池最佳实践

线程池初始化示例:

private static final ThreadPoolExecutor pool; 
static {
  ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-d etail-pool-%d").build();
  pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new Linked BlockingQueue<>(512), hreadFactory, new ThreadPoolExecutor.AbortPolicy());
  pool.allowCoreThreadTimeOut(true);
}
  • threadFactory:给出带业务语义的线程命名;
  • corePoolSize:快速启动 4 个线程处理该业务;
  • maximumPoolSize:IO 密集型业务,当服务器是 4C8G 的,最大线程数设置为 4*2=8;
  • keepAliveTime:服务器资源紧张,让空闲的线程快速释放;
  • pool.allowCoreThreadTimeOut(true):为了在资源紧张的时候,可以让线程释放,释放资源;
  • workQueue:一个任务的执行时长在 100~300ms,业务高峰期 8 个线程,按照 10s 超时(已经很高了)。10s →8 个线程,可以处理 10 * 1000ms / 200ms * 8 = 400 个任务左右,往上再取一点,512 已经很多了;
  • handler:极端情况下,一些任务只能丢弃,保护服务端。

1. 对线程池名称

创建线程或线程池时请指定有意义的线程名称,方便出错时回溯,即 threadFactory 参数要构造好。

建议不同类别的业务用不同的线程池。

2. 工作队列的使用

workQueue 不要使用无界队列,尽量使用有界队列。

当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉;而且会导致大量新任务在队列中堆积,最终导致OOM。

类型

代表

特点

无界队列

LinkedBlockingQueue

无界

有界队列

ArrayBlockingQueue、PriorityBlockingQueue

FIFO、优先级

同步移交队列

SynchronousQueue

线程之间移交的机制; 只有在使用无界线程池或者有饱和策略时才建议使用该队列

3. 避免用Executors 的创建线程池

使用 ThreadPoolExecutor 的构造函数声明线程池,避免使用 Executors 类的 newFixedThreadPool 和 newCachedThreadPool。

Executors常用方法:

类型

特点

newCachedThreadPool()

创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到线程池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器;

newFiexedThreadPool(int nThreads)

创建固定数目线程的线程池,线程数小于nThreads时,提交新的任务会创建新的线程,当线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行,FiexedThreadPool适用于负载略重但任务不是特别多的场景,为了合理利用资源,需要限制线程数量;

newSingleThreadExecutor()

创建一个单线程化的 Executor,SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行;

newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类。ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作;

newSingleThreadScheduledExecutor()

创建一个corePoolSize为1的ScheduledThreadPoolExecutor;

newWorkStealingPool(int parallelism)

返回一个ForkJoinPool实例,ForkJoinPool 主要用于实现“分而治之”的算法,适合于计算密集型的任务;

Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端

  1. FiexedThreadPool和SingleThreadPool任务队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
  2. CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;

使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。

4. 避免使用局部线程池

使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。

5. 合理设置线程池参数

业界的一些线程池参数配置方案:

在工程实践中,通常使用下述公式来计算核心线程数:

n = (w+c)/c*n*u = (w/c+1)*n*u

其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。

上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。

针对不同的任务类型的建议:

类型

使用建议

CPU 密集型任务(N+1)

这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶 发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务(2N)

这种任务应用起来,系统会用大部分的时间来处理 I /O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此,在 I/O 密集型任务的应用中,可以多配置一些线程, 具体的计算方法是:2N。

6. 增加异常处理

为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:

  1. 在任务代码处增加try...catch异常处理
  2. 如果使用的Future方式,则可通过Future对象的get方法接收抛出的异常
  3. 为工作线程设置setUncaughtExceptionHandler,在uncaughtException方法中处理异常

虽然使用线程池有多种异常处理的方式,但在任务代码中,使用 try-catch 最通用,也能给不同任务的异常处理做精细化。

7. 关闭线程池

public void destroy() {
  try {
    poolExecutor.shutdown();
    if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {
      poolExecutor.shutdownNow();
    }
  } catch (InterruptedException e) {
    //如果当前线程被中断,重新取消所有任务
    pool.shutdownNow();
    //保持中断状态
    Thread.currentThread().interrupt();
  }
}

为了实现优雅停机的目标,应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后,还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常需要根据业务场景预估一个合理的超时时间,然后调用该方法。

如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。

8. 使用allowsCoreThreadTimeOut参数

如果是资源紧张的应用,使用 allowsCoreThreadTimeOut 可以提高资源利用率。

在 JDK1.6 之前,线程池会尽量保持 corePoolSize 个核心线程,即使这些线程闲置 了很长时间。这一点曾被开发者诟病,所以从 JDK1.6 开始,提供了方法 allowsCoreThr eadTimeOut,如果传参为 true,则允许闲置的核心线程被终止。

9. 考虑初始化所有核心线程

对于请求量比较大的服务,可以在创建线程池的时候直接初始化所有核心线程,减少创建线程池带来的服务毛刺。

10. 监控线程池

除了参数动态化之外,为了更好地使用线程池,需要对线程池的运行状况有感知,比如当前线程池的负载、分配的资源、任务的执行情况、任务类型(长任务、短任务)。

美团的动态线程池的监控主要包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。

(1) 运行时状态实时查看

对于资源紧张的应用,如果担心线程池资源使用不当,可以利用 ThreadPoolExecutor 的 API 实现简单的监控,然后进行分析和优化。

用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数。

动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。

(2) 负载监控和告警

线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。

  • 事前,美团线程池定义了“活跃度”这个概念,让用户在发生Reject异常之前能够感知线程池负载问题(计算公式:线程池活跃度 = activeCount/maximumPoolSize),当活跃线程数趋向于maximumPoolSize时,表示线程负载趋高。
  • 事中,从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。

(3) 任务级精细化监控

在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。

【美团动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务。

主要可以监控以下信息:线程池名称、核心线程数、最大线程数、活跃线程数、队列类型、队列容量、队列使用情况、已完成任务数、拒绝任务数等】

参考:

https://mp.weixin.qq.com/s/BdVqvm2wLNv05vMTieevMg

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

...

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
ThreadPoolExecutor线程池最佳实践
创建线程或线程池时请指定有意义的线程名称,方便出错时回溯,即 threadFactory 参数要构造好。
<<上一篇
下一篇>>