相信很多人需要的是这个吧?copy完先别急着走,往下看
/**
* 线程池配置类
*/
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolConfig {
/**
* 工作线程数
* 核心线程数最好是一个固定值,因为如果配置成Runtime.getRuntime().availableProcessors() * 2,
* 很有可能引发这样一个错误:Failed to instantiate [java.util.concurrent.Executor]: .... nested exception is java.lang.IllegalArgumentException
* 因为线程池要求核心线程数不能大于最大线程数,当你的服务发布在核心数比较多的机器上时(比如你的maxSize配置的就是64,而Runtime.getRuntime().availableProcessors()>32时)
* 就会引发以上错误,导致无法启动
* (通常开发测试环境都没有线上的配置高,很可能发生测试没问题,但是在线上服务却启动不了的情况,
* 最最重要的是一旦发生了可能就要熬夜查问题(-_-。))。
*/
//private int coreSize = Runtime.getRuntime().availableProcessors() * 2;
private int coreSize = 8;
/**
* 最大工作线程数
*/
private int maxSize = 64;
/**
* 工作线任务队列
*/
private int queueCapacity = 1000;
@Bean("taskExecutor")
public Executor taskExecutor() {
//注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
//这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数10:线程池创建时候初始化的线程数
executor.setCorePoolSize(coreSize);
//最大线程数128:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
//maxPoolSize 当缓冲队列满了以后会增加活动线程数,直到MaxPoolSize;
executor.setMaxPoolSize(maxSize);
//缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(queueCapacity);
//允许线程的空闲时间30秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(30);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("taskExecutor");
//理线程池对拒绝任务的处策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
/*CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
DiscardPolicy:不能执行的任务将被删除
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心*/
executor.setRejectedExecutionHandler(rejectedExecutionHandler());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
/**
* 这个策略就是忽略缓冲队列限制,继续往里边塞
*/
public RejectedExecutionHandler rejectedExecutionHandler() {
return (r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public int getCoreSize() {
return coreSize;
}
public void setCoreSize(int coreSize) {
this.coreSize = coreSize;
}
public int getMaxSize() {
return maxSize;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
}
坑
ThreadPoolTaskExecutor的MaxPoolSize设计经常会让人产生误区,只有在等待队列满了以后才会增加core线程。感觉这点比较坑,还是数据库连接池的设计好一点。因为当队列满了以后core线程数再增长的话,很可能会来不及,这时就很容易触发拒绝策略。如果你的拒绝策略设置不当很可能带来灾难性的后果。这里推荐上面代码中的拒绝策略,在触发拒绝策略时直接强行塞入等待队列。
一个死锁问题的分析
现象:当同时送入线程池的任务大于配置core-size时,会引起程序死锁。
测试代码如下:
/**
* 创建i个nestedCall任务,送入线程池
*/
public static void main(String[] args) throws InterruptedException {
ThreadPoolTaskExecutor taskExecutor = taskExecutor();
for (int i = 0; i < 3; i++) {
int finalI = i;
taskExecutor.submit(() -> outerCall(taskExecutor, finalI));
}
System.out.println("task push over");
}
/**
* 先睡2s,再将一个任务送入线程池,再等待其完成
*/
public static void outerCall(Executor taskExecutor, int i){
try {
System.out.println("outer " + i);
TimeUnit.SECONDS.sleep(2);
CountDownLatch latch = new CountDownLatch(1);
taskExecutor.execute(() -> {
nestedCall(latch, i);
});
latch.await();
}catch (Exception e){
e.printStackTrace();
}
}
public static void nestedCall(CountDownLatch latch, int i){
System.out.println("nested " + i);
latch.countDown();
}
public static ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(12);
executor.setKeepAliveSeconds(30);
executor.setRejectedExecutionHandler(rejectedExecutionHandler());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
public static RejectedExecutionHandler rejectedExecutionHandler() {
return (r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
如果运行main方法会产生什么结果呢,如下:
task push over
outer 0
outer 1
outer 2
不会再有新的输出了,程序已经死锁。
为什么呢?
我们来看线程池里边发生的事情(数字的顺序可能是乱的,这里只是其中一种情况):
执行outerCall i=0
执行outerCall i=1
执行outerCall i=2
送入等待队列 nestedCall i=0
送入等待队列 nestedCall i=1
送入等待队列 nestedCall i=2
outerCall的执行占据了所有的活动线程(3个),等待nestedCall完成。然而nestedCall在当前线程池的等待队列中如果outerCall不退出,他永远也得不到执行的机会,死锁了。
结论
在使用java线程池的时候要特别注意最好不要有这种嵌套的任务。如果有这种嵌套使用线程池的情况的话一定要保证外层的线程不会等待内层线程完成再返回,或者为等待添加超时时间。
注意:本文归作者所有,未经作者允许,不得转载