把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  本文主要梳理一下使用无限时阻塞api与线程池的一个踩坑点,理解这个踩坑点前我们先了解一下JDK为什么会提供这些具有风险的无限时阻塞的api来给我们使用,如Object.wait() 、LockSupport.park(),这些阻塞api的共同点就是会让当前调用线程进入阻塞状态,释放CPU给其他线程使用,当其他线程调用唤醒api如Object.notify() 、 Object.notifyAll(),LockSupport.unpark()时,或者 Thread.interrupt() 中断请求,阻塞线程才会被唤醒;这些所谓的阻塞、唤醒操作,都是JVM来做,Java的线程模型是1:1,可以简单理解成是JVM通过调用OS的api来执行。
  现在假设当线程A调用一个无限时阻塞的Object.wait() 或者 LockSupport.park(),此时线程A其实已经进入无限时的阻塞状态了,等待被唤醒或者中断才能被继续调度;那么有没有这种可能:我们写的代码导致线程A没有被唤醒,或者说没有调用到唤醒或者中断,导致线程A永远的阻塞了。答案是有的,所以这边文章主要要讲的就是这个场景。如下这段代码,表达的是线程A调用无限时阻塞api后进入阻塞状态,5s后主线程进行唤醒:


        long now = System.currentTimeMillis();

        Thread thread_a = new Thread(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",进入阻塞状态");
                LockSupport.park();
                System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",被唤醒");
            } finally {
                System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",执行结束");
            }
        });
        thread_a.setName("thread_a");

        thread_a.start();


        Thread.sleep(5000L);

        System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",唤醒" + thread_a.getName());
        LockSupport.unpark(thread_a);

        Thread.sleep(3000L);

运行结果如下,符合预期:

Future、FutureTask

  ThreadPoolExecutor提供了execute()来执行任务,但是这个api只能执行Runnable类型的任务,如果线程想要获取任务返回的结果,那么execute()是不支持的;但是有提供了submit()这个api来执行支持获取任务返回结果,这个api的入参是Callable类型的任务,最终会被包装成一个FutureTask对象,底层最终执行其实还是调用了execute(),只是FutureTask做了很多功能的增强,如阻塞等待获取任务返回结果、取消任务等。如下图的红线:

如下调用submit()执行获取任务返回结果的代码, task2.get() 将会无限时的阻塞等待任务被完成执行后才返回:


        long now = System.currentTimeMillis();

        System.err.println(System.currentTimeMillis() - now + ":将task_2丢进线程池");
        String taskName2 = null;
        Future<String> task2 = submitTp.submit(new NamedCallable<>(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
            return "qrb";
        }, "task_2"));
        
        
         try {
            //name = task2.get(1000, TimeUnit.MILLISECONDS);
            System.out.println(System.currentTimeMillis() - now + ":开始阻塞方式task_2返回的结果");
            taskName2 = task2.get();
        } catch (InterruptedException | ExecutionException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
            System.out.println(System.currentTimeMillis() - now + ":异常了");
            //throw new RuntimeException(e);
        }
        System.out.println(System.currentTimeMillis() - now + ":获取到task_2结果:" + taskName2);
        System.out.println(System.currentTimeMillis() - now + ":执行完毕");

  当线程调用submit()会返回一个FutureTask对象,并且当需要获取结果的线程通过FutureTask对象调用get(),那么调用该api的线程会进入阻塞状态,而当任务执行完成后,等待结果的所有阻塞线程都会被唤醒,如下图:

用代码示例:


 private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


    public static void main(String[] args) throws InterruptedException {

        long now = System.currentTimeMillis();

        //主线程将任务丢进线程池执行,并且返回FutureTask对象(这里)
        Future<String> futureTask = THREAD_POOL_EXECUTOR.submit(() -> {
            System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",任务开始执行,模拟当前任务需要执行5s");
            Thread.sleep(5000L);
            System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",任务执行完毕");
            return "qrb";
        });

        Thread thread_2 = new Thread(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",开始获取结果");
                String result = futureTask.get();
                System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",获取到结果:" + result);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        thread_2.setName("thread_2");

        //线程2获取结果
        thread_2.start();

        try {
            //主线程获取
            System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",开始获取结果");
            String result = futureTask.get();
            System.out.println(System.currentTimeMillis() - now + ":" + Thread.currentThread().getName() + ",获取到结果:" + result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
        
        THREAD_POOL_EXECUTOR.shutdown();
    }

运行结果符合预期:

  • 1、主线程调用submit()将任务丢给线程池执行;
  • 2、线程池执行的任务模拟了5s的延时时间,主线程、线程2调用get()无限时阻塞等待结果;
  • 3、线程池完成任务的执行;
  • 4、主线程、线程2被唤醒,并且获取到了结果。

  从源码、上面的流程图以及执行结果可以知道,只有在任务被执行后,无限时等待结果的阻塞线程才会被唤醒;那么有没有这种可能:任务没有被执行到,从而导致无限时等待结果的阻塞线程没有被唤醒到?答案是有的,这也是本文要主讲的内容。

