文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java 从零开始手写 RPC—Reflect 反射实现通用调用之服务端

2024-12-02 18:42

关注

前面我们的例子是一个固定的出参和入参,固定的方法实现。

本节将实现通用的调用,让框架具有更广泛的实用性。

基本思路

所有的方法调用,基于反射进行相关处理实现。

[[430217]]

服务端

核心类

调整如下:

  1. serverBootstrap.group(workerGroup, bossGroup) 
  2.     .channel(NioServerSocketChannel.class) 
  3.     // 打印日志 
  4.     .handler(new LoggingHandler(LogLevel.INFO)) 
  5.     .childHandler(new ChannelInitializer() { 
  6.         @Override 
  7.         protected void initChannel(Channel ch) throws Exception { 
  8.             ch.pipeline() 
  9.             // 解码 bytes=>resp 
  10.             .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) 
  11.              // request=>bytes 
  12.              .addLast(new ObjectEncoder()) 
  13.              .addLast(new RpcServerHandler()); 
  14.         } 
  15.     }) 
  16.     // 这个参数影响的是还没有被accept 取出的连接 
  17.     .option(ChannelOption.SO_BACKLOG, 128) 
  18.     // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 
  19.     .childOption(ChannelOption.SO_KEEPALIVE, true); 

其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。

RpcServerHandler

  1. package com.github.houbb.rpc.server.handler; 
  2.  
  3.  
  4. import com.github.houbb.log.integration.core.Log; 
  5. import com.github.houbb.log.integration.core.LogFactory; 
  6. import com.github.houbb.rpc.common.rpc.domain.RpcRequest; 
  7. import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse; 
  8. import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory; 
  9. import io.netty.channel.ChannelHandlerContext; 
  10. import io.netty.channel.SimpleChannelInboundHandler; 
  11.  
  12.  
  13.  
  14. public class RpcServerHandler extends SimpleChannelInboundHandler { 
  15.  
  16.  
  17.     private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
  18.  
  19.  
  20.     @Override 
  21.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  22.         final String id = ctx.channel().id().asLongText(); 
  23.         log.info("[Server] channel {} connected " + id); 
  24.     } 
  25.  
  26.  
  27.     @Override 
  28.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  29.         final String id = ctx.channel().id().asLongText(); 
  30.         log.info("[Server] channel read start: {}", id); 
  31.  
  32.  
  33.         // 接受客户端请求 
  34.         RpcRequest rpcRequest = (RpcRequest)msg; 
  35.         log.info("[Server] receive channel {} request: {}", id, rpcRequest); 
  36.  
  37.  
  38.         // 回写到 client 端 
  39.         DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest); 
  40.         ctx.writeAndFlush(rpcResponse); 
  41.         log.info("[Server] channel {} response {}", id, rpcResponse); 
  42.     } 
  43.  
  44.  
  45.     @Override 
  46.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  47.         cause.printStackTrace(); 
  48.         ctx.close(); 
  49.     } 
  50.  
  51.  
  52.      
  53.     private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) { 
  54.         DefaultRpcResponse rpcResponse = new DefaultRpcResponse(); 
  55.         rpcResponse.seqId(rpcRequest.seqId()); 
  56.  
  57.  
  58.         try { 
  59.             // 获取对应的 service 实现类 
  60.             // rpcRequest=>invocationRequest 
  61.             // 执行 invoke 
  62.             Object result = DefaultServiceFactory.getInstance() 
  63.                     .invoke(rpcRequest.serviceId(), 
  64.                             rpcRequest.methodName(), 
  65.                             rpcRequest.paramTypeNames(), 
  66.                             rpcRequest.paramValues()); 
  67.             rpcResponse.result(result); 
  68.         } catch (Exception e) { 
  69.             rpcResponse.error(e); 
  70.             log.error("[Server] execute meet ex for request", rpcRequest, e); 
  71.         } 
  72.  
  73.  
  74.         // 构建结果值 
  75.         return rpcResponse; 
  76.     } 
  77.  
  78.  

和以前类似,不过 handleRpcRequest 要稍微麻烦一点。

这里需要根据发射,调用对应的方法。

pojo

其中使用的出参、入参实现如下:

RpcRequest

  1. package com.github.houbb.rpc.common.rpc.domain; 
  2.  
  3.  
  4. import java.util.List; 
  5.  
  6.  
  7.  
  8. public interface RpcRequest extends BaseRpc { 
  9.  
  10.  
  11.      
  12.     long createTime(); 
  13.  
  14.  
  15.      
  16.     String serviceId(); 
  17.  
  18.  
  19.      
  20.     String methodName(); 
  21.  
  22.  
  23.      
  24.     List paramTypeNames(); 
  25.  
  26.  
  27.     // 调用参数信息列表 
  28.  
  29.  
  30.      
  31.     Object[] paramValues(); 
  32.  
  33.  

RpcResponse

  1. package com.github.houbb.rpc.common.rpc.domain; 
  2.  
  3.  
  4.  
  5. public interface RpcResponse extends BaseRpc { 
  6.  
  7.  
  8.      
  9.     Throwable error(); 
  10.  
  11.  
  12.      
  13.     Object result(); 
  14.  
  15.  

BaseRpc

  1. package com.github.houbb.rpc.common.rpc.domain; 
  2.  
  3.  
  4. import java.io.Serializable
  5.  
  6.  
  7.  
  8. public interface BaseRpc extends Serializable { 
  9.  
  10.  
  11.      
  12.     String seqId(); 
  13.  
  14.  
  15.      
  16.     BaseRpc seqId(final String traceId); 
  17.  
  18.  

ServiceFactory-服务工厂

为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。

ServiceFactory

  1. package com.github.houbb.rpc.server.service; 
  2.  
  3.  
  4. import com.github.houbb.rpc.server.config.service.ServiceConfig; 
  5. import com.github.houbb.rpc.server.registry.ServiceRegistry; 
  6.  
  7.  
  8. import java.util.List; 
  9.  
  10.  
  11.  
  12. public interface ServiceFactory { 
  13.  
  14.  
  15.      
  16.     ServiceFactory registerServices(final List serviceConfigList); 
  17.  
  18.  
  19.      
  20.     Object invoke(final String serviceId, final String methodName, 
  21.                   List paramTypeNames, final Object[] paramValues); 
  22.  
  23.  

DefaultServiceFactory

作为默认实现,如下:

  1. package com.github.houbb.rpc.server.service.impl; 
  2.  
  3.  
  4. import com.github.houbb.heaven.constant.PunctuationConst; 
  5. import com.github.houbb.heaven.util.common.ArgUtil; 
  6. import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil; 
  7. import com.github.houbb.heaven.util.util.CollectionUtil; 
  8. import com.github.houbb.rpc.common.exception.RpcRuntimeException; 
  9. import com.github.houbb.rpc.server.config.service.ServiceConfig; 
  10. import com.github.houbb.rpc.server.service.ServiceFactory; 
  11.  
  12.  
  13. import java.lang.reflect.InvocationTargetException; 
  14. import java.lang.reflect.Method; 
  15. import java.util.HashMap; 
  16. import java.util.List; 
  17. import java.util.Map; 
  18.  
  19.  
  20.  
  21. public class DefaultServiceFactory implements ServiceFactory { 
  22.  
  23.  
  24.      
  25.     private Map serviceMap; 
  26.  
  27.  
  28.      
  29.     private Map methodMap; 
  30.  
  31.  
  32.     private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory(); 
  33.  
  34.  
  35.     private DefaultServiceFactory(){} 
  36.  
  37.  
  38.     public static DefaultServiceFactory getInstance() { 
  39.         return INSTANCE; 
  40.     } 
  41.  
  42.  
  43.      
  44.     @Override 
  45.     public synchronized ServiceFactory registerServices(List serviceConfigList) { 
  46.         ArgUtil.notEmpty(serviceConfigList, "serviceConfigList"); 
  47.  
  48.  
  49.         // 集合初始化 
  50.         serviceMap = new HashMap<>(serviceConfigList.size()); 
  51.         // 这里只是预估,一般为2个服务。 
  52.         methodMap = new HashMap<>(serviceConfigList.size()*2); 
  53.  
  54.  
  55.         for(ServiceConfig serviceConfig : serviceConfigList) { 
  56.             serviceMap.put(serviceConfig.id(), serviceConfig.reference()); 
  57.         } 
  58.  
  59.  
  60.         // 存放方法名称 
  61.         for(Map.Entry entry : serviceMap.entrySet()) { 
  62.             String serviceId = entry.getKey(); 
  63.             Object reference = entry.getValue(); 
  64.  
  65.  
  66.             //获取所有方法列表 
  67.             Method[] methods = reference.getClass().getMethods(); 
  68.             for(Method method : methods) { 
  69.                 String methodName = method.getName(); 
  70.                 if(ReflectMethodUtil.isIgnoreMethod(methodName)) { 
  71.                     continue
  72.                 } 
  73.  
  74.  
  75.                 List paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); 
  76.                 String key = buildMethodKey(serviceId, methodName, paramTypeNames); 
  77.                 methodMap.put(key, method); 
  78.             } 
  79.         } 
  80.  
  81.  
  82.         return this; 
  83.     } 
  84.  
  85.  
  86.  
  87.  
  88.     @Override 
  89.     public Object invoke(String serviceId, String methodName, List paramTypeNames, Object[] paramValues) { 
  90.         //参数校验 
  91.         ArgUtil.notEmpty(serviceId, "serviceId"); 
  92.         ArgUtil.notEmpty(methodName, "methodName"); 
  93.  
  94.  
  95.         // 提供 cache,可以根据前三个值快速定位对应的 method 
  96.         // 根据 method 进行反射处理。 
  97.         // 对于 paramTypes 进行 string 连接处理。 
  98.         final Object reference = serviceMap.get(serviceId); 
  99.         final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames); 
  100.         final Method method = methodMap.get(methodKey); 
  101.  
  102.  
  103.         try { 
  104.             return method.invoke(reference, paramValues); 
  105.         } catch (IllegalAccessException | InvocationTargetException e) { 
  106.             throw new RpcRuntimeException(e); 
  107.         } 
  108.     } 
  109.  
  110.  
  111.      
  112.     private String buildMethodKey(String serviceId, String methodName, List paramTypeNames) { 
  113.         String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT); 
  114.         return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON 
  115.                 +param; 
  116.     } 
  117.  
  118.  

ServiceRegistry-服务注册类

接口

  1. package com.github.houbb.rpc.server.registry; 
  2.  
  3.  
  4.  
  5. public interface ServiceRegistry { 
  6.  
  7.  
  8.      
  9.     ServiceRegistry port(final int port); 
  10.  
  11.  
  12.      
  13.     ServiceRegistry register(final String serviceId, final Object serviceImpl); 
  14.  
  15.  
  16.      
  17.     ServiceRegistry expose(); 
  18.  
  19.  

实现

  1. package com.github.houbb.rpc.server.registry.impl; 
  2.  
  3.  
  4. import com.github.houbb.heaven.util.common.ArgUtil; 
  5. import com.github.houbb.rpc.common.config.protocol.ProtocolConfig; 
  6. import com.github.houbb.rpc.server.config.service.DefaultServiceConfig; 
  7. import com.github.houbb.rpc.server.config.service.ServiceConfig; 
  8. import com.github.houbb.rpc.server.core.RpcServer; 
  9. import com.github.houbb.rpc.server.registry.ServiceRegistry; 
  10. import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory; 
  11.  
  12.  
  13. import java.util.ArrayList; 
  14. import java.util.List; 
  15.  
  16.  
  17.  
  18. public class DefaultServiceRegistry implements ServiceRegistry { 
  19.  
  20.  
  21.      
  22.     private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry(); 
  23.  
  24.  
  25.      
  26.     private int rpcPort; 
  27.  
  28.  
  29.      
  30.     private ProtocolConfig protocolConfig; 
  31.  
  32.  
  33.      
  34.     private List serviceConfigList; 
  35.  
  36.  
  37.     private DefaultServiceRegistry(){ 
  38.         // 初始化默认参数 
  39.         this.serviceConfigList = new ArrayList<>(); 
  40.         this.rpcPort = 9527; 
  41.     } 
  42.  
  43.  
  44.     public static DefaultServiceRegistry getInstance() { 
  45.         return INSTANCE; 
  46.     } 
  47.  
  48.  
  49.     @Override 
  50.     public ServiceRegistry port(int port) { 
  51.         ArgUtil.positive(port, "port"); 
  52.  
  53.  
  54.         this.rpcPort = port; 
  55.         return this; 
  56.     } 
  57.  
  58.  
  59.      
  60.     @Override 
  61.     @SuppressWarnings("unchecked"
  62.     public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) { 
  63.         ArgUtil.notEmpty(serviceId, "serviceId"); 
  64.         ArgUtil.notNull(serviceImpl, "serviceImpl"); 
  65.  
  66.  
  67.         // 构建对应的其他信息 
  68.         ServiceConfig serviceConfig = new DefaultServiceConfig(); 
  69.         serviceConfig.id(serviceId).reference(serviceImpl); 
  70.         serviceConfigList.add(serviceConfig); 
  71.  
  72.  
  73.         return this; 
  74.     } 
  75.  
  76.  
  77.     @Override 
  78.     public ServiceRegistry expose() { 
  79.         // 注册所有服务信息 
  80.         DefaultServiceFactory.getInstance() 
  81.                 .registerServices(serviceConfigList); 
  82.  
  83.  
  84.         // 暴露 netty server 信息 
  85.         new RpcServer(rpcPort).start(); 
  86.         return this; 
  87.     } 
  88.  
  89.  

ServiceConfig 是一些服务的配置信息,接口定义如下:

  1. package com.github.houbb.rpc.server.config.service; 
  2.  
  3.  
  4.  
  5. public interface ServiceConfig { 
  6.  
  7.  
  8.      
  9.     String id(); 
  10.  
  11.  
  12.      
  13.     ServiceConfig id(String id); 
  14.  
  15.  
  16.      
  17.     T reference(); 
  18.  
  19.  
  20.      
  21.     ServiceConfig reference(T reference); 
  22.  
  23.  

测试

maven 引入

引入服务端的对应 maven 包:

  1.  
  2.     com.github.houbb 
  3.     rpc-server 
  4.     0.0.6 
  5.  

服务端启动

  1. // 启动服务 
  2. DefaultServiceRegistry.getInstance() 
  3.         .register(ServiceIdConst.CALC, new CalculatorServiceImpl()) 
  4.         .expose(); 

这里注册了一个计算服务,并且设置对应的实现。

和以前实现类似,此处不再赘述。

启动日志:

  1. [DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. 
  2. [INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端 
  3. 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered 
  4. 信息: [id: 0xec4dc74f] REGISTERED 
  5. 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind 
  6. 信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527 
  7. 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive 
  8. 信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE 
  9. [INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口 

ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