为什么要使用线程池
- 重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高线程的可管理性,可进行统一的分配,调优和监控
线程池的处理流程
ExecutorService
newCachedThreadPool();
newFixedThreadPool();
newScheduledThreadPool();
newSingleThreadExecutor();
ScheduledThreadPool使用优先级队列进行排序(距离下次调度间隔短的任务排在前面)
DelayedWorkQueue
package lu.tool.demo;
import java.util.concurrent.*;
/**
* Created by xiaozi on 14-8-28.
*/
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
ThreadPoolExecutor tpe = (ThreadPoolExecutor) es;
// tpe.setMaximumPoolSize(100);
Future<?> future = null;
// for (int i = 1; i < 8000; i++) {
for (int i = 1; i < 100; i++) {
future = tpe.submit(new TaskDemo());
}
System.out.println("largest pool size: " + tpe.getLargestPoolSize());
System.out.println("task count: " + tpe.getTaskCount());
if (future != null) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// tpe.shutdown();
}
}
package lu.tool.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created by xiaozi on 14-8-28.
*/
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(100);
ThreadPoolExecutor tpe = (ThreadPoolExecutor) es;
for (int i = 1; i < 8000; i++) {
tpe.submit(new TaskDemo());
}
tpe.shutdown();
}
}
package lu.tool.demo;
import java.util.concurrent.*;
/**
* Created by xiaozi on 14-8-28.
*/
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService es = Executors.newScheduledThreadPool(100);
for (int i = 1; i < 8000; i++) {
es.schedule(new TaskDemo(), 3, TimeUnit.SECONDS);
}
// scheduleAtFixedRate
// scheduleWithFixedDelay
es.shutdown();
}
}
package lu.tool.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by xiaozi on 14-8-28.
*/
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 1; i < 8000; i++) {
es.submit(new TaskDemo());
}
es.shutdown();
}
}
ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
参数 | 解释 |
---|
corePoolSize | 线程池维护线程的最少数量 |
maximumPoolSize | 线程池维护线程的最大数量 |
keepAliveTime | 线程池维护线程所允许的空闲时间 |
unit | 线程池维护线程所允许的空闲时间的单位 |
workQueue | 线程池所使用的缓冲队列 |
handler | 线程池对拒绝任务的处理策略 |
package lu.tool.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by xiaozi on 14-8-29.
*/
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i < 8000; i++) {
tpe.submit(new TaskDemo());
}
tpe.shutdown();
}
}
阻塞队列
ArrayBlockingQueu // 有界
LinkedBlockingQueue // 无界
SynchronousQueue
PriorityBlockingQueue
线程池的监控
beforeExecute(Thread t, Runnable r)
afterExecute(Runnable r, Throwable t)
executor.getTaskCount();
executor.getCompletedTaskCount();
executor.getLargestPoolSize();
executor.isShutdown();
executor.isTerminated();
多少线程合适
- CPU密集型任务(ncpu + 1)
- I/O密集型任务(2 * ncpu)
- 任务的执行时间
- 任务的依赖性
阻塞队列的实现
锁
线程池 | 队列 | 锁 | |
---|
ScheduledThreadPool | DelayedWorkQueue | ReentrantLock.newCondition | available |
FixedThreadPool | LinkedBlockingQueue | ReentrantLock.newCondition | put, take |
CachedThreadPool | SynchronousQueue | ReentrantLock | |
SingleThread | LinkedBlockingQueue | ReentrantLock.newCondition | put, take |
引用
- 聊聊并发(三)——JAVA线程池的分析和使用
- 线程池
- Java四种线程池的使用
- 多线程之线程池探索
- 深入浅出多线程(4)对CachedThreadPool OutOfMemoryError问题的一些想法
- ScheduledThreadPoolExecutor实现原理
- JAVA线程池ThreadPoolExecutor
- JAVA线程池学习以及队列拒绝策略