文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何手搓一个自定义的RPC(远程过程调用框架)

2024-11-29 20:11

关注

首先简单介绍一下RPC 主要特点:

1.1 RPC远程过程调用的主要特点

1.2 RPC的应用场景

1.3 常见的RPC框架

2.实现自定义的RPC

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将

要实现一个自定义的RPC框架需解决以下几个主要问题:

1.客户端调用:客户端调用本地的代理函数(stub代码,这个函数负责将调用转换为RPC请求)。这其实就是一个接口描述文件,它可以有多种形式如JSON、XML、甚至是一份word文档或是口头约定均可,只要客户端及服务端都是遵守这份接口描述文件契约即可。在我们的实际开发中一种常见的方式是服务提供者发布一个包含服务接口类的jar包到maven 中央仓库,调用方通过pom文件将之依赖到本地。

2.参数序列化:代理函数将调用参数进行序列化,并将请求发送到服务器。

3.服务端数据接收:服务器端接收到请求,并将其反序列化,恢复成原始参数。

4.执行远程过程:服务端调用实际的服务过程(函数)并获取结果。

5.返回结果:服务端将调用结果进行序列化,并通过网络传给客户端。

6.客户端接收调用结果:客户到接收到服务端传输的字节流,进行反序列化,转换为实际的结果数据格式,并返回到原始调用方。

下面需我们通过代码一一展示上述各功能是如何实现的。

2.1 自定义通信协议

本文的目的是要实现一个自定义通信协议的远程调用框架,所以首先要定义一个通信协议数据格式。

整个自定义协议总体上分为Header 及 Body Content两部分;Header 占16个字节,又分为4个部分。

前2位为魔法值用于Netty编解码组件,解决网络通信中的粘包、半包等问题,此处不展开细讲。

msgtype用于表示消息的类型,如request(请求)、respone(响应)、heartbeat(心跳)等。

code 占1位,表示请求的响应状态,成功还是失败。

request id占8位,表示请求的序列号,用于后续调用结果的匹配,保证线程内唯一。

body size 占4位,表示实现请求内容的长度,在反序化时读取此长度的内容字节,解析出正确的数据。

客户端、服务端在通信过程中都要按照上述约定的通信协议进行数据的编、解码工作。

2.2 客户端调用

2.2.1 客户端的使用

客户端一般通过接口代理工厂通过动态代理技术来生成一个代理实例,所有的远程调用中的细节,如参数序列化,网络传输,异常处理等都隐藏在代理实例中实现,对调用方来说调用过程是透明的,就像调用本地方法一样。

首先看一下客户端的使用方式,本文假设一个IShoppingCartService (购物车)的接口类,基中有一个方法根据传入的用户pin,返回购物车详情。

//接口方法
ShoppingCart shopping(String pin);
//客户端通过代理工厂实现接口的一个代理实例
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)                
                .setSerializerType(SerializerType.JDK) //客户端设置所使用的序列化工具,此处为JDK原生
                .newProxyInstance(); //返回代理 实现
//像调用本地方法一样,调用此代理实例的shopping 方法
ShoppingCart result = serviceProxy.shopping("userPin");
log.info("result={}", JSONObject.toJSONString(result));
2.2.2 客户端代理工厂的核心功能
public class ProxyFactory {
    //……省略
    
    public I newProxyInstance() {     
        //服务的元数据信息
        ServiceData serviceData = new ServiceData(
                group, //分组
                providerName, //服务名称,一般为接口的class的全限定名称
                StringUtils.isNotBlank(version) ? version : "1.0.0" //版本号
        );


        //调用器
        Calller caller = newCaller().timeoutMillis(timeoutMillis);
        //集群策略,用于实现快速失败或失败转等功能
        Strategy strategy = StrategyConfigContext.of(strategy, retries);
        Object handler = null;
        switch (invokeType) {
            case "syncCall":
                //同步调用handler
                handler = new SyncCaller(serviceData, caller);
                break;
            case "asyncCall":
                //异步调用handler
                handler = new AsyncCaller(client.appName(), serviceData, caller, strategy);
                break;
            default:
                throw new RuntimeException("未知类型: " + invokeType);
        }


        //返回代理实例
        return ProxyEnum.getDefault().newProxy(interfaceClass, handler);
    }
    //……省略
}

代码 ProxyEnum.getDefault().newProxy(interfaceClass, handler) 返回一个具体的代理实例,此方法要求传入两个参数,interfaceClass  被代理的接口类class,即服务方所发布的服务接口类。

handler 为动态代理所需要代码增强逻辑,即所有的调用细节都由此增强类完成。按照动态代理的实现方式的不同,本文支持两种动态代理方式:

