Dubbo

Dubbo RPC框架
帅旋
关注
充电
IT宅站长,技术博主,共享单车手,全网id:arthinking。

图解Dubbo服务引入源码流程

发布于 2022-12-04 | 更新于 2024-02-28

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

本文会使用到以下的例子辅助解读Dubbo源码:

假设有如下系统:

  • 消费者:dubbo-consumer-app
  • 调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame
  • 提供者:dubbo-provider-app

如下图所示:

image-20221129221238557


1. 识别@Reference注入点

首先,应用会注册一个ReferenceAnnotationBeanPostProcessor用于处理@Reference注解。

1
2
3
4
public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements
ApplicationContextAware, ApplicationListener {
...
}

该类继承AnnotationInjectedBeanPostProcessor,实现了自定义注解注入,这里的自定义注解为Dubbo的@Reference注解,大家也可以基于该抽象类实现自己的注入注解。

AnnotationInjectedBeanPostProcessor中,会寻找所有被@Reference注解标注的属性或者方法,作为注入点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {

// 通过自定义注解寻找需要注入的属性(对于ReferenceAnnotationBeanPostProcessor类既是@Reference注解标注的属性)
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
+ " dependencies is failed", ex);
}
return pvs;
}

findInjectionMetadata方法会找到所有需要注入的字段(AnnotatedFieldElement)和方法(AnnotatedMethodElement),包装成AnnotatedInjectionMetadata返回。

最终调用metadata.inject(bean, beanName, pvs)进行依赖注入,该方法根据FieldElement的具体子类调用其inject方法,来完成属性或者方法的注入,这里就是Spring框架的依赖注入代码了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void inject(Object target, String beanName, PropertyValues pvs) throws Throwable {
Collection<InjectedElement> elementsToIterate =
(this.checkedElements != null ? this.checkedElements : this.injectedElements);
if (!elementsToIterate.isEmpty()) {
boolean debug = logger.isDebugEnabled();
for (InjectedElement element : elementsToIterate) {
if (debug) {
logger.debug("Processing injected element of bean '" + beanName + "': " + element);
}
element.inject(target, beanName, pvs);
}
}
}

而dubbo继承了Spring的InjectionMetadata.InjectedElement,我们挑一个AnnotatedFieldElement来看其具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void
inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {

Class<?> injectedType = field.getType();

// 获取需要注入的对象
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);

// 通过反射把对象注入到字段中
ReflectionUtils.makeAccessible(field);

field.set(bean, injectedObject);

}

关键是getInjectedObject方法,该方法获取到了需要依赖注入的值,最终通过反射注入属性。

2. 生成服务引入的对象

下面看看getInjectedObject是怎么获取到注入的属性的值的,该方法最终会调用到org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {

/**
* The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
*/
// referencedBeanName即为当前所引入服务对应的ServiceBean的beanName, 命名规则:ServiceBean:interfaceClassName:version:group
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);

/**
* The name of bean that is declared by {@link Reference @Reference} annotation injection
*/
// 根据@Reference注解的所有信息以及属性接口类型,生成referenceBeanName
String referenceBeanName = getReferenceBeanName(attributes, injectedType);

// 获取一个referenceBean对象,会先判断referenceBeanCache是否存在ReferenceBean,如果不存在则创建一个
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);

// 把referenceBean注册到Spring容器中,该方法内部逻辑:
// 如果Spring容器中存在referencedBeanName对应的ServiceBean,说明引用的是本地服务,则获取到该ServiceBean的ref属性,命名一个referenceBeanName别名
// 如果Spring容器中不存在referencedBeanName对应的ServiceBean,则说明是远程服务,则注册刚刚创建的referenceBean到Spring容器中,名称为:referenceBeanName
registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);

cacheInjectedReferenceBean(referenceBean, injectedElement);

// 创建一个代理对象,属性注入的就是这个代理对象,内部会继续调用referenceBean.get()
return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}

接着看getOrCreateProxy方法:

1
2
3
4
5
6
7
8
private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
wrapInvocationHandler(referenceBeanName, referenceBean));
} else { // ReferenceBean should be initialized and get immediately
return referenceBean.get();
}
}

