java中的线程池

为什么要使用线程池

  1. 重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高线程的可管理性,可进行统一的分配,调优和监控

线程池的处理流程

threadpool.jpg

ExecutorService

newCachedThreadPool();
newFixedThreadPool();
newScheduledThreadPool();
newSingleThreadExecutor();

ScheduledThreadPool使用优先级队列进行排序(距离下次调度间隔短的任务排在前面)
DelayedWorkQueue


package lu.tool.demo;

import java.util.concurrent.*;

/**
 * Created by xiaozi on 14-8-28.
 */
public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        ThreadPoolExecutor tpe = (ThreadPoolExecutor) es;

//        tpe.setMaximumPoolSize(100);

        Future<?> future = null;
//        for (int i = 1; i < 8000; i++) {
        for (int i = 1; i < 100; i++) {
            future = tpe.submit(new TaskDemo());
        }

        System.out.println("largest pool size: " + tpe.getLargestPoolSize());
        System.out.println("task count: " + tpe.getTaskCount());

        if (future != null) {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

//        tpe.shutdown();
    }
}
package lu.tool.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Created by xiaozi on 14-8-28.
 */
public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(100);
        ThreadPoolExecutor tpe = (ThreadPoolExecutor) es;

        for (int i = 1; i < 8000; i++) {
            tpe.submit(new TaskDemo());
        }

        tpe.shutdown();
    }
}
package lu.tool.demo;

import java.util.concurrent.*;

/**
 * Created by xiaozi on 14-8-28.
 */
public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        ScheduledExecutorService es = Executors.newScheduledThreadPool(100);

        for (int i = 1; i < 8000; i++) {
            es.schedule(new TaskDemo(), 3, TimeUnit.SECONDS);
        }

        // scheduleAtFixedRate
        // scheduleWithFixedDelay

        es.shutdown();
    }
}
package lu.tool.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by xiaozi on 14-8-28.
 */
public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newSingleThreadExecutor();

        for (int i = 1; i < 8000; i++) {
            es.submit(new TaskDemo());
        }

        es.shutdown();
    }
}

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize,
                    int maximumPoolSize,
                    long keepAliveTime,
                    TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                    RejectedExecutionHandler handler);
参数解释
corePoolSize线程池维护线程的最少数量
maximumPoolSize线程池维护线程的最大数量
keepAliveTime线程池维护线程所允许的空闲时间
unit线程池维护线程所允许的空闲时间的单位
workQueue线程池所使用的缓冲队列
handler线程池对拒绝任务的处理策略
package lu.tool.demo;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by xiaozi on 14-8-29.
 */
public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(3, 5, 60,
                                                        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                                                        new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 1; i < 8000; i++) {
            tpe.submit(new TaskDemo());
        }

        tpe.shutdown();
    }
}

阻塞队列

ArrayBlockingQueu   // 有界
LinkedBlockingQueue // 无界
SynchronousQueue
PriorityBlockingQueue

线程池的监控

beforeExecute(Thread t, Runnable r)
afterExecute(Runnable r, Throwable t)

executor.getTaskCount();
executor.getCompletedTaskCount();

executor.getLargestPoolSize();

executor.isShutdown();  
executor.isTerminated();

多少线程合适

  1. CPU密集型任务(ncpu + 1)
  2. I/O密集型任务(2 * ncpu)
  3. 任务的执行时间
  4. 任务的依赖性

阻塞队列的实现

线程池队列
ScheduledThreadPoolDelayedWorkQueueReentrantLock.newConditionavailable
FixedThreadPoolLinkedBlockingQueueReentrantLock.newConditionput, take
CachedThreadPoolSynchronousQueueReentrantLock
SingleThreadLinkedBlockingQueueReentrantLock.newConditionput, take

引用

  1. 聊聊并发(三)——JAVA线程池的分析和使用
  2. 线程池
  3. Java四种线程池的使用
  4. 多线程之线程池探索
  5. 深入浅出多线程(4)对CachedThreadPool OutOfMemoryError问题的一些想法
  6. ScheduledThreadPoolExecutor实现原理
  7. JAVA线程池ThreadPoolExecutor
  8. JAVA线程池学习以及队列拒绝策略

标签: none

添加新评论