1.JDK动态代码,如采用此方式,handler 需要实现接口 InvocationHandler

2.ByteBuddy,它是一个用于在运行时生成、修改和操作Java类的库,允许开发者通过简单的API生成新的类或修改已有的类,而无需手动编写字节码,它广泛应用于框架开发、动态代理、字节码操作和类加载等领域。

本文默认采用第二种方式,通过代码简单展示一下代理实例的的生成方式。

//方法newProxy 的具体实现
public  T newProxy(Class interfaceType, Object handler) {
            Class cls = new ByteBuddy()
                     //生成接口的子类
                    .subclass(interfaceType) 
                     //默认代理接口中所有声明的方法
                    .method(ElementMatchers.isDeclaredBy(interfaceType))
                     //代码增强,即接口中所有被代理的方法都
                     //委托给用户自定义的handler处理,这也是动态代理的意义所在
                   .intercept(MethodDelegation.to(handler, "handlerInstance"))
                    .make()
                     //通过类加载器加载
                   .load(interfaceType.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
                    .getLoaded();


            try {
                //通过newInstance构建一个代理实例并返回    
               return cls.newInstance();
            } catch (Throwable t) {
                ……
            }
        }

本文以同步调用为例,现在展示一下 SyncInvoker 的具体实现逻辑。

public class SyncCaller extends AbstractCaller {
    //……省略   
    
    @RuntimeType
    public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
        //封装请求的接口中的方法名及方法参数,组成一个request请求对象
        StarGateRequest request = createRequest(methodName, args);
        //集群容错策略调度器接口
        //提供快速失败,失败转移等策略供调用方选择,此处默认采用了快速失败的策略
        Invoker invoker = new FastFailInvoker();
        //returnType 的类型决定了泛型方法的实际结果类型,用于后续调用结果的类型转换
        Future future = invoker.invoke(request, method.getReturnType());
        if (sync) {
            //同步调用,线程会阻塞在get方法,直到超时或结果可用
            Object result = future.getResult();
            return result;
        } else {
            return future;
        }
    }
}


//同步,异步调用的关键点就在于InvokeFuture,它继承了Java的CompletionStage类,用于异步编程

通过以上核心代码,客户端就完成了服务调用环节,下一步RPC框架需要将客户端请求的接口方法及方法参数进行序列化并通过网络进行传输。下面通过代码片段展示一下序列化的实现方式。

2.2.3 请求参数序列化

我们将请求参数序列化的目的就是将具体的请求参数转换成字节组,填充进入上述自定义协议的 body content 部分。下面通过代码演示一下如何进行反序列化。

本文默认采用JDK原生的对象序列化及反序列化框架,也可通过SPI技术扩展支持Protocol Buffers等。

//上述代码行Future future = invoker.invoke(request, method.getReturnType());
//具体实现


public  Future invoke(StarGateRequest request, Class returnType) throws Exception {
        //对象序列化器,默认为JDK
        final Serializer _serializer = serializer();
        //message对象包含此次请求的接口名,方法名及实际参数列表
        final Message message = request.message();
        //通过软负载均衡选择一个 Netty channel
        Channel channel = selectChannel(message.getMetadata());
        byte code = _serializer.code();
        //将message对象序列成字节数组
        byte[] bytes = _serializer.writeObject(message);
        request.bytes(code, bytes);


        //数据写入 channel 并返回 future 约定,用于同步或异步获得调用结果
        return write(channel, request, returnType);
    }
//对象的序列化,JDK 原生方式
public  byte[] writeObject(T obj) {
        ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();
        try (ObjectOutputStream output = new ObjectOutputStream(buf)) {
            output.writeObject(obj);
            output.flush();
            return buf.toByteArray();
        } catch (IOException e) {
            ThrowUtil.throwException(e);
        } finally {
            OutputStreams.resetBuf(buf);
        }
        return null; 
    }
2.2.4 请求参数通过网络发送
//上述代码  write(channel, request, returnType);
//具体实现
protected  DefaultFuture write(final Channel channel,
                                               final StarGateRequest request,
                                               final Class returnType) {
        //……省略


        //调用结果占位 future对象,这也是promise编程模式
        final Future future = DefaultFuture.newFuture(request.invokeId(), channel, timeoutMillis, returnType);


        //将请求负载对象写入Netty channel通道,并绑定监听器处理写入结果
        channel.writeAndFlush(request).addListener((ChannelFutureListener) listener -> {
            if (listener.isSuccess()) {
                //网络写入成功
                ……
            } else {
                //异常时,构造造调用结果,供调用方进行处理
                DefaultFuture.errorFuture(channel, response, dispatchType);
            }
        });


        //因为Netty 是非阻塞的,所以写入后可立刻返回
        return future;
    }