最终都会调用到referenceBean.get()方法获取实例,ReferenceBean是ReferenceConfig的子类,最终会调用到ReferenceConfig的init()方法进行初始化,此方法最终会调用org.apache.dubbo.config.ReferenceConfig#createProxy方法来创建一个代理对象赋值给ReferenceBean的ref属性,该方法处理的逻辑分支比较多,我们这里假设只配置了一个注册中心的情况下,代码会走到这里来:

1
2
3
4
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
...

urls.get(0)为注册中心的url,最终REF_PROTOCOL.refer会调用到以下几个方法:

  • 1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer:
1
2
3
4
5
6
7
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}

这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。

  • 2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer:
1
2
3
4
5
6
7
8
9
10
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, INVOKER_LISTENER_KEY)));
}

这里如果是dubbo://开头的url,则会包装ListenerInvokerWrapper,给dubbo protocol对应的invoker添加监听器,用于处理结果,这里可以进行扩展。

  • 3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.registry.integration.RegistryProtocol#refer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// url由 registry:// 改为 zookeeper://
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();

// 这里基于Dubbo SPI的Adaptive机制,拿到注册中心实现,ZookeeperRegistry
// url.protocol="zookeeper"
Registry registry = registryFactory.getRegistry(url);

if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// qs,queryString, 消费者引入服务时所配置的参数
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));

String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}

// 继续调用doRefer
return doRefer(cluster, registry, type, url);
}

2.1 创建动态服务目录

继续调用RegistryProtocol#doRefer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory动态服务目录,每个服务都对应一个RegistryDirectory
// url为注册中心地址
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);

// 被引入的服务配置的参数
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

// 消费者url
// consumer://xxxx/com.itzhai.dubbo.demo.DubboTestService?application=duboo-consumer-app&...
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 注册消费者url到注册中心(url经过简化处理)
registry.register(directory.getRegisteredConsumerUrl());
}

// 构造路由链
directory.buildRouterChain(subscribeUrl);

// 订阅注册中心配置变更,这里会主动拉取配置,初始化好所有的提供者的DubboInvoker
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 利用传进来的cluster,join得到invoker:MockClusterInvoker --> FailoverClusterInvoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

其中

1
Invoker invoker = cluster.join(directory);

用到了Dubbo SPI,会依次执行MockClusterWrapper, FailoverCluster,最终生成MockClusterInvoker和FailoverClusterInvoker,代码跟踪到这里,我们可以得出以下对象结构:

image-20221118173502695

2.2 构造路由链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private RouterChain(URL url) {
// 获取RouterFactory接口扩展实现类:
// 0 = {MockRouterFactory@3505}
// 1 = {TagRouterFactory@3506} 标签路由
// 2 = {AppRouterFactory@3507} 应用条件路由
// 3 = {ServiceRouterFactory@3508} 服务条件路由
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);

// 基于url,使用RouterFactory生成各个类型的Router
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());

// 通过priority对routers进行排序
initWithRouters(routers);
}

其中:

  • AppRouterFactory:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Activate(order = 200)
public class AppRouterFactory implements RouterFactory {
public static final String NAME = "app";

private volatile Router router;

@Override
public Router getRouter(URL url) {
if (router != null) {
return router;
}
synchronized (this) {
if (router == null) {
router = createRouter(url);
}
}
return router;
}

private Router createRouter(URL url) {
return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}

最终AppRouter的构造函数中,会绑定一个监听器,监听路径:/dubbo/config/dubbo/dubbo-consumer-app.condition-router

同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成AppRouter,具体代码大家可以自行详细阅读。

  • ServiceRouterFactory:

与AppRouterFactory类似,最终ServiceRouter构造函数会绑定一个监听器,监听路径:/dubbo/config/dubbo/com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai.condition-router

同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成ServiceRouter,具体代码大家可以自行详细阅读。

