在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在
Java架构杂谈
公众号发送SPI
获取文章链接。
1. 消费端调用请求
基于对服务引入源码的分析,我们得出了如下的对象处理链:
我们可以得到消费端调用请求流程,如下图所示:
红色箭头方向为主流程,接下来详细介绍每个步骤执行的逻辑。
1.1 MockClusterInvoker
这层主要用于处理mock,以及系统降级相关逻辑。
相关功能参考官网:本地伪装[1]。
1.2 RegistryAwareClusterInvoker
如果有两个注册中心,则有这个Invoker,处理注册中心对应的Invoker,校验注册中心是否可用。
1.3 FailoverClusterInvoker
服务容错相关逻辑。
1.3.1 RouterChain
服务路由,获取到所有的服务提供者,并且通过路由链进行路由。
1.3.2 LoadBalance
使用负载均衡策略选择一个Invoker,最终得到的是一个InvokerWrapper。接着调用其invoke方法。
1.4 ListenerInvokerWrapper
接着会调用到ListenerInvokerWrapper的invoke方法。
1.5 CallbackRegistrationInvoker
接着执行Filter相关逻辑。
1.5.1 ConsumerContextFilter
设置Rpc的Context上下文属性。
1.5.2 FutureFilter
实现事件通知相关功能,可以借此完成一些通知功能,在调用方法之前,调用方法之后,出现异常时,会触发对应的事件。
1.5.3 MonitorFilter
实现Dubbo监控相关逻辑。
1.6 AsyncToSyncInvoker
异步转同步处理逻辑。
1.6.1 同步转异步逻辑
代码逻辑如下:
1 |
|
此方法最终会返回AsyncRpcResult
异步结果,异步转同步逻辑主要就是在这里调用以下方法一直等待处理结果:
1 | asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); |
继续跟进invoker.invoke(invocation)
看看内部是如何进行异步调用的,定位到DubboInvoker#doInvoke
方法:
1 |
|
这里创建了一个AsyncRpcResult
对象,然后调用currentClient.request(inv, timeout)
方法返回一个CompletableFuture对象,最后调用asyncRpcResult.subscribeTo(responseFuture)
让AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法。
最后跟进看看currentClient.request(inv, timeout)
的逻辑,最终会调用到HeaderExchangeChannel
的request方法:
1 |
|
可以发现,这里最终创建了一个DefaultFuture
对象,并且返回,在创建DefaultFuture
对象的逻辑中,会把请求id跟DefaultFuture的关系,以及请求id跟channel的关系保存到map中:
1 | private DefaultFuture(Channel channel, Request request, int timeout) { |
并且启动一个定时任务,用于判断调用是否超时:
1 | private static void timeoutCheck(DefaultFuture future) { |
当判断到超时之后,直接返回超时的响应结果。
1.6.2 异步响应结果处理
最后我们看看底层获取到响应结果之后,是如何依次通知到 DefaultFuture --> CompletableFuture --> AsyncRpcResult的。
HeaderExchangeHandler#received
方法调用HeaderExchangeHandler#handleResponse
,然后继续调用DefaultFuture#received(Channel, Response)
,最后调用到会调DefaultFuture#received(Channel, Response, boolean)
方法,接收数据,获取到异步结果:
1 | public static void received(Channel channel, Response response, boolean timeout) { |
最终根据响应id从map中找到DefaultFuture,并且把响应结果赋值给它,最终触发执行AsyncRpcResult的相关方法,最后触发了以下代码从阻塞处返回:
1 | asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); |
即完成了把异步转换为同步的处理工作。
1.7 DubboInvoker
会调用其org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke方法,完成具体的方法调用。
此方法会从可用的clients中选择一个,然后执行异步调用,相关代码上面已经说明过了。
1.8 HeaderExchangeClient
调用ExchangeClient数据交换层的方法发送数据,该方法最终会调用到数据传输层的具体方法区发送数据,如org.apache.dubbo.remoting.transport.netty4.NettyChannel#send。
2. 提供端处理请求
基于服务导出的源码分析,我们得到了如下的对象处理链:
以及传输层链路:
为此,我们可以得到如下的提供端的请求处理流程:
接下来,我们就看看每一步都是做什么事情的。
2.1 NettyServerHandler#channelRead
接收数据,传给下层handler继续处理。
2.2 MultiMessageHandler#received
如果接收到的消息是MultiMessage,则进行转换遍历处理。
2.3 HeartbeatHandler#received
记录读取数据时间戳;
判断是否心跳请求,如果是心跳请求,则直接响应心跳请求,会把requestId响应回去,用户区分是哪个请求
。
如果是心跳响应,则不用处理,直接返回。
否则继续往下执行。
2.4 AllChannelHandler#received
跟Dubbo的线程模型
[2]有关。
如果Dispatcher使用的是all策略(默认策略),则会执行到这里,最终会把请求封装成ChannelEventRunnalbe交给线程池处理,IO线程的任务到此结束。
如果Dubbo Dispatcher使用的是direct策略,则不会执行这个handler。
2.5 DecodeHandler#received
如果传入的消息实现了Decodeable接口,则调用接口的decode实现进行decode,提供了一种自定义decode的扩展机制。
2.6 HeaderExchangeHandler#handleRequest
判断是否双向通信。
构造Response对象,最终调用ExchangeHandlerAdapter的reply方法,返回一个CompletionStage:
1 | CompletionStage<Object> future = handler.reply(channel, msg); |
拿到CompletionStage对象后:
-
如果是同步调用,则直接拿到结果,并发送到channel中去
-
如果是异步调用,则监听直到拿到服务执行结果,然后发送到channel中去
最终把结果封装为AppResponse对象,包括异常也封装成该对象(HeaderExchangeHandler#handleRequest):
1 | ... |
可以发现,最终获取到异步调用的结果之后,再写到Channel中。
底层会封装结果为CompletableFuture:
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
1 |
|
2.7 ExchangeHandlerAdapter#reply
根据请求对象拿到一个Invoker,最终执行Invoker得到结果。
实际调用的方法可以分为两种情况:
-
同步执行:
1
public String test() {...}
-
异步执行:
1
public CompletableFuture<String> test() {...}
最终该方法返回一个CompletionStage对象,给到上层处理。
底层执行方法的时候,不管是同步还是异步返回,都统一转换为CompletableFuture:
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture
1 | private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) { |
可以发现,为了实现异步调用的功能,Dubbo做了一些比较绕的对象转换逻辑,如下图所示:
梳理清楚了这个处理流程,也就很容易理解为啥可以实现跨服务的异步调用了。
2.8 DubboExporter
2.8.1 CallbackRegistrationInvoker
执行过滤器链。
EchoFilter
处理Dubbo的回声检测。回声检测相关说明参考官方文档:回声测试[3]
如果判断到是$echo方法,则直接构造一个结果返回:
1 |
|
ClassLoaderFilter
用于设置类加载器。
GenericFilter
用于处理泛化服务。
ContextFilter
设置Dubbo ThreadLocal(RpcContext)相关的属性。
TraceFilter
处理dubbo trace命令相关的Filter。
TimeoutFilter
用于处理服务端的超时。
MonitorFilter
实现监控相关功能。
ExceptionFIlter
处理服务异常信息,转换成消费端能够识别的异常。
2.8.2 DelegateProviderMetaDataInvoker
继续委托给下层进行处理。
2.8.3 AbstractProxyInvoker
通过AbstractProxyInvoker执行最终的服务的方法,不管底层是同步调用还是异步调用,统一包装成异步RPC结果AsyncRpcResult返回。
至此,Dubbo源码解析完成,完整服务导出和引入的源码主线流程图如下(获取高清大图,在Java架构杂谈
公众号回复“Dubbo”即可。):
服务调用处理流程图如下:
获取高清大图,在Java架构杂谈
公众号回复“Dubbo”即可。