Executor集合框架体系
Executor
用来代替显示创造线程(例如:new Thread(new(RunnableTask())).start()
),接口并不要求所有实现都是异步的.也可想下面这样同步执行:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
java.util.concurrent.Executors
提供各种执行器的工厂方法.
杂记
ExecutorService
- 可产生
Future
用来跟踪异步任务处理进度. - 可通过
shutdown()
(拒绝新任务,已提交的任务会执行完)或者shutdownNow()
(立即关闭,已提交的任务会中断.)
AbstractExecutorService
invokeAny(Collection<? extends Callable<T>> tasks)
和 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
底层都是调用doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
方法执行的.
源码:
// 记录所有已提交执行的任务,方便最后关闭
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
// 任务执行结果获取器,若提交的任务完成,则会被放入一个队列中.
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// 为了更有效率,尤其是在有并发限制的executor执行器上.在提交更多任务前,检查前一个提交的任务是否完成.
// This interleaving plus the exception mechanics account for messiness of main loop. (异常叠加造成主循环混乱?)
try {
// 如果没有获取到任何结果,则抛出最后一个异常
ExecutionException ee = null;
// 记录上一个任务执行结束的时间点
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 将第一个任务提交并保存.
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;// 记录正在执行的任务个数
for (;;) {
// 获取前面已提交任务的执行结果,若无结果 poll() 方法会立即返回null
Future<T> f = ecs.poll();
if (f == null) {
// 若前面的提交的任务都无执行结果
if (ntasks > 0) {
// 若还有未提交的任务,则提交一个任务.
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0) // 若已经没有正在执行的任务.则跳出循环.
break;
else if (timed) {
// 若有超时限制, 则在时限内去获取最快完成任务的结果
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
//若无结果,则任务执行超时
if (f == null)
throw new TimeoutException();
// 记录最新完成的任务的时间点,并从时限中减去任务完成花费的时间
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
else
f = ecs.take(); // 若无超时时限,则一直阻塞,直到有任务执行完成.
}
if (f != null) {
// 获取到结果,则有任务执行完成.执行中任务个数减一
--active;
// 获取任务执行结果失败, 则记录异常.所有任务都执行完成,仍无法获取结果时,抛出最后一个异常
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
// 所有任务都执行完成,仍无法获取结果时,抛出最后一个异常
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 方法返回前,关闭所有提交的任务(若有任务完成并成功返回结果.可能还有其他任务正在执行)
for (Future<T> f : futures)
f.cancel(true);
} -----
invokeAll(Collection<? extends Callable
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0)
return futures;
}
ThreadPoolExecutor
corePoolSize
:核心线程数
maximumPoolSize
: 最大线程数
当一个新任务提交的时候, 如果当前正在运行的线程数少于corePoolSize,则直接创建一个新的线程用来执行任务, 如果正在运行的线程数比corePoolSize多(或者相等),则只有在任务队列(queue)满了的情况下,才会创建一个新的 线程用来执行任务.
默认情况下,只有新任务到来的时候,才会创建核心线程.但是可以通过prestartCoreThread
方法提前启动核心线程.
新线程的的创建使用ThreadFactory
,如果没有指定.则使用Executors.defaultThreadFactory
(创建的线程属于相同的线程组,拥有相同的优先级,并且都是非后台线程),如果ThreadFactory
创建线程失败(返回null),executor也会继续,但是不会执行任何任务
如果线程池线程个数超过了corePoolSize,则超出的线程超过keepAliveTime
(可通过setKeepAliveTime()
设置,默认为0)时间后,会被关闭.对核心线程,也可通过allowCoreThreadTimeOut(boolean)
来设置是否在空闲时间超过keepAliveTime后,被关闭.
任务排队情况说明(queue):
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务.
- 如果请求不能排队(队列已满的情况?),则在未超过最大线程数时,会创建新线程执行任务,否则,拒绝任务(rejected).
排队的三种策略:
- Direct handoffs , 直接传递, 一个良好的默认选择是
SyncronousQueue
,如果没有立即可用的线程来执行任务,则排队任务将会失败,所以会创建新的线程来执行任务(什么鬼?什么是排队失败?) - Unbounded queues , 无界队列.在处理web请求的瞬时爆发比较有用, 要保证处理速度大于请求任务速度,否则队列将被无限扩大.
- Bounded queues , 有界队列. 适用于会经常阻塞的任务(例如I/O操作).请求任务大于线程池大小,可以保持CPU忙碌的同时,减少调度开销,但同时也会减少吞吐量.
任务拒绝:
如果executor已被关闭,或者工作线程到达最大线程数并且队列已满的情况下,会执行RejectedExecutionHandler.rejectedExecution()
方法,提供的一些拒绝策略:
ThreadPoolExecutor.AbortPolicy
抛出RejectedExecutionException
异常ThreadPoolExecutor.CallerRunsPolicy
,在调用线程直接执行Runnable
的run方法ThreadPoolExecutor.DiscardPolicy
, 直接将任务丢弃,没有任何提示和感知ThreadPoolExecutor.DiscardOldestPolicy
, 将队列中第一个任务丢弃,并加入新任务
回调方法:
支持子类覆盖beforeExecute
和afterExecute
方法, terminated
方法在终止时被调用(调用shutdown()
和shotdownNow()
的时候会调用)
队列维护:
remove()
删除队列中某个指定任务,purge()
删除所有队列中的任务.
在什么地方调用的tread.start()方法?让线程启动的?
在private boolean addWorker(Runnable firstTask, boolean core)
中若新增worker成功,则调用Tread.start()启动线程.
ScheduledExecutorService
ScheduledExecutorService
在 ExecutorService
增加了可调度时间的一些方法,可延迟一段时间执行任务,或者周期性执行任务
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
// 执行时间点为 initialDelay + execCount * period , exec 是执行次数
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
// 执行时间点为 initialDelay + period
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
看 DelayedWorkQueue
的take()
逻辑, worker线程拿到第一个任务之后, 如果还没到执行时间,应该会wait()
, 导致后面的线程过来拿到的都是这个任务,从而所有线程阻塞, 知道到达地一个任务的执行时间,所以如果第一个任务等待10s执行, 第二个任务等待5s执行, 那应该是先执行第一个任务,然后再执行第二个任务. 但是实际测试发现是先执行第二个任务,再执行的第一个任务. why?
ScheduledExecutorService
内部维护一个DelayedWorkQueue
(实际是一个以任务执行时间点(纳秒级别)为比较条件的最小堆), 每次worker线程在获取任务时(会先获取锁lock.lockInterruptibly()
), 拿到的是离当前时间最近的待执行任务, 如果任务需延期执行, 则将当前线程挂起,等待需延迟执行的时间(并且记录当前线程header
). 如果在等待过程中, 有新的任务进入队列(获取锁lock.lock()
),并且新的任务为堆顶(执行时间点最小), 则唤醒一个线程,用来获取任务并执行. 如果任务需延期执行,则重复以上过程.
worker获取任务过程:
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁, 下面的`(Condition)available`调用的await()会释放锁.
// 这个方法获取锁时,会校验线程是否已被中断, 若中断则抛出`InterruptedException`
lock.lockInterruptibly();
try {
for (;;) {
// 获取最小等待时间的待执行任务
RunnableScheduledFuture first = queue[0];
// 没有任务,则无限期等待,并释放锁.直到 DelayedWorkQueue.offer() 中调用 available.signal(),唤醒线程
if (first == null)
available.await();
else {
// 任务不为空,则获取任务执行还需要等待的时间
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 等待时间小于0, 则直接返回任务
if (delay <= 0)
return finishPoll(first);
// 等待时间大于0, 但是已有另一个线程,在等待任务可执行,则当前线程无限期等待
else if (leader != null)
available.await();
else {
// 若无其他线程等待该任务可调度,则将当前线程设置为等待该任务执行, 并等待任务延期执行时间.
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 如果唤醒的是leader线程,则需要将leader置null,
// 不然别的线程后续被唤醒时,可能会进入上面if的无限期等待.
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 若下面的if条件满足,则表示其他所有获取任务的线程都处于无限期等待中,需要手动唤醒一个线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
ForkJoinPool
用来执行ForkJoinTask
任务的执行器,适用与当一个任务会产生多个其他子任务的情况(分治思想).fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。