Java多线程框架Executor

Executor简介

在Java中,使用线程来异步执行任务。

Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。 同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。Java的线程既是工作单元,也是执行机制。

从JDK 5开始,把工作单元与执行机制分离开来。 工作单元包括Runnable和Callable,而执行机制由Executor框架提供,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

Executors框架结构

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。 Executors框架结构.png Executor框架主要由3大部分组成如下:

  1. 任务 包括被执行任务需要实现的接口:Runnable接口或Callable接口。
  2. 任务的执行 包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  3. 异步计算的结果 包括接口Future和实现Future接口的FutureTask类。

主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。

然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callabletask))。

如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。

最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位来表示运行状态,分别是:

  1. RUNNING:接收新任务,并且处理任务队列中的任务
  2. SHUTDOWN:不接收新任务,但是处理任务队列的任务
  3. STOP:不接收新任务,不处理任务队列,同时中断所有进行中的任务
  4. TIDYING:所有任务已经被终止,工作线程数量为 0,到达该状态会执行terminated()
  5. TERMINATED:terminated()执行完毕
executor_lifecycle.png
executor_lifecycle.png

ThreadPoolExecutor核心参数

Executor框架最核心的类是ThreadPoolExecutor。

  • corePoolSize:最小存活的工作线程数量(如果设置allowCoreThreadTimeOut,那么该值为 0)
  • maximumPoolSize:最大的线程数量,受限于CAPACITY
  • keepAliveTime:对应线程的存活时间,时间单位由TimeUnit指定
  • workQueue:工作队列,存储待执行的任务
  • RejectExecutionHandler:拒绝策略,线程池满后会触发
  • 线程池的最大容量:CAPACITY中的前三位用作标志位,也就是说工作线程的最大容量为(2^29)-1

线程池种类

  • CachedThreadPool:一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。
  • FixedThreadPool:一个固定大小的线程池,提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的大小将不再变化。
  • SingleThreadPool:一个单线程的线程池,它只有一个工作线程来执行任务,可以确保按照任务在队列中的顺序来串行执行,如果这个线程异常结束将创建一个新的线程来执行任务。
  • ScheduledThreadPool:一个固定大小的线程池,并且以延迟、定时或给定间隔的方式来执行任务。

执行任务execute核心逻辑

  1. 当前线程数量 < corePoolSize,直接开启新的核心线程执行任务addWorker(command, true) 当前线程数量 >= corePoolSize,且任务加入工作队列成功
  2. 检查线程池当前状态是否处于RUNNING
  • 如果否,则拒绝该任务
  • 如果是,判断当前线程数量是否为 0,如果为 0,就增加一个工作线程。
  • 开启普通线程执行任务addWorker(command, false),开启失败就拒绝该任务

线程池运行的四个阶段

  1. poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务。
  2. poolSize == corePoolSize,此时提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
  3. poolSize == corePoolSize,并且队列已满,此时也会新建线程来处理提交的任务,但是poolSize < maxPoolSize。
  4. poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略。

拒绝策略

任务无法执行会被拒绝,RejectedExecutionHandler是处理被拒绝任务的接口。下面是四种拒绝策略。

  • AbortPolicy:默认策略,终止任务,抛出RejectedException
  • CallerRunsPolicy:在调用者线程执行当前任务,不抛异常
  • DiscardPolicy: 抛弃策略,直接丢弃任务,不抛异常
  • DiscardOldersPolicy:抛弃最老的任务,执行当前任务,不抛异常

使用Executor框架

实现Runnable接口或者Callable接口

主线程首先要创建实现Runnable接口或者Callable接口的任务对象。Executors可以把一个Runnable对象封装为一个Callable对象,如下

1
Executors.callable(Runnale task);

或者

1
Executors.callable(Runnable task, Object result);

把Runnable对象直接交给ExecutorService执行

1
ExecutorService.execute(Runnable command);

或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable接口,我们也可以创建FutureTask类,然后直接交给ExecutorService执行。  

1
ExecutorService.submit(Runnable task);

等待/取消任务执行

最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

线程池中的 Worker

