文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

详细图解Netty Reactor启动全流程

2024-12-02 05:47

关注

本系列Netty源码解析文章基于 4.1.56.Final版本。

大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。

在上篇文章​​《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》​​中我们详细介绍了Netty服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:

现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。

我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组:bossGroup,workerGroup后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap了。

public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//从Reactor线
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//从Reactor
.channel(NioServerSocketChannel.class)//主Reactor的channel
.option(ChannelOption.SO_BACKLOG, 100)//主Reactor中channel的option
.handler(new LoggingHandler(LogLevel.INFO))//主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer>() {//从Reactor册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 听accept
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

在上篇文章中我们对代码模板中涉及到ServerBootstrap的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。

ServerBootstrap类其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:


主ReactorGroup中的MainReactor管理的Channel类型为NioServerSocketChannel,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel,然后采用round-robin轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端NioSocketChannel进行绑定。

从ReactorGroup中的SubReactor管理的Channel类型为NioSocketChannel,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有NioSocketChannel上的IO事件。

不管是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel,每个Channel实例都会有一个Pipeline,Pipeline中有多个ChannelHandler用于编排处理对应Channel上感兴趣的IO事件。

ServerBootstrap结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap的源码结构:

ServerBootstrap

ServerBootstrap的继承结构比较简单,继承层次的职责分工也比较明确。

ServerBootstrap主要负责对主从Reactor线程组相关的配置进行管理,其中带child前缀的配置方法是对从Reactor线程组的相关配置管理。从Reactor线程组中的Sub Reactor负责管理的客户端NioSocketChannel相关配置存储在ServerBootstrap结构中。

父类AbstractBootstrap则是主要负责对主Reactor线程组相关的配置进行管理,以及主Reactor线程组中的Main Reactor负责处理的服务端ServerSocketChannel相关的配置管理。

一、 配置主从Reactor线程组

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//从Reactor
public class ServerBootstrap extends AbstractBootstrap, ServerChannel> {
//Main Reactor线
volatile EventLoopGroup group;
//Sub Reactor线
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//主Reactor线
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}

