解决线程池中ThreadGroup的坑

 

线程池中ThreadGroup的坑

在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。

定义一个线程组,通过以下代码可以实现。

ThreadGroup group=new ThreadGroup(“groupName”);
Thread thread=new Thread(group,”the first thread of group”);

ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。

所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。

线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的“父”。

那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?

我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?

ThreadGroup是否可行

通过new Thread(threadGroup,runnable)实现线程池中任务分组

public static void main(String[] args) {
      ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
      final ThreadGroup group = new ThreadGroup("Main_Test_Group");
      for (int i = 0; i < 5; i++) {
          Thread thread = new Thread(group, new Runnable() {
              @Override
              public void run() {
                  int sleep = (int)(Math.random() * 10);
                  try {
                      Thread.sleep(1000 * 3);
                      System.out.println(Thread.currentThread().getName()+"执行完毕");
                      System.out.println("当前线程组中的运行线程数"+group.activeCount());
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }, group.getName()+" #"+i+"");
          pool.execute(thread);
      }
  }

运行结果

pool-1-thread-3执行完毕
pool-1-thread-1执行完毕
当前线程组中的运行线程数0
pool-1-thread-2执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
pool-1-thread-4执行完毕
pool-1-thread-5执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0

运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.

从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) {
      this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
           Executors.defaultThreadFactory(), defaultHandler);
  }

如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.

Executors内部类DefaultThreadFactory

下面的源码即是默认的Thread工厂

static class DefaultThreadFactory implements ThreadFactory {
      private static final AtomicInteger poolNumber = new AtomicInteger(1);
      private final ThreadGroup group;
      private final AtomicInteger threadNumber = new AtomicInteger(1);
      private final String namePrefix;
      DefaultThreadFactory() {
          SecurityManager s = System.getSecurityManager();
          group = (s != null) ? s.getThreadGroup() :
                                Thread.currentThread().getThreadGroup();
          namePrefix = "pool-" +
                        poolNumber.getAndIncrement() +
                       "-thread-";
      }
      public Thread newThread(Runnable r) {
          Thread t = new Thread(group, r,
                                namePrefix + threadNumber.getAndIncrement(),
                                0);
          if (t.isDaemon())
              t.setDaemon(false);
          if (t.getPriority() != Thread.NORM_PRIORITY)
              t.setPriority(Thread.NORM_PRIORITY);
          return t;
      }
  }

从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.

public class MyTheadFactory implements ThreadFactory {
  private static final AtomicInteger poolNumber = new AtomicInteger(1);
  private final AtomicInteger threadNumber = new AtomicInteger(1);
  private final String namePrefix;
  private ThreadGroup defaultGroup;
  public MyTheadFactory() {
      SecurityManager s = System.getSecurityManager();
      defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
      namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
  }
  public MyTheadFactory(ThreadGroup group) {
     this.defaultGroup = group;
      namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
  }
  public Thread newThread(Runnable r) {
      Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
      if (t.isDaemon())
          t.setDaemon(false);
      if (t.getPriority() != Thread.NORM_PRIORITY)
          t.setPriority(Thread.NORM_PRIORITY);
      return t;
  }
}

 

ThreadGroup的使用及手写线程池

监听线程异常关闭

以下代码在window下不方便测试,需在linux 上 测试

// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`
// 模拟关闭 kill PID
public static void main(String[] args)  {
      Runtime.getRuntime().addShutdownHook(new Thread( () -> {
          System.out.println("线程被杀掉了");
      }));
      while(true){
          System.out.println("i am working ...");
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
  }

如何拿到Thread线程中异常

public static void main(String[] args) {
      Thread thread = new Thread(() -> {
          try {
              Thread.sleep(1000);
              int i = 10/0;
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      thread.setUncaughtExceptionHandler((t,e)->{
          System.out.println("线程的名字"+ t.getName());
          System.out.println(e);
      });  // 通过注入接口的方式
      thread.start();
  }

ThreadGroup

注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()

线程池使用

自己搭建的简单线程池实现

其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
* @Author: shengjm
* @Date: 2020/2/10 9:52
* @Description:
*/
public class SimpleThreadPool extends Thread{
  /**
   * 线程数量
   */
  private int size;
  private final int queueSize;
  /**
   * 默认线程队列数量
   */
  private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
  private static volatile int seq = 0;
  private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
  private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
  private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
  private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
  private final DiscardPolicy discardPolicy;
  private volatile boolean destory = false;
  private int min;
  private int max;
  private int active;
  /**
   * 定义异常策略的实现
   */
  private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
      throw new DiscardException("线程池已经被撑爆了,后继多余的人将丢失");
  };
  /**
   *
   */
  public SimpleThreadPool(){
      this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
  }
  /**
   *
   */
  public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
      this.min = min;
      this.active = active;
      this.max = max;
      this.queueSize = queueSize;
      this.discardPolicy = discardPolicy;
      init();
  }
/**
* 初始化
*/
  private void init() {
      for(int i = 0; i < min; i++){
          createWorkTask();
      }
      this.size = min;
      this.start();
  }
  private void createWorkTask(){
      WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
      task.start();
      THREAD_QUEUE.add(task);
  }
/**
* 线程池自动扩充
*/
  @Override
  public void run() {
      while(!destory){
          System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+  TASK_QUEUE.size());
          try {
              Thread.sleep(1000);
              if(TASK_QUEUE.size() > active && size < active){
                  for (int i = size; i < active;i++){
                      createWorkTask();
                  }
                  size = active;
              }else if(TASK_QUEUE.size() > max && size < max){
                  for (int i = size; i < max;i++){
                      createWorkTask();
                  }
                  size = max;
              }
              synchronized (THREAD_QUEUE){
                  if(TASK_QUEUE.isEmpty() && size > active){
                      int release = size - active;
                      for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
                          if(release <=0){
                              break;
                          }
                          WorkerTask task = it.next();
                          task.close();
                          task.interrupt();
                          it.remove();
                          release--;
                      }
                      size = active;
                  }
              }
          } catch (InterruptedException e) {
              break;
          }
      }
  }
  public void submit(Runnable runnable){
      synchronized (TASK_QUEUE){
          if(destory){
              throw new DiscardException("线程池已经被摧毁了...");
          }
          if(TASK_QUEUE.size() > queueSize){
              discardPolicy.discard();
          }
          TASK_QUEUE.addLast(runnable);
          TASK_QUEUE.notifyAll();
      }
  }
/**
* 关闭
*/
  public void shutdown(){
      while(!TASK_QUEUE.isEmpty()){
          try {
              Thread.sleep(10);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      synchronized (THREAD_QUEUE) {
          int initVal = THREAD_QUEUE.size();
          while (initVal > 0) {
              for (WorkerTask workerTask : THREAD_QUEUE) {
                  if (workerTask.getTaskState() == TaskState.BLOCKED) {
                      workerTask.interrupt();
                      workerTask.close();
                      initVal--;
                  } else {
                      try {
                          Thread.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
          this.destory = true;
      }
  }
  public int getSize() {
      return size;
  }
  public int getMin() {
      return min;
  }
  public int getMax() {
      return max;
  }
  public int getActive() {
      return active;
  }
  /**
   * 线程状态
   */
  private enum TaskState{
      FREE , RUNNING , BLOCKED , DEAD
  }
  /**
   * 自定义异常类
   */
  public static class DiscardException extends RuntimeException{
      public DiscardException(String message){
          super(message);
      }
  }
  /**
   * 定义异常策略
   */
  @FunctionalInterface
  public interface DiscardPolicy{
      void discard() throws DiscardException;
  }
  private static class WorkerTask extends Thread{
      private volatile TaskState taskState = TaskState.FREE;
      public TaskState getTaskState(){
          return this.taskState;
      }
      public WorkerTask(ThreadGroup group , String name){
          super(group , name);
      }
      @Override
      public void run(){
          OUTER:
          while(this.taskState != TaskState.DEAD){
              Runnable runnable;
              synchronized (TASK_QUEUE){
                  while(TASK_QUEUE.isEmpty()){
                      try {
                          taskState = TaskState.BLOCKED;
                          TASK_QUEUE.wait();
                      } catch (InterruptedException e) {
                          break OUTER;
                      }
                  }
                  runnable = TASK_QUEUE.removeFirst();
              }
              if(runnable != null){
                  taskState = TaskState.RUNNING;
                  runnable.run();
                  taskState = TaskState.FREE;
              }
          }
      }
      public void close(){
          this.taskState = TaskState.DEAD;
      }
  }
  /**
   * 测试
   * @param args
   */
  public static void main(String[] args) {
      SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
      IntStream.rangeClosed(0,40).forEach(i -> {
          simpleThreadPool.submit(() -> {
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
          });
      });
//        try {
//            Thread.sleep(15000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
      simpleThreadPool.shutdown();
  }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程宝库

由于maven 使用上手很容易所以很多时候可以囫囵吞枣能够使用就可以了,由于作者最近在做的持续集成的代码扫描的时候,发现私有云里面大型工程maven依赖,如果没有弄清楚里面的配置复杂的项目很难在私有环境里面 ...