Dubbo之服务暴露流程浅析

推荐2年前 (2022)发布 AI工具箱
29 0 0

本文基于:dubbo 2.6.3版本。

分享人:某人zZ

Dubbo的核心架构

一、前言:

总所周知,Dubbo的配置有两种方式:XML方式或者注解方式

DubboNamespaceHandler是干啥?

由于我司常用的为XML,所以主要聊一下XML的流程,Dubbo 是基于Sprin配置文件宽展了自定义的解析,在Spring容器(AbstractApplicationContext)启动时执行refresh()方法中会调用DubboNamespaceHandler的init()方法,具体如下:

(PS:Spring如何调用DubboNamespaceHandler的方法具体不做分析)

public class DubboNamespaceHandler extends NamespaceHandlerSupport {static {Version.checkDuplicate(DubboNamespaceHandler.class);}@Overridepublic void init() {registerBeanDefinitionParser(“application”, new DubboBeanDefinitionParser(ApplicationConfig.class, true));registerBeanDefinitionParser(“module”, new DubboBeanDefinitionParser(ModuleConfig.class, true));registerBeanDefinitionParser(“registry”, new DubboBeanDefinitionParser(RegistryConfig.class, true));registerBeanDefinitionParser(“monitor”, new DubboBeanDefinitionParser(MonitorConfig.class, true));registerBeanDefinitionParser(“provider”, new DubboBeanDefinitionParser(ProviderConfig.class, true));registerBeanDefinitionParser(“consumer”, new DubboBeanDefinitionParser(ConsumerConfig.class, true));registerBeanDefinitionParser(“protocol”, new DubboBeanDefinitionParser(ProtocolConfig.class, true));registerBeanDefinitionParser(“service”, new DubboBeanDefinitionParser(ServiceBean.class, true));registerBeanDefinitionParser(“reference”, new DubboBeanDefinitionParser(ReferenceBean.class, false));registerBeanDefinitionParser(“annotation”, new AnnotationBeanDefinitionParser());}}

主要作用为:将XML和对应XML解析器做映射,具体的调用链路如下:

为什么会加载ServiceBean?

Spring容器启动时,会将扫描到xml中的各个Dubbo自定义的如dubbo:application、dubbo:registry、dubbo:service等标签,根据DubboNameSpaceHandler中初始化的解析器,解析为对应的BeanDefiniton。

最终会加载如下的BeanDefinition

由于ServiceBean实现了ApplicationListener,而对应的onApplicationEvent方法中的export() 方法就是本文的主角。