二、 配置服务端ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap, ServerChannel> {
//建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}

在向ServerBootstrap配置服务端ServerSocketChannel的channel方法中,其实是创建了一个ChannelFactory工厂实例ReflectiveChannelFactory,在Netty服务端启动的过程中,会通过这个ChannelFactory去创建相应的Channel实例。

我们可以通过这个方法来配置netty的IO模型,下面为ServerSocketChannel在不同IO模型下的实现:

EventLoopGroupReactor线程组在不同IO模型下的实现:

我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在Netty中完成对IO模型的切换。

1、 ReflectiveChannelFactory

public class ReflectiveChannelFactory extends Channel> implements ChannelFactory> {
//NioServerSocketChannelde
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//取NioServerSocketChannel
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//建NioServerSocketChannel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

从类的签名我们可以看出,这个工厂类是通过泛型加反射的方式来创建对应的Channel实例。

注意这时只是配置阶段,NioServerSocketChannel此时并未被创建。它是在启动的时候才会被创建出来。

三、为NioServerSocketChannel配置ChannelOption

ServerBootstrap b = new ServerBootstrap();
//被MainReactor的NioServerSocketChannel的Socket
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap extends AbstractBootstrap, C>, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption
private final Map<?>, Object> options = new LinkedHashMap<?>, Object>();
public > B option(ChannelOption> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}

无论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel它们的相关底层Socket选项ChannelOption配置全部存放于一个Map类型的数据结构中。

由于客户端NioSocketChannel是由从Reactor线程组中的Sub Reactor来负责处理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前缀开头。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap, ServerChannel> {
//端SocketChannel的ChannelOption
private final Map<?>, Object> childOptions = new LinkedHashMap<?>, Object>();
public > ServerBootstrap childOption(ChannelOption> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}

相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。

public class ChannelOption> extends AbstractConstant>> {
................................
public static final ChannelOption> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption> SO_TIMEOUT = valueOf("SO_TIMEOUT");
................................
}

四、为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler

   //serverSocketChannel中pipeline的handler(是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}

向NioServerSocketChannel中的Pipeline添加ChannelHandler分为两种方式:

关于ChannelInitializer后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer是一种特殊的ChannelHandler,用于初始化pipeline。适用于向pipeline中添加多个ChannelHandler的场景。

  ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//从Reactor
.channel(NioServerSocketChannel.class)//主Reactor的channel
.handler(new ChannelInitializer>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})

隐式添加ServerBootstrapAcceptor是由Netty框架在启动的时候负责添加,用户无需关心。

在本例中,NioServerSocketChannel的PipeLine中只有两个ChannelHandler,一个由用户在外部显式添加的LoggingHandler,另一个是由Netty框架隐式添加的ServerBootstrapAcceptor。

其实我们在实际项目使用的过程中,不会向netty服务端NioServerSocketChannel添加额外的ChannelHandler,NioServerSocketChannel只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler只是为了向大家展示ServerBootstrap的配置方法。

五、 为客户端NioSocketChannel中的Pipeline配置ChannelHandler

  final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer>() {//从Reactor册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
    //socketChannel中pipeline理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}

向客户端NioSocketChannel中的Pipeline里添加ChannelHandler完全是由用户自己控制显式添加,添加的数量不受限制。

由于在Netty的IO线程模型中,是由单个Sub Reactor线程负责执行客户端NioSocketChannel中的Pipeline,一个Sub Reactor线程负责处理多个NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就会影响Sub Reactor线程执行其他NioSocketChannel上的Pipeline,从而降低IO处理效率,降低吞吐量。

所以Pipeline中的ChannelHandler不易添加过多,并且不能再ChannelHandler中执行耗时的业务处理任务。

在我们通过ServerBootstrap配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel的pipeline中添加ChannelHandler,还是向客户端NioSocketChannel的pipeline中添加ChannelHandler,当涉及到多个ChannelHandler添加的时候,我们都会用到ChannelInitializer,那么这个ChannelInitializer究竟是何方圣神,为什么要这样做呢?我们接着往下看。

ChannelInitializer

首先ChannelInitializer它继承于ChannelHandler,它自己本身就是一个ChannelHandler,所以它可以添加到childHandler中。

其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。

那为什么不直接添加ChannelHandler而是选择用ChannelInitializer呢?

这里主要有两点原因:

当客户端NioSocketChannel注册到对应的Sub Reactor上后,紧接着就会初始化NioSocketChannel中的Pipeline,此时Netty框架会回调ChannelInitializer#initChannel执行用户自定义的添加逻辑。

public abstract class ChannelInitializer extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//当channelRegister用initChannel化pipeline
if (initChannel(ctx)) {
................................
} else {
................................
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//单NioSocketChannel
initChannel((C) ctx.channel());
} catch (Throwable cause) {
................................
} finally {
................................
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
................................
}

这里由netty框架回调的ChannelInitializer#initChannel方法正是我们自定义的添加逻辑。

   final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer>() {//从Reactor册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

到此为止,Netty服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap启动辅助类中。

接下来要做的事情就是服务端的启动了。

// Start the server. 听accept
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty服务端的启动

经过前面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。

如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)函数中。

接下来我们看一下Netty服务端在启动过程中究竟干了哪些事情?

大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。

我们先来从netty服务端启动的入口函数开始我们今天的源码解析旅程:

    public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//验Netty
validate();
//
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//册ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor注....,
} else {
//向regFuture加operationComplete
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {

........serverSocketChannel向Main Reactor注....,
});
return promise;
}
}

Netty服务端的启动流程总体如下:

当netty服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor开始运转。

接下来,我们就来看下Netty源码是如何实现以上步骤的。

1、initAndRegister

  final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//建NioServerSocketChannel
//ReflectiveChannelFactory通的channel
channel = channelFactory.newChannel();
//化NioServerSocketChannel
init(channel);
} catch (Throwable t) {
...............................
}
//向MainReactor册ServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
...............................
return regFuture;
}

从函数命名中我们可以看出,这个函数主要做的事情就是首先创建NioServerSocketChannel,并对NioServerSocketChannel进行初始化,最后将NioServerSocketChannel注册到Main Reactor中。

1.1 创建NioServerSocketChannel

还记得我们在介绍ServerBootstrap启动辅助类配置服务端ServerSocketChannel类型的时候提到的工厂类ReflectiveChannelFactory吗?

