彻底搞懂java并发ThreadPoolExecutor使用

 

前言

线程池是Java中使用较多的并发框架,合理使用线程池,可以:降低资源消耗提高响应速度提高线程的可管理性

本篇文章将从线程池简单原理线程池的创建线程池执行任务关闭线程池进行使用学习。

 

正文

一. 线程池的简单原理

当一个任务提交到线程池ThreadPoolExecutor时,该任务的执行如下图所示。

  • 如果当前运行的线程数小于corePoolSzie(核心线程数),则创建新线程来执行任务(需要获取全局锁);
  • 如果当前运行的线程数等于或大于corePoolSzie,则将任务加入BlockingQueue(任务阻塞队列);
  • 如果BlockingQueue已满,则创建新的线程来执行任务(需要获取全局锁);
  • 如果创建新线程会使当前线程数大于maximumPoolSize(最大线程数),则拒绝任务并调用RejectedExecutionHandlerrejectedExecution() 方法。

由于ThreadPoolExecutor存储工作线程使用的集合是HashSet,因此执行上述步骤1和步骤3时需要获取全局锁来保证线程安全,而获取全局锁会导致线程池性能瓶颈,因此通常情况下,线程池完成预热后(当前线程数大于等于corePoolSize),线程池的execute() 方法都是执行步骤2。

二. 线程池的创建

通过ThreadPoolExecutor能够创建一个线程池,ThreadPoolExecutor的构造函数签名如下。

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)

通过ThreadPoolExecutor创建线程池时,需要指定线程池的核心线程数最大线程数线程保活时间线程保活时间单位任务阻塞队列,并按需指定线程工厂饱和拒绝策略,如果不指定线程工厂饱和拒绝策略,则ThreadPoolExecutor会使用默认的线程工厂饱和拒绝策略。下面分别介绍这些参数的含义。

参数含义
corePoolSize核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。
maximumPoolSize最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的任务会加入任务阻塞队列,但是如果任务阻塞队列已满且线程数小于maximumPoolSize,此时会继续创建新的线程来执行任务。该参数规定了线程池允许创建的最大线程数
keepAliveTime线程保活时间。当线程池的线程数大于核心线程数时,多余的空闲线程会最大存活keepAliveTime的时间,如果超过这个时间且空闲线程还没有获取到任务来执行,则该空闲线程会被回收掉。
unit线程保活时间单位。通过TimeUnit指定线程保活时间的时间单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么时间单位,ThreadPoolExecutor统一会将其转换为NANOSECONDS
workQueue任务阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的任务会添加到workQueue中,所有线程执行完上一个任务后,会循环从workQueue中获取任务来执行。
threadFactory创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
handler饱和拒绝策略。如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。

三. 线程池执行任务

1. 执行无返回值任务

通过ThreadPoolExecutorexecute() 方法,能执行Runnable任务,示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void ThreadPoolExecutor执行简单无返回值任务() throws Exception {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
      // 创建两个任务
      Runnable firstRunnable = new Runnable() {
          @Override
          public void run() {
              System.out.println("第一个任务执行");
          }
      };
      Runnable secondRunnable = new Runnable() {
          @Override
          public void run() {
              System.out.println("第二个任务执行");
          }
      };
      // 让线程池执行任务
      threadPoolExecutor.execute(firstRunnable);
      threadPoolExecutor.execute(secondRunnable);
      // 让主线程睡眠1秒,等待线程池中的任务被执行完毕
      Thread.sleep(1000);
  }
}

运行测试程序,结果如下。

2. 执行有返回值任务

通过ThreadPoolExecutorsubmit() 方法,能够执行Callable任务,通过submit() 方法返回的RunnableFuture能够拿到异步执行的结果。示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void ThreadPoolExecutor执行简单有返回值任务() throws Exception {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
      // 创建两个任务,任务执行完有返回值
      Callable&lt;String&gt; firstCallable = new Callable&lt;String&gt;() {
          @Override
          public String call() throws Exception {
              return "第一个任务返回值";
          }
      };
      Callable&lt;String&gt; secondCallable = new Callable&lt;String&gt;() {
          @Override
          public String call() throws Exception {
              return "第二个任务返回值";
          }
      };
      // 让线程池执行任务
      Future&lt;String&gt; firstFuture = threadPoolExecutor.submit(firstCallable);
      Future&lt;String&gt; secondFuture = threadPoolExecutor.submit(secondCallable);
      // 获取执行结果,拿不到结果会阻塞在get()方法上
      System.out.println(firstFuture.get());
      System.out.println(secondFuture.get());
  }
}

运行测试程序,结果如下。

3. 执行有返回值任务时抛出错误

如果ThreadPoolExecutor在执行Callable任务时,在Callable任务中抛出了异常并且没有捕获,那么这个异常是可以通过Futureget() 方法感知到的。示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void ThreadPoolExecutor执行简单有返回值任务时抛出错误() {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
      // 创建一个任务,任务有返回值,但是执行过程中抛出异常
      Callable<String> exceptionCallable = new Callable<String>() {
          @Override
          public String call() throws Exception {
              throw new RuntimeException("发生了异常");
          }
      };
      // 让线程池执行任务
      Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable);
      try {
          System.out.println(exceptionFuture.get());
      } catch (Exception e) {
          System.out.println(e.getMessage());
      }
  }
}

