前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更高的实用性。
所有的方法调用,基于反射进行相关处理实现。
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>rpc-server</artifactId>
<version>0.0.6</version>
</dependency>
和原来一样,计算接口实现及入参/出参,此处不再赘述。
- RpcServiceMain.java
import com.github.houbb.rpc.example.server.service.CalculatorServiceImpl;
import com.github.houbb.rpc.server.facade.constant.ServiceIdConst;
import com.github.houbb.rpc.server.registry.impl.DefaultServiceRegistry;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class RpcServiceMain {
public static void main(String[] args) {
// 启动服务
DefaultServiceRegistry.getInstance()
.register(ServiceIdConst.CALC, new CalculatorServiceImpl())
.expose();
}
}
- 日志
[DEBUG] [2019-11-01 15:45:25.406] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2019-11-01 15:45:25.411] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端
[INFO] [2019-11-01 15:45:26.580] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口
- 实现代码
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.config.reference.impl.DefaultReferenceConfig;
import com.github.houbb.rpc.server.facade.constant.ServiceIdConst;
import com.github.houbb.rpc.server.facade.model.CalculateRequest;
import com.github.houbb.rpc.server.facade.model.CalculateResponse;
import com.github.houbb.rpc.server.facade.service.CalculatorService;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class RpcClientMain {
public static void main(String[] args) {
// 服务配置信息
ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
config.serviceId(ServiceIdConst.CALC);
config.serviceInterface(CalculatorService.class);
config.addresses("localhost:9527");
CalculatorService calculatorService = config.reference();
CalculateRequest request = new CalculateRequest();
request.setOne(10);
request.setTwo(20);
CalculateResponse response = calculatorService.sum(request);
System.out.println(response);
}
}
- 日志
[DEBUG] [2019-11-01 15:46:42.437] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2019-11-01 15:46:42.467] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
[INFO] [2019-11-01 15:46:43.687] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端完成,监听地址 localhost:9527
[INFO] [2019-11-01 15:46:43.780] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='6267e9869f094f688565726893eeb36d', createTime=1572594403773, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 15:46:43.780] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 6267e9869f094f688565726893eeb36d
[INFO] [2019-11-01 15:46:43.781] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: d89c67fffe99d9d7-00002ed0-00000000-e080b63d353748a8-8dcc0ae6
[INFO] [2019-11-01 15:46:43.862] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: 6267e9869f094f688565726893eeb36d
[INFO] [2019-11-01 15:46:43.862] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 6267e9869f094f688565726893eeb36d 对应结果为空,进入等待
[INFO] [2019-11-01 15:46:43.988] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seq: 6267e9869f094f688565726893eeb36d, rpcResponse: DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 15:46:43.989] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息已经放入,通知所有等待方
[INFO] [2019-11-01 15:46:43.990] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 15:46:43.990] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 6267e9869f094f688565726893eeb36d 对应结果已经获取: DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 15:46:43.992] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: 6267e9869f094f688565726893eeb36d
CalculateResponse{success=true, sum=30}
本节内容较多,只节选部分核心内容进行讲解。
/**
* 获取对应的引用实现
* (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。
* (2)启动对应的长连接
* @return 引用代理类
* @since 0.0.6
*/
@Override
public T reference() {
// 1. 启动 client 端到 server 端的连接信息
// 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。
// 1.2 初期为了简单,直接使用同步循环的方式。
// 创建 handler
// 循环连接
for(RpcAddress rpcAddress : rpcAddresses) {
final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
final DefaultRpcClientContext context = new DefaultRpcClientContext();
context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
ChannelFuture channelFuture = new RpcClient(context).connect();
// 循环同步等待
// 如果出现异常,直接中断?捕获异常继续进行??
channelFutures.add(channelFuture);
}
// 2. 接口动态代理
ProxyContext<T> proxyContext = buildReferenceProxyContext();
return ReferenceProxy.newProxyInstance(proxyContext);
}
这里会根据 address 创建 client-server 之间的连接信息。
ReferenceProxy.newProxyInstance(proxyContext)
是服务端代理创建的核心实现。
这里是直接使用 java 动态代理实现的。
/**
* 获取代理实例
* (1)接口只是为了代理。
* (2)实际调用中更加关心 的是 serviceId
* @param proxyContext 代理上下文
* @param <T> 泛型
* @return 代理实例
* @since 0.0.6
*/
@SuppressWarnings("unchecked")
public static <T> T newProxyInstance(ProxyContext<T> proxyContext) {
final Class<T> interfaceClass = proxyContext.serviceInterface();
ClassLoader classLoader = interfaceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{interfaceClass};
ReferenceProxy proxy = new ReferenceProxy(proxyContext);
return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy);
}
核心流程如下:
(1)根据 proxyContext 构建 rpcRequest
(2)将 rpcRequest 写入到服务端
(3)同步等待服务端响应。
public class ReferenceProxy<T> implements InvocationHandler {
private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);
/**
* 服务标识
* @since 0.0.6
*/
private final ProxyContext<T> proxyContext;
/**
* 暂时私有化该构造器
* @param proxyContext 代理上下文
* @since 0.0.6
*/
private ReferenceProxy(ProxyContext<T> proxyContext) {
this.proxyContext = proxyContext;
}
/**
* 反射调用
* @param proxy 代理
* @param method 方法
* @param args 参数
* @return 结果
* @throws Throwable 异常
* @since 0.0.6
* @see Method#getGenericSignature() 通用标识,可以根据这个来优化代码。
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 反射信息处理成为 rpcRequest
final String seqId = Uuid.getInstance().id();
final long createTime = DefaultSystemTime.getInstance().time();
DefaultRpcRequest rpcRequest = new DefaultRpcRequest();
rpcRequest.serviceId(proxyContext.serviceId());
rpcRequest.seqId(seqId);
rpcRequest.createTime(createTime);
rpcRequest.paramValues(args);
rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));
rpcRequest.methodName(method.getName());
// 调用远程
LOG.info("[Client] start call remote with request: {}", rpcRequest);
proxyContext.invokeService().addRequest(seqId);
// 这里使用 load-balance 进行选择 channel 写入。
final Channel channel = getChannel();
LOG.info("[Client] start call channel id: {}", channel.id().asLongText());
// 对于信息的写入,实际上有着严格的要求。
// writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。
// 支持的必须是 ByteBuf
channel.writeAndFlush(rpcRequest).sync();
// 循环获取结果
// 通过 Loop+match wait/notifyAll 来获取
// 分布式根据 redis+queue+loop
LOG.info("[Client] start get resp for seqId: {}", seqId);
RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
LOG.info("[Client] start get resp for seqId: {}", seqId);
Throwable error = rpcResponse.error();
if(ObjectUtil.isNotNull(error)) {
throw error;
}
return rpcResponse.result();
}
}
服务端会在启动的时候,将所有的方法信息进行处理,便于后期调用。
- DefaultServiceFactory#registerServices
/**
* 服务注册一般在项目启动的时候,进行处理。
* 属于比较重的操作,而且一个服务按理说只应该初始化一次。
* 此处加锁为了保证线程安全。
* @param serviceConfigList 服务配置列表
* @return this
*/
@Override
public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {
ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");
// 集合初始化
serviceMap = new HashMap<>(serviceConfigList.size());
// 这里只是预估,一般为2个服务。
methodMap = new HashMap<>(serviceConfigList.size()*2);
for(ServiceConfig serviceConfig : serviceConfigList) {
serviceMap.put(serviceConfig.id(), serviceConfig.reference());
}
// 存放方法名称
for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {
String serviceId = entry.getKey();
Object reference = entry.getValue();
//获取所有方法列表
Method[] methods = reference.getClass().getMethods();
for(Method method : methods) {
String methodName = method.getName();
if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
continue;
}
List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);
String key = buildMethodKey(serviceId, methodName, paramTypeNames);
methodMap.put(key, method);
}
}
return this;
}
整体流程
(1)接收到请求信息后,整理出方法的相关信息
(2)根据方法信息,去初始化的方法集合中选取对应的方法
(3)反射调用,并且返回结果
/**
* 处理请求信息
* @param rpcRequest 请求信息
* @return 结果信息
* @since 0.0.6
*/
private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
rpcResponse.seqId(rpcRequest.seqId());
try {
// 获取对应的 service 实现类
// rpcRequest=>invocationRequest
// 执行 invoke
Object result = DefaultServiceFactory.getInstance()
.invoke(rpcRequest.serviceId(),
rpcRequest.methodName(),
rpcRequest.paramTypeNames(),
rpcRequest.paramValues());
rpcResponse.result(result);
} catch (Exception e) {
rpcResponse.error(e);
log.error("[Server] execute meet ex for request", rpcRequest, e);
}
// 构建结果值
return rpcResponse;
}
方法节选如下
public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {
//参数校验
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notEmpty(methodName, "methodName");
// 提供 cache,可以根据前三个值快速定位对应的 method
// 根据 method 进行反射处理。
// 对于 paramTypes 进行 string 连接处理。
final Object reference = serviceMap.get(serviceId);
final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
final Method method = methodMap.get(methodKey);
try {
return method.invoke(reference, paramValues);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RpcRuntimeException(e);
}
}
看代码之前需要掌握整体的流程。
这样看起来顺着流程,会比较轻松。
java 反射
netty 网络通讯
序列化