因为当时我们在配置ServerBootstrap启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体ServerSocketChannel的时机。

所以Netty通过工厂模式将要创建的ServerSocketChannel的类型(通过泛型指定)以及 创建的过程(封装在newChannel函数中)统统先封装在工厂类ReflectiveChannelFactory中。

ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel。

等待创建时机来临,我们调用保存在ServerBootstrap中的channelFactory直接进行创建。

public class ReflectiveChannelFactory extends Channel> implements ChannelFactory> {
private final Constructor<? extends T> constructor;
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

下面我们来看下NioServerSocketChannel的构建过程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//SelectorProvider(建Selector和Selectable Channels)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//建JDK NIO ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//ServerSocketChannel相
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) {
//类AbstractNioChannel存JDK NIO原生ServerSocketChannel件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中于Channel的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

}

NioServerSocketChannelConfig没什么重要的东西,我们这里也不必深究,它就是管理NioServerSocketChannel相关的配置,这里唯一需要大家注意的是这个用于Channel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator,我们后面在介绍Netty如何接收连接的时候还会提到。

NioServerSocketChannel的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下NioServerSocketChannel的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是Channel的核心信息,所以有必要了解一下。

在NioServerSocketChannel的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。

其中AbstractNioMessageChannel类主要是对NioServerSocketChannel底层读写行为的封装和定义,比如accept接收客户端连接。这个我们后续会介绍到,这里我们并不展开。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
//JDK NIO生Selectable Channel
private final SelectableChannel ch;
// Channel监 是SelectionKey.OP_ACCEPT事
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//置Channel 合IO
ch.configureBlocking(false);
} catch (IOException e) {
.............................
}
}
}

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//channel是如ServerSocketChannel SocketChannel的 parent
private final Channel parent;
//channel全一ID machineId+processId+sequence+timestamp+random
private final ChannelId id;
//unsafe用层socket
private final Unsafe unsafe;
//为channel的pipeline于IO
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用对Channel
unsafe = newUnsafe();
//为channel的pipeline于IO
pipeline = newChannelPipeline();
}
}
   private DefaultChannelId() {
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
}

Unsafe为Channel接口的一个内部接口,用于定义实现对Channel底层的各种操作,Unsafe接口定义的操作行为只能由Netty框架的Reactor线程调用,用户线程禁止调用。

interface Unsafe {       
//的Buffer
RecvByteBufAllocator.Handle recvBufAllocHandle();
//
SocketAddress localAddress();
//
SocketAddress remoteAddress();
//channel向Reactor
void register(EventLoop eventLoop, ChannelPromise promise);
//
void bind(SocketAddress localAddress, ChannelPromise promise);
//
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//闭channle
void close(ChannelPromise promise);
//
void beginRead();
//
void write(Object msg, ChannelPromise promise);
}

ChannelHandlerContext保存 ChannelHandler上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。

这里只是为了让大家简单理解pipeline的一个大致的结构,后面会写一篇文章专门详细讲解pipeline。

  protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}

到了这里NioServerSocketChannel就创建完毕了,我们来回顾下它到底包含了哪些核心信息。

1.2 初始化NioServerSocketChannel

  void init(Channel channel) {
//向NioServerSocketChannelConfig置ServerSocketChannelOption
setChannelOptions(channel, newOptionsArray(), logger);
//向netty的NioServerSocketChannel置attributes
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
//从Reactor线
final EventLoopGroup currentChildGroup = childGroup;
//端NioSocketChannel的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
//端SocketChannel的channelOption及attributes
final Entry<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel的pipeline化ChannelHandler
p.addLast(new ChannelInitializer>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//LoggingHandler
pipeline.addLast(handler);
}
//的acceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

Netty自定义的SocketChannel类型均继承AttributeMap接口以及DefaultAttributeMap类,正是它们定义了ChannelAttributes。用于向Channel添加用户自定义的一些信息。

这个ChannelAttributes的用处大有可为,Netty后边的许多特性都是依靠这个ChannelAttributes来实现的。这里先卖个关子,大家可以自己先想一下可以用这个ChannelAttributes做哪些事情?

问题来了,这里为什么不干脆直接将ChannelHandler添加到pipeline中,而是又使用到了ChannelInitializer呢?

其实原因有两点:

初始化NioServerSocketChannel中pipeline的时机是:当NioServerSocketChannel注册到Main Reactor之后,绑定端口地址之前。

前边在介绍ServerBootstrap配置childHandler时也用到了ChannelInitializer,还记得吗?

问题又来了,大家注意下ChannelInitializer#initChannel方法,在该初始化回调方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor为什么不是直接添加而是封装成异步任务呢?

这里先给大家卖个关子,笔者会在后续流程中为大家解答。

此时NioServerSocketChannel中的pipeline结构如下图所示:

1.3 向Main Reactor注册NioServerSocketChannel

从ServerBootstrap获取主Reactor线程组NioEventLoopGroup,将NioServerSocketChannel注册到NioEventLoopGroup中。

ChannelFuture regFuture = config().group().register(channel);

下面我们来看下具体的注册过程:

主Reactor线程组中选取一个Main Reactor进行注册

    @Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
//
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//询round-robin择Reactor
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}