运行测试程序,结果如下。

4. ThreadPoolExecutor通过submit方式执行Runnable

ThreadPoolExecutor可以通过submit() 方法来运行Runnable任务,并且还可以异步获取执行结果。示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void ThreadPoolExecutor通过submit的方式来提交并执行Runnable() throws Exception {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
      // 创建结果对象
      MyResult myResult = new MyResult();
      // 创建Runnable对象
      Runnable runnable = new Runnable() {
          @Override
          public void run() {
              myResult.setResult("任务执行了");
          }
      };
      // 通过ThreadPoolExecutor的submit()方法提交Runnable
      Future&lt;MyResult&gt; resultFuture = threadPoolExecutor.submit(runnable, myResult);
      // 获取执行结果
      MyResult finalResult = resultFuture.get();
      // myResult和finalResult的地址实际相同
      Assert.assertEquals(myResult, finalResult);
      // 打印执行结果
      System.out.println(resultFuture.get().getResult());
  }
  private static class MyResult {
      String result;
      public MyResult() {}
      public MyResult(String result) {
          this.result = result;
      }
      public String getResult() {
          return result;
      }
      public void setResult(String result) {
          this.result = result;
      }
  }
}

运行测试程序,结果如下。

实际上ThreadPoolExecutorsubmit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutorexecute() 方法来执行FutureTask

四. 关闭线程池

关闭线程池可以通过ThreadPoolExecutorshutdown() 方法,但是shutdown() 方法不会去中断正在执行任务的线程,所以如果线程池里有Worker正在执行一个永远不会结束的任务,那么shutdown() 方法是无法关闭线程池的。示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void 通过shutdown关闭线程池() {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
      // 创建Runnable对象
      Runnable runnable = new Runnable() {
          @Override
          public void run() {
              while (!Thread.currentThread().isInterrupted()) {
                  LockSupport.parkNanos(1000 * 1000 * 1000);
              }
              System.out.println(Thread.currentThread().getName() + " 被中断");
          }
      };
      // 让线程池执行任务
      threadPoolExecutor.execute(runnable);
      threadPoolExecutor.execute(runnable);
      // 调用shutdown方法关闭线程池
      threadPoolExecutor.shutdown();
      // 等待3秒观察现象
      LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
  }
}

运行测试程序,会发现在主线程中等待3秒后,也没有得到预期的打印结果。如果上述测试程序中使用shutdownNow,则是可以得到预期打印结果的,示例如下。

public class ThreadPoolExecutorTest {
  @Test
  public void 通过shutdownNow关闭线程池() {
      // 创建一个线程池
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
              60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
      // 创建Runnable对象
      Runnable runnable = new Runnable() {
          @Override
          public void run() {
              while (!Thread.currentThread().isInterrupted()) {
                  LockSupport.parkNanos(1000 * 1000 * 1000);
              }
              System.out.println(Thread.currentThread().getName() + " 被中断");
          }
      };
      // 让线程池执行任务
      threadPoolExecutor.execute(runnable);
      threadPoolExecutor.execute(runnable);
      // 调用shutdown方法关闭线程池
      threadPoolExecutor.shutdownNow();
      // 等待3秒观察现象
      LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
  }
}

运行测试程序,打印如下。

因为测试程序中的任务是响应中断的,而ThreadPoolExecutorshutdownNow() 方法会中断所有Worker,所以执行shutdownNow() 方法后,正在运行的任务会响应中断并结束运行,最终线程池关闭。

假如线程池中运行着一个永远不会结束的任务,且这个任务不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都是无法关闭线程池的。

 

总结

ThreadPoolExecutor的使用总结如下。

  • 通过ThreadPoolExecutorexecute() 方法能够执行Runnable任务;
  • 通过ThreadPoolExecutorsubmit() 方法能够执行Runnable任务和Callable任务,并且能够获取异步的执行结果;
  • ThreadPoolExecutorsubmit() 方法会返回一个Future对象(实际就是FutureTask),如果任务执行过程中发生了异常且未捕获,那么可以通过Futureget() 方法感知到异常;
  • ThreadPoolExecutorsubmit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutorexecute() 方法来执行FutureTask
  • 关闭线程池时,如果运行的任务可以在有限时间内运行完毕,那么可以使用shutdown() 方法来关闭线程池,这能够保证在关闭线程池时,正在运行的任务会顺利运行完毕;
  • 关闭线程池时,如果运行的任务永远不会结束但是响应中断,那么可以使用shutdownNow() 方法来关闭线程池,这种方式不保证任务顺利运行完毕;
  • 如果任务永远不会结束且不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都无法关闭线程池。

以上就是彻底搞懂java并发ThreadPoolExecutor使用的详细内容,更多关于java并发ThreadPoolExecutor的资料请关注编程宝库其它相关文章!

 一、Netty简介Gateway和Netty都有盲区的感觉;Netty是一个异步的,事件驱动的网络应用框架,用以快速开发高可靠、高性能的网络应用程序。传输服务:提供网络传输能力的管 ...