juc的线程模型比较简单,篇幅较短,先补充一些线程相关的概念,水字数
public class Thread implements Runnable {private ThreadGroup group;public Thread() {init(null, null, "Thread-" + nextThreadNum(), 0);}private void init(ThreadGroup g, Runnable target, String name, long stackSize) {init(g, target, name, stackSize, null, true);}private void init(ThreadGroup g, Runnable target, String name,long stackSize, AccessControlContext acc,boolean inheritThreadLocals) {if (name == null) {throw new NullPointerException("name cannot be null");}this.name = name;Thread parent = currentThread();SecurityManager security = System.getSecurityManager();if (g == null) {/* Determine if it's an applet or not *//* If there is a security manager, ask the security managerwhat to do. */if (security != null) {g = security.getThreadGroup();}// 使用当前线程所在的组// 一般直接创建的线程都是如此if (g == null) {g = parent.getThreadGroup();}}/* checkAccess regardless of whether or not threadgroup isexplicitly passed in. */g.checkAccess();/** Do we have the required permissions?*/if (security != null) {if (isCCLOverridden(getClass())) {security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);}}g.addUnstarted();this.group = g;this.daemon = parent.isDaemon();this.priority = parent.getPriority();if (security == null || isCCLOverridden(parent.getClass()))this.contextClassLoader = parent.getContextClassLoader();elsethis.contextClassLoader = parent.contextClassLoader;this.inheritedAccessControlContext =acc != null ? acc : AccessController.getContext();this.target = target;setPriority(priority);if (inheritThreadLocals && parent.inheritableThreadLocals != null)this.inheritableThreadLocals =ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);/* Stash the specified stack size in case the VM cares */this.stackSize = stackSize;/* Set thread ID */tid = nextThreadID();}
}
// 这个类非常之熟悉
// 命名(Xxxs)中也可以看出:用于构造各种类型的调度器(Executor)
public class Executors {/*** The default thread factory*/static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();// 默认的线程组基本如此group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}// 线程工厂的接口方法// 从命名也可见一斑:newXxx()public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}
}
executor 不是线程池,好吧,这个命名规则源于命令行设计模式(Executor/command)
public class ThreadPoolExecutor extends AbstractExecutorService {// 全参构造public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 存活时间TimeUnit unit, // 存活时间对应的 时间单位BlockingQueue workQueue, // 任务的阻塞队列ThreadFactory threadFactory, // 创建任务线程的线程工厂RejectedExecutionHandler handler // 拒绝调度的处理器) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}// 调度逻辑在此public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 下面这段说明挺不戳的,懒得再复述一遍了/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// The main pool control state, ctl, is an atomic integer packing two conceptual fields// private static int ctlOf(int rs, int wc) { return rs | wc; }int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// private final HashSet workers = new HashSet();// worker:任务的封装类型,内部维护了线程执行方法// private boolean addWorker(Runnable firstTask, boolean core)if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// false 表明非核心线程数(最大线程数)else if (!addWorker(command, false))reject(command);}
}
上一篇:数据仓库的设计思想