文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一文快速了解高性能网络通信框架 Netty

2024-11-28 15:41

关注

传统BIO与不完美的解决方案

1.BIO编程及其问题

Java程序员早期进行网络程序开发的时候,采用的都是传统BIO模式进行开发,这种模式工作流程非常简单:

这种做法在少量的客户端连接下还是可以保证可靠运行的,我们都知道每当服务器启动就会其一个端口监听连接,笔者以自己的服务器的1234号进程为例:

netstat -ano | findstr :1234

此时对应的端口使用情况为只有一个8080端口监听:

  TCP    0.0.0.0:8080           0.0.0.0:0              LISTENING       11312
  TCP    [::]:8080              [::]:0                 LISTENING       11312

每当我们一个客户端接入,服务器就会为其分配一个端口端口处理和该客户端的收发,以笔者的程序为例,可以看到此时该进程正使用58891与客户端socket进程交互:

 TCP    0.0.0.0:8080           0.0.0.0:0              LISTENING       11312
  TCP    127.0.0.1:8080         127.0.0.1:58891        ESTABLISHED     11312
  TCP    127.0.0.1:58891        127.0.0.1:8080         ESTABLISHED     4928
  TCP    [::]:8080              [::]:0                 LISTENING       11312

由此可知,一旦遇到高并发IO读写,由于一个客户端绑定一个线程的模式,所以每一个端口号的收发都需要一个线程进程处理,如果有大量连接接入势必导致频繁的线程上下文切换进而导致各种资源的消耗,由此导致著名的C10k问题:

这里笔者也给出一段比较基础的bio代码示例供读者参考一下这种实现,可以看到我们的主线程阻塞监听,每当收到一个新的连接就创建一个线程处理这个客户端的读写请求:

public class IOServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8888);
        //创建一个线程等待连接进来的客户端
        new Thread(() -> waitConnect(serverSocket)).start();
    }

    private static void waitConnect(ServerSocket serverSocket) {
        while (true) {
            try {
                // 1. 阻塞方法获取新连接
                Socket socket = serverSocket.accept();

                // 2. 每个客户端来了,就专门创建一个新的连接处理
                new Thread(() -> {
                    int len;
                    byte[] data = new byte[1024];
                    try {
                        InputStream inputStream = socket.getInputStream();
                        // 3. 按字节流方式读取数据
                        while ((len = inputStream.read(data)) != -1) {
                            System.out.println(Thread.currentThread().getName() + " receive msg:" + new String(data, 0, len));
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }).start();

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

2.epoll事件驱动编程

于是就有了epoll事件驱动编程这一方案,也就是我们常说的IO多路复用,该方案的理念是将所有socket的读写事件注册到epoll上。 以我们的服务端为例,创建socket监听连接时就会将自己的感兴趣的连接事件注册到epoll上,随后服务端就可以在循环中非阻塞的获取是否有连接接入,每当有连接接入就会为请求客户端建立连接并将其读写事件注册到处理客户都安的epoll上,后续所有客户端读写请求都会交给这个epoll处理,由此实现最少的线程做最多的事情,提升性能同时还降低消耗:

对此我们也用一段伪代码展示一下事件驱动编程:

//创建epoll
EpollFd epollFd=createEpoll();
//将文件描述符注册到epoll上
epollCreateCtl(epollFd,socketFdList)

while(true){
 //收到epoll推送过来的事件
 List eventList=epollWait(epollFd);
 //遍历并处理事件
 eventList.foreach(e->handler(e));
}

3.JDK传统事件驱动编程

基于上述描述我们对事件驱动编程有了初步的了解,接下来我们就来看看原生的jdk是如何实现NIO事件驱动编程的。

首先我们需要创建一个serverSelector用于非阻塞查询是否有就绪的socket事件,一旦收到客户端的请求后,为其建立连接之后,将客户端的读写事件注册到clientSelector,由clientSelector的线程处理这些客户端读写,而serverSelector依然负责非阻塞轮询监听是否有新连接:

简单介绍之后我们给出Selector 声明:

 //负责轮询是否有新连接
  Selector serverSelector = Selector.open();
  //负责处理每个客户端是否有数据可读
  Selector clientSelector = Selector.open();

然后我们使用这个socket非阻塞轮询就绪的连接事件并注册到客户端的epoll模型上:

new Thread(() -> {
            try {
                //创建服务端socket监听通道
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                //绑定端口
                listenerChannel.socket().bind(new InetSocketAddress(8888));
                //设置为非阻塞监听
                listenerChannel.configureBlocking(false);
                //注册感兴趣的事件为OP_ACCEPT事件,即可处理当前socket的ACCEPT连接接入事件
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

                //循环非阻塞获取就绪事件
                while (true) {
                    //阻塞1毫秒查看是否有新的连接进来
                    if (serverSelector.select(1) > 0) {
                        //查看是否有就绪的事件
                        Set set = serverSelector.selectedKeys();
                        Iterator keyIterator = set.iterator();
                        //遍历事件
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            //判断是否是新的socket连接加入
                            if (key.isAcceptable()) {
                                System.out.println("有新的socket连接加入");
                                //接收此通道与socket的连接
                                SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                clientChannel.configureBlocking(false);
                                //服务端监测到新连接之后,不再创建一个新线程,而是直接将
                                //新连接绑定到clientSelector上
                                clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                keyIterator.remove();
                            }
                        }
                    }
                }


            } catch (Exception e) {

            }
        }).start();

我们再来看看客户端处理线程逻辑,和上文差不多,都是非阻塞轮询客户端就绪的事件,我们以输出的方式模拟事件处理,然后进入下一次循环:

new Thread(() -> {
            while (true) {
                try {
                    //通过clientSelector.select(1)方法可以轮询出来,进而批量处理
                    if (clientSelector.select(1) > 0) {
                        //获取就绪的客户端事件
                        Set set = clientSelector.selectedKeys();
                        Iterator keyIterator = set.iterator();
                        //循环遍历处理客户端事件,完成后将该key移除,并在此注册一个OP_READ等待下一次该socket就绪
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isReadable()) {
                                try {
                                    //获取事件的通道
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    //数据的读写面向Buffer
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    //读取数据到buffer中
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Thread.currentThread().getName() + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
                                } catch (Exception e) {

                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

可以看出原生nio虽然相对bio减小了一定开销且提高一定的性能,但是缺点也很明显:

原生的JDK的NIO概念非常多,使用非常复杂对新手不友好。

高性能网络通信框架Netty

相对与JDK的原生nio,Netty与之相比有着一下的优势:

同样以以上述客户端服务端通信,Netty实现就比较简单了,我们编写服务端时,只需通过NioEventLoopGroup 完成上图所说两个slector创建,再通过channel指明当前事件轮询采用NIO非阻塞方式,最后将事件处理器FirstServerHandler添加到当前服务端childHandler的pipeline上即可处理所有客户端读写请求:

public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
  //创建处理连接的事件轮询eventLoop 
        NioEventLoopGroup boss = new NioEventLoopGroup();
        //创建处理客户端读写请求的eventLoop 
        NioEventLoopGroup worker = new NioEventLoopGroup();
  
        serverBootstrap.group(boss, worker)
          //设置为非阻塞轮询
          .channel(NioServerSocketChannel.class)
          //childHandler添加ServerHandler客户端读写请求
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {                   
                        ch.pipeline().addLast(new FirstServerHandler());    
                    }
                });

        serverBootstrap.bind("127.0.0.1", 8080);
    }

最后我们给出FirstServerHandler 的代码,可以看到我们直接继承ChannelInboundHandlerAdapter 处理客户端发送的数据,每当服务端收到客户端数据时就会回调channelRead,我们的逻辑也很简单,收到数据之后直接回复Hello Netty client:

public class FirstServerHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        //打印读取到的数据
        System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));

        // 回复客户端数据
        System.out.println(new Date() + ": 服务端写出数据");
        //组装数据并发送
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        ByteBuf buffer = ctx.alloc().buffer();

        byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

        buffer.writeBytes(bytes);

        return buffer;
    }
}

此时我们通过telnet 127.0.0.1 8080进行数据发送即可收到服务端的响应了:

来源:写代码的SharkChili内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