Netty通过next()方法根据上篇文章​​《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》​​提到的channel到reactor的绑定策略,从ReactorGroup中选取一个Reactor进行注册绑定。之后Channel生命周期内的所有IO 事件都由这个Reactor 负责处理,如 accept、connect、read、write等 IO 事件。

一个channel只能绑定到一个Reactor上,一个Reactor负责监听多个channel。

由于这里是NioServerSocketChannle向Main Reactor进行注册绑定,所以Main Reactor主要负责处理的IO事件是OP_ACCEPT事件。

向绑定后的Main Reactor进行注册

向Reactor进行注册的行为定义在NioEventLoop的父类SingleThreadEventLoop中,印象模糊的同学可以在回看下上篇文章中的NioEventLoop继承结构小节内容。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//册channel的Reactor
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe负责channel
promise.channel().unsafe().register(this, promise);
return promise;
}
}

通过NioServerSocketChannel中的Unsafe类执行底层具体的注册动作。

protected abstract class AbstractUnsafe implements Unsafe {

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//EventLoop的与Channel Nio Oio Aio
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//在channel的Reactor
AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
..............................
}
}
}
}

上篇文章我们介绍过 Netty对三种IO模型:Oio,Nio,Aio的支持,用户可以通过改变Netty核心类的前缀轻松切换IO模型。isCompatible方法目的就是需要保证Reactor和Channel使用的是同一种IO模型。

在Channel中保存其绑定的Reactor实例。

执行Channel向Reactor注册的动作必须要确保是在Reactor线程中执行。

当前执行线程并不是Reactor线程,而是用户程序的启动线程Main线程。

Reactor线程的启动

上篇文章中我们在介绍NioEventLoopGroup的创建过程中提到了一个构造器参数executor,它用于启动Reactor线程,类型为ThreadPerTaskExecutor。

当时笔者向大家卖了一个关子,“Reactor线程是何时启动的?”

那么现在就到了为大家揭晓谜底的时候了。

Reactor线程的启动是在向Reactor提交第一个异步任务的时候启动的。

Netty中的主Reactor线程组NioEventLoopGroup中的Main ReactorNioEventLoop是在用户程序Main线程向Main Reactor提交用于注册NioServerSocketChannel的异步任务时开始启动。

  eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

接下来我们关注下NioEventLoop的execute方法。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
//线为Reactor线
boolean inEventLoop = inEventLoop();
//addTaskWakesUp = true addTask唤醒Reactor线
addTask(task);
if (!inEventLoop) {
//线是Reactor线动Reactor线
//出Reactor线 向NioEventLoop
startThread();
..........................................
}
..........................................
}
}

startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//义Reactor线
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
//Reactor线
private volatile int state = ST_NOT_STARTED;
//Reactor线段state
private static final AtomicIntegerFieldUpdater> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
}
  //ThreadPerTaskExecutor 动Reactor线
private final Executor executor;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//Reactor线
SingleThreadEventExecutor.this.run();
success = true;
}

..............................
}

这里就来到了ThreadPerTaskExecutor类型的executor的用武之地了。

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
@Override
public void execute(Runnable command) {
//动Reactor线
threadFactory.newThread(command).start();
}
}

此时Reactor线程已经启动,后面的工作全部都由这个Reactor线程来负责执行了。