Worker继承了AbstractQueuedSynchronizer和Runnable,前者给Worker提供锁的功能,后者执行工作线程的主要方法runWorker(Worker w)(从任务队列捞任务执行)。Worker 引用存在workers集合里面,用mainLock守护。

1
2
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();

核心函数runWorker

下面是简化的逻辑,注意:每个工作线程的run都执行下面的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
w.unlock();
}
processWorkerExit(w, completedAbruptly);
}

  1. 从getTask()中获取任务
  2. 锁住 worker
  3. 执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
  4. 运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
  5. 执行afterExecute(task, thrown);
  6. 解锁 worker
  7. 如果获取到的任务为 null,关闭 worker

获取任务getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

1
private final BlockingQueue<Runnable> workQueue;

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

  1. 现有的线程数量超过最大线程数量
  2. 线程池处于STOP状态
  3. 线程池处于SHUTDOWN状态且工作队列为空
  4. 线程等待任务超时,且线程数量超过保留线程数量

核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

1
2
3
4
timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

在以下两种情况下等待任务会超时:

  1. 允许核心线程等待超时,即allowCoreThreadTimeOut(true)
  2. 当前线程是普通线程,此时wc > corePoolSize

示例

获取线程的返回值

通过FutureTask包装一个Callable的实例,再通过Thread包装FutureTask的实例,然后调用Thread的start()方法,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest implements Callable {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Object call() throws Exception {
for (int i = 0; i <= 100; i++) {
//Do something what U want
atomicInteger.set(atomicInteger.get() + 1);
}
return atomicInteger.get();
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
FutureTask future = new FutureTask(new ThreadTest());
Thread thread = new Thread(future);
thread.start();
if (future.isDone()) {
future.cancel(true);
}
System.out.println(future.get());
}
}

通过ExecutorService执行线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest implements Callable {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Object call() throws Exception {
for (int i = 0; i <= 100; i++) {
//Do something what U want
atomicInteger.set(atomicInteger.get() + 1);
}
return atomicInteger.get();
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future submit = executorService.submit(new ThreadTest());
System.out.println(submit.get());
}
}

批量提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest<T> implements Callable {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Object call() throws Exception {
synchronized (this) {
for (int i = 0; i <= 100; i++) {
//Do something what U want
atomicInteger.set(atomicInteger.get() + 1);
}
TimeUnit.SECONDS.sleep(1);
}
return atomicInteger.get();
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<T>> tasks = new ArrayList<>();
ThreadTest threadTest = new ThreadTest();
for (int i = 0; i < 5; i++) {
tasks.add(threadTest);
}
//该方法返回某个完成的任务
Object o = executorService.invokeAny(tasks);
System.out.println(o);
System.out.println("One completed!");
long start = System.currentTimeMillis();
threadTest = new ThreadTest();
tasks.clear();
for (int i = 0; i < 5; i++) {
tasks.add(threadTest);
}
List<Future<T>> futures = executorService.invokeAll(tasks);
long end = System.currentTimeMillis();
System.out.println(end - start + " ms之后,返回运行结果!");
for (int i = 0; i < 5; i++) {
System.out.println(futures.get(i).get());
}
}
}

运行结果

1
184

One completed! 5001 ms之后,返回运行结果!

1
2
3
4
5
101
505
404
404
202

invokeAny方法提交所有任务到一个Callable对象的集合中,并且返回某个已经完成了的任务的结果,返回的任务是不确定的。invokeAll方法则返回所有任务的结果,但是其一个弊端是,如果第一个任务花费了很长时间,则不得不等待,待所有任务都完成之后,才返回。在某些情况下,可能只需要一个任务出了结果就可以中止所有任务,这样就得不偿失。将结果按照可获得的顺序保存起来可能更好,这时可以使用ExecutorCompletionService,其中的take()方法会移除下一个已经完成的结果(Future),如果没有可用结果则阻塞。

