把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  基于线程池:ThreadPoolExecutor源码分析(一) 这篇文章,我们对ThreadPoolExecutor的底层实现原理有了一个质的认识,本文将继续探讨,即ThreadPoolExecutor的构造器参数优化及线程池提供的扩展.

构造器参数优化 --- 建议

corePoolSize、maximumPoolSize、workQueue

线程池线程数的计算主要结合以下三种类型对应的计算公式 :

  • CPU密集型 :
    -线程数配置公式 : 【(CPU总核数)】 或者 【(CPU总核数+1)】
    -为什么+1 : 即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费.
  • I/O密集型 : 类似 网络I/O、数据库、磁盘I/O
    -线程数配置公式 : 【(2 * CPU总核数)】
  • IO密集型和CPU密集型交叉运行 :
    -线程数配置公式 : 【CPU核数 / (1 – 阻塞系数)】(阻塞系数为0.8~0.9之间)

【参考链接 : 根据CPU核心数确定线程池并发线程数并发下线程池的最佳数量计算如何正确使用线程池I/O操作为什么不需要cpu

  • corePoolSize和maximumPoolSize这两个参数的设置比较复杂,需要结合上面三种类型的计算公式,而且并不是绝对的,还需要考虑实际生产环境的内存、网络、磁盘I/O读写能力等,最终还是需要结合公式 + 压力测试来设置这两个参数.
  • workQueue : 阻塞队列(生产者-消费者队列),有四种类型可供不同场景选择
    • 1.有界队列 : 一般使用 ArrayBlockingQueue , 是一个遵循FIFO原则的数组队列,值得注意的是,该队列在执行 添加(入队) 或者 删除(出队) 的时候,分别使用了putIndex 和 takeIndex 标记记录上一个线程的操作索引位置,如果当前操作的索引位置等于数组长度 - 1 , 那么将标记重置为0,这样就不用频繁的挪动数组减少不必要的性能消耗.
    • 2.无界队列 : 一般使用 LinkedBlockingQueue 的无参构造器创建无界队列 , 是一个遵循FIFO原则的链表队列,使用该参数则会导致maximumPoolSize参数无效,并且还可能会出现OOM风险,所以该参数是值得注意的.
    • 3.同步队列 : 我叫它无容量队列,使用的是SynchronousQueue,该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直被阻塞.
    • 4.优先队列 : PriorityBlockingQueue , 一个支持优先级的无界阻塞队列

  同步队列和优先队列 一般使用于一些特殊场景,而无界队列我个人是不推荐使用的,而有界队列是常用的阻塞队列,一般是 ArrayBlockingQueue , 或者 通过LinkedBlockingQueue的有参构造器也可以使用有界队列.

  结合上面三个参数的分析使用,我们现在来谈谈线程池涉及到的两个核心性能问题就是 : 系统资源利用率 和 吞吐量 ,这两者是鱼与熊掌不能兼得,只能做到一种权衡 :

  • 1.提高系统资源利用率,则会减少吞吐量 : 减少内存资源的使用、CPU利用率也会降低、吞吐量也会降低.
    • 减少corePoolSize、maximumPoolSize线程数,提高有界阻塞队列的容量.
  • 2.降低系统资源利用率,可能也会减少吞吐量 : 过度增加内存资源的使用导致系统资源利用率降低、从而CPU利用率也会降低、吞吐量也会降低.
    • 过度增加corePoolSize、maximumPoolSize线程数,可能导致频繁的上下文切换这种不必要的性能消耗,进而导致CPU利用率也会降低、吞吐量也会降低.
  • 3.提高吞吐量 :
    • 线程数太小或者太大,可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量

keepAliveTime

  • keepAliveTime : 该值在阻塞队列没有任务的情况下 且 当前创建的线程数大于核心线程数才有效;触发该条件后,核心线程以外的线程在keepAliveTime时间内没有获取到任务则需要被回收,减少核心线程以外的空闲线程占有空间.
  • 该值如果设置为0,核心线程以外的线程将会直接被回收.
  • 该值如果设置得越大,那么核心线程以外的线程被回收的几率就越小,反之则越大.

