Java并发编程:线程池的简单介绍

综合技术 2017-12-10

前言

Java从JDK1.5引入了java.uitl.concurrent包,在这个包中包含了一个Executor Framework,有关线程池的操作就位于该框架中。在没有引入线程池之前,在实际开发中为了提高系统性能会为每一个任务分配一个线程,实际上这种处理方式存在很大缺陷,尤其是在需要创建大量线程时。

  • 线程生命周期的开销非常高 。每一个线程在创建的时候都需要时间,并且需要JVM和操作系统提供一些辅助操作。
  • 资源消耗比较大 。活跃的线程会消耗系统资源,尤其是内存,如果正在运行的线程数量大于了处理器数量,那么会有大量线程闲置,不但会占用系统内存,同样也给垃圾回收带来压力,而且大量线程在竞争CPU资源时还会带来其它开销。如果已经有足够多的线程保持CPU在忙碌状态,这时候创建更多的线程反而会降低性能。这就好比在修建道路时就是为了应付同时可以并行4辆车,可是却突然由于节假日路上突然来了更多的车辆,反而造成了交通阻塞。
  • 稳定性降低 。有前面两个缺点自然会降低系统的稳定性,创建线程时开销大,在使用的时候消耗也大,再者又受平台操作系统线程数、JVM的启动参数以及Thread构造方法请求的栈的大小限制等,很容易导致系统内存溢出直接崩溃。

在一定的范围内,增加线程确实可以提高系统的吞吐量,但是如果超过了这个范围,反而会降低系统性能甚至造成整个应用崩溃。想要避免这种风险,就需要对创建的线程进行合理的控制,并且应该全面的测试整个应用,从而确保系统一直保持在较高的吞吐量而不会给系统造成负担。

Java中线程池的引入就是为了解决线程生命周期开销大以及资源不足的问题。在线程池中可以存放一定数量的线程,当有新任务创建需要线程时可以直接从线程池中取出,而不必重新创建,这样不仅可以消除创建线程所带来的时间延迟,而且可以减少频繁创建线程所带来的开销;另外可以给线程池设置一个合适的数量,防止存放过多线程消耗系统资源,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

线程池相关的类

首先看一下比较常用的线程池几个相关类的继承关系:

Executor接口

public interface Executor {
 
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

该接口只有一个方法,方法的入参是一个Runnable类型的对象。该接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用Executor并不是显示地创建一个线程。通常我们使用线程执行异步任务都是这样的:

new Thread(new Runnable() {
 public void run() {
 // do something
 }
}).start();

但是使用了Executor可以像下面这样执行异步任务:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

但是Executor并没有严格要求执行的任务一定要是异步的,在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {
 public void execute(Runnable r) {
 r.run();//这里并没有启动一个新的线程
 }
}

当然了,更常见的一种方式还是执行异步任务。

class ThreadPerTaskExecutor implements Executor {
 public void execute(Runnable r) {
 new Thread(r).start();
 }
}

在JDK中还给出了这样一种实现方式的示例,如何顺序的执行Executor。Android开发的人应该都很熟悉这种用法,这正是在Android3.0以后AsyncTask中默认实现的一种顺序执行任务的线程池。

class SerialExecutor implements Executor {
 final Queue tasks = new ArrayDeque();
 final Executor executor;
 Runnable active;
 
 SerialExecutor(Executor executor) {
 this.executor = executor;
 }
 
 public synchronized void execute(final Runnable r) {
 tasks.offer(new Runnable() {
 public void run() {
 try {
 r.run();
 } finally {
 scheduleNext();
 }
 }
 });
 if (active == null) {
 scheduleNext();
 }
 }
 
 protected synchronized void scheduleNext() {
 if ((active = tasks.poll()) != null) {
 executor.execute(active);
 }
 }
}

Executor接口很简单,它只是一个任务提交功能的抽象。如果想要实现线程池的所有功能只有Executor是不可能的,接下来我们看一下Executor的实现接口ExecutorService。

ExecutorService接口

ExecutorService也是一个接口,它对Executor的功能进行了扩展,增添了一些用于管理生命周期的方法。提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成Future的方法。ExecutorService的生命周期有3中状态:运行、关闭和已经终止,下面会对这3中状态相关的方法进行简单介绍。

首先需要明确一点,线程池并不是在执行submit()或者execute()方法时才进入运行状态,跟线程的状态不是一个概念,而是在初始创建时就进入了运行状态。当ExecutorService创建完毕才可以使用submit()方法或者execute()方法,如下是ExecutorService扩展的用于提交执行任务的方法:

