上一篇我们介绍了,如何实现基于反射的通用服务端。
这一节我们来一起学习下如何实现通用客户端。
因为内容较多,所以拆分为 2 个部分。
基本思路
所有的方法调用,基于反射进行相关处理实现。
核心类
为了便于拓展,我们把核心类调整如下:
- package com.github.houbb.rpc.client.core;
-
-
- import com.github.houbb.heaven.annotation.ThreadSafe;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.core.context.RpcClientContext;
- import com.github.houbb.rpc.client.handler.RpcClientHandler;
- import com.github.houbb.rpc.common.constant.RpcConstant;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
-
-
-
- @ThreadSafe
- public class RpcClient {
-
-
- private static final Log log = LogFactory.getLog(RpcClient.class);
-
-
-
- private final String address;
-
-
-
- private final int port;
-
-
-
- private final ChannelHandler channelHandler;
-
-
- public RpcClient(final RpcClientContext clientContext) {
- this.address = clientContext.address();
- this.port = clientContext.port();
- this.channelHandler = clientContext.channelHandler();
- }
-
-
-
- public ChannelFuture connect() {
- // 启动服务端
- log.info("RPC 服务开始启动客户端");
-
-
- EventLoopGroup workerGroup = new NioEventLoopGroup();
-
-
-
- ChannelFuture channelFuture;
- try {
- Bootstrap bootstrap = new Bootstrap();
- channelFuture = bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer
(){ - @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- // 解码 bytes=>resp
- .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
- // request=>bytes
- .addLast(new ObjectEncoder())
- // 日志输出
- .addLast(new LoggingHandler(LogLevel.INFO))
- .addLast(channelHandler);
- }
- })
- .connect(address, port)
- .syncUninterruptibly();
- log.info("RPC 服务启动客户端完成,监听地址 {}:{}", address, port);
- } catch (Exception e) {
- log.error("RPC 客户端遇到异常", e);
- throw new RuntimeException(e);
- }
- // 不要关闭线程池!!!
-
-
- return channelFuture;
- }
-
-
- }
可以灵活指定对应的服务端地址、端口信息。
ChannelHandler 作为处理参数传入。
ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端类似,是 netty 的内置实现。
RpcClientHandler
客户端的 handler 实现如下:
-
-
-
- package com.github.houbb.rpc.client.handler;
-
-
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.core.RpcClient;
- import com.github.houbb.rpc.client.invoke.InvokeService;
- import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
-
-
- public class RpcClientHandler extends SimpleChannelInboundHandler {
-
-
- private static final Log log = LogFactory.getLog(RpcClient.class);
-
-
-
- private final InvokeService invokeService;
-
-
- public RpcClientHandler(InvokeService invokeService) {
- this.invokeService = invokeService;
- }
-
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- RpcResponse rpcResponse = (RpcResponse)msg;
- invokeService.addResponse(rpcResponse.seqId(), rpcResponse);
- log.info("[Client] response is :{}", rpcResponse);
- }
-
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)
- // 个人理解:如果不关闭,则永远会被阻塞。
- ctx.flush();
- ctx.close();
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
只有 channelRead0 做了调整,基于 InvokeService 对结果进行处理。
InvokeService
接口
- package com.github.houbb.rpc.client.invoke;
-
-
- import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
-
-
-
- public interface InvokeService {
-
-
-
- InvokeService addRequest(final String seqId);
-
-
-
- InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);
-
-
-
- RpcResponse getResponse(final String seqId);
-
-
- }
主要是对入参、出参的设置,以及出参的获取。
实现
- package com.github.houbb.rpc.client.invoke.impl;
-
-
- import com.github.houbb.heaven.util.guava.Guavas;
- import com.github.houbb.heaven.util.lang.ObjectUtil;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.core.RpcClient;
- import com.github.houbb.rpc.client.invoke.InvokeService;
- import com.github.houbb.rpc.common.exception.RpcRuntimeException;
- import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
-
-
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
-
-
-
- public class DefaultInvokeService implements InvokeService {
-
-
- private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);
-
-
-
- private final Set
requestSet; -
-
-
- private final ConcurrentHashMap
responseMap; -
-
- public DefaultInvokeService() {
- requestSet = Guavas.newHashSet();
- responseMap = new ConcurrentHashMap<>();
- }
-
-
- @Override
- public InvokeService addRequest(String seqId) {
- LOG.info("[Client] start add request for seqId: {}", seqId);
- requestSet.add(seqId);
- return this;
- }
-
-
- @Override
- public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
- // 这里放入之前,可以添加判断。
- // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。
- LOG.info("[Client] 获取结果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);
- responseMap.putIfAbsent(seqId, rpcResponse);
-
-
- // 通知所有等待方
- LOG.info("[Client] seq 信息已经放入,通知所有等待方", seqId);
-
-
- synchronized (this) {
- this.notifyAll();
- }
-
-
- return this;
- }
-
-
- @Override
- public RpcResponse getResponse(String seqId) {
- try {
- RpcResponse rpcResponse = this.responseMap.get(seqId);
- if(ObjectUtil.isNotNull(rpcResponse)) {
- LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
- return rpcResponse;
- }
-
-
- // 进入等待
- while (rpcResponse == null) {
- LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId);
- // 同步等待锁
- synchronized (this) {
- this.wait();
- }
-
-
- rpcResponse = this.responseMap.get(seqId);
- LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
- }
-
-
- return rpcResponse;
- } catch (InterruptedException e) {
- throw new RpcRuntimeException(e);
- }
- }
- }
使用 requestSet 存储对应的请求入参。
使用 responseMap 存储对应的请求出参,在获取的时候通过同步 while 循环等待,获取结果。
此处,通过 notifyAll() 和 wait() 进行等待和唤醒。
ReferenceConfig-服务端配置
说明
我们想调用服务端,首先肯定要定义好要调用的对象。
ReferenceConfig 就是要告诉 rpc 框架,调用的服务端信息。
接口
- package com.github.houbb.rpc.client.config.reference;
-
-
- import com.github.houbb.rpc.common.config.component.RpcAddress;
-
-
- import java.util.List;
-
-
-
- public interface ReferenceConfig
{ -
-
-
- ReferenceConfig
serviceId(final String serviceId); -
-
-
- String serviceId();
-
-
-
- Class
serviceInterface(); -
-
-
- ReferenceConfig
serviceInterface(final Class serviceInterface); -
-
-
- ReferenceConfig
addresses(final String addresses); -
-
-
- T reference();
-
-
- }
实现
- package com.github.houbb.rpc.client.config.reference.impl;
-
-
- import com.github.houbb.heaven.constant.PunctuationConst;
- import com.github.houbb.heaven.util.common.ArgUtil;
- import com.github.houbb.heaven.util.guava.Guavas;
- import com.github.houbb.heaven.util.lang.NumUtil;
- import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
- import com.github.houbb.rpc.client.core.RpcClient;
- import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;
- import com.github.houbb.rpc.client.handler.RpcClientHandler;
- import com.github.houbb.rpc.client.invoke.InvokeService;
- import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
- import com.github.houbb.rpc.client.proxy.ReferenceProxy;
- import com.github.houbb.rpc.client.proxy.context.ProxyContext;
- import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
- import com.github.houbb.rpc.common.config.component.RpcAddress;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandler;
-
-
- import java.util.List;
-
-
-
- public class DefaultReferenceConfig
implements ReferenceConfig { -
-
-
- private String serviceId;
-
-
-
- private Class
serviceInterface; -
-
-
- private List
rpcAddresses; -
-
-
- private List
channelFutures; -
-
-
- @Deprecated
- private RpcClientHandler channelHandler;
-
-
-
- private InvokeService invokeService;
-
-
- public DefaultReferenceConfig() {
- // 初始化信息
- this.rpcAddresses = Guavas.newArrayList();
- this.channelFutures = Guavas.newArrayList();
- this.invokeService = new DefaultInvokeService();
- }
-
-
- @Override
- public String serviceId() {
- return serviceId;
- }
-
-
- @Override
- public DefaultReferenceConfig
serviceId(String serviceId) { - this.serviceId = serviceId;
- return this;
- }
-
-
- @Override
- public Class
serviceInterface() { - return serviceInterface;
- }
-
-
- @Override
- public DefaultReferenceConfig
serviceInterface(Class serviceInterface) { - this.serviceInterface = serviceInterface;
- return this;
- }
-
-
- @Override
- public ReferenceConfig
addresses(String addresses) { - ArgUtil.notEmpty(addresses, "addresses");
-
-
- String[] addressArray = addresses.split(PunctuationConst.COMMA);
- ArgUtil.notEmpty(addressArray, "addresses");
-
-
- for(String address : addressArray) {
- String[] addressSplits = address.split(PunctuationConst.COLON);
- if(addressSplits.length < 2) {
- throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");
- }
- String ip = addressSplits[0];
- int port = NumUtil.toIntegerThrows(addressSplits[1]);
- // 包含权重信息
- int weight = 1;
- if(addressSplits.length >= 3) {
- weight = NumUtil.toInteger(addressSplits[2], 1);
- }
-
-
- RpcAddress rpcAddress = new RpcAddress(ip, port, weight);
- this.rpcAddresses.add(rpcAddress);
- }
-
-
- return this;
- }
-
-
-
- @Override
- public T reference() {
- // 1. 启动 client 端到 server 端的连接信息
- // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。
- // 1.2 初期为了简单,直接使用同步循环的方式。
- // 创建 handler
- // 循环连接
- for(RpcAddress rpcAddress : rpcAddresses) {
- final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
- final DefaultRpcClientContext context = new DefaultRpcClientContext();
- context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
- ChannelFuture channelFuture = new RpcClient(context).connect();
- // 循环同步等待
- // 如果出现异常,直接中断?捕获异常继续进行??
- channelFutures.add(channelFuture);
- }
-
-
- // 2. 接口动态代理
- ProxyContext
proxyContext = buildReferenceProxyContext(); - return ReferenceProxy.newProxyInstance(proxyContext);
- }
-
-
-
- private ProxyContext
buildReferenceProxyContext() { - DefaultProxyContext
proxyContext = new DefaultProxyContext<>(); - proxyContext.serviceId(this.serviceId);
- proxyContext.serviceInterface(this.serviceInterface);
- proxyContext.channelFutures(this.channelFutures);
- proxyContext.invokeService(this.invokeService);
- return proxyContext;
- }
-
-
- }
这里主要根据指定的服务端信息,初始化对应的代理实现。
这里还可以拓展指定权重,便于后期负载均衡拓展,本期暂时不做实现。
ReferenceProxy
说明
所有的 rpc 调用,客户端只有服务端的接口。
那么,怎么才能和调用本地方法一样调用远程方法呢?
答案就是动态代理。
实现
实现如下:
- package com.github.houbb.rpc.client.proxy;
-
-
- import com.github.houbb.heaven.util.lang.ObjectUtil;
- import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.proxy.context.ProxyContext;
- import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
- import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;
- import com.github.houbb.rpc.common.support.id.impl.Uuid;
- import com.github.houbb.rpc.common.support.time.impl.DefaultSystemTime;
- import io.netty.channel.Channel;
-
-
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
-
-
-
- public class ReferenceProxy
implements InvocationHandler { -
-
- private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);
-
-
-
- private final ProxyContext
proxyContext; -
-
-
- private ReferenceProxy(ProxyContext
proxyContext) { - this.proxyContext = proxyContext;
- }
-
-
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // 反射信息处理成为 rpcRequest
- final String seqId = Uuid.getInstance().id();
- final long createTime = DefaultSystemTime.getInstance().time();
- DefaultRpcRequest rpcRequest = new DefaultRpcRequest();
- rpcRequest.serviceId(proxyContext.serviceId());
- rpcRequest.seqId(seqId);
- rpcRequest.createTime(createTime);
- rpcRequest.paramValues(args);
- rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));
- rpcRequest.methodName(method.getName());
-
-
- // 调用远程
- LOG.info("[Client] start call remote with request: {}", rpcRequest);
- proxyContext.invokeService().addRequest(seqId);
-
-
- // 这里使用 load-balance 进行选择 channel 写入。
- final Channel channel = getChannel();
- LOG.info("[Client] start call channel id: {}", channel.id().asLongText());
-
-
- // 对于信息的写入,实际上有着严格的要求。
- // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。
- // 支持的必须是 ByteBuf
- channel.writeAndFlush(rpcRequest).sync();
-
-
- // 循环获取结果
- // 通过 Loop+match wait/notifyAll 来获取
- // 分布式根据 redis+queue+loop
- LOG.info("[Client] start get resp for seqId: {}", seqId);
- RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
- LOG.info("[Client] start get resp for seqId: {}", seqId);
- Throwable error = rpcResponse.error();
- if(ObjectUtil.isNotNull(error)) {
- throw error;
- }
- return rpcResponse.result();
- }
-
-
-
- private Channel getChannel() {
- return proxyContext.channelFutures().get(0).channel();
- }
-
-
-
- @SuppressWarnings("unchecked")
- public static
T newProxyInstance(ProxyContext proxyContext) { - final Class
interfaceClass = proxyContext.serviceInterface(); - ClassLoader classLoader = interfaceClass.getClassLoader();
- Class>[] interfaces = new Class[]{interfaceClass};
- ReferenceProxy proxy = new ReferenceProxy(proxyContext);
- return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy);
- }
-
-
- }
客户端初始化 newProxyInstance 的就是创建的代理的过程。
客户端调用远程方法,实际上是调用 invoke 的过程。
(1)构建反射 invoke 请求信息,添加 reqId
(2)netty 远程调用服务端
(3)同步获取响应信息
测试
引入 maven
-
com.github.houbb -
rpc-client -
0.0.6 -
测试代码
- public static void main(String[] args) {
- // 服务配置信息
- ReferenceConfig
config = new DefaultReferenceConfig(); - config.serviceId(ServiceIdConst.CALC);
- config.serviceInterface(CalculatorService.class);
- config.addresses("localhost:9527");
-
-
- CalculatorService calculatorService = config.reference();
- CalculateRequest request = new CalculateRequest();
- request.setOne(10);
- request.setTwo(20);
-
-
- CalculateResponse response = calculatorService.sum(request);
- System.out.println(response);
- }
测试日志:
- [DEBUG] [2021-10-05 14:16:17.534] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
- [INFO] [2021-10-05 14:16:17.625] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
- ...
- [INFO] [2021-10-05 14:16:19.328] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端完成,监听地址 localhost:9527
- [INFO] [2021-10-05 14:16:19.346] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
- [INFO] [2021-10-05 14:16:19.347] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
- [INFO] [2021-10-05 14:16:19.348] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000017bc-00000000-399b9d7e1b88839d-5ccc4a29
- 十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler write
- 信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] WRITE: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
- 十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler flush
- 信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] FLUSH
- [INFO] [2021-10-05 14:16:19.412] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
- [INFO] [2021-10-05 14:16:19.413] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应结果为空,进入等待
- 十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler channelRead
- 信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] READ: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
- ...
- [INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seq: a525c5a6196545f5a5241b2cdc2ec2c2, rpcResponse: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
- [INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息已经放入,通知所有等待方
- [INFO] [2021-10-05 14:16:19.506] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
- [INFO] [2021-10-05 14:16:19.506] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应结果已经获取: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
- [INFO] [2021-10-05 14:16:19.507] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
- CalculateResponse{success=true, sum=30}
小结
现在看来有一个小问题,要求服务端必须指定 port,这有点不太合理,比如代理域名,后续需要优化。
这里的启动声明方式也比较基础,后续可以考虑和 spring 进行整合。