超硬核!ThreadPoolExecutor线程池源码解析

本文阅读 19 分钟
首页 代码,Java 正文

 

目录

1 Executor & 概述

2 Executors中对线程池的实现

2.1 CachedThreadPool

2.2 FixedThreadPool

2.3 SingleThreadExecutor

2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1)

2.4 SingleThreadScheduledExrcutor

2.5 ScheduledThreadPool

5 ThreadPoolExecutor

6 线程池的工作流程

7 ThreadPoolExecutor 的执行方法

8 线程的拒绝策略

8.1 自定义拒绝策略

Executor是顶级接口。关于线程池的总览示意图如下图所示:

img

申请线程实例时会先从核心线程corePool中获取,如果核心线程满了之后线程会先加入到工作队列中,工作队列也满了的话也允许继续申请,直至maxnumPoolSize。之后会执行拒绝策略RejectedExecutionHandler。

ThreadFactory是worker中构建线程实例的工厂。

 

使用线程池的好处如下:

  • 可以复用线程、控制最大并发数。
  • 实现任务线程队列缓存策略和拒绝机制。
  • 实现如定时执行、周期执行等与时间相关的功能。
  • 隔离线程环境。
    - 比如,为交易服务和搜索服务分别开两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免线程间互相影响。
Executor executor = new ExecutorSubClass(); //线程池实现类
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executors 类为 Executor 提供了工厂方法。ExecutorService 是 Executor 接口的默认实现,下面是使用 ExecutorService 创建线程的几种方式。

 

2.1 CachedThreadPool

public static void main(String[] args) {
    ExecutorService service = Executors.newCachedThreadPool();
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}

CachedThreadPool 会为每个任务都创建一个线程。

ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定Executor类型。调用 shutDown 可以防止新任务提交给 ExecutorService,这个线程在 Executor 中所有任务完成后退出。

 

2.2 FixedThreadPool

FixedThreadPool 可以使用有限的线程集来启动多线程。

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(5);
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}

FixedThreadPool 可以一次性的预先执行高昂的线程分配,因此也就可以限制线程的数量。因为不必为每个任务都固定的付出创建线程的时间开销,所以可以节省时间。

 

2.3 SingleThreadExecutor

SingleThreadExecutor 就是线程数量为1的 FixedThreadPool,如果向SingleThreadPool一次性提交了多个任务,那么这些任务将会排队,所有的任务都将使用相同的线程。SingleThreadPool 会序列化所有提交给他的任务,并会维护一个隐藏的挂起队列。

public static void main(String[] args) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}

可以用 SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行。

 

2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1)

结合上面的介绍,自然会想到一个问题:既然已经有了newFixedThreadPool,为什么还要存在newSingleThreadExecutor这个方法?

结合jdk中的说明“Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.”。newSingleThreadExecutor和newFixedThreadPool(1)确实是有区别的,区别在于newSingleThreadExecutor返回的线程池保证不能被重新配置(重新调整线程池大小等)

对比源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}

二者及其相似,连ThreadPoolExecutor对象的参数值都一样的,只不过newFixedThreadPool返回了一个ThreadPoolExecutor对象,newSingleThreadExecutor返回了一个被FinalizableDelegatedExecutorService包装过的ThreadPoolExecutor对象,问题其实就出在FinalizableDelegatedExecutorService上。

  • 容量为1的FixedThreadPool的属性(容量等)可以通过将其强转为ThreadPoolExecutor而被重新进行配置;
  • SingleThreadPool实际是一个FinalizableDelegatedExecutorService类的对象,把诸如setCorePoolSize的方法给去掉了,并且该类没有继承任何可以配置线程池的类,因此可以保证它不能被再次配置。

 

2.4 SingleThreadScheduledExrcutor

创建一个可以周期性执行任务的单线程线程池。

public class TestMain {
    //格式化
    static SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //AtomicInteger用来计数
    static AtomicInteger number = new AtomicInteger();


    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        for (int i = 0; i < 3; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第" + number.incrementAndGet() + "周期线程运行当前时间【" + sim.format(new Date()) + "】");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, 3L, TimeUnit.SECONDS);
        }
        System.out.println("主线程运行当前时间【" + sim.format(new Date()) + "】");
    }
}

img

 

2.5 ScheduledThreadPool

ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。

