每当我们需要使用线程的时候,直接创建一个线程并不是一件很麻烦的事情,但是如果我们频繁的创建、销毁线程,系统的效率会大大降低。
Java 提供了线程池,使得我们的线程在执行完任务之后可以不被销毁,而是继续执行或等待其他任务。线程池能够大大简化并发编程
ThreadPoolExecutor 详解
ThreadPoolExecutor 的完整构造方法签名:
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize - 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize - 线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程
keepAliveTime - 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit - keepAliveTime 参数的时间单位。
workQueue - 一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
1
2
3
4ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueuethreadFactory - 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等
handler - 表示当拒绝处理任务时的策略,有以下四种取值:
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- AbortPolicy:直接抛出异常。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- 也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
怎么理解无界队列和有界队列
有界队列
- 初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行 。
- 当提交的任务数超过了corePoolSize,会将当前的runable提交到一个block queue中。
- 有界队列满了之后,如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务。
- 如果3中也无法处理了,就会走到第四步执行reject操作。
无界队列
与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
ThreadPoolExecutor是Executors类的底层实现。
在JDK帮助文档中,有如此一段话:
“强烈建议程序员使用较为方便的Executors工厂方法
Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)
Executors.newFixedThreadPool(int)(固定大小线程池)
Executors.newSingleThreadExecutor()(单个后台线程)
ExecutorService 接口
ExecutorService
接口继承了 Executor
,并且在此基础上又提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成Future
的方法。可以关闭 ExecutorService
,使其无法接受新任务。ExecutorService
提供了两种终止方法:shutdown()
方法在终止前允许执行以前提交的任务,而 shutdownNow()
方法阻止等待任务的启动并试图停止当前正在执行的任务。
1 | ExecutorService exec = Executors.newCachedThreadPool(); |
常用线程池
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,若线程不够,则创建新线程。
工作特点:
- 工作线程数量几乎不受限制(小于Integer.MAX_VALUE),可以灵活的在线程池中添加线程。
- 如果长时间没有使用线程,且到了线程池的指定空闲时间(默认一分钟),线程池将会自动终止。终止后如果有新的任务提交,则线程池会重新创建一个工作线程。
newFixedThreadPool
创建一个指定线程数量的线程池。每当提交一个任务,如果工作线程的数量达到线程池设定的大小,则将任务存到队列中,直到有空闲线程能够执行该任务。newFixedThreadPool
线程池具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
newSingleThreadPool
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是==可保证顺序地执行各个任务==,并且在任意给定的时间不会有多个线程是活动的。
newScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
newWorkStealingPool
工作窃取线程池,当一个线程执行完自己的任务之后,可以从别的未完成的线程中窃取任务继续工作。==该线程池中的线程是守护线程==