threadFactory

  • threadFactory : 自定义线程的名字,daemon,优先级等,易于我们对日志信息的查询错误信息排错、统一管理、维护.

handler

  • handler : 拒绝策略,当前线程池的阻塞队列已经饱和状态 并且 创建的工作线程以达到最大线程数,那么需要执行拒绝策略,Java提供了四种拒绝策略 :
    • AbortPolicy : 默认的拒绝策略,该策略简单粗暴,直接抛出异常.
    • CallerRunsPolicy : 当前线程池处于运行状态(RUNNING)的情况下,直接使用调用者所在的线程来执行任务.
    • DiscardOldestPolicy : 删除队头(存放最久的任务)的任务,然后再执行任务尝试入队(阻塞队列);也就是说牺牲最久的任务保留最新的任务.
    • DiscardPolicy : 什么都没做,即该任务直接被抛弃了

小结

  • 核心的参数优化还是对于corePoolSize、maximumPoolSize、workQueue的调整,具体是对于资源利用率、吞吐量两者的权衡.

其他优化 --- 建议

  • 基于ThreadPoolExecutor的实现原理,我们可以知道刚开始阶段是调用execute()慢慢的通过热身去初始化线程;线程池提供了prestartAllCoreThreads()可以做到一次性初始化,直接跳过热身阶段,即在调用execute()之前调用 :

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

  • 全局变量 allowCoreThreadTimeOut : 该变量默认为false,即当阻塞队列为null时,不允许回收核心线程;那么如果设置为true,即阻塞队列为null时,不止回收核心线程外的线程,还要回收核心线程,通过调用 allowCoreThreadTimeOut() 来设置 :

    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

  • execute 运行的任务如果出现异常,那么会清理掉当前的工作线程(具体参考runWorker()的task.run()),即execute对于执行的Task任务如果抛出异常,执行该Task的工作线程会被回收,会这里就涉及到一个创建线程的操作
  • submit: 因为能够调用 future.get(). 所以有异常也会捕获,不会造成线程终止.
  • shutdown() 和 shutdownNow() 的调用,应该优先考虑调用shutdown().

线程池提供的一些扩展函数

  • beforeExecute() : 工作线程执行任务前的执行函数
  • afterExecute() : 工作线程执行任务后的执行函数
  • terminated() : 线程池进入TIDYING状态后执行的函数

具体看看以下ThreadPoolExpand类 :


   # 引入Maven包
   <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <version>3.9</version>
   </dependency>
   
   
   public class ThreadPoolExpand {

    // 创建线程池  : daemon设置为true时表示线程池一起死亡
    public ThreadPoolExecutor getThreadPoolExecutor() {
        return new ThreadPoolExecutor(
                5, 10,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20),
                new BasicThreadFactory.Builder().namingPattern("thread-pool-%d").daemon(false).build()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
//                super.beforeExecute(t, r);
                System.out.println("beforeExecute : 当前工作线程执行任务前");
                System.out.println("beforeExecute start");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("beforeExecute end");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
//                super.afterExecute(r, t);
                System.out.println("afterExecute : 当前工作线程执行任务后");
                System.out.println("afterExecute start");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("afterExecute end");
            }

            @Override
            protected void terminated() {
//                super.terminated();
                System.out.println("terminated : 线程池成功进入TIDYING状态");
            }
        };
    }

    public static void main(String[] args) {

        ThreadPoolExpand expand = new ThreadPoolExpand();
        ThreadPoolExecutor executor = expand.getThreadPoolExecutor();


        executor.execute(() -> {
            System.out.println("run : 开始干活");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("run : 干活结束");
        });

        // 关闭线程池
        executor.shutdown();
        
    }

}

ThreadPoolExpand类运行结果如下图 :

总结

  对于线程池的构造器参数优化的话题,是比较开放性的,并不是一定的,必须结合实际生产环境来设置这些参数,本文提供的建议只供参考.

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料

  • JDK1.8

目录