ScheduledThreadPoolExecutor  scheduled = new ScheduledThreadPoolExecutor(2);
scheduled.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
            loge("time:");
      }
}, 0, 40, TimeUnit.MILLISECONDS);
//0表示首次执行任务的延迟时间,40表示每次执行任务的间隔时间,TimeUnit.MILLISECONDS执行的时间间隔数值单位

 

5.1 概述

上面说了那么多,其实都不是推荐的方法,阿里巴巴的编程手册中有这样的描述:

线程池不允许使用Executors去创建。

具体原因是:

  • fixedThreadPool 和 singleThreadExecutor 对于排队的队列没有数量限制,最大支持Integer.MAX_VALUE个;
  • cachedThreadPool 和 scheduledThreadPool 中最大线程数可以达到 Integer.MAX_VALUE 个;
  • 当线程过多的时候,这些方法就容易造成OOM了

img

img

当我们去看 Executors 的源码就会发现,Executors.newFixedThreadPool 、Executors.newSingleThreadPool 、Executors.newCachedThreadPool 、Executors.newScheduledThreadPool等方法的底层都是  ThreadPoolExecutor 实现的,其中执行周期任务得益于DelayedWorkedQueue的使用,而这些线程池又不是被推荐使用的,所以有必要好好研究一下 ThreadPoolExecutor 以便自定义线程池,这样才可以更加明确的线程池的运行规则,规避资源耗尽的风险。

 

ThreadPoolExecutor 的核心参数值得是他在构造时需要传递的参数,其构造参数如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • int corePoolSize: 线程池的常驻核心线程数
    -     如果设置为0,则表示在没有任何任务时,销毁线程池; -     如果大于0,即使没有任务时也会保证线程池的线程数量等于此值。

    需要注意的是,如果此值设置的比较小,则会频繁的创建和销毁线程。如果设置的比较大,则会浪费系统资源。

 

  • int maximumPoolSize:线程池最大可以创建的线程数

   官方规定此参数必须大于0,也必须大于等于 corePoolSize ,此值只有在任务比较多,且任务队列中已被存满时才会用到。    

  • long keepAliveTime : 线程的存活时间

  当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量等于corePoolSize。   如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候不会销毁任何线程。   

  • TimeUnit unit:存活时间的单位,是配合 keepAliveTime  参数共同使用的。

 

  • BlockingQueue<Runnable> workQueue : 线程池执行的任务队列

  当线程池中的所有线程都在处理任务时,如果来了新任务就会缓存到次任务队列中排队等待执行。   

  • ThreadFactory threadFactory:线程的创建工厂

    此参数一般用的较少,如果创建线程时不指定此参数,则会使用默认的现场创建工厂的方法来创建线程。

 

  • RejectedExecutionHandler handler :指定线程池的拒绝策略。

  当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执此任务时,就会用到此拒绝策略。

 

5.2 先从CAPACITY的初始化开始说起:

@Native public static final int SIZE = 32;
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;    // 1110 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;    // 0010 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;    // 0100 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;    // 0110 0000 0000 0000 0000 0000 0000

 COUNT_BITS 值为29,CAPACITY的计算如下图:

img

 为什么最后要 - 1,原因是和一下两个方法有关,获得运行状态和获得当前活动线程数:

// 获取运行状态 RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED
private static int runStateOf(int c)     { 
    // - CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000
    return c & ~CAPACITY; 
}

// 取出低29位的值,表示获得当前活动的线程数
private static int workerCountOf(int c)  { 
    // CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
    return c & CAPACITY; 
}

前三位是0,后29位是1的序列可以非常方便的取出c的前三位和后29位,即,runState和workerCount,是存储在一个叫ctl的变量中的。

 

5.3 线程池运行状态和活动线程数

img

RUNNING状态可以转换为SHUTDOWN或STOP状态,具体如下:

img

SHUTDOWN和STOP相当于一个中间状态,最终所有任务都停止了时会进入TIDYING状态。

 

5.4 上述线程池的构造函数

img

5.5 execute

执行流程如下图:

img

简化版的流程图:

img

对应的带注释的源代码如下:

public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();

            /** ctl记录着workCount和runState */
            int c = ctl.get();

            /** case1:如果线程池中的线程数量小于核心线程数,那么创建线程并执行 */
            if (workerCountOf(c) < corePoolSize) {  // workerCountof(c):获取当前活动线程数
                /**
                 * 在线程池中新建一个新的线程
                 * command:需要执行的Runnable线程
                 * true:新增线程时,【当前活动的线程数】是否 < corePoolSize
                 * false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
                 */
                if (addWorker(command, true))
                    return;
                // 添加新线程失败,则重新获取【当前活动的线程数】
                c = ctl.get();
            }

            /** 第二步:如果当前线程池是运行状态 且 任务添加到队列成功(即,case2:如果workCount >= corePoolSize) */
            if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中
                // 重新获取ctl
                int recheck = ctl.get();
                // 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚放入到workQueue队列中的command移除掉
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0){  // 如果【当前活动的线程数】为0,则执行addWorker方法
                    /**
                     * null:只创建线程,但不去启动
                     * false:添加线程时,根据maximumPoolSize来判断
                     *
                     * 如果workerCountOf(recheck) > 0,则直接返回,在队列中的command稍后会出队列并且执行
                     */
                    addWorker(null, false);
                }
                /**
                 * 第三步:满足以下两种条件之一,进入第三步判断语句
                 *  case1:线程池不是正在运行状态,即:isRunning(c)==false
                 *  case2:workCount>=corePoolSize 并且添加workQueue队列失败。即:workQueue.offer(command)== false
                 *  由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则进入方法体执行reject(command)
                 *  如果是true的话则和核心线程数进行比较
                 */
            }
            else if (!addWorker(command, false))
                reject(command);    // 执行线程创建失败的拒绝策略
        }

5.5.1 addWorker

execute中经常使用到的一个重要方法就是addWorker(),效果是在线程池中添加一个新的线程。

img

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 步骤1:试图将workerCount + 1
        for (;;) {
            int c = ctl.get();
            // 获得运行状态runState
            int rs = runStateOf(c);

            /**
             * 只有如下两种情况可以新增worker,继续执行下去:
             * case one:rs==RUNNING
             * case two:rs==SHUTDOWN && firstTask ==null&&!workQueue.isEmpty()
             */
            if (rs >= SHUTDOWN &&   // 非RUNNING状态。线程池异常,表示不再去接收新的线程任务了,返回false
                    /**
                     * 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
                     * case1:如果firstTask!=nul1,表示要添加新任务,则:新增worker失败,返回false。
                     * case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。则:新增worker失败,返回false
                     */
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            /**
             * 试图将workerCount + 1
             */
            for (;;) {
                // 获取当前线程池里的线程数
                int wc = workerCountOf(c);
                /**
                 * 满足如下任意情况,则新增worker失败,返回false
                 * case1:大于等于最大线程容量,即:536870911
                 * case2:当core是true时;>=  核心线程数
                 *        当core是false时:>= 最大线程数
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 当前工作线程数加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;    // 成功加1,则跳出retry标识的这两层for循环
                c = ctl.get();  // Re-read ctl
                // 如果线程数加1操作失败,则获取当前最新的线程池运行状态,来判断与rs是否相同,如果不同,则说明方法处理期间线程池运行状态发生了变化,里新获取最新runState
                if (runStateOf(c) != rs)
                    continue retry; // 跳出内层for循环,继续从第一个for执行
            }
        }

            /**
             * 步骤二:创建Worker,加入集合workers中,并启动Worker线程
             */
        boolean workerStarted = false;  // 用于判断新的worker实例是否已经开始执行Thread.start
        boolean workerAdded = false;    // 用于判断新的worker实例是否已经被添加到线程池
        Worker w = null;    // AQS.Worker
        try {
            // 创建Worker实例,每个Worker对象都会创建一个线程
            w = new Worker(firstTask);
            // 获取包含work的线程
            final Thread t = w.thread;
            if (t != null) {
                // 重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();    // 如果能获得全局mainlock锁,则执行,否则阻塞
                try {
                    // 获得线程池当前的运行状态runStatus
                    int rs = runStateOf(ctl.get());

                    /**
                     * 满足如下任意条件,即可向线程池中添加线程:
                     * case1:线程池状态为RUNNING。
                     * case2:线程池状态为SHUTDOWN并且firstTask为空。
                     */
                    if (rs < SHUTDOWN ||    // 只有rs=RUNNING才满足
                            (rs == SHUTDOWN && firstTask == null)) {    //  线程池关闭,传入线程任务为null
                        if (t.isAlive()) // 因为t是新构建的线程,还没有启动,所以如果是alive状态,说明已经被启动
                            throw new IllegalThreadStateException();
                        workers.add(w); // workers中保存线程池中存在的所有work实例集合
                        int s = workers.size();
                        if (s > largestPoolSize)    // largestPoolSize用于记录线程池中曾经存在的最大的线程数量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 开启线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)    // 如果没有开启线程
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker()中会取出当前队列中的第一个线程并调用start()方法开启img

其中线程 t 由以下代码获取

img

观察Worker的构造方法,使用 getThreadFactory工厂创建一个线程:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

线程中的run方法调用runWorker方法,对应上图绿色部分

/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

上图中标记为绿色的的runWorker和tryTerminate方法要着重分析一下。

 

5.5.2 tryTerminate*

img

 

 

5.5.3 runWorker*

执行流程如下图所示:

img

final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    /**
                     * 同时满足如下两个条件,则执行wt.interrupt()
                     * 1> 线程状态为STOP、TIDYING、TERMINATED或者(当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                     * 2> 当前线程wt是否被标记中断
                     */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

runWorker方法的核心是调用了getTask方法,即上图中绿色框部分。

 

5.5.4 getTask*

img

private Runnable getTask() {
            // 表示上次从阻塞队列中获取任务是否超时
            boolean timedOut = false;

            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);

                /**
                 * 同时满足如下两点,则线程池中工作线程数减1,并返回null
                 * 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
                 * 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空
                 */
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount(); // 线程池中工作线程数 - 1
                    return null;
                }

                int wc = workerCountOf(c);

                // timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
                // allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                /**
                 * 同时满足以下两种情况,则线程池中工作线程数减1并返回nul1:
                 * case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务
                 * case2:如果有效线程数大于1,或者阻塞队列为空。
                 */
                if ((wc > maximumPoolSize   // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小
                        || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))  // 线程池中工作线程数 - 1
                        return null;
                    // 如果 - 1失败,则循环重试
                    continue;
                }

                try {
                    // 如果需要超时控制,则通过阻塞队列的pol1方法进行超时控制,
                    // 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :   // pol1-->若队列为空,返回null
                            workQueue.take();   // take-->若队列为空,发生阻塞,等待元素
                    if (r != null)
                        return r;
                    // 如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

 

 

执行方法有两个,分别是 execute() 和 submit(),最主要的区别就是 submit() 方法可以接受线程池执行的返回值,而 execute() 不能接收返回值。

示例代码:

public static void main(String[] args) throws Exception{
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        // execute 使用例子
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("我是一个 execute");
            }
        });
        // submit 使用例子
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "我是一个 submit";
            }
        });
        System.out.println(future.get());
    }

