Dubbo

Dubbo RPC框架
帅旋
关注
充电
IT宅站长,技术博主,共享单车手,全网id:arthinking。

图解Dubbo服务导出源码流程

发布于 2022-12-04 | 更新于 2024-04-09

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

本文会使用到以下的例子辅助解读Dubbo源码:

假设有如下系统:

  • 消费者:dubbo-consumer-app
  • 调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame
  • 提供者:dubbo-provider-app

如下图所示:

image-20221129221252828


在SpringBoot项目中,我们要启用Dubbo,一般会在启动类中配置:

1
@EnableDubbo(scanBasePackages = {"com.itzhai.dubbo.demo"})

@EnableDubbo注解:

1
2
3
4
5
6
7
8
9
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
...
}

其中最重要的两个注解为@EnableDubboConfig以及@DubboComponentScan

  • @EnableDubboConfig是用于解析生成DubboConfig配置类的;
  • @DubboComponentScan是用于扫描Dubbo服务以及Dubbo服务引用的;

1. 解析生成DubboConfig配置类

我们继续查看@EnableDubboConfig注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {

/**
* It indicates whether binding to multiple Spring Beans.
*
* @return the default value is <code>false</code>
* @revised 2.5.9
*/
boolean multiple() default true;

}

这里引入了DubboConfigConfigurationRegistrar类,这个是ImportBeanDefinitionRegistrar接口的实现类。ImportBeanDefinitionRegistrar是Spring中用于注册BeanDefinition的扩展接口,实现了该接口的类被@Import注解引入之后,会触发其registerBeanDefinitions方法,然后生成BeanDefiniton对象,并注册到BeanDefinitionRegistry中,为实例化Bean做好准备。

下面看看DubboConfigConfigurationRegistrarregisterBeanDefinitions方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));

boolean multiple = attributes.getBoolean("multiple");

// Single Config Bindings
registerBeans(registry, DubboConfigConfiguration.Single.class);

if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193
registerBeans(registry, DubboConfigConfiguration.Multiple.class);
}
}

其中multiple默认为true,所以默认的会进入到

1
registerBeans(registry, DubboConfigConfiguration.Multiple.class);

这个方法中。跟踪registerBeans方法,可得知,是通过AnnotatedBeanDefinitionReader这个类来读取DubboConfigConfiguration.Multiple.class上面的注解的。DubboConfigConfiguration.Multiple类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Multiple Dubbo {@link AbstractConfig Config} Bean Binding
*/
@EnableDubboConfigBindings({
@EnableDubboConfigBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.modules", type = ModuleConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.registries", type = RegistryConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.protocols", type = ProtocolConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.monitors", type = MonitorConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.providers", type = ProviderConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.consumers", type = ConsumerConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.config-centers", type = ConfigCenterBean.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.metadata-reports", type = MetadataReportConfig.class, multiple = true),
@EnableDubboConfigBinding(prefix = "dubbo.metricses", type = MetricsConfig.class, multiple = true)
})
public static class Multiple {

}

这里又用到了EnableDubboConfigBinding注解:

1
2
3
4
5
6
7
8
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(EnableDubboConfigBindings.class)
@Import(DubboConfigBindingRegistrar.class)
public @interface EnableDubboConfigBinding {
...
}

可以发现,最终是通过DubboConfigBindingRegistrar这个类来处理Multiple上面的注解信息的,这个类也是一个ImportBeanDefinitionRegistrar类。接下来看看其registerBeanDefinitions方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBinding.class.getName()));

registerBeanDefinitions(attributes, registry);

}

protected void registerBeanDefinitions(AnnotationAttributes attributes, BeanDefinitionRegistry registry) {
String prefix = environment.resolvePlaceholders(attributes.getString("prefix"));

Class<? extends AbstractConfig> configClass = attributes.getClass("type");

boolean multiple = attributes.getBoolean("multiple");

registerDubboConfigBeans(prefix, configClass, multiple, registry);

}

可以发现,最终会读取到每个@EnableDubboConfigBinding注解的prefix和type属性,传入registerDubboConfigBeans进行处理。

registerDubboConfigBeans内部会根据prefix去Spring的environment中对应的配置项,找到之后,实例化成对应type的bean,注册到Spring中。比如:

image-20221118221801881

