Dubbo

Dubbo RPC框架
帅旋
关注
充电
IT宅站长,技术博主,共享单车手,全网id:arthinking。

图解Dubbo服务调用源码流程

发布于 2022-12-04 | 更新于 2024-02-28

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

1. 消费端调用请求

基于对服务引入源码的分析,我们得出了如下的对象处理链:

image-20221126230437358

我们可以得到消费端调用请求流程,如下图所示:

image-20221204115305434

红色箭头方向为主流程,接下来详细介绍每个步骤执行的逻辑。

1.1 MockClusterInvoker

这层主要用于处理mock,以及系统降级相关逻辑。

相关功能参考官网:本地伪装[1]

1.2 RegistryAwareClusterInvoker

如果有两个注册中心,则有这个Invoker,处理注册中心对应的Invoker,校验注册中心是否可用。

1.3 FailoverClusterInvoker

服务容错相关逻辑。

1.3.1 RouterChain

服务路由,获取到所有的服务提供者,并且通过路由链进行路由。

1.3.2 LoadBalance

使用负载均衡策略选择一个Invoker,最终得到的是一个InvokerWrapper。接着调用其invoke方法。

1.4 ListenerInvokerWrapper

接着会调用到ListenerInvokerWrapper的invoke方法。

1.5 CallbackRegistrationInvoker

接着执行Filter相关逻辑。

1.5.1 ConsumerContextFilter

设置Rpc的Context上下文属性。

1.5.2 FutureFilter

实现事件通知相关功能,可以借此完成一些通知功能,在调用方法之前,调用方法之后,出现异常时,会触发对应的事件。

1.5.3 MonitorFilter

实现Dubbo监控相关逻辑。

1.6 AsyncToSyncInvoker

异步转同步处理逻辑。

1.6.1 同步转异步逻辑

代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
// 如果invocation指定为同步调用,则执行异步转同步逻辑
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
...
return asyncResult;
}

此方法最终会返回AsyncRpcResult异步结果,异步转同步逻辑主要就是在这里调用以下方法一直等待处理结果:

1
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

继续跟进invoker.invoke(invocation)看看内部是如何进行异步调用的,定位到DubboInvoker#doInvoke方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
...
} else {
// 创建一个AsyncRpcResult,承载执行结果
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
// 异步调用 client的request方法,返回CompletableFuture对象
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
// AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法
asyncRpcResult.subscribeTo(responseFuture);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
...
}

这里创建了一个AsyncRpcResult对象,然后调用currentClient.request(inv, timeout)方法返回一个CompletableFuture对象,最后调用asyncRpcResult.subscribeTo(responseFuture)让AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法。

最后跟进看看currentClient.request(inv, timeout)的逻辑,最终会调用到HeaderExchangeChannel的request方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

可以发现,这里最终创建了一个DefaultFuture对象,并且返回,在创建DefaultFuture对象的逻辑中,会把请求id跟DefaultFuture的关系,以及请求id跟channel的关系保存到map中:

1
2
3
4
5
6
7
8
9
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}

并且启动一个定时任务,用于判断调用是否超时:

1
2
3
4
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}

当判断到超时之后,直接返回超时的响应结果。

1.6.2 异步响应结果处理

最后我们看看底层获取到响应结果之后,是如何依次通知到 DefaultFuture --> CompletableFuture --> AsyncRpcResult的。

HeaderExchangeHandler#received方法调用HeaderExchangeHandler#handleResponse,然后继续调用DefaultFuture#received(Channel, Response),最后调用到会调DefaultFuture#received(Channel, Response, boolean)方法,接收数据,获取到异步结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void received(Channel channel, Response response, boolean timeout) {
try {
// 获取到responseId,根据responseId从map中获取到DefaultFuture对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
// 最终完成DefaultFuture对象
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}

最终根据响应id从map中找到DefaultFuture,并且把响应结果赋值给它,最终触发执行AsyncRpcResult的相关方法,最后触发了以下代码从阻塞处返回:

1
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

即完成了把异步转换为同步的处理工作。

1.7 DubboInvoker

会调用其org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke方法,完成具体的方法调用。

此方法会从可用的clients中选择一个,然后执行异步调用,相关代码上面已经说明过了。

1.8 HeaderExchangeClient

调用ExchangeClient数据交换层的方法发送数据,该方法最终会调用到数据传输层的具体方法区发送数据,如org.apache.dubbo.remoting.transport.netty4.NettyChannel#send。

2. 提供端处理请求

基于服务导出的源码分析,我们得到了如下的对象处理链:

image-20221121233718510

以及传输层链路:

image-20221121235355200

为此,我们可以得到如下的提供端的请求处理流程:

image-20221204155457497