 Future submit(Callable task);
 Future submit(Runnable task, T result);
 Future submit(Runnable task);

在父接口Executor中execute()方法没有返回值,所以这里定义了一个可以有返回值的submit()方法,这也是在使用线程池时一个很大的区别,如果想要获取返回结果直接使用submit()方法,否则可以使用execute()方法。

ExecutorService用于关闭线程池提供了两个方法,方法如下:

v

oid shutdown();
List shutdownNow();
boolean isShutdown()

方法的命名一般尽量做到简明释义,但是shutdown()方法并不像方法名看上去那样执行,shutdown()方法是一个平缓的关闭方法: 不再接受新任务,同时等待已经提交的任务执行完成-包括那些还未执行的任务。 shutdownNow()方法则是一个暴力的关闭方法: 它会尝试取消所有运行中的任务,并且不再执行队列中尚未开始执行的任务,返回从未开始执行的任务的列表 。isShutdown()方法在调用shutdown()方法后再次调用就会返回true。

最后看一下ExecutorService提供了有关线程池终止的方法:

boolean awaitTermination(long timeout,TimeUnit unit)throws InterruptedException
boolean isTerminated()

awaitTermination()方法JDK中是这样描述的,当请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到 所有任务完成执行 。如何来判断线程池是否已经终止了呢,这就是isTerminated()的作用了,如果关闭后 所有任务都已完成 ,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。

还有一些方法这里不列出了,AbstractExecutorService和ThreadPoolExecutor都涉及到线程池具体方法实现了,源码分析将在下一篇博文中再做介绍。下面继续介绍一下ThreadPoolExecutor构造方法。

ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,
   int maximumPoolSize,
   long keepAliveTime,
   TimeUnit unit,
   BlockingQueue workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler) {
 if (corePoolSize < 0 ||
 maximumPoolSize <= 0="" ||="" maximumpoolsize=""  
 

ThreadPoolExecutor有4个构造方法,其余3个方法都是调用的上面这个构造方法,下面我们队构造方法中参数进行介绍一下:

  • corePoolSize 核心线程数,当调用线程池的execute()或者submit()方法时,如果运行的线程数量小于corePoolSize,则创建新线程执行任务,即使其它线程是空闲的。corePoolSize是线程池一直维持的核心线程数量,即使线程池中线程处于空闲状态,这些线程也不会被关闭掉,除非设置了allowCoreThreadTimeOut。当线程池创建以后默认线程池中的线程数量是0,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,这两个方法是预创建线程的意思,即预创建corePoolSize个线程或者1个线程。
  • maximumPoolSize 线程池所能允许创建的最大数量的线程。
  • workQueue 等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize小于maximumPoolSize的时候,把该任务封装成一个Worker对象放入等待队列;在Executors工具类中使用了LinkedBlockingQueue和SynchronousQueue。
  • keepAliveTime 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit 是keepAliveTime时间单位,参数可以取TimeUnit类中有7种静态属性。
  • threadFactory 线程工程,执行创建新线程时所使用的工程ThreadFactory。
  • handler 拒绝执行器,当线程池中创建的线程数量大于maximumPoolSize并且workQueue已满时,再次提交新任务将会被拒绝,这时候具体的拒绝执行策略便是由handler担当的。

corePoolSize、maximumPoolSize以及workQueue

  • 如果线程池中的线程数量少于corePoolSize,就创建新的线程来执行新添加的任务;
  • 如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到workQueue中;
  • 如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
  • 如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来执行拒绝策略。

线程工厂ThreadFactory

ThreadFactory接口定义很简单,就一个方法:

public interface ThreadFactory {
 
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

根据需要创建新线程的对象。使用线程工厂就无需再手工编写对 new Thread 的调用了,从而允许应用程序使用特殊的线程子类、属性等等。

最简单的使用线程工程的方式如下:

class SimpleThreadFactory implements ThreadFactory {
   public Thread newThread(Runnable r) {
     return new Thread(r);
   }
 }

Executors.defaultThreadFactory()方法提供了更有用的简单实现,代码就不再这里贴出了。

线程池的拒绝执行策略

RejectedExecutionHandler定义了无法由ThreadPoolExecutor执行的任务的处理程序。ThreadPoolExecutor定义了4种拒绝处理策略处理器。

  • ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常;
  • ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常;
  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程);
  • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务 。

当然了如果上述4中拒绝策略都不满足需求,也可以根据需要自定义RejectedExecutionHandler。

线程池工具类Executors

ThreadPoolExecutor是线程池的核心类,它构造方法有很多参数可供选择,同时也提供了许多其它方法,那么在开发的时候开发者必须进行很好的衡量已确定参数的合理化。在JDK中该类的doc建议使用另外一个工具了Executors,改了提供了几个静态方法供调用。下面列出了几个常用的方法:

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
   60L, TimeUnit.SECONDS,
   new SynchronousQueue());
}
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
   0L, TimeUnit.MILLISECONDS,
   new LinkedBlockingQueue());
}