dubbo.application前缀的配置会生成一个ApplicationConfig的bean,dubbo.protocols的前缀,会生成ProtocolConfigd bean。

registerDubboConfigBeans大致的实现思路如下:

每个@EnableDubboConfigBinding注解都会解析得到一个Config的BeanDefinition注册到Spring中,同时注册一个对应的DubboConfigBindingBeanPostProcessor后置处理器,该处理器内部会利用Spring的DataBinder技术,结合properties文件,为Config Bean对应的属性进行赋值。

image-20221204113648720

具体的registerDubboConfigBeans实现代码这里就不列出来了,感兴趣的自己跟踪阅读。

至此,Dubbo的Config类生成完毕。

2. 扫描Dubbo的@Service注解,生成ServiceBean

上面提到@EnableDubboConfig注解会引入DubboComponentScan@DubboComponentScan是用于扫描Dubbo服务以及Dubbo服务引用的,@DubboComponentScan注解如下:

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
...
}

这里引入了DubboComponentScanRegistrar,也是一个ImportBeanDefinitionRegistrar接口实现类,继续看其registerBeanDefinitions方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// 获取DubboComponentScan注解配置的扫描路径
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);

// 处理服务导出相关逻辑
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);

// 处理服务引入相关逻辑
registerReferenceAnnotationBeanPostProcessor(registry);

}

其中最主要的两个方法:

  • registerServiceAnnotationBeanPostProcessor
    • 注册ServiceAnnotationBeanPostProcessor(是一个BeanDefinitionRegistryPostProcessor接口实现类)
    • 在Spring启动时会调用postProcessBeanDefinitionRegistry方法扫描添加了@Service注解了的类
    • 然后生成BeanDefinition(会生成两个,一个普通的bean,一个ServiceBean),后续的Spring周期中会生成Bean
    • 在ServiceBean中会监听ContextRefreshedEvent事件,一旦Spring启动完后,就会发出事件触发进行服务导出;
  • registerReferenceAnnotationBeanPostProcessor
    • 注册ReferenceAnnotationBeanPostProcessor(是一个AnnotationInjectedBeanPostProcessor抽象类子类);
    • 在Spring启动时,会通过该类扫描服务引用@Reference的注入点,生成ReferenceBean,然后生成具体的代理类注入到对应的字段中。

本节我们看第一个方法,服务导出相关逻辑。

接下来继续跟踪ServiceAnnotationBeanPostProcessorpostProcessBeanDefinitionRegistry方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}

}

关键方法为registerServiceBeans,该方法扫描并且注册ServiceBean,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

// 创建扫描类
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

scanner.setBeanNameGenerator(beanNameGenerator);

// 设置需要扫描的注解,这里只会扫描Dubbo的@Service注解
scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));

/**
* Add the compatibility for legacy Dubbo's @Service
*
* The issue : https://github.com/apache/dubbo/issues/4330
* @since 2.7.3
*/
scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));

for (String packageToScan : packagesToScan) {

// Registers @Service Bean first
scanner.scan(packageToScan);

// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

// 处理扫描到的BeanDefinition
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}

if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}

} else {

if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}

}

}

}

这里的逻辑比较简单,主要是通过DubboClassPathBeanDefinitionScanner扫描器扫描标注有Dubbo的@Service注解的类,扫描得到相关的BeanDefinition,然后遍历处理每个BeanDefinition,处理方法:ServiceAnnotationBeanPostProcessor#registerServiceBean。接下来看看ServiceAnnotationBeanPostProcessor#registerServiceBean这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
// 获取到服务实现类
Class<?> beanClass = resolveClass(beanDefinitionHolder);
// 获取服务实现类上面的@Service注解
Annotation service = findServiceAnnotation(beanClass);

/**
* The {@link AnnotationAttributes} of @Service annotation
*/
// 获取@Service注解上的信息
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);

// 获取服务接口
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
// 获取服务实现类bean名称
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();

// 生成服务的ServiceBeanDefinition
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);

// 生成服务对应的Bean名称
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);

if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean

// ServiceBean注册到Spring容器中,beanName的格式:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai
registry.registerBeanDefinition(beanName, serviceBeanDefinition);

if (logger.isInfoEnabled()) {
logger.info("The BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean has been registered with name : " + beanName);
}

} else {

if (logger.isWarnEnabled()) {
logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean[ bean name : " + beanName +
"] was be found , Did @DubboComponentScan scan to same package in many times?");
}

}

}