public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean,ApplicationContextAware, ApplicationListener, BeanNameAware,ApplicationEventPublisherAware {@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {// 延迟暴露 if (isDelay() && !isExported() && !isUnexported()) {if (logger.isInfoEnabled()) {logger.info(“The service ready on spring started. service: ” + getInterface());}export();}}}

PS:ApplicationListener的onApplicationEvent具体执行的链路如下,忘记的可以按照下面的流程跟一下代码:

二、Dubbo服务暴露的流程

由于直接撸代码可能会比较乱,所以先简单地捋一下整体的流程。

1.校验配置和初始化

Dubbo的配置类

服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。

数据配置类

含义

ApplicationConfig

应用信息配置 dubbo:application/ 用于配置当前应用信息,不管该应用是提供者还是消费者。

ResistryConfig

注册中心配置 dubbo:registry/ 用于配置连接注册中心相关信息。

ProtocolConfig

服务提供者协议配置 dubbo:protocol/ 用于配置提供服务的协议信息,协议由提供方指定,消费方被动接受。

MonitorConfig

监控中心配置 dubbo:monitor/ 用于配置连接监控中心相关信息,可选。

ServiceConfig

服务提供者暴露服务配置 dubbo:service/ 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心。

ProviderConfig

服务提供者缺省值配置 dubbo:provider/ 当ProtocolConfig和ServiceConfig某属性没有配置时,采用此缺省值,可选。

ReferenceConfig

服务消费者引用服务配置 dubbo:reference/ 用于创建一个远程服务代理,一个引用可以指向多个注册中心。

ConsumerConfig

服务消费者缺省值配置 dubbo:consumer/ 当ReferenceConfig某属性没有配置时,采用此缺省值,可选。

MoudleConfig

模块信息配置 dubbo:moudle/ 用于配置当前模块信息,可选。

MethodConfig

方法及配置 dubbo:method/ 用于ServiceConfig和ReferenceConfig指定方法级的配置信息。

Dubbo的Config的类继承图

2.根据配置构建URL

(本地暴露的URL、远程暴露的URL)

本地暴露的URL

为什么要本地暴露? 因为同一个jvm的其他服务调用当前服务时, 就不用走远程服务调用了,直接调用injvm的服务就可以了。

injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider tamp=1666425598425

远程暴露的URL

registry的URL

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=51020&side=provider tamp=1666426018092&group=aaa&pid=51020 istry=zookeeper tamp=1666426018075

Dubbo的URL

dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=49599&side=provider tamp=1666425598425

3.将服务暴露的本地和远程

本地暴露

1.构造本地暴露的URL。

injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=94019&side=provider&timestamp=1666446463499

2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。具体链路如下:

3.通过protocol.export(invoker),构造一个Exporter。 具体链路如下:

4.把exporter缓存到exporters。

关于Protocol

Protocol为Dubbo的SPI接口,Dubbo的配置类:com.alibaba.dubbo.rpc.Protocol具体如下:

— Protocol的SPI的配置filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrappermock=com.alibaba.dubbo.rpc.support.MockProtocoldubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocolinjvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocolrmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocolhessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocolcom.alibaba.dubbo.rpc.protocol.http.HttpProtocolcom.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocolthrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocolmemcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocolredis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocolrest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocolregistry=com.alibaba.dubbo.registry.integration.RegistryProtocolqos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper

Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类。具体流程不清楚可以看上一篇《Dubbo的SPI简介》。

重点涉及到的有如下:

1.QosProtocolWrapper:如果是注册的Invoker,则会开启QOS服务。

QoS(Quality of Service,服务质量)指一个网络能够利用各种基础技术,为指定的网络通信提供更好的服务能力,是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术。dubbo为用户提供类似的网络服务用来online和offline service来解决网络延迟,阻塞等问题。

ProtocolFilterWrapper:如果不是注册的Invoker,组装一个invoker链。

EchoFilter -> ClassLoaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter

如图:

ProtocolListenerWrapper:如果不是注册的Invoker,则构建一个ListenerExporterWrapper,包含了ExporterListener集合。(Dubbo默认没有listener)。

关于Filter

Filter也是Dubbo的SPI接口。

Filter的配置com.alibaba.dubbo.rpc.Filter如下:

— filter的配置cache=com.alibaba.dubbo.cache.filter.CacheFiltervalidation=com.alibaba.dubbo.validation.filter.ValidationFilterecho=com.alibaba.dubbo.rpc.filter.EchoFiltergeneric=com.alibaba.dubbo.rpc.filter.GenericFiltergenericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFiltertoken=com.alibaba.dubbo.rpc.filter.TokenFilteraccesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilteractivelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilterclassloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFiltercontext=com.alibaba.dubbo.rpc.filter.ContextFilterconsumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilterexception=com.alibaba.dubbo.rpc.filter.ExceptionFilterexecutelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFiltercompatible=com.alibaba.dubbo.rpc.filter.CompatibleFiltertimeout=com.alibaba.dubbo.rpc.filter.TimeoutFiltertrace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilterfuture=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFiltermonitor=com.alibaba.dubbo.monitor.support.MonitorFilter

Dubbo会通过 ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group) 方法获取Filter列表。

该方法会对Filter列表进行排序:

1.如果Filter 有order,则会根据order,从小到大排序

2.如果没有order,则会将cachedActivates中的Filter的顺序 倒排。

FAQ: 1.cachedActivates中的Filter的顺序是按照Filter的name的Hash值排序的。 2.为什么没有Filter,没有order会根据cachedActivates中的Filter的顺序 倒排 ? 具体可以看一下ActivateComparator.COMPARATOR的排序规则。具体的规则大概流程:

n1 > n2 ? 1: -1 filter1的order 不大于 filter2的order,则放在filter2的前面。

远程暴露

1.构建URL

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=94019 istry=zookeeper tamp=1666446463342

2.通过proxyFactory.getInvoker,把需要暴露的服务封装为Wrapper,然后构造一个Invoker。(和本地暴露一下)。

3.将invoker封装为DelegateProviderMetaDataInvoker

4.(重要)通过protocol.export(invoker),构造一个Exporter。

具体链路如下:

RegistryPortocol

1.服务暴露(构建Dubbo 的URL 然后调用DubooProtocol的export的方法)2.构造真正的registryURL(ZK的注册中心URL)zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=62781&side=provider tamp=1666429076800&group=aaa&pid=62781 tamp=16664290767863.获取ZK的Registry4.构建Dubbo的URL5.将invoker缓存到providerInvokers中6.将服务注册到ZK中,并将注册表中的服务的是否已经注册(isReg) 设置为true7.创建监听器,放入缓存中,并且订阅注册中心的override数据(2.7.X之后废弃)

DubboProtocol

1.构造exporter,然后缓存到exporterMap中。2.开启Netty服务

5.把exporter缓存到exporters。

三、源码解析

先从ServiceBean的onApplicationEvent()方法开始

@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {// 如果是延时暴露 或者 已经暴露 或者 不能暴露,则不暴露if (isDelay() && !isExported() && !isUnexported()) {if (logger.isInfoEnabled()) {logger.info(“The service ready on spring started. service: ” + getInterface());}// 调用ServiceConfig的export()方法export();}}

ServiceConfig的export()方法代码如下:

public synchronized void export() {// 服务提供者的配置是否为空(默认为空)if (provider != null) {if (export == null) {export = provider.getExport();}if (delay == null) {delay = provider.getDelay();}}if (export != null && !export) {return;}// 延迟暴露的配置是否为空,不为空则延迟暴露if (delay != null && delay > 0) {delayExportExecutor.schedule(new Runnable() {@Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);} else {// 直接暴露服务doExport();}}

ServiceConfig的doExport()方法代码如下:

protected synchronized void doExport() {if (unexported) {throw new IllegalStateException(“Already unexported!”);}if (exported) {return;}exported = true;if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException(” interface not allow null!”);}// 校验ProviderConfig是否为空,为空则构建一个并初始化checkDefault();if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {registries = provider.getRegistries();}if (monitor == null) {monitor = provider.getMonitor();}if (protocols == null) {protocols = provider.getProtocols();}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {monitor = module.getMonitor();}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {monitor = application.getMonitor();}}if (ref instanceof GenericService) {interfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {generic = Boolean.TRUE.toString();}} else {try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}checkInterfaceAndMethods(interfaceClass, methods);checkRef();generic = Boolean.FALSE.toString();}if (local != null) {if (“true”.equals(local)) {local = interfaceName + “Local”;}Class localClass;try {localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException(“The local implementation class ” + localClass.getName() + ” not implement interface ” + interfaceName);}}if (stub != null) {if (“true”.equals(stub)) {stub = interfaceName + “Stub”;}Class stubClass;try {stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException(“The stub implementation class ” + stubClass.getName() + ” not implement interface ” + interfaceName);}}// 检验ApplicationConfig不为空,并append系统参数checkApplication();// 检验RegistryConfig不为空,并append系统参数checkRegistry();// 检验ProtocolConfig不为空,并append系统参数checkProtocol();// append系统参数到ServiceConfigappendProperties(this);// stub、mock的合理性校验(是否有构造方法)checkStubAndMock(interfaceClass);if (path == null || path.length() == 0) {path = interfaceName;}// 暴露服务doExportUrls();// 构造providerModel 并缓存到providedServices中ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);}

调用ServiceConfig的doExportUrls()方法

private void doExportUrls() {// 获取配置的注册中心列表(默认为zk)List registryURLs = loadRegistries(true);// 遍历 协议列表(默认为dubbo协议)for (ProtocolConfig protocolConfig : protocols) {// 根据不同的协议 暴露服务doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}

调用ServiceConfig的doExportUrlsFor1Protocol方法

/** * 真正的开始暴露服务 * registryURLs:registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&group=aaa&pid=4939 istry=zookeeper tamp=1666491937481 */private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {// 获取协议名称,默认为dubboString name = protocolConfig.getName();if (name == null || name.length() == 0) {name = “dubbo”;}// 组装URL的各个参数Map map = new HashMap();// side、dubbo、timestamp、pid的参数组装map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}// application、moudle、provider、protocol参数组装appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);// 遍历methods,然后放入map中if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + “.retry”;if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if (“false”.equals(retryValue)) {map.put(method.getName() + “.retries”, “0”);}}List arguments = method.getArguments();if (arguments != null && !arguments.isEmpty()) {for (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();// visit all methodsif (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class[] argtypes = methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() != -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());}} else {// multiple callbacks in the methodfor (int j = 0; j 0) {map.put(“revision”, revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn(“NO method found in service interface ” + interfaceClass.getName());map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), “,”));}}if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put(“notify”, “false”);}// export serviceString contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}// 获取 host、port,并组装URLString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? “” : contextPath + “/”) + path, map);// 默认不执行if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}// 获取URL的scopeString scope = url.getParameter(Constants.SCOPE_KEY);// don’t export when none is configured// scope 不等于 noneif (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote)// scope 不等于remoteif (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {// 本地暴露exportLocal(url);}// export to remote if the config is not local (export to local only when config is local)// scope 不等于 localif (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info(“Export dubbo service ” + interfaceClass.getName() + ” to url ” + url);}// registryURLs不为空if (registryURLs != null && !registryURLs.isEmpty()) {// 循环遍历url,远程暴露服务for (URL registryURL : registryURLs) {// 重新构造Dubbo的URLurl = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));// 构造monitor的URLURL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info(“Register dubbo service ” + interfaceClass.getName() + ” url ” + url + ” to registry ” + registryURL);}// 从URL中获取代理方式String proxy = url.getParameter(Constants.PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}// 根据 proxyFactory.getInvoker 获取invokerInvoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 服务远程暴露Exporter exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}} else {// registryURLs为空的场景Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url);}

本地暴露

ServiceConfig的exportLocal方法代码如下:

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();/** * url: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider tamp=1666493114123 */@SuppressWarnings({“unchecked”, “rawtypes”})private void exportLocal(URL url) {// protocol 不等于 injvm的话if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {// 构造本地暴露的URL// injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12906&side=provider tamp=1666493114123URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));// 执行protocol.export方法 本地暴露(重要)Exporter exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));// 放入缓存中exporters.add(exporter);logger.info(“Export dubbo service ” + interfaceClass.getName() + ” to local registry”);}}

此处的protocol是Dubbo的SPI接口,是根据ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 方法获取的,会先获取适配次Adaptive。具体SPI相关的代码请看上一篇《Dubbo的SPI机制简介解析》。

关于本地暴露的export的流程:

Protocol$Adaptive的export方法

@Overridepublic Exporter export(Invoker arg0) throws RpcException {// 判空校验if (arg0 == null) {throw new IllegalArgumentException(“com.alibaba.dubbo.rpc.Invoker argument == null”);}// url判空检验if (arg0.getUrl() == null) {throw new IllegalArgumentException(“com.alibaba.dubbo.rpc.Invoker argument getUrl() == null”);}// 如果协议参数为空,则默认为dubboURL url = arg0.getUrl();String extName = (url.getProtocol() == null ? “dubbo” : url.getProtocol());if (extName == null) {throw new IllegalStateException(“Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(” + url.toString() + “) use keys([protocol])”);}// 根据协议获取protocol的服务提供者,getExtension方法会先获取包装类(具体请看上一篇《Dubbo SPI核心源码解析》)Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);// 再调用export方法return extension.export(arg0);}

QosProtocolWrapper的export方法

@OverridepublicExporter export(Invoker invoker) throws RpcException {// 如果protocol等于registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// 则开启qosstartQosServer(invoker.getUrl());return protocol.export(invoker);}// 不等于registryreturn protocol.export(invoker);}

ProtocolListenerWrapper的export方法

@OverridepublicExporter export(Invoker invoker) throws RpcException {// 如果protocol 等于 registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// 继续调用exportreturn protocol.export(invoker);}// 如果 protocol 不等于 registry,则根据Dubbo的SPI 获取ExporterListener的服务提供者列表, 然后构造ListenerExporterWrapper。return new ListenerExporterWrapper(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}

InjvmProtocol的export方法

@OverridepublicExporter export(Invoker invoker) throws RpcException {// 构造InjvmExporterreturn new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);}

远程暴露

此处的protocol和本地暴露也是一样的。

远程暴露的流程

RegistryProtocol的export方法

@OverridepublicExporter export(final Invoker originInvoker) throws RpcException {//export invoker// 真正的服务暴露方法final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);// 获取注册中心的URL/** * 构造zk注册中心的URL * zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider tamp=1666495390735&group=aaa&pid=28778 tamp=1666495390726 */URL registryUrl = getRegistryUrl(originInvoker);// 获取注册中心ZookeeperRegistryfinal Registry registry = getRegistry(originInvoker);/** * 构造需要注册到ZK上的URL * dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider tamp=1666495390735 */final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);// 是否需要注册boolean register = registedProviderUrl.getParameter(“register”, true);//添加到注册表ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);// 如果需要注册的话,则注册服务,并将注册表中的服务的是否已经注册(isReg) 设置为trueif (register) {register(registryUrl, registedProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// Subscribe the override data// 订阅重写数据,在2.7.0之后的版本废弃,所以不做详细分析// 获取重写订阅的URLfinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);// 构造重写订阅的监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 订阅注册中心的重写数据registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 返回构造的exporterreturn new DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);}

订阅ZK具体有啥用:

Dubbo提供了动态配置的功能,当服务发布后,用户可以通过管理中心来进行动态配置,提供者可以感知这些配置的变化并更新到本地,如果需要将服务合并配置后重新发布。如当我们使用dubbo-admin 进行动态配置时,dubbo-admin 会在注册中心或配置中心创建配置节点(Dubbo 2.6 没有配置中心,会在注册中心上创建,而 Dubbo 2.7 则是在配置中心上创建),提供者只需要监听这些节点的变化即可。监听的首要工作是生成一个 URL 代表监听的节点信息,即本步的工作。

RegistryProtocol的doLocalExport方法

/** * registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider tamp=1666495390735&group=aaa&pid=28778 istry=zookeeper tamp=1666495390726 */privateExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {/** * 获取参数export的值 * dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=28778&side=provider tamp=1666495390735 */String key = getCacheKey(originInvoker);ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);// 默认为空(DCL 双重校验)if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper) bounds.get(key);if (exporter == null) {// 构造invokerfinal Invoker invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker));// 调用DubboProtocol的export方法exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);bounds.put(key, exporter);}}}return exporter;}

DubboProtocol的export方法

/** * 1.构造exporter,然后缓存到exporterMap中。 * 2.开启Netty服务 * 3.序列化URL(Dubbo默认不处理) * URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=53733&side=provider tamp=1666500518340 */@OverridepublicExporter export(Invoker invoker) throws RpcException {URL url = invoker.getUrl();// export service.// 获取需要暴露的服务(org.apache.dubbo.samples.api.client.HelloService:20890)String key = serviceKey(url);// 构造Dubbo的exporterDubboExporter exporter = new DubboExporter(invoker, key, exporterMap);// 翻入缓存exporterMap.put(key, exporter);//export an stub service for dispatching event// 如果是dubbo.stub.event或者is_callback_service这两个参数的值,没有的话默认为false(Dubbo默认流程不执行)Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException(“consumer [” + url.getParameter(Constants.INTERFACE_KEY) +”], has set stubproxy support event ,but no stub methods founded.”));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}// 开启服务openServer(url);// 序列化URLoptimizeSerialization(url);return exporter;}

Dubbo开启Netty服务的核心代码流程

DubboProtocol的openServer方法

private void openServer(URL url) {// find server.// 获取 address: 192.168.0.100:20890String key = url.getAddress();// 判断URL的isserver的值,默认为trueboolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) {// 判断server是否为空,第一次执行为空,后续不为空ExchangeServer server = serverMap.get(key);if (server == null) {// 构造一个netty服务。并缓存到serverMap中serverMap.put(key, createServer(url));} else {// server supports reset, use together with overrideserver.reset(url);}}}/** * 构造一个netty服务 */private ExchangeServer createServer(URL url) {// 设置URL的channel.readonly.sent为trueurl = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());// 设置URL的heartbeat 为60surl = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 获取URL的server,默认为nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException(“Unsupported server type: ” + str + “, url: ” + url);// 设置URL的codec为dubbourl = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {// 真正的开始netty服务server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException(“Fail to start server(url: ” + url + “) ” + e.getMessage(), e);}str = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException(“Unsupported client type: ” + str);}}return server;}

Exchangers的bind的方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException(“url == null”);}if (handler == null) {throw new IllegalArgumentException(“handler == null”);}url = url.addParameterIfAbsent(Constants.CODEC_KEY, “exchange”);// 获取Exchager,并绑定URL 和handlerreturn getExchanger(url).bind(url, handler);}

Exchangers的getExchanger方法,最后获取到HeaderExchanger服务提供者

/** * 获取Exchanger,通过Dubbo的SPI 获取 */public static Exchanger getExchanger(URL url) {// 默认类型为headerString type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);}/** * 通过SPI获取Exchagner –> HeaderExchanger */public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}

调用HeaderExchanger的bind方法

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 调用Transporters的bind方法获取Serverreturn new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}public static Server bind(URL url, ChannelHandler… handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException(“url == null”);}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException(“handlers == null”);}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}// 通过Dubbo 的SPI 获取Transporter,并绑定URL和Handlerreturn getTransporter().bind(url, handler);}

获取Transporter的服务提供者(通过Dubbo的SPI获取)

// 通过Dubbo的SPI获取Transporterpublic static Transporter getTransporter() {return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}

NettyTransporter的bind方法

public class NettyTransporter implements Transporter {public static final String NAME = “netty”;@Overridepublic Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);}@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener);}}

new NettyServer的代码逻辑

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}/** * netty真正启动逻辑 */public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);localAddress = getUrl().toInetSocketAddress();String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {// 初始化ServerBootstrap,并绑定addressdoOpen();if (logger.isInfoEnabled()) {logger.info(“Start ” + getClass().getSimpleName() + ” bind ” + getBindAddress() + “, export ” + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, “Failed to bind ” + getClass().getSimpleName()+ ” on ” + getLocalAddress() + “, cause: ” + t.getMessage(), t);}//fixme replace this with better methodDataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}@Overrideprotected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory(“NettyServerBoss”, true));ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory(“NettyServerWorker”, true));ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));bootstrap = new ServerBootstrap(channelFactory);final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();// https://issues.jboss.org/browse/NETTY-365// https://issues.jboss.org/browse/NETTY-379// final Timer timer = new HashedWheelTimer(new NamedThreadFactory(“NettyIdleTimer”, true));bootstrap.setOption(“child.tcpNoDelay”, true);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();/*int idleTimeout = getIdleTimeout();if (idleTimeout > 10000) {pipeline.addLast(“timer”, new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));}*/pipeline.addLast(“decoder”, adapter.getDecoder());pipeline.addLast(“encoder”, adapter.getEncoder());pipeline.addLast(“handler”, nettyHandler);return pipeline;}});// bindchannel = bootstrap.bind(getBindAddress());}

向Zookeeper注册服务

RegisteryProtocol的register方法

/** * 注册中心URL:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20890&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider tamp=1666504784182&group=aaa&pid=76029 tamp=1666504784173 * 服务的URL: dubbo://192.168.0.100:20890/org.apache.dubbo.samples.api.client.HelloService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=76029&side=provider tamp=1666504784182 */public void register(URL registryUrl, URL registedProviderUrl) {// 根据Dubbo 的SPI的获取注册中心(默认为ZookerRegistry,调用new ZookerRegistry()方法)Registry registry = registryFactory.getRegistry(registryUrl);// 调用ZK的注册中心注册 服务registry.register(registedProviderUrl);}

ZookeeperRegistry的register方法:

因为ZookeeperRegistry继承了FailbackRegistry,所以看FailbackRegistry的register方法

@Overridepublic void register(URL url) {// 把URL缓存到registered中super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// Sending a registration request to the server side// 真正注册URLdoRegister(url);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException(“Failed to register ” + url + ” to registry ” + getUrl().getAddress() + “, cause: ” + t.getMessage(), t);} else {logger.error(“Failed to register ” + url + “, waiting for retry, cause: ” + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyfailedRegistered.add(url);}}

ZookeeperRegistry的doRegister方法

@Overrideprotected void doRegister(URL url) {try {// 调用zkClient的create方法创建节点zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException(“Failed to register ” + url + ” to zookeeper ” + getUrl() + “, cause: ” + e.getMessage(), e);}}

Over.

© 版权声明

相关文章