触发线程池拒绝策略导致任务没有被执行

  JDK提供了四种拒绝策略,其中的DiscardPolicy、DiscardOldestPolicy这两种拒绝策略就会出现任务未被执行,进而导致获取这个任务结果的线程进入了无限时等待。

  DiscardOldestPolicy: 这个拒绝策略的执行逻辑就是会丢弃
队列中第一个任务
,然后再尝试将当前执行的任务重新丢进线程池继续执行。现在问题来了,这个丢弃的任务就有可能是某些阻塞等待线程的FutureTask,那么这些调用了get()的无限时等待api的线程将无限时阻塞了,没人去唤醒他,如下图:

  DiscardPolicy: 没有任何的拒绝策略逻辑,说明这个任务是直接丢弃了,那么这个丢弃的FutureTask,一些调用了get()的无限时等待api的线程将无限时阻塞了,没人去唤醒他,跟DiscardOldestPolicy是一个道理。

  接下来开始实际代码复现一下,线程池准备:submitTp线程池的核心线程数1、最大线程数1、阻塞队列为ArrayBlockingQueue,阻塞队列容量为1、拒绝策略为DiscardOldestPolicy;设置这样的线程池参数是为了更快的触发拒绝策略的场景。

FutureTask

  如下代码示例:向submitTp线程池分别丢了task_1、task_2、task_3 3个任务,其中task_1先执行,占用了核心线程数5s,接着task_2是放在阻塞队列中,此时阻塞队列满了,再接着task_3会触发拒绝策略,那么task_2将会被丢弃,不会被执行;而disposeFutureTaskTp线程池通过task_2返回的FutureTask对象调用get()无限时阻塞等待返回结果。


//当前submitTp线程池的核心线程数1、最大线程数1、阻塞队列为ArrayBlockingQueue,阻塞队列容量为1、拒绝策略为DiscardOldestPolicy
        ThreadPoolExecutor submitTp = GlobalDtpManager.getThreadPoolByName("submitTp");
        if (Objects.isNull(submitTp)) {
            return;
        }
        long now = System.currentTimeMillis();

        System.err.println(System.currentTimeMillis() - now + ":将task_1丢进线程池");
        submitTp.submit(new NamedCallable<>(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,执行的任务需要5s");
                Thread.sleep(5000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println("sleep被中断了");
            }
            return "qrb";
        }, "task_1"));


        Thread.sleep(500);
        System.err.println(System.currentTimeMillis() - now + ":将task_2丢进线程池");
        Future<String> task2 = submitTp.submit(new NamedCallable<>(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
            return "qrb";
        }, "task_2"));


        Thread.sleep(500);
        System.err.println(System.currentTimeMillis() - now + ":将task_3丢进线程池");
        submitTp.execute(new NamedRunnable(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
        }, "task_3"));


        //模拟线程获取task_2的结果
        ThreadPoolExecutor disposeFutureTaskTp = GlobalDtpManager.getThreadPoolByName("disposeFutureTask");
        disposeFutureTaskTp.execute(() -> {
            String taskName2 = null;
            try {
                //name = task2.get(1000, TimeUnit.MILLISECONDS);
                System.out.println(System.currentTimeMillis() - now + ":开始阻塞方式task_2返回的结果");
                taskName2 = task2.get();
            } catch (InterruptedException | ExecutionException e) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
                }
                System.out.println(System.currentTimeMillis() - now + ":异常了");
                //throw new RuntimeException(e);
            }
            System.out.println(System.currentTimeMillis() - now + ":获取到task_2结果:" + taskName2);
            System.out.println(System.currentTimeMillis() - now + ":执行完毕");
        });

执行符合预期,task_2没有打印任何信息,确实被丢弃了:

task_3触发了拒绝策略,也符合预期:

disposeFutureTaskTp线程池中的线程阻塞了,负载100%,也符合预期:

CompletableFuture

  CompletableFuture也提供了阻塞等待的api,并且允许自定义线程池来执行CompletableFuture任务。
  代码示例:disposeFutureTaskTp线程池调用CompletableFuture.allOf().join(),等待3个CompletableFuture任务都执行完毕后,就会被唤醒,然后开始获取结果;但是由于task_2被丢弃了,导致任务永远无法完成,进而导致disposeFutureTask线程无限阻塞了。