这段代码主要逻辑:

  • 获取到服务实现类,以及服务实现类的注解信息,以及服务实现类的接口;
  • 为服务实现类生成一个ServiceBean的Definition,ServiceBean的名称格式为:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai;
  • 把ServiceBeanDefinition注册到Spring容器中。

注意,在生成ServiceBean的Definition方法中,会把服务实现类的bean赋值给ServiceBean的ref属性:

1
2
3
4
5
6
7
8
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
AnnotationAttributes serviceAnnotationAttributes,
Class<?> interfaceClass,
String annotatedServiceBeanName) {
...
addPropertyReference(builder, "ref", annotatedServiceBeanName);
...
}

基于ServiceBeanDefinition生成的bean结构如下图所示:

image-20221119181110383

其中SpringBean即为服务实现类的Bean,ServiceBean为Dubbo服务的Bean,具体的服务导出逻辑就封装在ServiceBean中。

在Spring启动完成之后,会触发每个ServiceBean的onApplicationEvent方法,此方法会调用ServiceBean的export()方法,执行服务导出。

接下来就详细看看服务导出的实现逻辑。

3. ServiceBean服务导出流程

ServiceBean.export()方法如下:

1
2
3
4
5
6
@Override
public void export() {
super.export();
// 发布ServiceBeanExportedEvent事件
publishExportEvent();
}

继续跟踪super.export()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized void export() {
// 读取配置
checkAndUpdateSubConfigs();

// 判断服务是否需要导出
if (!shouldExport()) {
return;
}

// 判断是否需要延迟发布
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 执行服务导出
doExport();
}
}

这里有两个核心方法调用:

  • checkAndUpdateSubConfigs():读取配置;
  • doExport():执行服务导出;

接下来详细可靠这两个方法的逻辑。

3.1 读取配置

checkAndUpdateSubConfigs()方法的详情如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void checkAndUpdateSubConfigs() {
// Use default configs defined explicitly on global configs
// 补全ServiceConfig中的属性配置,ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取
completeCompoundConfigs();

// Config Center should always being started first.
// 从配置中心获取配置,包括应用配置和全局配置,保存到
// Environment的externalConfigurationMap和appExternalConfigurationMap中
// 然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig)
startConfigCenter();

checkDefault();

checkProtocol();

checkApplication();

// if protocol is not injvm checkRegistry
if (!isOnlyInJvm()) {
checkRegistry();
}

// 使用配置中心的数据覆盖刷新ServiceConfig
this.refresh();

// 如果配了metadataReportConfig,则刷新配置
checkMetadataReport();

if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}

// 如果当前服务的实现类是GenericService接口的实例
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
// 加载接口
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 刷新MethodConfig
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = Boolean.FALSE.toString();
}
...
checkStubAndLocal(interfaceClass);
checkMock(interfaceClass);
}

主要逻辑如下:

  • completeCompoundConfigs()
    • 补全ServiceConfig中的属性配置;
    • 针对ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取;
    • 其中ProviderConfig、ModuleConfig、ApplicationConfig这些配置类Bean,在上面已经解析生成;
  • startConfigCenter()
    • 从配置中心获取配置,包括应用配置和全局配置,保存到Environment的externalConfigurationMap和appExternalConfigurationMap中。这一步中,如果配置了ConfigCenter,则会会去连接配置中心,将当前应用的配置和全局配置都拉取到本地。
    • 然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig);
    • 配置优先级关系图如下:
      • image-20221120111638561
    • 详细的代码可以自己进一步阅读;
  • this.refresh():刷新ServiceConfig,在这一步,已经把远程配置中心的配置数据拉取下来,此时会对使用这些配置数据刷新本地的ServiceConfig从注解解析出来的配置。

下面看看重要的服务导出方法。

3.2 执行服务导出

ServiceConfig#doExport方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;

if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}

继续查看ServiceConfig#doExportUrls

1
2
3
4
5
6
7
8
9
10
11
12
private void doExportUrls() {
// 根据注册中心配置生成所有的注册中心URL
List<URL> registryURLs = loadRegistries(true);

for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 服务导出的方法
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

重点的方法调用:ServiceConfig#doExportUrlsFor1Protocol,继续查看该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* @param protocolConfig 表示一个协议
* @param registryURLs 所有注册中心的URL
*/
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 协议默认为dubbo
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}

