Executor执行器

2016年07月25日

Executor集合框架体系

Executor用来代替显示创造线程(例如:new Thread(new(RunnableTask())).start()),接口并不要求所有实现都是异步的.也可想下面这样同步执行:

    class DirectExecutor implements Executor {
          public void execute(Runnable r) {
              r.run();
          }
    }

java.util.concurrent.Executors提供各种执行器的工厂方法.

杂记

ExecutorService

  • 可产生Future用来跟踪异步任务处理进度.
  • 可通过shutdown()(拒绝新任务,已提交的任务会执行完)或者shutdownNow()(立即关闭,已提交的任务会中断.)

AbstractExecutorService

invokeAny(Collection<? extends Callable<T>> tasks)invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 底层都是调用doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) 方法执行的.

源码:

             // 记录所有已提交执行的任务,方便最后关闭
            List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

            // 任务执行结果获取器,若提交的任务完成,则会被放入一个队列中.
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);

            // 为了更有效率,尤其是在有并发限制的executor执行器上.在提交更多任务前,检查前一个提交的任务是否完成.
            // This interleaving plus the exception mechanics account for messiness of main loop. (异常叠加造成主循环混乱?)

            try {
                // 如果没有获取到任何结果,则抛出最后一个异常
                ExecutionException ee = null;

                // 记录上一个任务执行结束的时间点
                long lastTime = timed ? System.nanoTime() : 0;

                Iterator<? extends Callable<T>> it = tasks.iterator();

                // 将第一个任务提交并保存.
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;// 记录正在执行的任务个数

                for (;;) {
                    // 获取前面已提交任务的执行结果,若无结果 poll() 方法会立即返回null
                    Future<T> f = ecs.poll();
                    if (f == null) {
                       // 若前面的提交的任务都无执行结果
                        if (ntasks > 0) {
                           // 若还有未提交的任务,则提交一个任务.
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        else if (active == 0) // 若已经没有正在执行的任务.则跳出循环.
                            break;
                        else if (timed) {
                           // 若有超时限制, 则在时限内去获取最快完成任务的结果
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);

                            //若无结果,则任务执行超时
                            if (f == null)
                                throw new TimeoutException();

                            // 记录最新完成的任务的时间点,并从时限中减去任务完成花费的时间
                            long now = System.nanoTime();
                            nanos -= now - lastTime;
                            lastTime = now;
                        }
                        else
                            f = ecs.take(); // 若无超时时限,则一直阻塞,直到有任务执行完成.
                    }
                    if (f != null) {
                      // 获取到结果,则有任务执行完成.执行中任务个数减一
                        --active;

                        // 获取任务执行结果失败, 则记录异常.所有任务都执行完成,仍无法获取结果时,抛出最后一个异常
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }

                // 所有任务都执行完成,仍无法获取结果时,抛出最后一个异常
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;

            } finally {
              // 方法返回前,关闭所有提交的任务(若有任务完成并成功返回结果.可能还有其他任务正在执行)
                for (Future<T> f : futures)
                    f.cancel(true);
            } -----

invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 方法中在提交每一个任务时也会进行计时操作,也就是说,如果在任务尚未全部提交时,就可能已经超时,则会返回所有已提交任务的`Future`,并且会将未执行完成的任务cancel掉.

      while (it.hasNext()) {
            execute((Runnable)(it.next()));
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0)
                return futures;
        }

ThreadPoolExecutor

corePoolSize:核心线程数

maximumPoolSize: 最大线程数

当一个新任务提交的时候, 如果当前正在运行的线程数少于corePoolSize,则直接创建一个新的线程用来执行任务, 如果正在运行的线程数比corePoolSize多(或者相等),则只有在任务队列(queue)满了的情况下,才会创建一个新的 线程用来执行任务.

默认情况下,只有新任务到来的时候,才会创建核心线程.但是可以通过prestartCoreThread方法提前启动核心线程.

新线程的的创建使用ThreadFactory,如果没有指定.则使用Executors.defaultThreadFactory(创建的线程属于相同的线程组,拥有相同的优先级,并且都是非后台线程),如果ThreadFactory创建线程失败(返回null),executor也会继续,但是不会执行任何任务

如果线程池线程个数超过了corePoolSize,则超出的线程超过keepAliveTime(可通过setKeepAliveTime()设置,默认为0)时间后,会被关闭.对核心线程,也可通过allowCoreThreadTimeOut(boolean)来设置是否在空闲时间超过keepAliveTime后,被关闭.

任务排队情况说明(queue):

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务.
  • 如果请求不能排队(队列已满的情况?),则在未超过最大线程数时,会创建新线程执行任务,否则,拒绝任务(rejected).