ThreadPoolExecutor submitTp = GlobalDtpManager.getThreadPoolByName("submitTp");
        if (Objects.isNull(submitTp)) {
            return;
        }
        long now = System.currentTimeMillis();

        System.err.println(System.currentTimeMillis() - now + ":将task_1丢进线程池");
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,执行的任务需要5s");
                Thread.sleep(5000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println("sleep被中断了");
            }
            return "qrb1";
        }, submitTp);


        System.err.println(System.currentTimeMillis() - now + ":将task_2丢进线程池");
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
            return "qrb2";
        }, submitTp);


        System.err.println(System.currentTimeMillis() - now + ":将task_3丢进线程池");
        CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            }
            return "qrb3";
        }, submitTp);


        //模拟线程获取task的结果
        ThreadPoolExecutor disposeFutureTaskTp = GlobalDtpManager.getThreadPoolByName("disposeFutureTask");
        disposeFutureTaskTp.execute(() -> {
            System.out.println(System.currentTimeMillis() - now + ":开始阻塞时等待返回的结果");
            CompletableFuture.allOf(task1, task2, task3).join();
            try {
                System.out.println(System.currentTimeMillis() - now + ":获取到task_1结果:" + task1.get());
                System.out.println(System.currentTimeMillis() - now + ":获取到task_2结果:" + task2.get());
                System.out.println(System.currentTimeMillis() - now + ":获取到task_3结果:" + task3.get());
                System.out.println(System.currentTimeMillis() - now + ":执行完毕");
            } catch (ExecutionException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

  结果也和上边的FutureTask分析的一样,由于task_2被丢弃,disposeFutureTaskTp线程调用CompletableFuture.allOf()进入了无限时等待,符合执行预期。

CountDownLatch

  CountDownLatch提供了await()来陷入无限时阻塞等待,当通过构造器初始化的数量没有递减到0时,调用await()的线程线程是不会被唤醒的。
  代码示例:submitTp提交的每个任务,在执行完毕后才会调用 countDownLatch.countDown(),而disposeFutureTaskTp线程池必须等待3个任务都执行完毕后调用countDownLatch.countDown()才会被唤醒,但是由于task_2被丢弃了,导致任务永远无法执行,所以countDownLatch.countDown()是不会被调用到的,进而导致disposeFutureTask线程无限阻塞了。


 ThreadPoolExecutor submitTp = GlobalDtpManager.getThreadPoolByName("submitTp");
        if (Objects.isNull(submitTp)) {
            return;
        }

        long now = System.currentTimeMillis();
        //初始化3个
        countDownLatch = new CountDownLatch(3);

        System.err.println(System.currentTimeMillis() - now + ":将task_1丢进线程池");
        submitTp.execute(new NamedRunnable(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,执行的任务需要5s");
                Thread.sleep(5000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_1,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println("sleep被中断了");
            } finally {
                countDownLatch.countDown();
            }
        }, "task_1"));


        System.err.println(System.currentTimeMillis() - now + ":将task_2丢进线程池");
        submitTp.execute(new NamedRunnable(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_2,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            } finally {
                countDownLatch.countDown();
            }
        }, "task_2"));


        System.err.println(System.currentTimeMillis() - now + ":将task_3丢进线程池");
        submitTp.execute(new NamedRunnable(() -> {
            try {
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我会延时执行3s");
                Thread.sleep(3000);
                System.out.println(System.currentTimeMillis() - now + ":我是task_3,我已经执行完毕");
            } catch (InterruptedException e) {
                System.out.println(System.currentTimeMillis() - now + ":sleep被中断了");
            } finally {
                countDownLatch.countDown();
            }
        }, "task_3"));


        //模拟线程获取task的结果
        ThreadPoolExecutor disposeFutureTaskTp = GlobalDtpManager.getThreadPoolByName("disposeFutureTask");
        disposeFutureTaskTp.execute(() -> {
            System.out.println(System.currentTimeMillis() - now + ":开始阻塞等待3个任务的执行完成");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                System.out.println("我被中断了");
            }
        });

  执行结果也和上边的FutureTask预期的一样,由于task_2被丢弃,导致没有调用到countDownLatch.countDown(),所以不会执行唤醒操作。


        if (Objects.nonNull(countDownLatch)) {
            return countDownLatch.getCount();
        }
        throw new NullPointerException();

执行结果符合预期:返回的结果是1,有一个任务没有被完成。

总结

1、在线程池使用的拒绝策略DiscardPolicy或者DiscardOldestPolicy,并且触发了拒绝策略的情况下丢弃的任务刚好是被某些阻塞线程无限时等待着,那么这些线程就永远不会被唤醒了。
2、可以重写DiscardPolicy或者DiscardOldestPolicy,针对Future对象进行调用取消api,这样就会触发唤醒操作了,但是这个方案只能针对Future对象,像CountDownLatch等其他的非Future对象,解决不了。
3、比较建议的一种方案就是尽量别使用无限时阻塞api,而是使用超时阻塞api;超时阻塞api可以避免这种无限时阻塞,只要在设置的时间内还没完成任务,那么JVM会帮我们唤醒。像这些阻塞api,一般都是会提供超时阻塞api。


目录