// 通过这个map收集服务url的参数
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);

appendRuntimeParameters(map);

// 监控相关参数
appendParameters(map, metrics);
// 应用相关参数
appendParameters(map, application);
...

// 解析每个方法的参数配置,如:
// @Service(versions="1.0.1", grousp="itzhai", methods = {@Method(name = "playGame", timeout = 3000)})
// 解析出来之后,把这些参数拼接到Dubbo服务的导出URL上
if (CollectionUtils.isNotEmpty(methods)) {...}

if (ProtocolUtils.isGeneric(generic)) {...} else {...}

// token机制:https://dubbo.apache.org/en/docs/v2.7/user/examples/token-authorization/
if (!ConfigUtils.isEmpty(token)) {...}

// export service
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 构造最终的要导出的服务url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 生成的URL:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1

// 扩展点,可以通过ConfiguratorFactory,对服务url再次进行配置
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

// scope可能取值:null, remote, local, none
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// scope != remote,则进行本地导出,url的protocol会改为injvm
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 远程导出逻辑
...
}
}
this.urls.add(url);
}

doExportUrlsFor1Protocol这个方法比较长,主要逻辑:

  • 通过map收集服务url的参数;
  • 解析单个方法的配置;
  • token机制参数处理;
  • 构造要导出的服务的最终URL;
  • 根据scope参数执行具体的服务导出逻辑;

根据不同的scope取值,走不同的逻辑:

image-20221120225809987

我们先来看看远程导出逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// scope != local, 则进行远程导出
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 如果配置了注册中心,则将服务注册到所有的注册中心
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}

// 设置是否动态服务参数,用于设置对应的zookeeper上的节点是否临时节点
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));

// 获取监控中心地址
URL monitorUrl = loadMonitor(registryURL);

// 设置监控中心地址
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}

if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}

// For providers, this is used to enable custom proxy to generate invoker,默认为javassist
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}

// 生成ServiceBean对应服务的Invoker代理对象
// registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider-app&dubbo=2.0.2&export=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1&logger=log4j&pid=11035&registry=zookeeper&release=2.7.0&timeout=3000&timestamp=1668952313811
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

// 通过特定协议来执行服务导出,得到Exporter对象,这里的协议为RegistryProtocol
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 不用注册服务到注册中心,直接进行服务导出
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

// 导出的核心方法
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}

/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}

如果配置了注册中心,首先会把服务URL注册到所有的注册中心,在这之前会通过ProxyFactory生成一个服务的代理类Invoker:

1
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

其中registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())会在注册中心的地址后面拼接上服务的URL地址,拼接的结果如下:

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider-app&dubbo=2.0.2&export=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1&logger=log4j&pid=11035&registry=zookeeper&release=2.7.0&timeout=3000&timestamp=1668952313811

通过ProxyFactory创建如下代理类:

image-20221121231440343

然后再把代理类包装成DelegateProviderMetaDataInvoker:

1
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)

image-20221121231821328

接着执行Exporter<?> exporter = protocol.export(wrapperInvoker)进行服务导出。

这里基于以上的url和Dubbo SPI机制,可以推断用到的是RegistryProtocol#exort方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 源url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx&registry=zookeeper&xxx
// 转换为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx
URL registryUrl = getRegistryUrl(originInvoker);
// 得到服务提供者url,表示服务提供者:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?xxx
URL providerUrl = getProviderUrl(originInvoker);

// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.

// 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
// provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);

// 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


// 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
// dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

// export invoker
// 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了dd
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

// url to registry
final Registry registry = getRegistry(originInvoker);

// 简化providerUrl参数,得到存入到注册中心去的providerUrl
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);

// 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);

//to judge if we need to delay publish
// 判断是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册简化后的服务URL到注册中心
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}

// 监听路径内容
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}

这段代码主要处理以下逻辑:

  • getRegistryUrl(originInvoker):得到注册中心的url,这一步会把传进来的registry://xxx?xxx=xxx&registry=zookeeper转换为:zookeeper://xxx?xxx=xxx
  • getProviderUrl(originInvoker):得到服务提供者的url,dubbo://xxx
  • getRegistry(originInvoker):得到注册中心;
  • exporter = doLocalExport(originInvoker, providerUrl):执行服务导出;
  • register(registryUrl, registeredProviderUrl):把服务的URL注册到注册中心;
  • registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener):监听zookeeper节点;

registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:

我们先来重点看看RegistryProtocol#doLocalExport这个方法,这个方法是执行本地导出的。

3.2.1 服务本地导出

代码如下:

1
2
3
4
5
6
7
8
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}

这里最关键的导出方法是:protocol.export(invokerDelegate),这里用到了Dubbo SPI加载具体实现,基于providerUrl,加载到的为DubboProtocol类,最终会调用到以下几个方法:

  • 1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export:
1
2
3
4
5
6
7
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。

执行完之后,生成的对象如下:

image-20221121235737554

  • 2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export:
1
2
3
4
5
6
7
8
9
10
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
// 加载ExporterListener接口扩展点,使用url和EXPORTER_LISTENER_KEY筛选到导出服务相关的扩展点
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}

这里如果是dubbo://开头的url,则会包装ListenerExporterWrapper,给dubbo protocol对应的invoker添加监听器,用于处理导出结果,这里可以进行扩展。

  • 3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
// 构造DubboExporter,用于服务导出
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}

} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}

// 启动Server
openServer(url);

// 一些特殊的序列化机制处理
optimizeSerialization(url);

return exporter;
}

最终会创建一个DubboExporter对象,并放入到exporterMap中,后续执行服务的时候,会从这个map中检索到需要执行的服务,如下图所示:

image-20221121235703655

执行完export方法之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法,这里会包装多一层ExporterChangeableWrapper

image-20221121235640404

而执行完org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#export这个方法,最后会多包装一层DestroyableExporter

image-20221121233718510

最里面的代理类中包括了引用的服务实现类ZDubboTestServiceImpl,当通过Dubbo调用服务的时候,会从exporterMap中找到对应的DubboExporter,最终就会调用这个服务实现类的方法。

接下来的DubboProtocol#openServer就是构建传输层链路与建立连接的了,下面详细看看。

构建传输层链路与启动服务

在Dubbo中,数据处理分为了数据交互层和数据传输层:

image-20221118165218534

接下来看看这个连接是怎么建立的,跟踪DubboProtocol#openServer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void openServer(URL url) {
// find server. key = IP + port, 如:192.168.1.101:20880
String key = url.getAddress();
// client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}

可以发现,这里会为每个 key(IP:port)启动一个服务:

image-20221121234750405

继续跟踪,发现最终会调用到DubboProtocol#createServer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();

// 获取协议名称,如netty,netty,jetty等
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

// 校验客户端协议
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}

return server;
}

这里最关键的是调用数据交换层的bind方法:

1
server = Exchangers.bind(url, requestHandler);

继续跟踪Exchangers#bind方法:

1
2
3
4
5
6
7
8
9
10
11
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// codec 协议编码方式
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}

最终会调用getExchanger得到一个Exchanger实现去启动服务器,这里基于SPI扩展机制,会获取到HeaderExchanger:

1
2
3
4
5
6
7
8
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

接着会调用HeaderExchanger的bind方法去启动服务器,继续跟踪此方法:

1
2
3
4
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.bind来启动服务器,最后再包装成HeaderExchangeServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}

// 默认调用到NettyTransporter的bind方法
return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:

1
2
3
4
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}

继续调用new NettyServer:

1
2
3
4
5
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

这里会调用wrapChannelHandler继续把handler包装多几层:

1
2
3
4
5
6
7
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
// 把handler包装成AllChannelHandler
// 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

最终包装成如下几层:

MultiMessageHandler–>HeartbeatHandler–>AllChannelHandler

最终会调用到NettyServer的doOpen()方法创建连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();

bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();

bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();

}

这里就是我们熟悉的Netty服务器开启的代码了。至此,服务导出的主线流程执行完毕。

传输层构建的链路如下:

image-20221121235355200

可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。

3.2.2 服务导出到注册中心

RegistryProtocol.exort()方法在执行完本地导出之后,会继续把服务URL注册到注册中心:

1
register(registryUrl, registeredProviderUrl);

继续调用以下代码:

1
2
3
4
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}

这里底层会基于SPI的IoC,创建具体的注册中心,如ZookeeperRegistry。

最后通过register方法,把提供者的URL注册到注册中心。

3.3 服务导出相关URL订阅