排队的三种策略:

  • Direct handoffs , 直接传递, 一个良好的默认选择是SyncronousQueue,如果没有立即可用的线程来执行任务,则排队任务将会失败,所以会创建新的线程来执行任务(什么鬼?什么是排队失败?)
  • Unbounded queues , 无界队列.在处理web请求的瞬时爆发比较有用, 要保证处理速度大于请求任务速度,否则队列将被无限扩大.
  • Bounded queues , 有界队列. 适用于会经常阻塞的任务(例如I/O操作).请求任务大于线程池大小,可以保持CPU忙碌的同时,减少调度开销,但同时也会减少吞吐量.

任务拒绝:

如果executor已被关闭,或者工作线程到达最大线程数并且队列已满的情况下,会执行RejectedExecutionHandler.rejectedExecution()方法,提供的一些拒绝策略:

  • ThreadPoolExecutor.AbortPolicy抛出RejectedExecutionException异常
  • ThreadPoolExecutor.CallerRunsPolicy,在调用线程直接执行Runnable的run方法
  • ThreadPoolExecutor.DiscardPolicy, 直接将任务丢弃,没有任何提示和感知
  • ThreadPoolExecutor.DiscardOldestPolicy , 将队列中第一个任务丢弃,并加入新任务

回调方法: 支持子类覆盖beforeExecuteafterExecute方法, terminated方法在终止时被调用(调用shutdown()shotdownNow()的时候会调用)

队列维护: remove()删除队列中某个指定任务,purge()删除所有队列中的任务.

在什么地方调用的tread.start()方法?让线程启动的?

private boolean addWorker(Runnable firstTask, boolean core)中若新增worker成功,则调用Tread.start()启动线程.

ScheduledExecutorService

ScheduledExecutorServiceExecutorService 增加了可调度时间的一些方法,可延迟一段时间执行任务,或者周期性执行任务

            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

            public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

            // 执行时间点为 initialDelay + execCount * period , exec 是执行次数
            public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

            // 执行时间点为 initialDelay + period
            public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

DelayedWorkQueuetake()逻辑, worker线程拿到第一个任务之后, 如果还没到执行时间,应该会wait(), 导致后面的线程过来拿到的都是这个任务,从而所有线程阻塞, 知道到达地一个任务的执行时间,所以如果第一个任务等待10s执行, 第二个任务等待5s执行, 那应该是先执行第一个任务,然后再执行第二个任务. 但是实际测试发现是先执行第二个任务,再执行的第一个任务. why?

ScheduledExecutorService内部维护一个DelayedWorkQueue(实际是一个以任务执行时间点(纳秒级别)为比较条件的最小堆), 每次worker线程在获取任务时(会先获取锁lock.lockInterruptibly()), 拿到的是离当前时间最近的待执行任务, 如果任务需延期执行, 则将当前线程挂起,等待需延迟执行的时间(并且记录当前线程header). 如果在等待过程中, 有新的任务进入队列(获取锁lock.lock()),并且新的任务为堆顶(执行时间点最小), 则唤醒一个线程,用来获取任务并执行. 如果任务需延期执行,则重复以上过程.

worker获取任务过程:

    public RunnableScheduledFuture take() throws InterruptedException {
               final ReentrantLock lock = this.lock;

               // 获取锁, 下面的`(Condition)available`调用的await()会释放锁.
               // 这个方法获取锁时,会校验线程是否已被中断, 若中断则抛出`InterruptedException`
               lock.lockInterruptibly();
               try {
                   for (;;) {
                       // 获取最小等待时间的待执行任务
                       RunnableScheduledFuture first = queue[0];

                       // 没有任务,则无限期等待,并释放锁.直到 DelayedWorkQueue.offer() 中调用 available.signal(),唤醒线程
                       if (first == null)
                           available.await();
                       else {
                           // 任务不为空,则获取任务执行还需要等待的时间
                           long delay = first.getDelay(TimeUnit.NANOSECONDS);

                           // 等待时间小于0, 则直接返回任务
                           if (delay <= 0)
                               return finishPoll(first);

                           // 等待时间大于0, 但是已有另一个线程,在等待任务可执行,则当前线程无限期等待
                           else if (leader != null)
                               available.await();
                           else {
                             // 若无其他线程等待该任务可调度,则将当前线程设置为等待该任务执行, 并等待任务延期执行时间.
                               Thread thisThread = Thread.currentThread();
                               leader = thisThread;
                               try {
                                   available.awaitNanos(delay);
                               } finally {
                                  // 如果唤醒的是leader线程,则需要将leader置null,
                                  // 不然别的线程后续被唤醒时,可能会进入上面if的无限期等待.  
                                   if (leader == thisThread)
                                       leader = null;
                               }
                           }
                       }
                   }
               } finally {
                 // 若下面的if条件满足,则表示其他所有获取任务的线程都处于无限期等待中,需要手动唤醒一个线程
                   if (leader == null && queue[0] != null)
                       available.signal();
                   lock.unlock();
               }
           }

ForkJoinPool

用来执行ForkJoinTask任务的执行器,适用与当一个任务会产生多个其他子任务的情况(分治思想).fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。