  • TagRouterFactory.getRouter:

TagRouter与上面两个Router有所不同,因为TagRouter监听的路径为:/dubbo/config/dubbo/dubbo-provider-app.tag-router,是需要读取提供者配置的tag-router的,所以只有在获取到提供者的应用名之后,才可以读取配置,在TagRouterFactory.getRouter()方法中,并不会监听和拉取配置,只是先初始化好一个空的TagRouter,等到服务提供者的服务url从配置中心拉取到之后,获取到提供者的名称之后,才会拉取tag-router配置,并且构造TagRouter。TODO 触发点

最终构造如下路由链:

image-20221115230537607

到这里构建的对象结构如下:

image-20221118173446557

2.3 监听节点

上面最核心的方法就是directory.subscribe(),下面详细看看这个方法(RegistryDirectory#subscribe):

1
2
3
4
5
6
7
8
9
public void subscribe(URL url) {
setConsumerUrl(url);
// 监听应用动态配置 /dubbo/config/dubbo/dubbo-consumer-app.configurators
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
// 监听所引入的服务的动态配置 /dubbo/config/dubbo/com.itzhai.DemoTest.configurators
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 监听其他的节点
registry.subscribe(url, this);
}

这里的registry为:ZookeeperRegistry extends FailbackRegistry,而RegistryDirectory也是一个NotifyListener:

1
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener

继续跟踪 registry.subscribe(url, this),最终会调用:org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe,此方法我们先关注订阅单个服务的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
} else {
// 订阅单个服务
List<URL> urls = new ArrayList<>();
// toCategoriesPath(url) 获取到要监听的zk路径:
// 0 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers"
// 1 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators"
// 2 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/routers"
for (String path : toCategoriesPath(url)) {
// 根据监听地址获取listeners,没有则先生成一个
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}

// listener为传入的参数RegistryDirectory
// 通过NotifyListener获取ChildListener,为ZookeeperRegistry
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 初始化监听器
listeners.putIfAbsent(listener, (parentPath, currentChilds)
-> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))
);
zkListener = listeners.get(listener);
}
// 为当前处理的path创建zk节点
zkClient.create(path, false);

// 添加节点变更监听,同时获取到节点下的内容
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 调用监听器方法处理获取到的节点内容,初始化DubboInvoker
notify(url, listener, urls);
}

以上代码主要会监听以下三个路径:

  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/providers
  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators
  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/routers

完整的监听节点和处理逻辑如下图:

image-20221118172919438

2.4 初始化DubboInvoker

在监听的同时,会拉取监听节点的数据。

然后调用notify(url, listener, urls)处理拉取到的数据。这里我们重点观察/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers的处理流程,调用以下代码监听该节点:

1
List<String> children = zkClient.addChildListener(path, zkListener);

同时返回的children(经过URLDecode解码处理),为引入的服务的所有提供者的url:

  • 0 = “dubbo://192.168.0.106:20810/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668588840665&version=1.0.1”
  • 1 = “dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668588840598&version=1.0.1”

接着会调用org.apache.dubbo.registry.support.FailbackRegistry#notify方法,跟踪notify方法,最终会调用到这里:AbstractRegistry#notify(URL, NotifyListener, List<URL>),关键的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 先把urls转成result这个Map
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
// 遍历处理map
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}

这里会先把传进来的urls转换为Map:

image-20221116170957418

接着遍历map进行处理,遍历的过程中,调用listener.notify(categoryList)进行处理,这里的listener为RegistryDirectory,categoryList即为两个服务提供者的URL,最终会调用到org.apache.dubbo.registry.integration.RegistryDirectory#notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));

// 获取动态配置的URL,然后生成configurators,赋值给configurators
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

// 获取老版本路由的URL,然后生成Router,并调用addRouters添加到路由链中
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);

// 获取服务提供者的URL,进一步处理,生成DubboInvoker
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}

这里主要是根据传入的url,获取到需要的信息进行加工,最后存储到RegistryDirectory中:

  • 获取动态配置URL,并且生成configurators;
  • 获取老版本的路由URL,生成Router,添加到路由链中;
  • 获取服务提供者的URL,进一步处理,生成DubboInvoker。

以上这些结果最终都存储到RegistryDirectory中。

我们这里重点跟踪生成DubboInvoker的逻辑,当传入的urls为服务提供者的URL时,会执行以下两行代码:

1
2
3
// 获取服务提供者的URL,进一步处理,生成DubboInvoker
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);

生成DubboInvoker的核心方法是refreshOverrideAndInvoker:

1
2
3
4
5
6
7
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
// 这里会拿到应用动态配置,重写dubbo服务URL,得到最终的URL
overrideDirectoryUrl();
// 生成DubboInvoker的方法
refreshInvoker(urls);
}

最关键的就是这个refreshInvoker方法,跟踪这个方法,其中最核心的一行代码:

1
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

继续跟踪该方法,最终会调用到这行代码生成invoker:

1
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

这里基于Dubbo SPI,最终调用到的是DubboProtocol.refer方法,同时,由于在dubbo-rpci-api模块中,配置了Dubbo SPI的Wrapper类:

1
2
3
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol

所以,这里的执行顺序是:

  • ProtocolListenerWrapper.refer:该Wrapper类中判断到URL如果是dubbo://开通的,会给DubboProtocol对应的invoker添加监听器,包装成ListenerInvokerWrapper类,用于处理refer结果,这里可以进行扩展,标签路由器就是在这里设置了监听器,处理完refer之后,进行初始化的
  • ProtocolFilterWrapper.refer:这个Wrapper会给dubbo protocol的url对应的invoker添加Filter过滤器,我们给dubbo添加的自定义过滤器就是在这里集成进来的
  • DubboProtocol.refer:生成DubboInvoker的类。

其中DubboProtocol.refer会调用到父类AbstractProtocol的refer方法

1
2
3
4
5
6
7
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 包装一个异步转同步的Invoker
// type是接口
// url是服务地址
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

所以,最终生成DubboInvoker的就是DubboProtocol#protocolBindingRefer这个方法了:

1
2
3
4
5
6
7
8
9
10
11
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {

optimizeSerialization(url);

// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}

image-20221118173418260

可以发现在DubboInvoker中维护了和服务提供者的socket连接。

3. 构建传输层链路与建立连接

在Dubbo中,数据处理分为了数据交互层和数据传输层:

image-20221118165218534

接下来看看这个连接是怎么建立的,跟踪getClients方法,发现最终会调用到DubboProtocol#initClient方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private ExchangeClient initClient(URL url) {

// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}

ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);

} else {
client = Exchangers.connect(url, requestHandler);
}

} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}

return client;
}

最核心的一行代码为:

1
client = Exchangers.connect(url, requestHandler);

继续跟踪该方法:

1
2
3
4
5
6
7
8
9
10
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}

最终会调用getExchanger得到一个Exchanger实现去创建连接,这里基于SPI扩展机制,会获取到HeaderExchanger:

1
2
3
4
5
6
7
8
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

接着会调用HeaderExchanger的connect方法去创建连接,继续跟踪此方法:

1
2
3
4
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.connect来创建Client连接对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}

public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

最后再包装成HeaderExchangeClient,HeaderExchangeClient中会把Client对象包装成HeaderExchangeChannel对象,同时开启重连任务和心跳检测任务:

1
2
3
4
5
6
7
8
9
10
11
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);

if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}

在Transporters.connect方法中最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:

1
2
3
4
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}

继续调用new NettyClient:

1
2
3
4
5
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}

这里会调用wrapChannelHandler继续把handler包装多几层:

1
2
3
4
5
6
7
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
// 把handler包装成AllChannelHandler
// 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

最终包装成如下几层:

MultiMessageHandler–>HeartbeatHandler–>AllChannelHandler

最终会调用到NettyClient的doOpen()方法创建连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer() {

@Override
protected void initChannel(Channel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}

这里就是我们熟悉的Netty创建客户端连接的代码了。至此,服务引入的主线流程执行完毕。

传输层构建的链路如下:

image-20221126230402799

可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。

最终主线流程构造的完整处理链路如下:

image-20221126230437358

在服务调用章节,会为你详细介绍每个类的作用。在这之前,你需要了解服务引用都创建了什么样的东西。

4. 服务引入总结

服务引入流程主要做如下事情:

  • 扫描项目识别@Reference注入点;
  • 获取需要注入的对象:
    • 获取到@Reference注解的信息,封装成ReferenceBean;
    • 最终执行ReferenceBean的refer方法执行服务引入:
      • 为引入的服务创建动态服务目录;
      • 构造路由链,存储到动态服务目录中;
      • 监听动态配置节点,以及服务提供者节点,并且先从注册中心拉取节点配置;
      • 获取到服务提供者的URL之后,最终包装成DubboInvoker;
      • 为DubboInvoker包装异步转同步类,以及过滤器链处理类;
      • 最后为该DubboInvoker构造传输层链路,以及创建底层网络连接;
  • 通过反射进行注入。

可以发现,Dubbo的整个服务引入流程还是挺复杂的,很多的SPI扩展机制,增加了扩展性,但是跟踪代码的时候也会更难跟踪。

整个服务引入的流程,就是构造以上的请求链路,以及创建好socket连接。

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/jvm/dubbo-service-reference-source-code.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。