2.2.4.1 Netty 消息编码器

消息写入Netty channel 后,会依次经过 channel pipline 上所安装的各种handler处理,然后再通过物理网络将数据发送出去,这里展示了客户端及服务端所使用的自定义编、解解器。

//自定义的编码器 继承自Netty 的 MessageToByteEncoder
public class StarGateEncoder extends MessageToByteEncoder {


    //……省略


    private void doEncodeRequest(RequestPayload request, ByteBuf out) {
        byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST);
        long invokeId = request.invokeId();
        byte[] bytes = request.bytes();
        int length = bytes.length;


        out.writeShort(StarGateProtocolHeader.Head)  //写入两个字节
                .writeByte(sign)              //写入1个字节
                .writeByte(0x00)            //写入1个字节
                .writeLong(invokeId)          //写入8个节节
                .writeInt(length)             //写入4个字节
                .writeBytes(bytes);
    }


}

至此,通过上述核心代码,客户的请求已经按照自定义的协议格式进行了序列化,并把数据写入到Netty channel中,最后通过物理网络传输到服务器端。

2.3 服务端接收数据

2.3.1 消息解码器

服务器端接收到客户端的发送的数据后,需要进行正确的消息解码,下面是解码器的实现。

//消息解码器,继承自Netty 的ReplayingDecoder,将客户端请求解码为 RequestPayload 对象,供业务处理handler使用
public class StarGateDecoder extends ReplayingDecoder {


    //……省略


    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        switch (state()) {
            case HEAD:
                checkMagic(in.readShort());         // HEAD
                checkpoint(State.HEAD);
            case SIGN:
                header.sign(in.readByte());         // 消息标志位
                checkpoint(State.STATUS);
            case STATUS:
                header.status(in.readByte());       // 状态位
                checkpoint(State.ID);
            case ID:
                header.id(in.readLong());           // 消息id
                checkpoint(State.BODY_SIZE);
            case BODY_SIZE:
                header.bodySize(in.readInt());      // 消息体长度
                checkpoint(State.BODY);
            case BODY:
                switch (header.messageCode()) {
                    //……省略
                    case StarGateProtocolHeader.REQUEST: {
                        //消息体长度信息
                        int length = checkBodySize(header.bodySize());
                        byte[] bytes = new byte[length];
                        //读取指定长度字节
                        in.readBytes(bytes);
                        //调用请求
                        RequestPayload request = new RequestPayload(header.id());
                        //设置序列化器编码,有效载荷
                        request.bytes(header.serializerCode(), bytes);
                        out.add(request);
                        break;
                    }


                    default:
                        throw new Exception("错误标志位");
                }
                checkpoint(State.HEAD);
        }
    }


   //……省略
}
2.3.2 请求参数反序列化
//服务端 Netty channel pipline 上所安装的业务处理 handler
//业务处理handler 对RequestPayload 所携带的字节数组进行反序列化,解析出客户端所传递的实际参数
public class ServiceHandler extends ChannelInboundHandlerAdapter {


    //……省略


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof RequestPayload) {       
            StarGateRequest request = new StarGateRequest((RequestPayload) msg);
            //约定的反序列化器, 由客户端设置
            byte code = request.serializerCode();
            Serializer serializer = SerializerFactory.getSerializer(code);
            //实际请求参数字组数组
            byte[] bytes = payload.bytes();
            //对象反序列化
            Message message = serializer.readObject(bytes, Message.class);
            log.info("message={}", JSONObject.toJSONString(message));


            request.message(message);


            //业务处理
            process(message);
        } else {
            //引用释放
            ReferenceCountUtil.release(msg);
        }
    }


    //……省略
}
2.3.3 处理客户端请求

经过反序列化后,服务端可以知道用户所请求的是哪个接口、方法、以及实际的参数值,下一步就可进行真实的方法调用。

