在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈
公众号发送SPI
获取文章链接。
本文会使用到以下的例子辅助解读Dubbo源码:
假设有如下系统:
消费者:dubbo-consumer-app
调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame
提供者:dubbo-provider-app
如下图所示:
在SpringBoot项目中,我们要启用Dubbo,一般会在启动类中配置:
1 @EnableDubbo (scanBasePackages = {"com.itzhai.dubbo.demo" })
@EnableDubbo注解:
1 2 3 4 5 6 7 8 9 @Target ({ElementType.TYPE})@Retention (RetentionPolicy.RUNTIME)@Inherited @Documented @EnableDubboConfig @DubboComponentScan public @interface EnableDubbo { ... }
其中最重要的两个注解为@EnableDubboConfig
以及@DubboComponentScan
。
@EnableDubboConfig
是用于解析生成DubboConfig配置类的;
@DubboComponentScan
是用于扫描Dubbo服务以及Dubbo服务引用的;
1. 解析生成DubboConfig配置类
我们继续查看@EnableDubboConfig注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Target ({ElementType.TYPE})@Retention (RetentionPolicy.RUNTIME)@Inherited @Documented @Import (DubboConfigConfigurationRegistrar.class)public @interface EnableDubboConfig { boolean multiple () default true ; }
这里引入了DubboConfigConfigurationRegistrar
类,这个是ImportBeanDefinitionRegistrar
接口的实现类。ImportBeanDefinitionRegistrar
是Spring中用于注册BeanDefinition的扩展接口,实现了该接口的类被@Import注解引入之后,会触发其registerBeanDefinitions
方法,然后生成BeanDefiniton对象,并注册到BeanDefinitionRegistry中,为实例化Bean做好准备。
下面看看DubboConfigConfigurationRegistrar
的registerBeanDefinitions
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes attributes = AnnotationAttributes.fromMap( importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName())); boolean multiple = attributes.getBoolean("multiple" ); registerBeans(registry, DubboConfigConfiguration.Single.class); if (multiple) { registerBeans(registry, DubboConfigConfiguration.Multiple.class); } }
其中multiple默认为true,所以默认的会进入到
1 registerBeans(registry, DubboConfigConfiguration.Multiple.class);
这个方法中。跟踪registerBeans方法,可得知,是通过AnnotatedBeanDefinitionReader
这个类来读取DubboConfigConfiguration.Multiple.class
上面的注解的。DubboConfigConfiguration.Multiple类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @EnableDubboConfigBindings ({ @EnableDubboConfigBinding (prefix = "dubbo.applications" , type = ApplicationConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.modules" , type = ModuleConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.registries" , type = RegistryConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.protocols" , type = ProtocolConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.monitors" , type = MonitorConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.providers" , type = ProviderConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.consumers" , type = ConsumerConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.config-centers" , type = ConfigCenterBean.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.metadata-reports" , type = MetadataReportConfig.class, multiple = true ), @EnableDubboConfigBinding (prefix = "dubbo.metricses" , type = MetricsConfig.class, multiple = true ) }) public static class Multiple {}
这里又用到了EnableDubboConfigBinding
注解:
1 2 3 4 5 6 7 8 @Target ({ElementType.TYPE, ElementType.ANNOTATION_TYPE})@Retention (RetentionPolicy.RUNTIME)@Documented @Repeatable (EnableDubboConfigBindings.class)@Import (DubboConfigBindingRegistrar.class)public @interface EnableDubboConfigBinding { ... }
可以发现,最终是通过DubboConfigBindingRegistrar
这个类来处理Multiple上面的注解信息的,这个类也是一个ImportBeanDefinitionRegistrar
类。接下来看看其registerBeanDefinitions
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes attributes = AnnotationAttributes.fromMap( importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBinding.class.getName())); registerBeanDefinitions(attributes, registry); } protected void registerBeanDefinitions (AnnotationAttributes attributes, BeanDefinitionRegistry registry) { String prefix = environment.resolvePlaceholders(attributes.getString("prefix" )); Class<? extends AbstractConfig> configClass = attributes.getClass("type" ); boolean multiple = attributes.getBoolean("multiple" ); registerDubboConfigBeans(prefix, configClass, multiple, registry); }
可以发现,最终会读取到每个@EnableDubboConfigBinding注解的prefix和type属性,传入registerDubboConfigBeans
进行处理。
registerDubboConfigBeans内部会根据prefix去Spring的environment中对应的配置项,找到之后,实例化成对应type的bean,注册到Spring中。比如:
dubbo.application前缀的配置会生成一个ApplicationConfig的bean,dubbo.protocols的前缀,会生成ProtocolConfigd bean。
registerDubboConfigBeans
大致的实现思路如下:
每个@EnableDubboConfigBinding注解都会解析得到一个Config的BeanDefinition注册到Spring中,同时注册一个对应的DubboConfigBindingBeanPostProcessor后置处理器,该处理器内部会利用Spring的DataBinder技术,结合properties文件,为Config Bean对应的属性进行赋值。
具体的registerDubboConfigBeans
实现代码这里就不列出来了,感兴趣的自己跟踪阅读。
至此,Dubbo的Config类生成完毕。
2. 扫描Dubbo的@Service注解,生成ServiceBean
上面提到@EnableDubboConfig
注解会引入DubboComponentScan
。@DubboComponentScan
是用于扫描Dubbo服务以及Dubbo服务引用的,@DubboComponentScan
注解如下:
1 2 3 4 5 6 7 @Target (ElementType.TYPE)@Retention (RetentionPolicy.RUNTIME)@Documented @Import (DubboComponentScanRegistrar.class)public @interface DubboComponentScan { ... }
这里引入了DubboComponentScanRegistrar
,也是一个ImportBeanDefinitionRegistrar
接口实现类,继续看其registerBeanDefinitions
方法:
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void registerBeanDefinitions (AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { Set<String> packagesToScan = getPackagesToScan(importingClassMetadata); registerServiceAnnotationBeanPostProcessor(packagesToScan, registry); registerReferenceAnnotationBeanPostProcessor(registry); }
其中最主要的两个方法:
registerServiceAnnotationBeanPostProcessor
:
注册ServiceAnnotationBeanPostProcessor(是一个BeanDefinitionRegistryPostProcessor接口实现类)
在Spring启动时会调用postProcessBeanDefinitionRegistry方法扫描添加了@Service注解了的类
然后生成BeanDefinition(会生成两个,一个普通的bean,一个ServiceBean),后续的Spring周期中会生成Bean
在ServiceBean中会监听ContextRefreshedEvent事件,一旦Spring启动完后,就会发出事件触发进行服务导出;
registerReferenceAnnotationBeanPostProcessor
:
注册ReferenceAnnotationBeanPostProcessor(是一个AnnotationInjectedBeanPostProcessor抽象类子类);
在Spring启动时,会通过该类扫描服务引用@Reference的注入点,生成ReferenceBean,然后生成具体的代理类注入到对应的字段中。
本节我们看第一个方法,服务导出相关逻辑。
接下来继续跟踪ServiceAnnotationBeanPostProcessor
的postProcessBeanDefinitionRegistry
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void postProcessBeanDefinitionRegistry (BeanDefinitionRegistry registry) throws BeansException { Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) { registerServiceBeans(resolvedPackagesToScan, registry); } else { if (logger.isWarnEnabled()) { logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!" ); } } }
关键方法为registerServiceBeans,该方法扫描并且注册ServiceBean,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private void registerServiceBeans (Set<String> packagesToScan, BeanDefinitionRegistry registry) { DubboClassPathBeanDefinitionScanner scanner = new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader); BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); scanner.setBeanNameGenerator(beanNameGenerator); scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class)); scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class)); for (String packageToScan : packagesToScan) { scanner.scan(packageToScan); Set<BeanDefinitionHolder> beanDefinitionHolders = findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); if (!CollectionUtils.isEmpty(beanDefinitionHolders)) { for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { registerServiceBean(beanDefinitionHolder, registry, scanner); } if (logger.isInfoEnabled()) { logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " + beanDefinitionHolders + " } were scanned under package[" + packageToScan + "]" ); } } else { if (logger.isWarnEnabled()) { logger.warn("No Spring Bean annotating Dubbo's @Service was found under package[" + packageToScan + "]" ); } } } }
这里的逻辑比较简单,主要是通过DubboClassPathBeanDefinitionScanner
扫描器扫描标注有Dubbo的@Service注解的类,扫描得到相关的BeanDefinition,然后遍历处理每个BeanDefinition,处理方法:ServiceAnnotationBeanPostProcessor#registerServiceBean
。接下来看看ServiceAnnotationBeanPostProcessor#registerServiceBean
这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void registerServiceBean (BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry, DubboClassPathBeanDefinitionScanner scanner) { Class<?> beanClass = resolveClass(beanDefinitionHolder); Annotation service = findServiceAnnotation(beanClass); AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false , false ); Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass); String annotatedServiceBeanName = beanDefinitionHolder.getBeanName(); AbstractBeanDefinition serviceBeanDefinition = buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName); String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass); if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { registry.registerBeanDefinition(beanName, serviceBeanDefinition); if (logger.isInfoEnabled()) { logger.info("The BeanDefinition[" + serviceBeanDefinition + "] of ServiceBean has been registered with name : " + beanName); } } else { if (logger.isWarnEnabled()) { logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition + "] of ServiceBean[ bean name : " + beanName + "] was be found , Did @DubboComponentScan scan to same package in many times?" ); } } }
这段代码主要逻辑:
获取到服务实现类,以及服务实现类的注解信息,以及服务实现类的接口;
为服务实现类生成一个ServiceBean的Definition,ServiceBean的名称格式为:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai;
把ServiceBeanDefinition注册到Spring容器中。
注意,在生成ServiceBean的Definition方法中,会把服务实现类的bean赋值给ServiceBean的ref属性:
1 2 3 4 5 6 7 8 private AbstractBeanDefinition buildServiceBeanDefinition (Annotation serviceAnnotation, AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass, String annotatedServiceBeanName) { ... addPropertyReference(builder, "ref" , annotatedServiceBeanName); ... }
基于ServiceBeanDefinition生成的bean结构如下图所示:
其中SpringBean即为服务实现类的Bean,ServiceBean为Dubbo服务的Bean,具体的服务导出逻辑就封装在ServiceBean中。
在Spring启动完成之后,会触发每个ServiceBean的onApplicationEvent方法,此方法会调用ServiceBean的export()方法,执行服务导出。
接下来就详细看看服务导出的实现逻辑。
3. ServiceBean服务导出流程
ServiceBean.export()方法如下:
1 2 3 4 5 6 @Override public void export () { super .export(); publishExportEvent(); }
继续跟踪super.export()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public synchronized void export () { checkAndUpdateSubConfigs(); if (!shouldExport()) { return ; } if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this ::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); } }
这里有两个核心方法调用:
checkAndUpdateSubConfigs():读取配置;
doExport():执行服务导出;
接下来详细可靠这两个方法的逻辑。
3.1 读取配置
checkAndUpdateSubConfigs()方法的详情如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public void checkAndUpdateSubConfigs () { completeCompoundConfigs(); startConfigCenter(); checkDefault(); checkProtocol(); checkApplication(); if (!isOnlyInJvm()) { checkRegistry(); } this .refresh(); checkMetadataReport(); if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!" ); } if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { try { interfaceClass = Class.forName(interfaceName, true , Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); checkRef(); generic = Boolean.FALSE.toString(); } ... checkStubAndLocal(interfaceClass); checkMock(interfaceClass); }
主要逻辑如下:
completeCompoundConfigs()
:
补全ServiceConfig中的属性配置;
针对ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取;
其中ProviderConfig、ModuleConfig、ApplicationConfig这些配置类Bean,在上面已经解析生成;
startConfigCenter()
:
从配置中心获取配置,包括应用配置和全局配置,保存到Environment的externalConfigurationMap和appExternalConfigurationMap中。这一步中,如果配置了ConfigCenter,则会会去连接配置中心,将当前应用的配置和全局配置都拉取到本地。
然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig);
配置优先级关系图如下:
详细的代码可以自己进一步阅读;
this.refresh()
:刷新ServiceConfig,在这一步,已经把远程配置中心的配置数据拉取下来,此时会对使用这些配置数据刷新本地的ServiceConfig从注解解析出来的配置。
下面看看重要的服务导出方法。
3.2 执行服务导出
ServiceConfig#doExport
方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected synchronized void doExport () { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!" ); } if (exported) { return ; } exported = true ; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); }
继续查看ServiceConfig#doExportUrls
:
1 2 3 4 5 6 7 8 9 10 11 12 private void doExportUrls () { List<URL> registryURLs = loadRegistries(true ); for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
重点的方法调用:ServiceConfig#doExportUrlsFor1Protocol
,继续查看该方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private void doExportUrlsFor1Protocol (ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, metrics); appendParameters(map, application); ... if (CollectionUtils.isNotEmpty(methods)) {...} if (ProtocolUtils.isGeneric(generic)) {...} else {...} if (!ConfigUtils.isEmpty(token)) {...} String host = this .findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this .findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); if (!SCOPE_NONE.equalsIgnoreCase(scope)) { if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { ... } } this .urls.add(url); }
doExportUrlsFor1Protocol这个方法比较长,主要逻辑:
通过map收集服务url的参数;
解析单个方法的配置;
token机制参数处理;
构造要导出的服务的最终URL;
根据scope参数执行具体的服务导出逻辑;
根据不同的scope取值,走不同的逻辑:
我们先来看看远程导出逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue ; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null ) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true )) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } } String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this ); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this ); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } MetadataReportService metadataReportService = null ; if ((metadataReportService = getMetadataReportService()) != null ) { metadataReportService.publishProvider(url); }
如果配置了注册中心,首先会把服务URL注册到所有的注册中心,在这之前会通过ProxyFactory生成一个服务的代理类Invoker:
1 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
其中registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())
会在注册中心的地址后面拼接上服务的URL地址,拼接的结果如下:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
?application=dubbo-provider-app&dubbo=2.0.2&export
=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1&logger=log4j&pid=11035®istry=zookeeper&release=2.7.0&timeout=3000×tamp=1668952313811
通过ProxyFactory创建如下代理类:
然后再把代理类包装成DelegateProviderMetaDataInvoker:
1 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this )
接着执行Exporter<?> exporter = protocol.export(wrapperInvoker)
进行服务导出。
这里基于以上的url和Dubbo SPI机制,可以推断用到的是RegistryProtocol#exort
方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Override public <T> Exporter<T> export (final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); URL providerUrl = getProviderUrl(originInvoker); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); boolean register = providerUrl.getParameter(REGISTER_KEY, true ); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true ); } registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); return new DestroyableExporter<>(exporter); }
这段代码主要处理以下逻辑:
getRegistryUrl(originInvoker)
:得到注册中心的url,这一步会把传进来的registry://xxx?xxx=xxx®istry=zookeeper
转换为:zookeeper://xxx?xxx=xxx
getProviderUrl(originInvoker)
:得到服务提供者的url,dubbo://xxx
;
getRegistry(originInvoker)
:得到注册中心;
exporter = doLocalExport(originInvoker, providerUrl)
:执行服务导出;
register(registryUrl, registeredProviderUrl)
:把服务的URL注册到注册中心;
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)
:监听zookeeper节点;
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:
我们先来重点看看RegistryProtocol#doLocalExport
这个方法,这个方法是执行本地导出的。
3.2.1 服务本地导出
代码如下:
1 2 3 4 5 6 7 8 private <T> ExporterChangeableWrapper<T> doLocalExport (final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
这里最关键的导出方法是:protocol.export(invokerDelegate)
,这里用到了Dubbo SPI加载具体实现,基于providerUrl,加载到的为DubboProtocol类,最终会调用到以下几个方法:
1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export:
1 2 3 4 5 6 7 @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); }
这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。
执行完之后,生成的对象如下:
2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export:
1 2 3 4 5 6 7 8 9 10 @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); }
这里如果是dubbo://开头的url,则会包装ListenerExporterWrapper,给dubbo protocol对应的invoker添加监听器,用于处理导出结果,这里可以进行扩展。
3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false ); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded." )); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
最终会创建一个DubboExporter对象,并放入到exporterMap中,后续执行服务的时候,会从这个map中检索到需要执行的服务,如下图所示:
执行完export方法之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
方法,这里会包装多一层ExporterChangeableWrapper
:
而执行完org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#export
这个方法,最后会多包装一层DestroyableExporter
:
最里面的代理类中包括了引用的服务实现类ZDubboTestServiceImpl,当通过Dubbo调用服务的时候,会从exporterMap中找到对应的DubboExporter,最终就会调用这个服务实现类的方法。
接下来的DubboProtocol#openServer
就是构建传输层链路与建立连接的了,下面详细看看。
构建传输层链路与启动服务
在Dubbo中,数据处理分为了数据交互层和数据传输层:
接下来看看这个连接是怎么建立的,跟踪DubboProtocol#openServer
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void openServer (URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(IS_SERVER_KEY, true ); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null ) { synchronized (this ) { server = serverMap.get(key); if (server == null ) { serverMap.put(key, createServer(url)); } } } else { server.reset(url); } } }
可以发现,这里会为每个 key(IP:port)启动一个服务:
继续跟踪,发现最终会调用到DubboProtocol#createServer
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private ExchangeServer createServer (URL url) { url = URLBuilder.from(url) .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0 ) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
这里最关键的是调用数据交换层的bind方法:
1 server = Exchangers.bind(url, requestHandler);
继续跟踪Exchangers#bind
方法:
1 2 3 4 5 6 7 8 9 10 11 public static ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).bind(url, handler); }
最终会调用getExchanger得到一个Exchanger实现去启动服务器,这里基于SPI扩展机制,会获取到HeaderExchanger:
1 2 3 4 5 6 7 8 public static Exchanger getExchanger (URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger (String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
接着会调用HeaderExchanger的bind方法
去启动服务器,继续跟踪此方法:
1 2 3 4 @Override public ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.bind来启动服务器,最后再包装成HeaderExchangeServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static Server bind (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handlers == null || handlers.length == 0 ) { throw new IllegalArgumentException("handlers == null" ); } ChannelHandler handler; if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter () { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:
1 2 3 4 @Override public Server bind (URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
继续调用new NettyServer:
1 2 3 4 5 public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
这里会调用wrapChannelHandler继续把handler包装多几层:
1 2 3 4 5 6 7 protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
最终包装成如下几层:
MultiMessageHandler-->HeartbeatHandler-->AllChannelHandler
最终会调用到NettyServer的doOpen()方法
创建连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override protected void doOpen () throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1 , new DefaultThreadFactory("NettyServerBoss" , true )); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker" , true )); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this ); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("server-idle-handler" , new IdleStateHandler(0 , 0 , idleTimeout, MILLISECONDS)) .addLast("handler" , nettyServerHandler); } }); ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
这里就是我们熟悉的Netty服务器开启的代码了。至此,服务导出的主线流程执行完毕。
传输层构建的链路如下:
可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。
3.2.2 服务导出到注册中心
RegistryProtocol.exort()
方法在执行完本地导出之后,会继续把服务URL注册到注册中心:
1 register(registryUrl, registeredProviderUrl);
继续调用以下代码:
1 2 3 4 public void register (URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }
这里底层会基于SPI的IoC,创建具体的注册中心,如ZookeeperRegistry。
最后通过register方法,把提供者的URL注册到注册中心。
3.3 服务导出相关URL订阅
最后,我们梳理下服务导出需要订阅到的URL。
相关代码在RegistryProtocol#exort
中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); ... final Registry registry = getRegistry(originInvoker);final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); ... registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); ...
主要逻辑:
先生成老版本的动态配置监听URL:overrideSubscribeUrl;
创建OverrideListener用于处理监听到的overrideSubscribeUrl变更事件;
执行overrideUrlWithConfig(),处理以下任务:
通过 ProviderConfigurationListener 监听应用级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
通过 ServiceConfigurationListener 监听服务级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
执行registry.subscribe()监听旧版本的动态配置。
3.3.1 动态配置变更监听处理
监听到动态配置变更之后,最终会调用到这个方法:
org.apache.dubbo.registry.integration.RegistryProtocol.OverrideListener#doOverrideIfNecessary
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public synchronized void doOverrideIfNecessary () { final Invoker<?> invoker; if (originInvoker instanceof InvokerDelegate) { invoker = ((InvokerDelegate<?>) originInvoker).getInvoker(); } else { invoker = originInvoker; } URL originUrl = RegistryProtocol.this .getProviderUrl(invoker); String key = getCacheKey(originInvoker); ExporterChangeableWrapper<?> exporter = bounds.get(key); if (exporter == null ) { logger.warn(new IllegalStateException("error state, exporter should not be null" )); return ; } URL currentUrl = exporter.getInvoker().getUrl(); URL newUrl = getConfigedInvokerUrl(configurators, originUrl); newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl); newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()) .getConfigurators(), newUrl); if (!currentUrl.equals(newUrl)) { RegistryProtocol.this .reExport(originInvoker, newUrl); logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); } }
最终,如果判断到providerUrl有变更,则执行重新导出方法reExport():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public <T> void reExport (final Invoker<T> originInvoker, URL newInvokerUrl) { ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl); URL registryUrl = getRegistryUrl(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker); ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) { unregister(registryUrl, providerInvokerWrapper.getProviderUrl()); register(registryUrl, registeredProviderUrl); newProviderInvokerWrapper.setReg(true ); } exporter.setRegisterUrl(registeredProviderUrl); }
这里代码不继续跟进了,主要逻辑:
订阅的URL如下所示:
至此,服务导出主线逻辑梳理完成。
4. 服务导出总结
服务导出主要做的事情:
解析配置生成Dubbo的Config配置类;
扫描Dubbo服务@Service注解,生成ServiceBean;
执行ServiceBean的export方法执行服务导出
先获取到各种配置,包括配置中心的配置,最后用这些配置覆盖刷新ServiceBean的配置;
基于以上配置信息,构造导出服务的URL;
执行本地导出,主要是用到Protocol的export方法,基于Dubbo的SPI机制,以及URL的参数,调用各种扩展去封装Invoker,Invoker最内部是一个代理类,包含了实际服务的引用;最后为不同的端口分别启动Netty服务,构造传输层的链路;
服务导出到注册中心,即把Invoker对应的URL注册到注册中心;
服务导出相关URL订阅,当感知到动态配置变更的时候,更新providerURL,最终执行RegistryProtocol的reExport方法。