另一个区别是  execute() 方法 属于 顶级接口 Executor 的方法 ,而  submit() 属于 子类接口 ExecutorService 的方法。

 

当线程池中的任务队列已满,再有任务来添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。

 

自带的拒绝策略有4种:

  • AbortPolicy :终止策略,线程池抛出一个异常并终止执行,是默认策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
  • CallerRunsPolicy :把任务交给当前线程执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
  • DiscardPolicy : 丢弃新进来的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
  • DiscardOldestPolicy :丢弃最早的任务(最先加入队列中的任务)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

8.1 自定义拒绝策略

自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写其 rejectedExecution 方法即可。

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                3,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)
                , new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("我是自定义的拒绝策略");
            }
        });

        for (int i = 0; i < 6; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }

1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;

2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);

3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;

4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

 

 

corePoolSize=10, maximumPoolSize=10,queueSize = 10

 

20个并发任务过来,有多少个活跃线程?

10个。corePoolSize  =  maximumPoolSize 定长线程池,corePoolSize先打满,queueSize也满

 

队列里面有几个线程?

10个。corePoolSize先打满,queueSize也满。

 

 

如果有21个并发队列过来呢?

corePoolSize先打满,queueSize也满还多了一个,这个时候如果是丢弃策略就丢弃。

 

 

corePoolSize=10, maximumPoolSize=20,queueSize = 10?

20个并发任务过来,有多少个活跃线程?

10个。corePoolSize打满,queueSize 也满

 

21个并发任务过来,有多少个活跃线程?

11个。corePoolSize打满,queueSize 也满还多一个,maximumPoolSize = 20,所以corePoolSize + 1此时活跃的为11个。

 

30个并发任务过来,有多少个活跃线程?

20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20,此时有20个活跃任务。

 

31个并发任务过来,有多少个活跃线程?

20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20还多一个,如果是丢弃策略,此时有20个活跃任务。

 

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://wangjiawei.blog.csdn.net/article/details/108685273
-- 展开阅读全文 --
安全面试之XSS(跨站脚本攻击)
« 上一篇 07-24

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复