在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈
公众号发送SPI
获取文章链接。
本文会使用到以下的例子辅助解读Dubbo源码:
假设有如下系统:
消费者:dubbo-consumer-app
调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame
提供者:dubbo-provider-app
如下图所示:
1. 识别@Reference注入点
首先,应用会注册一个ReferenceAnnotationBeanPostProcessor
用于处理@Reference
注解。
1 2 3 4 public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements ApplicationContextAware , ApplicationListener { ... }
该类继承AnnotationInjectedBeanPostProcessor
,实现了自定义注解注入,这里的自定义注解为Dubbo的@Reference注解,大家也可以基于该抽象类实现自己的注入注解。
在AnnotationInjectedBeanPostProcessor
中,会寻找所有被@Reference
注解标注的属性或者方法,作为注入点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public PropertyValues postProcessPropertyValues ( PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException { InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs); try { metadata.inject(bean, beanName, pvs); } catch (BeanCreationException ex) { throw ex; } catch (Throwable ex) { throw new BeanCreationException (beanName, "Injection of @" + getAnnotationType().getSimpleName() + " dependencies is failed" , ex); } return pvs; }
findInjectionMetadata方法会找到所有需要注入的字段(AnnotatedFieldElement)和方法(AnnotatedMethodElement),包装成AnnotatedInjectionMetadata返回。
最终调用metadata.inject(bean, beanName, pvs)
进行依赖注入,该方法根据FieldElement的具体子类调用其inject方法,来完成属性或者方法的注入,这里就是Spring框架的依赖注入代码了:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void inject (Object target, String beanName, PropertyValues pvs) throws Throwable { Collection<InjectedElement> elementsToIterate = (this .checkedElements != null ? this .checkedElements : this .injectedElements); if (!elementsToIterate.isEmpty()) { boolean debug = logger.isDebugEnabled(); for (InjectedElement element : elementsToIterate) { if (debug) { logger.debug("Processing injected element of bean '" + beanName + "': " + element); } element.inject(target, beanName, pvs); } } }
而dubbo继承了Spring的InjectionMetadata.InjectedElement
,我们挑一个AnnotatedFieldElement
来看其具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected void inject (Object bean, String beanName, PropertyValues pvs) throws Throwable { Class<?> injectedType = field.getType(); Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this ); ReflectionUtils.makeAccessible(field); field.set(bean, injectedObject); }
关键是getInjectedObject
方法,该方法获取到了需要依赖注入的值,最终通过反射注入属性。
2. 生成服务引入的对象
下面看看getInjectedObject
是怎么获取到注入的属性的值的,该方法最终会调用到org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override protected Object doGetInjectedBean (AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception { String referencedBeanName = buildReferencedBeanName(attributes, injectedType); String referenceBeanName = getReferenceBeanName(attributes, injectedType); ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType); cacheInjectedReferenceBean(referenceBean, injectedElement); return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType); }
接着看getOrCreateProxy方法:
1 2 3 4 5 6 7 8 private Object getOrCreateProxy (String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) { if (existsServiceBean(referencedBeanName)) { return newProxyInstance(getClassLoader(), new Class []{serviceInterfaceType}, wrapInvocationHandler(referenceBeanName, referenceBean)); } else { return referenceBean.get(); } }
最终都会调用到referenceBean.get()方法获取实例,ReferenceBean是ReferenceConfig的子类,最终会调用到ReferenceConfig的init()方法进行初始化,此方法最终会调用org.apache.dubbo.config.ReferenceConfig#createProxy
方法来创建一个代理对象赋值给ReferenceBean的ref属性,该方法处理的逻辑分支比较多,我们这里假设只配置了一个注册中心的情况下,代码会走到这里来:
1 2 3 4 if (urls.size() == 1 ) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0 )); } else { ...
urls.get(0)为注册中心的url,最终REF_PROTOCOL.refer会调用到以下几个方法:
1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer:
1 2 3 4 5 6 7 @Override public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); }
这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。
2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer:
1 2 3 4 5 6 7 8 9 10 @Override public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper <T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); }
这里如果是dubbo://开头的url,则会包装ListenerInvokerWrapper,给dubbo protocol对应的invoker添加监听器,用于处理结果,这里可以进行扩展。
3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.registry.integration.RegistryProtocol#refer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0 ) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*" .equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
2.1 创建动态服务目录
继续调用RegistryProtocol#doRefer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private <T> Invoker<T> doRefer (Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory <T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap <String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL (CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0 , type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true )) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
其中
1 Invoker invoker = cluster.join(directory);
用到了Dubbo SPI,会依次执行MockClusterWrapper, FailoverCluster,最终生成MockClusterInvoker和FailoverClusterInvoker,代码跟踪到这里,我们可以得出以下对象结构:
2.2 构造路由链
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private RouterChain (URL url) { List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class) .getActivateExtension(url, (String[]) null ); List<Router> routers = extensionFactories.stream() .map(factory -> factory.getRouter(url)) .collect(Collectors.toList()); initWithRouters(routers); }
其中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Activate(order = 200) public class AppRouterFactory implements RouterFactory { public static final String NAME = "app" ; private volatile Router router; @Override public Router getRouter (URL url) { if (router != null ) { return router; } synchronized (this ) { if (router == null ) { router = createRouter(url); } } return router; } private Router createRouter (URL url) { return new AppRouter (DynamicConfiguration.getDynamicConfiguration(), url); } }
最终AppRouter的构造函数中,会绑定一个监听器,监听路径:/dubbo/config/dubbo/dubbo-consumer-app.condition-router
同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成AppRouter,具体代码大家可以自行详细阅读。
与AppRouterFactory类似,最终ServiceRouter构造函数会绑定一个监听器,监听路径:/dubbo/config/dubbo/com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai.condition-router
同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成ServiceRouter,具体代码大家可以自行详细阅读。
TagRouterFactory.getRouter:
TagRouter与上面两个Router有所不同,因为TagRouter监听的路径为:/dubbo/config/dubbo/dubbo-provider-app.tag-router
,是需要读取提供者配置的tag-router的,所以只有在获取到提供者的应用名之后,才可以读取配置,在TagRouterFactory.getRouter()方法中,并不会监听和拉取配置,只是先初始化好一个空的TagRouter,等到服务提供者的服务url从配置中心拉取到之后,获取到提供者的名称之后,才会拉取tag-router配置,并且构造TagRouter。TODO 触发点
最终构造如下路由链:
到这里构建的对象结构如下:
2.3 监听节点
上面最核心的方法就是directory.subscribe(),下面详细看看这个方法(RegistryDirectory#subscribe
):
1 2 3 4 5 6 7 8 9 public void subscribe (URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this ); serviceConfigurationListener = new ReferenceConfigurationListener (this , url); registry.subscribe(url, this ); }
这里的registry为:ZookeeperRegistry extends FailbackRegistry,而RegistryDirectory也是一个NotifyListener:
1 public class RegistryDirectory <T> extends AbstractDirectory <T> implements NotifyListener
继续跟踪 registry.subscribe(url, this),最终会调用:org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe,此方法我们先关注订阅单个服务的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 } else { List<URL> urls = new ArrayList <>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null ) { zkListeners.putIfAbsent(url, new ConcurrentHashMap <>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null ) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this .notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)) ); zkListener = listeners.get(listener); } zkClient.create(path, false ); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null ) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); }
以上代码主要会监听以下三个路径:
/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers
/dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators
/dubbo/com.itzhai.dubbo.demo.DubboTestService/routers
完整的监听节点和处理逻辑如下图:
2.4 初始化DubboInvoker
在监听的同时,会拉取监听节点的数据。
然后调用notify(url, listener, urls)处理拉取到的数据。这里我们重点观察/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers
的处理流程,调用以下代码监听该节点:
1 List<String> children = zkClient.addChildListener(path, zkListener);
同时返回的children(经过URLDecode解码处理),为引入的服务的所有提供者的url:
0 = “dubbo://192.168.0.106:20810/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider×tamp=1668588840665&version=1.0.1”
1 = “dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider×tamp=1668588840598&version=1.0.1”
接着会调用org.apache.dubbo.registry.support.FailbackRegistry#notify
方法,跟踪notify方法,最终会调用到这里:AbstractRegistry#notify(URL, NotifyListener, List<URL>
),关键的处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Map<String, List<URL>> result = new HashMap <>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList <>()); categoryList.add(u); } } if (result.size() == 0 ) { return ; } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap <>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); saveProperties(url); }
这里会先把传进来的urls转换为Map:
接着遍历map进行处理,遍历的过程中,调用listener.notify(categoryList)进行处理,这里的listener为RegistryDirectory,categoryList即为两个服务提供者的URL,最终会调用到org.apache.dubbo.registry.integration.RegistryDirectory#notify
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public synchronized void notify (List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this ::isValidCategory) .filter(this ::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return "" ; })); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this .configurators = Configurator.toConfigurators(configuratorURLs).orElse(this .configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this ::addRouters); List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs); }
这里主要是根据传入的url,获取到需要的信息进行加工,最后存储到RegistryDirectory中:
获取动态配置URL,并且生成configurators;
获取老版本的路由URL,生成Router,添加到路由链中;
获取服务提供者的URL,进一步处理,生成DubboInvoker。
以上这些结果最终都存储到RegistryDirectory中。
我们这里重点跟踪生成DubboInvoker的逻辑,当传入的urls为服务提供者的URL时,会执行以下两行代码:
1 2 3 List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs);
生成DubboInvoker的核心方法是refreshOverrideAndInvoker:
1 2 3 4 5 6 7 private void refreshOverrideAndInvoker (List<URL> urls) { overrideDirectoryUrl(); refreshInvoker(urls); }
最关键的就是这个refreshInvoker方法,跟踪这个方法,其中最核心的一行代码:
1 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
继续跟踪该方法,最终会调用到这行代码生成invoker:
1 invoker = new InvokerDelegate <>(protocol.refer(serviceType, url), url, providerUrl);
这里基于Dubbo SPI,最终调用到的是DubboProtocol.refer方法,同时,由于在dubbo-rpci-api模块中,配置了Dubbo SPI的Wrapper类:
1 2 3 filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper mock=org.apache.dubbo.rpc.support.MockProtocol
所以,这里的执行顺序是:
ProtocolListenerWrapper.refer:该Wrapper类中判断到URL如果是dubbo://开通的,会给DubboProtocol对应的invoker添加监听器,包装成ListenerInvokerWrapper
类,用于处理refer结果,这里可以进行扩展,标签路由器就是在这里设置了监听器,处理完refer之后,进行初始化的 ;
ProtocolFilterWrapper.refer:这个Wrapper会给dubbo protocol的url对应的invoker添加Filter过滤器,我们给dubbo添加的自定义过滤器就是在这里集成进来的
;
DubboProtocol.refer:生成DubboInvoker的类。
其中DubboProtocol.refer会调用到父类AbstractProtocol的refer方法
:
1 2 3 4 5 6 7 @Override public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker <>(protocolBindingRefer(type, url)); }
所以,最终生成DubboInvoker的就是DubboProtocol#protocolBindingRefer
这个方法了:
1 2 3 4 5 6 7 8 9 10 11 @Override public <T> Invoker<T> protocolBindingRefer (Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); DubboInvoker<T> invoker = new DubboInvoker <T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
可以发现在DubboInvoker中维护了和服务提供者的socket连接。
3. 构建传输层链路与建立连接
在Dubbo中,数据处理分为了数据交互层和数据传输层:
接下来看看这个连接是怎么建立的,跟踪getClients方法,发现最终会调用到DubboProtocol#initClient
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private ExchangeClient initClient (URL url) { String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); url = url.addParameter(CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException ("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " " )); } ExchangeClient client; try { if (url.getParameter(LAZY_CONNECT_KEY, false )) { client = new LazyConnectExchangeClient (url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException ("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
最核心的一行代码为:
1 client = Exchangers.connect(url, requestHandler);
继续跟踪该方法:
1 2 3 4 5 6 7 8 9 10 public static ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException ("url == null" ); } if (handler == null ) { throw new IllegalArgumentException ("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).connect(url, handler); }
最终会调用getExchanger得到一个Exchanger实现去创建连接,这里基于SPI扩展机制,会获取到HeaderExchanger:
1 2 3 4 5 6 7 8 public static Exchanger getExchanger (URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger (String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
接着会调用HeaderExchanger的connect方法去创建连接,继续跟踪此方法:
1 2 3 4 @Override public ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient (Transporters.connect(url, new DecodeHandler (new HeaderExchangeHandler (handler))), true ); }
这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.connect来创建Client连接对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static Client connect (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException ("url == null" ); } ChannelHandler handler; if (handlers == null || handlers.length == 0 ) { handler = new ChannelHandlerAdapter (); } else if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher (handlers); } return getTransporter().connect(url, handler); } public static Transporter getTransporter () { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
最后再包装成HeaderExchangeClient,HeaderExchangeClient中会把Client对象包装成HeaderExchangeChannel对象,同时开启重连任务和心跳检测任务:
1 2 3 4 5 6 7 8 9 10 11 public HeaderExchangeClient (Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null" ); this .client = client; this .channel = new HeaderExchangeChannel (client); if (startTimer) { URL url = client.getUrl(); startReconnectTask(url); startHeartBeatTask(url); } }
在Transporters.connect方法中最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:
1 2 3 4 @Override public Client connect (URL url, ChannelHandler listener) throws RemotingException { return new NettyClient (url, listener); }
继续调用new NettyClient:
1 2 3 4 5 public NettyClient (final URL url, final ChannelHandler handler) throws RemotingException { super (url, wrapChannelHandler(url, handler)); }
这里会调用wrapChannelHandler继续把handler包装多几层:
1 2 3 4 5 6 7 protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler (new HeartbeatHandler (ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
最终包装成如下几层:
MultiMessageHandler–>HeartbeatHandler–>AllChannelHandler
最终会调用到NettyClient的doOpen()方法创建连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override protected void doOpen () throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler (getUrl(), this ); bootstrap = new Bootstrap (); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000 , getConnectTimeout())); bootstrap.handler(new ChannelInitializer () { @Override protected void initChannel (Channel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter (getCodec(), getUrl(), NettyClient.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("client-idle-handler" , new IdleStateHandler (heartbeatInterval, 0 , 0 , MILLISECONDS)) .addLast("handler" , nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if (socksProxyHost != null ) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler (new InetSocketAddress (socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); }
这里就是我们熟悉的Netty创建客户端连接的代码了。至此,服务引入的主线流程执行完毕。
传输层构建的链路如下:
可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。
最终主线流程构造的完整处理链路如下:
在服务调用章节,会为你详细介绍每个类的作用。在这之前,你需要了解服务引用都创建了什么样的东西。
4. 服务引入总结
服务引入流程主要做如下事情:
扫描项目识别@Reference注入点;
获取需要注入的对象:
获取到@Reference注解的信息,封装成ReferenceBean;
最终执行ReferenceBean的refer方法执行服务引入:
为引入的服务创建动态服务目录;
构造路由链,存储到动态服务目录中;
监听动态配置节点,以及服务提供者节点,并且先从注册中心拉取节点配置;
获取到服务提供者的URL之后,最终包装成DubboInvoker;
为DubboInvoker包装异步转同步类,以及过滤器链处理类;
最后为该DubboInvoker构造传输层链路,以及创建底层网络连接;
通过反射进行注入。
可以发现,Dubbo的整个服务引入流程还是挺复杂的,很多的SPI扩展机制,增加了扩展性,但是跟踪代码的时候也会更难跟踪。
整个服务引入的流程,就是构造以上的请求链路,以及创建好socket连接。