通过CompletionService提交多组任务并获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest implements Callable {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Object call() throws Exception {
synchronized (this) {
for (int i = 0; i <= 100; i++) {
//Do something what U want
atomicInteger.set(atomicInteger.get() + 1);
}
TimeUnit.SECONDS.sleep(1);
}
return atomicInteger.get();
}
@org.junit.Test
public void test() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletionService<Integer> completionService = new ExecutorCompletionService(executorService);
ThreadTest threadTest = new ThreadTest();
long start = System.currentTimeMillis();
for (int i = 0; i < 5; ++i) {
completionService.submit(threadTest);//提交五组任务
}
long end = System.currentTimeMillis();
System.out.println(end - start + " ms之后,返回运行结果!");
for (int i = 0; i < 5; ++i) {
Integer res = completionService.take().get();//通过take
System.out.println(res);
}
}
}

输出结果:

1 ms之后,返回运行结果!

1
2
3
4
5
101
202
303
404
505

由运行结果可知,ExecutorCompletionService并不会阻塞,在提交任务之后,继续向下运行,哪个任务完成即返回,并不受任务提交顺序的影响。

通过自维护列表管理多组任务并获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest implements Callable {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public Object call() throws Exception {
for (int i = 0; i <= 100; i++) {
//Do something what U want
atomicInteger.set(atomicInteger.get() + 1);
}
return atomicInteger.get();
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
List<Future> futures = new ArrayList<>();
ThreadTest threadTest = new ThreadTest();
for (int i = 0; i < 5; ++i) {
Future submit = executorService.submit(threadTest);
futures.add(submit);
}
Iterator<Future> iterator = futures.iterator();
while (iterator.hasNext()) {
Future next = iterator.next();
System.out.println(next.get());
}
}
}

采用自维护Future集合方法,submit的task不一定是按照加入自己维护的列表顺序完成的。从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。

而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 所以,先完成的必定先被取出。这样就减少了不必要的等待时间。 ## ScheduledExecutor任务调度 ScheduledExecutor提供了基于开始时间与重复间隔的任务调度,可以实现简单的任务调度需求。每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor才会真正启动一个线程,其余时间ScheduledExecutor都是在轮询任务的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorTest implements Runnable {
private String jobName = "";
public ScheduledExecutorTest(String jobName) {
super();
this.jobName = jobName;
}
@Override
public void run() {
System.out.println("execute " + jobName);
}
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
long initialDelay1 = 1;
long period1 = 1;
// 从现在开始1秒钟之后,每隔1秒钟执行一次job1
service.scheduleAtFixedRate(
new ScheduledExecutorTest("job" + period1), initialDelay1,
period1, TimeUnit.SECONDS);
long initialDelay2 = 2;
long period2 = 2;
// 从现在开始2秒钟之后,每隔2秒钟执行一次job2
service.scheduleWithFixedDelay(
new ScheduledExecutorTest("job" + period2), initialDelay2,
period2, TimeUnit.SECONDS);
}
}

运行结果

1
2
3
4
5
6
7
8
9
execute job1
execute job1
execute job2
execute job1
execute job1
execute job2
execute job1
execute job1
execute job2

取消向线程池提交的某个任务

在ExecutorService中提供了submit()方法,用于向线程池提交任务,该方法返回一个包含结果集的Future实例。而Future提供了cancel(boolean mayInterruptIfRunning)方法用于取消提交的运行任务,如果向该函数传递true,那么不管该任务是否运行结束,立即停止,如果向该函数传递false,那么等待该任务运行完成再结束之。同样Future还提供了isDone()用于测试该任务是否结束,isCancelled()用于测试该任务是否在运行结束前已取消。

关闭线程池

在ExecutorService中提供了shutdown()和List shutdownNow()方法用来关闭线程池,前一个方法将启动一次顺序关闭,有任务在执行的话,则等待该任务运行结束,同时不再接受新任务运行。后一个方法将取消所有未开始的任务并且试图中断正在执行的任务,返回从未开始执行的任务列表,不保证能够停止正在执行的任务,但是会尽力尝试。例如,通过Thread.interrupt()来取消正在执行的任务,但是对于任何无法响应中断的任务,都可能永远无法终止。

坚持原创技术分享,您的支持将鼓励我继续创作!
0%