文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

阅读一个分布式框架,这些必备的 NIO 知识你要知道

2024-12-03 05:42

关注

 一、开篇

阅读一个分布式开源项目的时候,最重要的就是了解这个项目的通信框架。

因为一个分布式的开源框架,通常是集群部署的,不同的节点和节点之间需要相互通信来完成复杂的功能,而阅读到这些源码的时候,如果不了解它通信机制的话,就会迷失在代码里,像走进了一片原始森林。

比如 HDFS ,使用的通信框架是自己封装的 Hadoop Rpc;Spark 底层通信就是用的 Netty;而最近阅读的 Kafka 源码,底层使用的是原生的 Java NIO。

所以本次,我们来聊一聊 Java NIO 的那些主要的知识点。

二、多图弄懂 NIO 三大核心概念

谈到 NIO,就会有三个核心的概念:通道、缓冲、选择器。

直接开门见山,或许听起来会有点迷茫,我们需要从头开始说。

1、通道

以前在并发要求不是很高的情况下,是 CPU 来全权处理输入输出的(中断),如下图:

用户程序向服务端发起读写请求,cpu 直接处理这些请求。这样有一个弊端,当 IO 请求非常多的时候,会大量占用 CPU,使得整个系统的处理能力会下降。

随着计算机的发展,出现了一种新的方式,使用 DMA 来全权处理 IO 请求,如下图:

DMA 是 Direct Memory Access,直接内存访问控制。

为什么要增加这个设备呢?是因为 CPU 中断方式不能满足数据传输速度的要求,因为在中断方式下,每次中断需要保存断点和现场,中断返回时,要恢复断点和现场。

所有这些原因,使得中断方式难以满足高速外设对传输速度的要求。

所以,就有了 DMA 这样的设备,在 DMA 方式的数据传输过程中,当 I/O 设备需要进行数据传送时,通过 DMA 控制器向 CPU 提出 DMA 传送请求,CPU 响应之后将让出系统总线,由 DMA 控制器接管总线进行数据传输,而此时 CPU 除了做一些初始化操作之外,可以去做自己的事情。

但是有了 DMA,仍然满足不了业务快速发展的需要,因为当 I/O 请求过多时,会出现总线冲突的问题。

所以后面就出现了通道(Channel),它和 DMA 不同的地方是,通道有自己的指令系统和程序,是一个协处理器;而 DMA 只能实现固定的数据传送控制。

而 Java NIO 中的 Channel ,就是对上图中通道的实现。

2、缓冲

理解了通道的概念,缓冲区也很好理解了。

通道表示打开到 I/O 设备的(例如:文件、套接字)的连接,但是通道本身并不存储数据。真正作为数据传输载体的是缓冲区。

当应用程序要写数据时,需要先把数据写到缓冲区里,然后由通道负责把缓冲区的数据发送到目的地(文件、磁盘、网络),然后再从缓冲区把数据取出来。

若需要使用 NIO 系统,需要获取用于连接 I/O 设备的通道以及用于容纳数据的缓冲区,然后操作缓冲区,对数据进行处理。

3、选择器

选择器也叫做多路复用器,是一种非阻塞式的 I/O 。既然谈到了非阻塞式,必然要先谈谈阻塞式。阻塞式如下图所示:

客户端向服务端发出一个读写请求时,服务端的线程会一直看内核地址空间是否有数据了。

客户端没有数据发送过来时,服务端的线程会一直等待,在此期间是什么事情都做不了的。

直到客户端有数据发送过来,会把数据从内核地址空间拷贝到用户地址空间,然后才读取到了数据的。

这就导致如果有大量的请求过来,后面的请求要等待前面的请求执行完毕,会造成大量的排队,无法充分利用 cpu 资源,性能就会急剧下降。

再看看选择器是如何工作的。

现在客户端服务端之间通信是用通道+缓冲区的,那么所有的通道都会注册到选择器上来。选择器会监控这些通道的 I/O 状态,比如连接、读、写的情况。

当某一个通道上的某个事件完全就绪时,选择器才会把这个任务分配到服务端的一个或者多个线程上。