//处理调用
public void process(Message message) {         
   try {
       ServiceMetadata metadata = msg.getMetadata(); //客户端请求的元数据
       String providerName = metadata.getProviderName(); //服务名,即接口类名


       //根据接口类名,查找服务端实现此接口的类的全限定类名
       providerName = findServiceImpl(providerName);
       String methodName = msg.getMethodName();  //方法名
       Object[] args = msg.getArgs();    //客户设置的实际参数


       //线程上下文类加载器
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
       //加载具体实现类
       Class clazz = classLoader.loadClass(providerName);
       //创建接口类实例
       Object instance = clazz.getDeclaredConstructor().newInstance();


       Method method = null;
       Class[] parameterTypes = new Class[args.length];
       for (int i = 0; i < args.length; i++) {
           parameterTypes[i] = args[i].getClass();
       }
       method = clazz.getMethod(methodName, parameterTypes);


       //反射调用 
       Object invokeResult = method.invoke(instance, args);
       } catch (Exception e) {
            log.error("调用异常:", e);
           throw new RuntimeException(e);
       }


       //处理同步调用结果
      doProcess(invokeResult);


}
2.3.4 返回调用结果

通过反射调用接口实现类,获取调用结果,然后对结果进行序列化并包装成response响应消息,将消息写入到channel, 经过channel pipline 上所安装的编码器对消息对象进行编码,最后发送给调用客户端。

//处理同步调用结果,并将结果写回到 Netty channel
private void doProcess(Object realResult) {
        ResultWrapper result = new ResultWrapper();
        result.setResult(realResult);
        byte code = request.serializerCode();
        Serializer serializer = SerializerFactory.getSerializer(code);
        //new response 响应消息对象
        Response response = new Response(request.invokeId());
        //调用结果序列成字节数组
        byte[] bytes = serializer.writeObject(result);
        response.bytes(code, bytes);
        response.status(Status.OK.value());


        //响应消息对象 response 写入 Netty channel
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    log.info("响应成功");
                } else {
                    //记录调用失败日志
                    log.error("响应失败, channel: {}, cause: {}.", channel, channelFuture.cause());
                }
            }
        });
    }

同样的,消息写入channel 后,先依次经过pipline 上所安装的 消息编码器,再发送给客户端。具体编码方式同客户端编码器类似,此处不再赘述。

2.4 客户端接收调用结果

客户端收到服务端写入响应消息后,同样经过Netty channel pipline 上所安装的解码器,进行正确的解码。然后再对解码后的对象进行正确的反序列化,最终获得调用结果 。具体的解码,反序列化过程不再赘述,流程基本同上面服务端的解码及反序列化类似。

public class consumerHandler extends ChannelInboundHandlerAdapter {


    //……省略


    //客户端处理所接收到的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof ResponseMessage) {
            try {
                //类型转换
                ResponseMessage responseMessage= (ResponseMessage)msg
                StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg());
                byte code = response.serializerCode();
                Serializer serializer = SerializerFactory.getSerializer(code);
                byte[] bytes = responseMessage.bytes();
                //反序列化成调用结果的包装类
                Result result = serializer.readObject(bytes, Result.class);
                response.result(result);


                //处理调用结果
                long invokeId = response.id();
                //通过 rnvokeid,从地缓存中拿到客户端调用的结果点位对象 futrue
                DefaultFuture future = FUTURES_MAP.remove(invokeId);


                //判断调用是否成功
                byte status = response.status();
                if (status == Status.OK.value()) {
                    //对调用结果进行强制类型转换,并设置future结果,对阻塞在future.get()的客户端同步调用来说,调用返回。
                    complete((V) response.getResult());
                } else {
                    //todo 处理异常
                }


            } catch (Throwable t) {
                log.error("调用记录: {}, on {} #channelRead().", t, ch);
            }
        } else {
            log.warn("消息类型不匹配: {}, channel: {}.", msg.getClass(), ch);
            //计数器减1
            ReferenceCountUtil.release(msg);
        }
    }


}

下面再通过一个简单的调用时序图展示一下一次典型的Rpc调用所经历的步骤。

3.结尾

本文首先简单介绍了一下RPC的概念、应用场景及常用的RPC框架,然后讲述了一下如何自己手动实现一个RPC框架的基本功能。目的是想让大家对RPC框架的实现有一个大概思路,并对Netty 这一高效网络编程框架有一个了解,通过对Netty 的编、解码器的学习,了解如何自定义一个私有的通信协议。限于篇幅本文只简单讲解了RPC的核心的调用逻辑的实现。真正生产可用的RPC框架还需要有更多复杂的功能,如限流、负载均衡、融断、降级、泛型调用、自动重连、自定义可扩展的拦截器等等。

另外RPC框架中一般有三种角色,服务提供者、服务消费者、注册中心,本文并没有介绍注册中心如何实现。并假定服务提供者已经将服务发布到了注册中心,服务消费者跟服务提供者之间建立起了TCP 长连接。

后续会通过其它篇章介绍注册中心,服务自动注册,服务发现等功能的实现原理。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容
咦!没有更多了?去看看其它编程学习网 内容吧