再接上回
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// step into ...// 这里的group()所返回的是ServerBootstrap构建过程中bossGroupChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)// 为什么是这个类?// 因为:我们是通过group()返回的bossGroup调用的,通过继承关系可知@Overridepublic ChannelFuture register(Channel channel) {// step into next() ...return next().register(channel);}// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)@Overridepublic EventLoop next() {// step into ...// 注意此时的类型: bossGroup 此时以 EventLoop的身份调用父类的next()return (EventLoop) super.next();}// io.netty.util.concurrent.MultithreadEventExecutorGroup#next@Overridepublic EventExecutor next() {// 先停下脚本,了解一下 chooser,都是很简单的东西 ...// 我们能知道的是: 构建过程chooser是缺省的,所以关注默认的chooser即可return chooser.next();}
这一块的算法比较简单,不多复述
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {// 线程调度器的数量来决定使用哪种实现if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {// Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.// The 64-bit long solves this by placing the overflow so far into the future, that no system// will encounter this in practice.private final AtomicLong idx = new AtomicLong();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];}}
}