当客户端没有事件准备好时,服务端的线程是不会阻塞的,它可以做自己的事情,直到客户端事件就绪,才会去处理。

这种非阻塞式相比较阻塞式,可以进一步的利用 cpu 资源。

三、理解了概念,再来学 API

1、缓冲区的 API

要彻底理解缓冲区,必须知道缓冲区的四个属性,mark,position,limit,capacity,只需要跑一遍代码就知道了。

(1)分配一定大小的缓冲区

  1. //1.分配一个指定大小的缓冲区 
  2. ByteBuffer buffer = ByteBuffer.allocate(10); 
  3. System.out.println("---------alocate"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

运行结果:

  1. ---------alocate----------- 
  2. position:0 
  3. limit:10 
  4. capacity:10 

这里我们分配了 10 个字节的缓冲区,也就是在 ByteBuffer 的 final byte[] hb; 属性上开辟了 10 个字节的空间。

所以容量 capacity 为 10 , limit 可读写数据的最大位置 也是 10 ,position 为可以操作数据的位置为 0 。

(2)往缓冲区写数据

  1. // 2.写入数据到缓冲区 
  2. String str = "abcde"
  3. System.out.println("------------put------------"); 
  4. buffer.put(str.getBytes(StandardCharsets.UTF_8)); 
  5. System.out.println("position:" + buffer.position()); 
  6. System.out.println("limit:" + buffer.limit()); 
  7. System.out.println("capacity:" + buffer.capacity()); 

运行结果:

  1. ------------put------------ 
  2. position:5 
  3. limit:10 
  4. capacity:10 

这里我们往缓冲区写了 5 个字节的数据,那么 capacity 和 limit 都还是10,但是 position 为 5 了,因为前面已经写入了 5 个了

(3)切换成读数据的模式

  1. // 3.切换成读数据的模式 
  2. buffer.flip(); 
  3. System.out.println("------------flip------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

那我们现在想从缓冲区读取一些数据出来,就需要切换成 flip 模式,flip 会改变一些属性的值

运行结果:

  1. ------------flip------------ 
  2. position:0 
  3. limit:5 
  4. capacity:10 

flip 会改变 position 的值为 0 ,并且 limit 为5,表示我要从头开始读,并且只能读到 5 的位置

(4)读取一些数据

  1. // 4. 读取数据 
  2. System.out.println("------------get------------"); 
  3. byte[] dest = new byte[buffer.limit()]; 
  4. buffer.get(dest); 
  5. System.out.println(new String(dest,0,dest.length)); 
  6. System.out.println("position:" + buffer.position()); 
  7. System.out.println("limit:" + buffer.limit()); 
  8. System.out.println("capacity:" + buffer.capacity()); 

运行结果:

  1. ------------get------------ 
  2. abcde 
  3. position:5 
  4. limit:5 
  5. capacity:10 

读取了数据之后,position 就变成 5 了,表示我已经读取到 5 了。

(5)重复读

  1. //5.rewind() 
  2. buffer.rewind(); 
  3. System.out.println("------------rewind------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

运行结果:

  1. ------------rewind------------ 
  2. position:0 
  3. limit:5 
  4. capacity:10 

rewind 表示重复读取 buffer 里面的数据

(6)清除数据

  1. //6.clear() 
  2. buffer.clear(); 
  3. System.out.println("------------clear------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

运行结果:

  1. ------------clear------------ 
  2. position:0 
  3. limit:10 
  4. capacity:10 

clear() 之后,position 回到了 0 ,limit 回到了 10,又可以重头开始写数据了,能写 10 个字节。

但是要注意的是,缓冲里面的数据并没有清空掉,数据还在里面,处于被“遗忘”状态。这几个指针回到了最初的状态。

(7)标记

这是第四个属性:mark。

mark 可以记录 position 的位置。可以通过 reset() 方法回到 mark 的位置。

  1.  @Test 
  2.     public void test2() { 
  3.         // 分配 10 个字节 
  4.         String str = "abcde"
  5.         ByteBuffer buffer = ByteBuffer.allocate(10); 
  6.         buffer.put(str.getBytes(StandardCharsets.UTF_8)); 
  7.  
  8.         // 切换到读模式,读取 2 个字节 
  9.         buffer.flip(); 
  10.         byte[] dest = new byte[buffer.limit()]; 
  11.         buffer.get(dest, 0, 2); 
  12.         System.out.println(new String(dest, 0, 2)); 
  13.         System.out.println(buffer.position()); 
  14.  
  15.         // mark 一下记录当前位置 
  16.         buffer.mark(); 
  17.  
  18.         // 又读取两个字节 
  19.         buffer.get(dest, 2, 2); 
  20.         System.out.println(new String(dest, 2, 2)); 
  21.         System.out.println(buffer.position()); 
  22.  
  23.         // reset,回到 mark 的位置 
  24.         buffer.reset(); 
  25.         System.out.println(buffer.position()); 
  26.     } 
  27.  
  28. 执行结果: 
  29.  
  30. ```tex 
  31. ab 
  32. cd 

2、使用通道、缓冲区、选择器完成一个网络程序

(1)服务端

  1. @Test 
  2.  public void testServer() throws IOException { 
  3.      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
  4.      serverSocketChannel.configureBlocking(false); 
  5.  
  6.      serverSocketChannel.bind(new InetSocketAddress(8989)); 
  7.  
  8.      Selector selector = Selector.open(); 
  9.      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
  10.  
  11.      while (selector.select() > 0) { 
  12.          Iterator iterator = selector.selectedKeys().iterator(); 
  13.          while (iterator.hasNext()) { 
  14.              SelectionKey key = iterator.next(); 
  15.              if (key.isAcceptable()) { 
  16.                  SocketChannel socketChannel = serverSocketChannel.accept(); 
  17.                  socketChannel.configureBlocking(false); 
  18.                  socketChannel.register(selector, SelectionKey.OP_READ); 
  19.              } else if (key.isReadable()) { 
  20.                  SocketChannel channel = (SocketChannel) key.channel(); 
  21.                  ByteBuffer byteBuffer = ByteBuffer.allocate(1024); 
  22.                  int len = 0; 
  23.                  while ((len = channel.read(byteBuffer)) > 0) { 
  24.                      byteBuffer.flip(); 
  25.                      System.out.println(new String(byteBuffer.array(), 0, len)); 
  26.                      byteBuffer.clear(); 
  27.                  } 
  28.              } 
  29.          } 
  30.  
  31.          iterator.remove(); 
  32.      } 
  33.  } 

首先使用 ServerSocketChannel.open(),打开一个通道,设置成非阻塞模式;

绑定到 8989 端口上;

把通道注册到选择器上;

while 循环,选择器上是否有事件,如果事件是客户端的连接事件,则打开一个 SocketChannel,注册成非阻塞模式,并且往选择器上注册一个读数据的事件;

当客户端发送数据过来的时候,就可以打开一个通道,读取缓冲区上的数据;

并且此时,服务端是可以同时接受多个客户端的请求的。

(2)客户端

  1. @Test 
  2.  public void testClient() throws IOException { 
  3.      SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989)); 
  4.      socketChannel.configureBlocking(false); 
  5.  
  6.      ByteBuffer byteBuffer = ByteBuffer.allocate(1024); 
  7.      byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8)); 
  8.      byteBuffer.flip(); 
  9.      socketChannel.write(byteBuffer); 
  10.      byteBuffer.clear(); 
  11.  
  12.      socketChannel.close(); 
  13.  
  14.  } 

客户端打开一个 SocketChannel,配置成非阻塞模式;

使用 ByteBuffer 发送数据(注意发送之前,要 flip);

关闭通道。

四、总结

本次我们初步探究了一下 Java NIO 的几个核心概念,通道、缓冲区、选择器。

但是你要知道,这是冰山一角,通道和选择器如果要深究的话,会涉及到操作系统底层和很多计算机组成原理的知识。

比如选择器就涉及到了 select,poll,epoll 的概念,这几个概念如果再打开的话,还会牵涉到硬件中断,内核的一些知识。

所以学海无涯苦作舟,越来越对这句话感同身受。

 

来源:KK架构师内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