以上是 JDK 提供的一些拒绝策略,这四个用的比较多的是第一种 AbortPolicy,也是默认拒绝策略。
而我们在业务开发过程中,往往会自定义线程池拒绝策略进行处理。
线程池拒绝策略接口
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
/** 线程池核心属性之一 ctl* 高3位表示当前线程池运行状态,低29位表示当前线程池中所拥有的线程数量* 是一个原子类 AtomicInteger*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));/** 表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位* Integer.SIZE = 32 => 32 - 3 = 29 表示低29位用来存放当前线程数量的位*/private static final int COUNT_BITS = Integer.SIZE - 3;/** 表示低29位能表示的最大的线程数 就是 1 << 29 - 1 (大概是5亿多)* CAPACITY = 000 11111111111111111111111111111*/private static final int CAPACITY = (1 << COUNT_BITS) - 1;/** 下面的表示线程池的5种状态* 状态从上到下依次递增*/// 111 00000000000000000000000000000(二进制) 转换成10进制是一个负数private static final int RUNNING = -1 << COUNT_BITS;// 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS;// 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS;// 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS;// 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;/** 获取当前线程池的运行状态* CAPACITY = 000 11111111111111111111111111111 取反后 => ~CAPACITY = 111 00000000000000000000000000000* 因为要进行一个&运算,而~CAPACITY的值是固定的,根据这个值并且我们知道ctl的高三位* 表示线程池的运行状态,所以进行&运算后就能获取到ctl的高三位的状态,即线程池的状态* c = ctl = 111 00000000000000000000000000111(表示当前线程池RUNNING状态 并且有7个线程)* &* 111 00000000000000000000000000000* =* 111 00000000000000000000000000000(最终只会保留高三位 即线程池的状态)*/private static int runStateOf(int c) { return c & ~CAPACITY; }/** 获取当前线程池的线程数量* CAPACITY = 000 11111111111111111111111111111* 这个值跟ctl进行&运算,取出ctl的低29位的值,即表示获取线程池中的线程数量*/private static int workerCountOf(int c) { return c & CAPACITY; }/* * 用在重置当前线程池ctl值时会用到 * rs 表示线程池状态, wc表示当前线程池中worker(线程)数量* |表示的就是不进位加法 表示的就是通过rs 和 wc重新构建一个ctl* 111 000000000000000000* 000 000000000000000111* 111 000000000000000111*/private static int ctlOf(int rs, int wc) { return rs | wc; }/** 表示当前线程池ctl所表示的状态是否小于某个状态s* RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED*/private static boolean runStateLessThan(int c, int s) {return c < s;}/** 表示当前线程池ctl所表示的状态是否大于等于某个状态s*/private static boolean runStateAtLeast(int c, int s) {return c >= s;}/** 判断线程池是否处于RUNNING状态* 小于SHUTDOWN的状态一定是RUNNING状态 */private static boolean isRunning(int c) {return c < SHUTDOWN;}
/* * 使用CAS的方式让 ctl值+1,成功返回true 失败返回false* 即尝试添加一个线程(Worker实际上就是工作者线程)*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/** 使用CAS的方式让 ctl值-1,成功返回true 失败返回false* 即尝试干掉一个线程(Worker实际上就是工作者线程)*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/** 将ctl的值-1, 这个方法一定成功,使用的是 自旋 + CAS 的方式保证*/private void decrementWorkerCount() {do {} while (!compareAndDecrementWorkerCount(ctl.get()));}
/** 线程池全局锁* 增加worker(线程) 减少worker 时需要持有mainLock,修改线程池运行状态时也需要*/private final ReentrantLock mainLock = new ReentrantLock();// 线程池中真正存到 worker(Thread)的地方 工作者集合private final HashSet workers = new HashSet();/** 条件队列* 当外部线程调用awaitTermination()方法时,外部线程会阻塞等待当前线程池状态为Termination为止* 底层类似AQS的原理,等待就是将当前线程封装成一个Node,然后进入Condition的等待队列中(线程被park)。当线程池状态变为termination时,* 会通过调用termination.signalAll()将这些线程全部唤醒,进入到阻塞队列中(AQS),继续去争抢锁(每次只有头节点可以获得锁)* 抢占到的线程,会继续执行awaitTermination() 后面程序。这些线程最后,都会正常执行。* 简单理解:termination.await() 会将线程阻塞在这* termination.signalAll() 会将阻塞在这的线程依次唤醒*/private final Condition termination = mainLock.newCondition();// 记录线程池生命周期内,线程数的最大值private int largestPoolSize;// 记录线程池所完成的任务总数,当一个worker退出时,会将worker完成的任务累加到这个属性中private long completedTaskCount;/* * 线程池7大核心参数之一:任务队列:BlockingQueue(阻塞队列)是一个接口* 当线程池中的正在工作的线程达到核心线程数时,这时再提交的任务会直接放到workQueue中* 常用的实现类有基于数组的阻塞队列 ArrayBlockingQueue* 基于链表的阻塞队列 LinkedBlockingQueue */private final BlockingQueue workQueue;/** 线程池的7大参数之一,线程的创建工厂,创建线程时会使用,是一个接口* 当我们使用 Executors.newFix... newCache... 创建线程池时,使用的是 DefaultThreadFactory* 一般不推荐使用默认的实现类DefaultThreadFactory,推荐自己实现ThreadFactory*/private volatile ThreadFactory threadFactory;/** 线程池7大核心参数之一,拒绝策略,是一个接口,有四种实现,默认是直接丢弃并抛出异常* DiscardOldestPolicy ---> 丢弃队列中最老(最先入队)的任务* AbortPolicy ---> 直接丢弃新来的任务 抛出异常 (默认的)* CallerRunsPolicy ---> 直接调用run方法,相当于同步方法* DiscardPolicy ---> 直接丢弃新来的任务 不抛出异常*/private volatile RejectedExecutionHandler handler;/** 线程池7大核心参数之一:空闲线程存活时间* 当allowCoreThreadTimeOut为false时,只有当非核心线程空闲时间达到指定时间时才会被回收* 当allowCoreThreadTimeOut为true时,线程池内所有的线程到达指定的时间均会被回收* 此参数常常和 TimeUnit一起使用,指定超时时间的单位(也是线程池的7大核心参数之一) */private volatile long keepAliveTime;// 控制线程池内核心线程空闲时间达到指定时间时能否被回收 true 可以 false不可以private volatile boolean allowCoreThreadTimeOut;/** 线程池7大核心参数之一:核心线程数*/private volatile int corePoolSize;/** 线程池7大核心参数之一:最大线程数*/private volatile int maximumPoolSize;// 缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
/** Worker采用了AQS的 独占 模式* 独占模式:两个重要属性 state 和 ExclusiveOwnerThread* state:0时表示表示未被占用,> 0时表示被占用,< 0时表示初始状态,这种情况下不能被抢锁,只有等于0时才能尝试去抢锁* ExclusiveOwnerThread表示独占(抢到)锁的线程*/ private final class Workerextends AbstractQueuedSynchronizer // 是AQS的子类implements Runnable // 实现了Runnable接口{private static final long serialVersionUID = 6138294804551838833L;// worker内部封装的工作线程final Thread thread;// 假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到队列中去获取下一个任务 Runnable firstTask;// 记录当前worker所完成的任务数量volatile long completedTasks;/** 构造器 传来的Runnable任务可以为null,firstTask为null的线程启动后会去队列中获取任务*/Worker(Runnable firstTask) {// 设置AQS独占模式为初始化中状态,这个时候不能被抢占锁setState(-1); // 为内部的firstTask赋值this.firstTask = firstTask; /** 使用线程工厂创建了一个线程,并且将当前worker指定为Runnable,也就是说当thread启动的时候会议worker.run为入口*/this.thread = getThreadFactory().newThread(this);}/** 当worker启动时,会执行run()方法 当前的这个Worker就是一个任务(Runnable)* 底层调用runWorker()直接将this传入了*/public void run() {// 直接将当前对象传入进行执行// ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入runWorker(this);}/** 判断当前worker的独占锁是否被占用* state为0 表示为被占用* state为1 表示被占用*/protected boolean isHeldExclusively() {return getState() != 0;}// 尝试去占用worker的独占锁 // 返回值 表示是否抢占成功protected boolean tryAcquire(int unused) {// CAS的方式,将state设置为1,尝试抢占锁if (compareAndSetState(0, 1)) {// CAS成功,成功抢到锁,则将exclusiveOwnerThread设置为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}/* * 尝试释放锁 * 外部不会直接调用这个方法,这个方法是 AQS内调用的* 外部调用unlock时,unlock -> AQS.release -> tryRelease (模板方法模式)*/protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}// 加锁,加锁失败时会阻塞当前线程(类似ReentarntLock),直到获取到锁public void lock() { acquire(1); }/** 尝试去加锁,如果锁是未被持有状态,那么加锁成功后会返回true* 否则 不会阻塞当前线程 直接返回false*/ public boolean tryLock() { return tryAcquire(1); }/** 释放锁* 一般情况下,咱们调用unlock 要保证 当前线程是持有锁的* 特殊情况,当worker的 state == -1 时,调用unlock 表示初始化state 设置state == 0* 启动worker之前会先调用unlock()这个方法 会强制刷新ExclusiveOwnerThread == null state==0 之后看源码就明白了。*/public void unlock() { release(1); }// 返回当前worker的lock是否被占用public boolean isLocked() { return isHeldExclusively(); }// 回头再说void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
/** 7个参数的构造方法 传入7大核心参数,为内部属性赋值*/ public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 空闲线程存活时间TimeUnit unit, // 时间单位 seconds nano..BlockingQueue workQueue, // 阻塞(任务)队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler) { // 拒绝策略// 判断参数是否合法if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// 工作队列 和 线程工厂 和 拒绝策略 都不能为空if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();// 为属性赋值this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
参考
- 视频参考
- b站_小刘讲源码付费课
- 文章参考
- shstart7_线程池源码解析2.工作原理与内部结构
- 肆华_线程池阅读理解