最后,我们梳理下服务导出需要订阅到的URL。

相关代码在RegistryProtocol#exort中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
// provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);

// 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


// 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
// dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

...

// url to registry
final Registry registry = getRegistry(originInvoker);

// 简化providerUrl参数,得到存入到注册中心去的providerUrl
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);

// 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);

...

// 监听旧版本的动态配置
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

...

主要逻辑:

  • 先生成老版本的动态配置监听URL:overrideSubscribeUrl;
  • 创建OverrideListener用于处理监听到的overrideSubscribeUrl变更事件;
  • 执行overrideUrlWithConfig(),处理以下任务:
    • 通过 ProviderConfigurationListener 监听应用级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
    • 通过 ServiceConfigurationListener 监听服务级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
  • 执行registry.subscribe()监听旧版本的动态配置。

3.3.1 动态配置变更监听处理

监听到动态配置变更之后,最终会调用到这个方法:

org.apache.dubbo.registry.integration.RegistryProtocol.OverrideListener#doOverrideIfNecessary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
//The origin invoker
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);

ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}

//The current, may have been merged many times
URL currentUrl = exporter.getInvoker().getUrl();

//Merged with this configuration
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);

newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
.getConfigurators(), newUrl);

if (!currentUrl.equals(newUrl)) {
// 重新导出
RegistryProtocol.this.reExport(originInvoker, newUrl);
logger.info("exported provider url changed, origin url: " + originUrl +
", old export url: " + currentUrl + ", new export url: " + newUrl);
}
}

最终,如果判断到providerUrl有变更,则执行重新导出方法reExport():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {

// update local exporter
ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);

// update registry
URL registryUrl = getRegistryUrl(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);

//decide if we need to re-publish
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

/**
* Only if the new url going to Registry is different with the previous one should we do unregister and register.
*/
if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
register(registryUrl, registeredProviderUrl);
newProviderInvokerWrapper.setReg(true);
}

exporter.setRegisterUrl(registeredProviderUrl);
}

这里代码不继续跟进了,主要逻辑:

  • 如果newInvokerUrl有变更,则调用doChangeLocalExport()重新执行本地导出,这里又会继续执行DubboProtocol#export方法:

    • 会重新生成一个InvokerDelegate方法,把newInvokerUrl保存在新生成的对象中;

    • 注意,如果是重新导出,并不会关掉Netty服务重新开启,相关代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      private void openServer(URL url) {
      // find server. key = IP + port, 如:192.168.1.101:20880
      String key = url.getAddress();
      // client can export a service which's only for server to invoke
      boolean isServer = url.getParameter(IS_SERVER_KEY, true);
      if (isServer) {
      ExchangeServer server = serverMap.get(key);
      if (server == null) {
      synchronized (this) {
      server = serverMap.get(key);
      if (server == null) {
      serverMap.put(key, createServer(url));
      }
      }
      } else {
      // server supports reset, use together with override
      server.reset(url);
      }
      }
      }
      • 可以发现,最终调用到了server.reset(url),该方法内部会检测心跳间隔时间有没有变更,如果有变更,则取消原来的心跳任务,根据新的心跳时间重新创建任务;
  • 如果providerUrl有变更,则删除掉配置中心原来的url,重新注册新的providerUrl。

订阅的URL如下所示:

image-20221123233751658

至此,服务导出主线逻辑梳理完成。

4. 服务导出总结

服务导出主要做的事情:

  • 解析配置生成Dubbo的Config配置类;
  • 扫描Dubbo服务@Service注解,生成ServiceBean;
  • 执行ServiceBean的export方法执行服务导出
    • 先获取到各种配置,包括配置中心的配置,最后用这些配置覆盖刷新ServiceBean的配置;
    • 基于以上配置信息,构造导出服务的URL;
    • 执行本地导出,主要是用到Protocol的export方法,基于Dubbo的SPI机制,以及URL的参数,调用各种扩展去封装Invoker,Invoker最内部是一个代理类,包含了实际服务的引用;最后为不同的端口分别启动Netty服务,构造传输层的链路;
    • 服务导出到注册中心,即把Invoker对应的URL注册到注册中心;
  • 服务导出相关URL订阅,当感知到动态配置变更的时候,更新providerURL,最终执行RegistryProtocol的reExport方法。

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/jvm/dubbo-service-export-source-code.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。