12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析

ScheduledThreadPoolExecutor 介绍

ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。

简单使用

public static void main(String[] args) throws InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

    //1. execute
    // 和普通线程池执行一样
    executor.execute(() -> {
        System.out.println("execute");
    });

    //2. schedule
    // 指定延迟时间,一次性执行任务
    executor.schedule(() -> {
        System.out.println("schedule");
    },2000,TimeUnit.MILLISECONDS);

    //3. AtFixedRate
    // 周期性执行任务(周期时间:执行时间和延迟时间的最大值)
    executor.scheduleAtFixedRate(() -> {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("at:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);

    //4. WithFixedDelay
    // 周期性执行任务(周期时间:执行时间 + 延迟时间)
    executor.scheduleWithFixedDelay(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("with:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);
}

核心内容

在这里插入图片描述

ScheduledFutureTask - 任务

ScheduledFutureTask 实现了 RunnableScheduledFuture 接口,间接的实现了 Delayed 接口,让任务可以放到延迟队列中,并且基于二叉堆做排序

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** 计数器,每个任务都有一个全局唯一的序号
        	如果任务的执行时间一模一样,比对sequenceNumber */
        private final long sequenceNumber;

        /** 任务执行的时间,单位是纳秒 */
        private long time;

        /* 
        * period == 0:表示一次性执行的任务 
        * period > 0:表示使用的是 scheduleAtFixedRate
        * period < 0:表示使用的是 scheduleWithFixedDelay 
        * */
        private final long period;

        /** 周期性执行任务时,引用具体任务,方便后面重新扔到阻塞队列 */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * Index into delay queue, to support faster cancellation.
         */
        int heapIndex;
		// 实现Delayed接口重写的方法,执行时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
		// 实现Delayed接口重写的方法,比较方式
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    }

DelayedWorkQueue - 队列

DelayedWorkQueue 包装了 RunnableScheduledFuture<?>[]

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
		// 初始长度
        private static final int INITIAL_CAPACITY = 16;
        // 数组
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        // 锁
        private final ReentrantLock lock = new ReentrantLock();
        // 长度
        private int size = 0;
		// 等待拿堆顶数据的线程
        private Thread leader = null;
		// Condition 队列
        private final Condition available = lock.newCondition();
}

源码分析

execute()

直接调用 schedule(),入参 delay = 0

public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

schedule()

延迟一段时间,执行一次任务

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        // 非空校验
        if (callable == null || unit == null)
            throw new NullPointerException();
        // 将任务封装成 ScheduledFutureTask 
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        // 延迟执行任务                               
        delayedExecute(t);
        return t;
    }


	// 返回当前任务要执行的系统时间
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

delayedExecute() 延迟执行任务

private void delayedExecute(RunnableScheduledFuture<?> task) {
		// 线程池状态不是RUNNING,就拒绝任务
        if (isShutdown())
            reject(task);
        else {
			// 将任务放入延迟队列中(二叉堆)
            super.getQueue().add(task);
			// 1.线程池状态
			// 2.根据策略决定是否能执行
			// 3.将任务从延迟队列移除
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
				// 是否需要创建线程
                ensurePrestart();
        }
    }
	
	
	
// periodic - true:代表是周期性执行的任务
// periodic - false:代表是一次性的延迟任务
boolean canRunInCurrentRunState(boolean periodic) {
	// 默认情况下,如果任务扔到了延迟队列中,有两个策略
    // 如果任务是周期性执行的,默认为false(continueExistingPeriodicTasksAfterShutdown)
    // 如果任务是一次性的延迟任务,默认为true(executeExistingDelayedTasksAfterShutdown)
	
	// 此时,周期性任务返回false,一次性任务返回true
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
    
}

// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
    // 重新拿到线程池的ctl
    int rs = runStateOf(ctl.get());
    // 如果线程池是RUNNING,返回true
    // 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}



// 是否需要创建线程
void ensurePrestart() {
    // 获取线程池中的工作线程个数。
    int wc = workerCountOf(ctl.get());
    // 如果工作线程个数,小于核心线程数,
    if (wc < corePoolSize)
        // 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
        addWorker(null, true);
    // 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
    else if (wc == 0)
        // 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
        addWorker(null, false);
}

scheduleAtFixedRate()

scheduleAtFixedRate 在包装 ScheduledFutureTask 时会将 period 设置为正数,代表固定周期执行

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
        sft.outerTask = t;
        // 延迟执行任务
        delayedExecute(t);
        return t;
    }

scheduleWithFixedDelay()

scheduleWithFixedDelay 在包装 ScheduledFutureTask 时会将 period 设置为负数,代表在执行任务完毕后,再计算下次执行的时间

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
        sft.outerTask = t;
        // 延迟执行任务
        delayedExecute(t);
        return t;
    }

run()

执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。

// 执行任务
public void run() {
    // 获取任务是否是周期执行
    // true:周期执行
    // false:一次的延迟执行
    boolean periodic = isPeriodic();
    // 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
    if (!canRunInCurrentRunState(periodic))
        // 取消任务
        cancel(false);
    else if (!periodic)
        // 当前任务是一次性的延迟执行。执行任务具体的run方法
        ScheduledFutureTask.super.run();
        // 周期性任务
    else if (ScheduledFutureTask.super.runAndReset()) {
    			// 计算下次任务运行时间
                setNextRunTime();
                // 重新将任务扔到延迟队列中
                reExecutePeriodic(outerTask);
            }
}


// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
    // 拿到当前任务的period
    long p = period;
    // period > 0:At
    if (p > 0)
        // 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
        time = time + p;
    else
        // period < 0:With
        // 任务执行完,拿当前系统时间计算下次执行的时间点
        time = now() + p;
}

// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 线程池状态的判断
    if (canRunInCurrentRunState(true)) {
        // 将任务扔到了延迟队列中
        super.getQueue().add(task);
        // 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 是否需要创建线程
            ensurePrestart();
    }
}

相关文章

学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习...
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面...
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生...
Can’t connect to local MySQL server through socket \'/v...
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 ...
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服...