而用户启动线程在向Reactor提交完NioServerSocketChannel的注册任务register0后,就逐步退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()。

此时Reactor中的任务队列中只有一个任务register0,Reactor线程启动后,会从任务队列中取出任务执行。

至此NioServerSocketChannel的注册工作正式拉开帷幕。

register0

    //true if the channel has never been registered, false otherwise 
private boolean neverRegistered = true;
private void register0(ChannelPromise promise) {
try {
//应channel
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//
doRegister();
//
neverRegistered = false;
registered = true;
//调pipeline的ChannelInitializer的handlerAdded化channelPipeline
pipeline.invokeHandlerAddedIfNeeded();
//置regFuture为success发operationComplete,将bind入Reactor待Reactor线
safeSetSuccess(promise);
//发channelRegister
pipeline.fireChannelRegistered();
//端ServerSocketChannel channel的是active
//在Reactor的isActive()是false
if (isActive()) {
if (firstRegistration) {
//发channelActive
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
.........................
}
}

register0是驱动整个Channel注册绑定流程的关键方法,下面我们来看下它的核心逻辑:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

protected void doRegister() throws Exception {
// NOOP
}
}

初始化ChannelPipeline的时机是当Channel向对应的Reactor注册成功后,在handlerAdded事件回调中利用ChannelInitializer进行初始化。

还记得这个regFuture在哪里出现的吗?它是在哪里被创建,又是在哪里添加的ChannelFutureListener呢?大家还有印象吗?回忆不起来也没关系,笔者后面还会提到。

pipeline中channelHandler的channelRegistered方法被回调。

当Reactor线程执行完register0方法后,才会去执行绑定任务。

下面我们来看下register0方法中这些核心步骤的具体实现:

doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {
//channel注到Selector的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...................................
}
}
}

}

调用底层JDK NIO Channel方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),将NettyNioServerSocketChannel中包装的JDK NIO ServerSocketChannel注册到Reactor中的JDK NIO Selector上。

简单介绍下SelectableChannel#register方法参数的含义:

SelectionKey可以理解为Channel在Selector上的特殊表示形式, SelectionKey中封装了Channel感兴趣的IO事件集合~interestOps,以及IO就绪的事件集合~~readyOps, 同时也封装了对应的JDK NIO Channel以及注册的Selector。最后还有一个重要的属性attachment,可以允许我们在SelectionKey上附加一些自定义的对象。

这里NioServerSocketChannel向Reactor中的Selector注册的IO事件为0,这个操作的主要目的是先获取到Channel在Selector中对应的SelectionKey,完成注册。当绑定操作完成后,在去向SelectionKey添加感兴趣的IO事件~OP_ACCEPT事件。

同时通过SelectableChannel#register方法将Netty自定义的NioServerSocketChannel(这里的this指针)附着在SelectionKey的attechment属性上,完成Netty自定义Channel与JDK NIO Channel的关系绑定。这样在每次对Selector进行IO就绪事件轮询时,Netty 都可以从 JDK NIO Selector返回的SelectionKey中获取到自定义的Channel对象(这里指的就是NioServerSocketChannel)。

HandlerAdded事件回调中初始化ChannelPipeline

当NioServerSocketChannel注册到Main Reactor上的Selector后,Netty通过调用pipeline.invokeHandlerAddedIfNeeded()开始回调NioServerSocketChannel中pipeline里的ChannelHandler的handlerAdded方法。

此时NioServerSocketChannel的pipeline结构如下:

此时pipeline中只有在初始化NioServerSocketChannel时添加的ChannelInitializer。

我们来看下ChannelInitializer中handlerAdded回调方法具体作了哪些事情。

public abstract class ChannelInitializer extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//从pipeline
removeState(ctx);
}
}
}
//ChannelInitializer实的Channel化ChannelPipeline
//过Set的ChannelPipeline一ChannelPipeline
private final Set> initMap = Collections.newSetFromMap(
new ConcurrentHashMap, Boolean>());

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//从pipeline
pipeline.remove(this);
}
}
return true;
}
return false;
}
//
protected abstract void initChannel(C ch) throws Exception;

private void removeState(final ChannelHandlerContext ctx) {
//从initMap重Set除ChannelInitializer
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}

