前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更广泛的实用性。
基本思路
所有的方法调用,基于反射进行相关处理实现。
服务端
核心类
- RpcServer
调整如下:
- serverBootstrap.group(workerGroup, bossGroup)
- .channel(NioServerSocketChannel.class)
- // 打印日志
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- // 解码 bytes=>resp
- .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
- // request=>bytes
- .addLast(new ObjectEncoder())
- .addLast(new RpcServerHandler());
- }
- })
- // 这个参数影响的是还没有被accept 取出的连接
- .option(ChannelOption.SO_BACKLOG, 128)
- // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
- .childOption(ChannelOption.SO_KEEPALIVE, true);
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。
RpcServerHandler
- package com.github.houbb.rpc.server.handler;
-
-
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
- import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
- import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
-
-
- public class RpcServerHandler extends SimpleChannelInboundHandler {
-
-
- private static final Log log = LogFactory.getLog(RpcServerHandler.class);
-
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel {} connected " + id);
- }
-
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel read start: {}", id);
-
-
- // 接受客户端请求
- RpcRequest rpcRequest = (RpcRequest)msg;
- log.info("[Server] receive channel {} request: {}", id, rpcRequest);
-
-
- // 回写到 client 端
- DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
- ctx.writeAndFlush(rpcResponse);
- log.info("[Server] channel {} response {}", id, rpcResponse);
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
-
-
- private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
- DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
- rpcResponse.seqId(rpcRequest.seqId());
-
-
- try {
- // 获取对应的 service 实现类
- // rpcRequest=>invocationRequest
- // 执行 invoke
- Object result = DefaultServiceFactory.getInstance()
- .invoke(rpcRequest.serviceId(),
- rpcRequest.methodName(),
- rpcRequest.paramTypeNames(),
- rpcRequest.paramValues());
- rpcResponse.result(result);
- } catch (Exception e) {
- rpcResponse.error(e);
- log.error("[Server] execute meet ex for request", rpcRequest, e);
- }
-
-
- // 构建结果值
- return rpcResponse;
- }
-
-
- }
和以前类似,不过 handleRpcRequest 要稍微麻烦一点。
这里需要根据发射,调用对应的方法。
pojo
其中使用的出参、入参实现如下:
RpcRequest
- package com.github.houbb.rpc.common.rpc.domain;
-
-
- import java.util.List;
-
-
-
- public interface RpcRequest extends BaseRpc {
-
-
-
- long createTime();
-
-
-
- String serviceId();
-
-
-
- String methodName();
-
-
-
- List
paramTypeNames(); -
-
- // 调用参数信息列表
-
-
-
- Object[] paramValues();
-
-
- }
RpcResponse
- package com.github.houbb.rpc.common.rpc.domain;
-
-
-
- public interface RpcResponse extends BaseRpc {
-
-
-
- Throwable error();
-
-
-
- Object result();
-
-
- }
BaseRpc
- package com.github.houbb.rpc.common.rpc.domain;
-
-
- import java.io.Serializable;
-
-
-
- public interface BaseRpc extends Serializable {
-
-
-
- String seqId();
-
-
-
- BaseRpc seqId(final String traceId);
-
-
- }
ServiceFactory-服务工厂
为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。
ServiceFactory
- package com.github.houbb.rpc.server.service;
-
-
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.registry.ServiceRegistry;
-
-
- import java.util.List;
-
-
-
- public interface ServiceFactory {
-
-
-
- ServiceFactory registerServices(final List
serviceConfigList); -
-
-
- Object invoke(final String serviceId, final String methodName,
- List
paramTypeNames, final Object[] paramValues); -
-
- }
DefaultServiceFactory
作为默认实现,如下:
- package com.github.houbb.rpc.server.service.impl;
-
-
- import com.github.houbb.heaven.constant.PunctuationConst;
- import com.github.houbb.heaven.util.common.ArgUtil;
- import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
- import com.github.houbb.heaven.util.util.CollectionUtil;
- import com.github.houbb.rpc.common.exception.RpcRuntimeException;
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.service.ServiceFactory;
-
-
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
-
-
- public class DefaultServiceFactory implements ServiceFactory {
-
-
-
- private Map
serviceMap; -
-
-
- private Map
methodMap; -
-
- private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();
-
-
- private DefaultServiceFactory(){}
-
-
- public static DefaultServiceFactory getInstance() {
- return INSTANCE;
- }
-
-
-
- @Override
- public synchronized ServiceFactory registerServices(List
serviceConfigList) { - ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");
-
-
- // 集合初始化
- serviceMap = new HashMap<>(serviceConfigList.size());
- // 这里只是预估,一般为2个服务。
- methodMap = new HashMap<>(serviceConfigList.size()*2);
-
-
- for(ServiceConfig serviceConfig : serviceConfigList) {
- serviceMap.put(serviceConfig.id(), serviceConfig.reference());
- }
-
-
- // 存放方法名称
- for(Map.Entry
entry : serviceMap.entrySet()) { - String serviceId = entry.getKey();
- Object reference = entry.getValue();
-
-
- //获取所有方法列表
- Method[] methods = reference.getClass().getMethods();
- for(Method method : methods) {
- String methodName = method.getName();
- if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
- continue;
- }
-
-
- List
paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); - String key = buildMethodKey(serviceId, methodName, paramTypeNames);
- methodMap.put(key, method);
- }
- }
-
-
- return this;
- }
-
-
-
-
- @Override
- public Object invoke(String serviceId, String methodName, List
paramTypeNames, Object[] paramValues) { - //参数校验
- ArgUtil.notEmpty(serviceId, "serviceId");
- ArgUtil.notEmpty(methodName, "methodName");
-
-
- // 提供 cache,可以根据前三个值快速定位对应的 method
- // 根据 method 进行反射处理。
- // 对于 paramTypes 进行 string 连接处理。
- final Object reference = serviceMap.get(serviceId);
- final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
- final Method method = methodMap.get(methodKey);
-
-
- try {
- return method.invoke(reference, paramValues);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RpcRuntimeException(e);
- }
- }
-
-
-
- private String buildMethodKey(String serviceId, String methodName, List
paramTypeNames) { - String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
- return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON
- +param;
- }
-
-
- }
ServiceRegistry-服务注册类
接口
- package com.github.houbb.rpc.server.registry;
-
-
-
- public interface ServiceRegistry {
-
-
-
- ServiceRegistry port(final int port);
-
-
-
- ServiceRegistry register(final String serviceId, final Object serviceImpl);
-
-
-
- ServiceRegistry expose();
-
-
- }
实现
- package com.github.houbb.rpc.server.registry.impl;
-
-
- import com.github.houbb.heaven.util.common.ArgUtil;
- import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
- import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
- import com.github.houbb.rpc.server.config.service.ServiceConfig;
- import com.github.houbb.rpc.server.core.RpcServer;
- import com.github.houbb.rpc.server.registry.ServiceRegistry;
- import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
-
-
- import java.util.ArrayList;
- import java.util.List;
-
-
-
- public class DefaultServiceRegistry implements ServiceRegistry {
-
-
-
- private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();
-
-
-
- private int rpcPort;
-
-
-
- private ProtocolConfig protocolConfig;
-
-
-
- private List
serviceConfigList; -
-
- private DefaultServiceRegistry(){
- // 初始化默认参数
- this.serviceConfigList = new ArrayList<>();
- this.rpcPort = 9527;
- }
-
-
- public static DefaultServiceRegistry getInstance() {
- return INSTANCE;
- }
-
-
- @Override
- public ServiceRegistry port(int port) {
- ArgUtil.positive(port, "port");
-
-
- this.rpcPort = port;
- return this;
- }
-
-
-
- @Override
- @SuppressWarnings("unchecked")
- public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
- ArgUtil.notEmpty(serviceId, "serviceId");
- ArgUtil.notNull(serviceImpl, "serviceImpl");
-
-
- // 构建对应的其他信息
- ServiceConfig serviceConfig = new DefaultServiceConfig();
- serviceConfig.id(serviceId).reference(serviceImpl);
- serviceConfigList.add(serviceConfig);
-
-
- return this;
- }
-
-
- @Override
- public ServiceRegistry expose() {
- // 注册所有服务信息
- DefaultServiceFactory.getInstance()
- .registerServices(serviceConfigList);
-
-
- // 暴露 netty server 信息
- new RpcServer(rpcPort).start();
- return this;
- }
-
-
- }
ServiceConfig 是一些服务的配置信息,接口定义如下:
- package com.github.houbb.rpc.server.config.service;
-
-
-
- public interface ServiceConfig
{ -
-
-
- String id();
-
-
-
- ServiceConfig
id(String id); -
-
-
- T reference();
-
-
-
- ServiceConfig
reference(T reference); -
-
- }
测试
maven 引入
引入服务端的对应 maven 包:
-
com.github.houbb -
rpc-server -
0.0.6 -
服务端启动
- // 启动服务
- DefaultServiceRegistry.getInstance()
- .register(ServiceIdConst.CALC, new CalculatorServiceImpl())
- .expose();
这里注册了一个计算服务,并且设置对应的实现。
和以前实现类似,此处不再赘述。
启动日志:
- [DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
- [INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered
- 信息: [id: 0xec4dc74f] REGISTERED
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind
- 信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527
- 十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive
- 信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
- [INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口
ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。