文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java 从零开始手写 RPC -序列化

2024-12-02 19:08

关注

前面几节我们实现了最基础的客户端调用服务端,这一节来学习一下通讯中的对象序列化。

为什么需要序列化

netty 底层都是基于 ByteBuf 进行通讯的。

前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。

但是有一个问题,如果想把我们的项目抽象为框架,那就需要为所有的对象编写编码器/解码器。

显然,直接通过每一个对象写一对的方式是不现实的,而且用户如何使用,也是未知的。

序列化的方式

基于字节的实现,性能好,可读性不高。

基于字符串的实现,比如 json 序列化,可读性好,性能相对较差。

ps: 可以根据个人还好选择,相关序列化可参考下文,此处不做展开。

json 序列化框架简介[1]

实现思路

可以将我们的 Pojo 全部转化为 byte,然后 Byte 转换为 ByteBuf 即可。

反之亦然。

代码实现

maven

引入序列化包:

  1.  
  2.     com.github.houbb 
  3.     json 
  4.     0.1.1 
  5.  

服务端

核心

服务端的代码可以大大简化:

  1. serverBootstrap.group(workerGroup, bossGroup) 
  2.     .channel(NioServerSocketChannel.class) 
  3.     // 打印日志 
  4.     .handler(new LoggingHandler(LogLevel.INFO)) 
  5.     .childHandler(new ChannelInitializer() { 
  6.         @Override 
  7.         protected void initChannel(Channel ch) throws Exception { 
  8.             ch.pipeline() 
  9.                     .addLast(new RpcServerHandler()); 
  10.         } 
  11.     }) 
  12.     // 这个参数影响的是还没有被accept 取出的连接 
  13.     .option(ChannelOption.SO_BACKLOG, 128) 
  14.     // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 
  15.     .childOption(ChannelOption.SO_KEEPALIVE, true); 

这里只需要一个实现类即可。

RpcServerHandler

服务端的序列化/反序列化调整为直接使用 JsonBs 实现。

  1. package com.github.houbb.rpc.server.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.common.model.CalculateRequest; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9. import com.github.houbb.rpc.common.service.Calculator; 
  10. import com.github.houbb.rpc.server.service.CalculatorService; 
  11.  
  12.  
  13. import io.netty.buffer.ByteBuf; 
  14. import io.netty.buffer.Unpooled; 
  15. import io.netty.channel.ChannelHandlerContext; 
  16. import io.netty.channel.SimpleChannelInboundHandler; 
  17.  
  18.  
  19.  
  20. public class RpcServerHandler extends SimpleChannelInboundHandler { 
  21.  
  22.  
  23.     private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
  24.  
  25.  
  26.     @Override 
  27.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  28.         final String id = ctx.channel().id().asLongText(); 
  29.         log.info("[Server] channel {} connected " + id); 
  30.     } 
  31.  
  32.  
  33.     @Override 
  34.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  35.         final String id = ctx.channel().id().asLongText(); 
  36.  
  37.  
  38.         ByteBuf byteBuf = (ByteBuf)msg; 
  39.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  40.         byteBuf.readBytes(bytes); 
  41.         CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 
  42.         log.info("[Server] receive channel {} request: {} from ", id, request); 
  43.  
  44.  
  45.         Calculator calculator = new CalculatorService(); 
  46.         CalculateResponse response = calculator.sum(request); 
  47.  
  48.  
  49.         // 回写到 client 端 
  50.         byte[] responseBytes = JsonBs.serializeBytes(response); 
  51.         ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 
  52.         ctx.writeAndFlush(responseBuffer); 
  53.         log.info("[Server] channel {} response {}", id, response); 
  54.     } 
  55.  
  56.  

客户端

核心

客户端可以简化如下:

  1. channelFuture = bootstrap.group(workerGroup) 
  2.     .channel(NioSocketChannel.class) 
  3.     .option(ChannelOption.SO_KEEPALIVE, true
  4.     .handler(new ChannelInitializer(){ 
  5.         @Override 
  6.         protected void initChannel(Channel ch) throws Exception { 
  7.             channelHandler = new RpcClientHandler(); 
  8.             ch.pipeline() 
  9.                     .addLast(new LoggingHandler(LogLevel.INFO)) 
  10.                     .addLast(channelHandler); 
  11.         } 
  12.     }) 
  13.     .connect(RpcConstant.ADDRESS, port) 
  14.     .syncUninterruptibly(); 

RpcClientHandler

客户端的序列化/反序列化调整为直接使用 JsonBs 实现。

  1. package com.github.houbb.rpc.client.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.client.core.RpcClient; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9.  
  10.  
  11. import io.netty.buffer.ByteBuf; 
  12. import io.netty.channel.ChannelHandlerContext; 
  13. import io.netty.channel.SimpleChannelInboundHandler; 
  14.  
  15.  
  16.  
  17. public class RpcClientHandler extends SimpleChannelInboundHandler { 
  18.  
  19.  
  20.     private static final Log log = LogFactory.getLog(RpcClient.class); 
  21.  
  22.  
  23.      
  24.     private CalculateResponse response; 
  25.  
  26.  
  27.     @Override 
  28.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  29.         ByteBuf byteBuf = (ByteBuf)msg; 
  30.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  31.         byteBuf.readBytes(bytes); 
  32.  
  33.  
  34.         this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class); 
  35.         log.info("[Client] response is :{}", response); 
  36.     } 
  37.  
  38.  
  39.     @Override 
  40.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  41.         // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行) 
  42.         // 个人理解:如果不关闭,则永远会被阻塞。 
  43.         ctx.flush(); 
  44.         ctx.close(); 
  45.     } 
  46.  
  47.  
  48.     public CalculateResponse getResponse() { 
  49.         return response; 
  50.     } 
  51.  
  52.  

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