接下来,我们就看看每一步都是做什么事情的。

2.1 NettyServerHandler#channelRead

接收数据,传给下层handler继续处理。

2.2 MultiMessageHandler#received

如果接收到的消息是MultiMessage,则进行转换遍历处理。

2.3 HeartbeatHandler#received

记录读取数据时间戳;

判断是否心跳请求,如果是心跳请求,则直接响应心跳请求,会把requestId响应回去,用户区分是哪个请求

如果是心跳响应,则不用处理,直接返回。

否则继续往下执行。

2.4 AllChannelHandler#received

跟Dubbo的线程模型[2]有关。

如果Dispatcher使用的是all策略(默认策略),则会执行到这里,最终会把请求封装成ChannelEventRunnalbe交给线程池处理,IO线程的任务到此结束。

如果Dubbo Dispatcher使用的是direct策略,则不会执行这个handler。

2.5 DecodeHandler#received

如果传入的消息实现了Decodeable接口,则调用接口的decode实现进行decode,提供了一种自定义decode的扩展机制。

2.6 HeaderExchangeHandler#handleRequest

判断是否双向通信。

构造Response对象,最终调用ExchangeHandlerAdapter的reply方法,返回一个CompletionStage:

1
CompletionStage<Object> future = handler.reply(channel, msg);

拿到CompletionStage对象后:

  • 如果是同步调用,则直接拿到结果,并发送到channel中去

  • 如果是异步调用,则监听直到拿到服务执行结果,然后发送到channel中去

最终把结果封装为AppResponse对象,包括异常也封装成该对象(HeaderExchangeHandler#handleRequest):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...
Object msg = req.getData();
try {
CompletionStage<Object> future = handler.reply(channel, msg);

future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}

可以发现,最终获取到异步调用的结果之后,再写到Channel中。

底层会封装结果为CompletableFuture:

org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());


CompletableFuture<Object> future = wrapWithFuture(value, invocation);


AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);


future.whenComplete((obj, t) -> {

AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});

return asyncRpcResult;

2.7 ExchangeHandlerAdapter#reply

根据请求对象拿到一个Invoker,最终执行Invoker得到结果。

实际调用的方法可以分为两种情况:

  • 同步执行:

    • public String test() {...}
      <!--code11-->
      
      
      

最终该方法返回一个CompletionStage对象,给到上层处理。

底层执行方法的时候,不管是同步还是异步返回,都统一转换为CompletableFuture:

org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture

1
2
3
4
5
6
7
8
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
if (RpcContext.getContext().isAsyncStarted()) {
return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
} else if (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
}
return CompletableFuture.completedFuture(value);
}

可以发现,为了实现异步调用的功能,Dubbo做了一些比较绕的对象转换逻辑,如下图所示:

image-20221204164229351

梳理清楚了这个处理流程,也就很容易理解为啥可以实现跨服务的异步调用了。

2.8 DubboExporter

2.8.1 CallbackRegistrationInvoker

执行过滤器链。

EchoFilter

处理Dubbo的回声检测。回声检测相关说明参考官方文档:回声测试[3]

如果判断到是$echo方法,则直接构造一个结果返回:

1
2
3
4
5
6
7
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}

ClassLoaderFilter

用于设置类加载器。

GenericFilter

用于处理泛化服务。

ContextFilter

设置Dubbo ThreadLocal(RpcContext)相关的属性。

TraceFilter

处理dubbo trace命令相关的Filter。

TimeoutFilter

用于处理服务端的超时。

MonitorFilter

实现监控相关功能。

ExceptionFIlter

处理服务异常信息,转换成消费端能够识别的异常。

2.8.2 DelegateProviderMetaDataInvoker

继续委托给下层进行处理。

2.8.3 AbstractProxyInvoker

通过AbstractProxyInvoker执行最终的服务的方法,不管底层是同步调用还是异步调用,统一包装成异步RPC结果AsyncRpcResult返回。


至此,Dubbo源码解析完成,完整服务导出和引入的源码主线流程图如下(获取高清大图,在Java架构杂谈公众号回复“Dubbo”即可。):

image-20221206225909095

服务调用处理流程图如下:

image-20221206225803370

获取高清大图,在Java架构杂谈公众号回复“Dubbo”即可。

References


  1. 本地伪装. Retrieved from https://dubbo.apache.org/zh/docs/advanced/local-mock/ ↩︎

  2. 线程模型. Retrieved from https://dubbo.apache.org/zh/docs/v2.7/user/examples/thread-model/ ↩︎

  3. 回声测试. Retrieved from https://dubbo.apache.org/zh/docs/advanced/echo-service/ ↩︎

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/jvm/dubbo-service-call-source-code.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。