ChannelInitializer中的初始化逻辑比较简单明了:

   p.addLast(new ChannelInitializer>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

还记得在初始化NioServerSocketChannel时。io.netty.bootstrap.ServerBootstrap#init方法中向pipeline中添加的ChannelInitializer吗?

当初始化完pipeline时,此时pipeline的结构再次发生了变化:

此时Main Reactor中的任务队列taskQueue结构变化为:

添加ServerBootstrapAcceptor的任务是在初始化NioServerSocketChannel的时候向main reactor提交过去的。还记得吗?

回调regFuture的ChannelFutureListener

在本小节《Netty服务端的启动》的最开始,我们介绍了服务端启动的入口函数io.netty.bootstrap.AbstractBootstrap#doBind,在函数的最开头调用了initAndRegister()方法用来创建并初始化NioServerSocketChannel,之后便会将NioServerSocketChannel注册到Main Reactor中。

注册的操作是一个异步的过程,所以在initAndRegister()方法调用后返回一个代表注册结果的ChannelFuture regFuture。

public abstract class AbstractBootstrap extends AbstractBootstrap, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
//册ServerSocketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
//
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
..............................
// ,Reactor线
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}

之后会向ChannelFuture regFuture添加一个注册完成后的回调函数~ ChannelFutureListener。在回调函数operationComplete中开始发起绑端口地址流程。

那么这个回调函数在什么时候?什么地方发起的呢?

让我们在回到本小节的主题register0方法的流程中:

当调用doRegister()方法完成NioServerSocketChannel向Main Reactor的注册后,紧接着会调用pipeline.invokeHandlerAddedIfNeeded()方法中触发ChannelInitializer#handlerAdded回调中对pipeline进行初始化。

最后在safeSetSuccess方法中,开始回调注册在regFuture上的ChannelFutureListener。

   protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
//在promise的listeners
notifyListeners();
}
return true;
}
return false;
}

safeSetSuccess的逻辑比较简单,首先设置regFuture结果为success,并且回调注册在regFuture上的ChannelFutureListener。

需要提醒的是,执行safeSetSuccess方法,以及后边回调regFuture上的ChannelFutureListener这些动作都是由Reactor线程执行的。

关于Netty中的Promise模型后边我会在写一篇专门的文章进行分析,这里大家只需清楚大体的流程即可。不必在意过多的细节。

下面我们把视角切换到regFuture上的ChannelFutureListener回调中,看看在Channel注册完成后,Netty又会做哪些事情?

2、doBind0

public abstract class AbstractBootstrap extends AbstractBootstrap, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

}

这里Netty又将绑定端口地址的操作封装成异步任务,提交给Reactor执行。

但是这里有一个问题,其实此时执行doBind0方法的线程正是Reactor线程,那为什么不直接在这里去执行bind操作,而是再次封装成异步任务提交给Reactor中的taskQueue呢?

反正最终都是由Reactor线程执行,这其中又有什么分别呢?

经过上小节的介绍我们知道,bind0方法的调用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法在将NioServerSocketChannel注册到Main Reactor之后,并且NioServerSocketChannel的pipeline已经初始化完毕后,通过safeSetSuccess方法回调过来的。

这个过程全程是由Reactor线程来负责执行的,但是此时register0方法并没有执行完毕,还需要执行后面的逻辑。

而绑定逻辑需要在注册逻辑执行完之后执行,所以在doBind0方法中Reactor线程会将绑定操作封装成异步任务先提交给taskQueue中保存,这样可以使Reactor线程立马从safeSetSuccess中返回,继续执行剩下的register0方法逻辑。

 private void register0(ChannelPromise promise) {
try {
............................
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//发channelRegister
pipeline.fireChannelRegistered();
if (isActive()) {
............................
}
} catch (Throwable t) {
............................
}
}

当Reactor线程执行完register0方法后,就会从taskQueue中取出异步任务执行。

此时Reactor线程中的taskQueue结构如下:

此时NioServerSocketChannel中pipeline的结构如下:

3、绑定端口地址

对Channel的操作行为全部定义在ChannelOutboundInvoker接口中。

public interface ChannelOutboundInvoker {

ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}

bind方法由子类AbstractChannel实现。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}