newCachedThreadPool()方法是将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,阻塞队列workQueue使用的SynchronousQueue,keepAliveTime存活时间设置为60,单位unit是秒。如果

newSingleThreadExecutor()方法corePoolSize和maximumPoolSize设置相等都为1,创建一个使用单个worker线程的Executor,以无界队列LinkedBlockingQueue方式来运行该线程。 如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务 。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

newFixedThreadPool()方法中corePoolSize和maximumPoolSize需要作为入参传入,并且值相等,也是以无界队列LinkedBlockingQueue方式来维护线程。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

自定义线程池

一般情况下我们直接使用Executors工具类提供的几个静态方法就可以满足需求了,但是在某些情况下可能还需要按照需求个性化配置ThreadPoolExecutor中的各个参数等,一般自定义线程池也就如下两种方式:

  1. 个性化配置ThreadPoolExecutor中的各个参数;
  2. 扩展线程池添加辅助功能。

个性化配置ThreadPoolExecutor中的各个参数

这里直接举一个AsyncTask中线程池配置的示例,自定义配置了corePoolSize、maximumPoolSize、keepAliveTime、workQueue以及threadFactory,几乎每一项都根据移动端特性进行的设置,这时候JDK中所提供的工具类很显然不能满足我们开发需要。

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
 
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
 private final AtomicInteger mCount = new AtomicInteger(1);
 
 public Thread newThread(Runnable r) {
 return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
 }
};
 
private static final BlockingQueue sPoolWorkQueue =
 new LinkedBlockingQueue(128);
 
/**
 * An {@link Executor} that can be used to execute tasks in parallel.
 */
public static final Executor THREAD_POOL_EXECUTOR;
 
static {
 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
 CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
 sPoolWorkQueue, sThreadFactory);
 threadPoolExecutor.allowCoreThreadTimeOut(true);
 THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

扩展线程池添加辅助功能日志记录或者统计工作

这里举一个扩展线程池添加日志记录或者统计工作的自定义线程池,代码摘自《Java并发编程实践》。

public class TimingThreadPool extends ThreadPoolExecutor  
{  
    private final ThreadLocal startTime = new ThreadLocal();  
    private final Logger log = Logger.getAnonymousLogger();  
    private final AtomicLong numTasks = new AtomicLong();  
    private final AtomicLong totalTime = new AtomicLong();  
      
    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,  
            BlockingQueue workQueue)  
    {  
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);  
    }  
  
    protected void beforeExecute(Thread t, Runnable r){  
        super.beforeExecute(t, r);  
        log.info(String.format("Thread %s: start %s", t,r));  
        startTime.set(System.nanoTime());  
    }  
      
    protected void afterExecute(Runnable r, Throwable t){  
        try{  
            long endTime = System.nanoTime();  
            long taskTime = endTime-startTime.get();  
            numTasks.incrementAndGet();  
            totalTime.addAndGet(taskTime);  
            log.info(String.format("Thread %s: end %s, time=%dns", t,r,taskTime));  
        }  
        finally  
        {  
            super.afterExecute(r, t);  
        }  
    }  
      
    protected void terminated()  
    {  
        try  
        {  
            log.info(String.format("Terminated: avg time=%dns",totalTime.get()/numTasks.get()));  
        }  
        finally  
        {  
            super.terminated();  
        }  
    }  
}

本篇文章先介绍到这里,后续再继续从源码层面介绍下ThreadPoolExecutor的任务提交与关闭逻辑。

责编内容by:SUNNY空间 (源链)。感谢您的支持!

您可能感兴趣的

全栈虚拟机GraalVM初体验 官方介绍 近日Oracle开源了一个实验性的产品GraalVM,官方称之为Universal GraalVM。它打通了不同语言之间的鸿沟,让我们可以进行混合式多语言编程。 在GraalVM之上,我们可以编写Java、Python、Ruby、R、Scala、Kotlin,甚至是C、C++语言。 ...
Cases and possibilities of using Java enums mutes&... I don't know if I'm the only one to know that, but the values of an enum are not implicitly final and can be modified. enum EnumTest { TOTO("TOT...
《数据结构与抽象:Java语言描述(原书第4版)》一第2章... 本节书摘来华章计算机《数据结构与抽象:Java语言描述(原书第4版)》一书中的第1章 ,第1.1节,[美]弗兰克M.卡拉诺(Frank M. Carrano) 蒂莫西M.亨利(Timothy M. Henry) 著 罗得岛大学 新英格兰理工学院 辛运帏 饶一梅 译 更多章节内容可以访问云栖社区...
Connecting to PostgreSQL in the java application e... I'm trying to connect to PostgreSQL server from java app engine. On my local machine code works perfectly, but when I deploy code to appengine, I get ...
The Java Eclipse and Command Line output differs Could there be any obvious reason why a Java program I run from the Eclipse IDE's output is different from what I get if I do the same with the comman...