调用pipeline.bind(localAddress, promise)在pipeline中传播bind事件,触发回调pipeline中所有ChannelHandler的bind方法。

事件在pipeline中的传播具有方向性:

inbound事件只能被pipeline中的ChannelInboundHandler响应处理outbound事件只能被pipeline中的ChannelOutboundHandler响应处理。

然而这里的bind事件在Netty中被定义为outbound事件,所以它在pipeline中是反向传播。先从TailContext开始反向传播直到HeadContext。

然而bind的核心逻辑也正是实现在HeadContext中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//发AbstractChannel->bind方 行JDK NIO SelectableChannel
unsafe.bind(localAddress, promise);
}
}

在HeadContext#bind回调方法中,调用Channel里的unsafe操作类执行真正的绑定操作。

protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
.................................
//时channel wasActive = false
boolean wasActive = isActive();
try {
//io.netty.channel.socket.nio.NioServerSocketChannel.doBind
//体channel
doBind(localAddress);
} catch (Throwable t) {
.................................
return;
}
// channel激 发channelActive
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//pipeline中发channelActive
pipeline.fireChannelActive();
}
});
}
//在promise的ChannelFutureListener
safeSetSuccess(promise);
}
protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
  @Override
protected void doBind(SocketAddress localAddress) throws Exception {
//用JDK NIO 层SelectableChannel
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

还是同样的问题,当前执行线程已经是Reactor线程了,那么为何不直接触发pipeline中的ChannelActive事件而是又封装成异步任务呢?

因为如果直接在这里触发ChannelActive事件,那么Reactor线程就会去执行pipeline中的ChannelHandler的channelActive事件回调。

这样的话就影响了safeSetSuccess(promise)的执行,延迟了注册在promise上的ChannelFutureListener的回调。

到现在为止,Netty服务端就已经完成了绑定端口地址的操作,NioServerSocketChannel的状态现在变为Active。

最后还有一件重要的事情要做,我们接着来看pipeline中对channelActive事件处理。

3.2 channelActive事件处理

channelActive事件在Netty中定义为inbound事件,所以它在pipeline中的传播为正向传播,从HeadContext一直到TailContext为止。

在channelActive事件回调中需要触发向Selector指定需要监听的IO事件~~OP_ACCEPT事件。

这块的逻辑主要在HeadContext中实现。

    final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中播channelActive
ctx.fireChannelActive();
//是autoRead 发read
//在read 发OP_ACCEPT
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//是autoRead 发read
channel.read();
}
}
//AbstractChannel
public Channel read() {
//发read
pipeline.read();
return this;
}
@Override
public void read(ChannelHandlerContext ctx) {
//册OP_ACCEPT者OP_READ
unsafe.beginRead();
}
}

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void beginRead() {
assertEventLoop();
//channel必是Active
if (!isActive()) {
return;
}
try {
// 在selector册channel
doBeginRead();
} catch (final Exception e) {
...........................
}
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//
protected abstract void doBeginRead() throws Exception;
}
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注到Selector的SelectKey
volatile SelectionKey selectionKey;
// Channel监
protected final int readInterestOp;
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;

final int interestOps = selectionKey.interestOps();

if ((interestOps & readInterestOp) == 0) {
//加OP_ACCEPT到interestOps
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}

Main Reactor中主要监听的是OP_ACCEPT事件。

流程走到这里,Netty服务端就真正的启动起来了,下一步就开始等待接收客户端连接了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?

此时Netty的Reactor模型结构如下:

总结

本文我们通过图解源码的方式完整地介绍了整个Netty服务端启动流程,并介绍了在启动过程中涉及到的ServerBootstrap相关的属性以及配置方式。NioServerSocketChannel的创建初始化过程以及类的继承结构。

其中重点介绍了NioServerSocketChannel向Reactor的注册过程以及Reactor线程的启动时机和pipeline的初始化时机。

最后介绍了NioServerSocketChannel绑定端口地址的整个流程。

上述介绍的这些流程全部是异步操作,各种回调绕来绕去的,需要反复回想下,读异步代码就是这样,需要理清各种回调之间的关系,并且时刻提醒自己当前的执行线程是什么?

好了,现在Netty服务端已经启动起来,接着就该接收客户端连接了。

来源:bin的技术小屋内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