Dubbo 源码解析2 - Dubbo 服务导出

直接进入服务导出入口 ServiceConfig#export 方法

    //服务导出的入口
    public synchronized void export() {
        if (!shouldExport()) {
            return;
        }

        //获取单例的 dubboBootstrap 并初始化 dubboBootstrap
        //这一步 boostrap 不是null, 因为 bootstrap 已经在 dubboBootstrapApplicationListener.onContextRefreshedEvent 方法中启动了
        if (bootstrap == null) {
            bootstrap = dubboBootstrap.getInstance();
            bootstrap.initialize();
        }

        //检查并更新配置信息
        checkAndUpdateSubConfigs();

        //init serviceMetadata
        serviceMetadata.setVersion(getVersion());
        serviceMetadata.setGroup(getGroup());
        serviceMetadata.setDefaultGroup(getGroup());
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());

        //如果需要延时导出,导出服务加入定时任务,否则立即导出
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

        exported();
    }

 ServiceConfig#export 方法主要就是校验,配置服务元数据信息,执行真正的导入方法 ServiceConfig#exported,最后调用导入成功之后的方法 ServiceConfig#exported

 

 进入 ServiceConfig#exported 方法

    /**
     * 执行服务导入,由 {@link #export()} 调用
     */
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        //检查是否需要导出,如果不需要导出,则返回,
        // 极少情况下是不需要导出的,如本地debug
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        //从需要导出的 URL 列表中导出服务
        doExportUrls();
    }

 这一步接近一个空壳方法,还是要调用 ServiceConfig#doExportUrls 方法从需要导出的 URL 列表中导出服务

 

 进入 ServiceConfig#doExportUrls 方法

    /**
     * 从需要导出的 URL 列表中导出服务,由 {@link #doExport}调用
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        //向 repository 注册服务和服务提供者,服务提供者被包装成了一个 ProviderModel 模型
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

        //加载所有注册中心链接,这里是 dubbo 支持多协议多注册中心导出服务的关键
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getcontextpath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            //如果用户明确指定了路径,注册服务到其指定的路径上
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            // Todo, uncomment this line once service key is unified
            serviceMetadata.setServiceKey(pathKey);
            //根据协议导出服务
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

 这一步是要导出URL 列表中的所有服务,它先向 repository 注册服务和服务提供者,服务提供者被包装成了一个 ProviderModel 模型,然后调用 ConfigValidationUtils#loadRegistries 方法加载所有注册中心连接,这一步是 dubbo 支持多协议多注册中心导出服务的关键,最后迭代不同的协议,调用 ServiceConfig#doExportUrlsFor1Protocol 方法根据协议导出服务。

 有关注册中心的模型看下图,主要包含了注册中心的URL字符串,协议类型,名称,密码,IP地址,端口等。

 

 进入 ConfigValidationUtils#loadRegistries 方法,看 dubbo 是如何加载注册中心

    public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
        // check && override if necessary
        //检查并覆盖 URL中的一些属性
        List<URL> registryList = new ArrayList<URL>();
        //获取<dubbo:application .../>信息
        ApplicationConfig application = interfaceConfig.getApplication();
        //获取<dubbo:registry .../>信息
        List<RegistryConfig> registries = interfaceConfig.getRegistries();
        if (CollectionUtils.isNotEmpty(registries)) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (StringUtils.isEmpty(address)) {
                    // 若 address 为空,则将其设为 0.0.0.0
                    address = ANYHOST_VALUE;
                }
                if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    //定义一个保存属性的map 并将 application, config, AbstractInterfaceConfig 中的一些属性加入 map
                    Map<String, String> map = new HashMap<String, String>();
                    AbstractConfig.appendParameters(map, application);
                    AbstractConfig.appendParameters(map, config);
                    map.put(PATH_KEY, RegistryService.class.getName());
                    AbstractInterfaceConfig.appendRuntimeParameters(map);
                    // 如果 protocol 为空,认使用 dubbo 协议
                    if (!map.containsKey(PROTOCOL_KEY)) {
                        map.put(PROTOCOL_KEY, dubBO_PROTOCOL);
                    }
                    // 通过address 和 map 解析并生成 registry URL 列表
                    List<URL> urls = UrlUtils.parseURLs(address, map);

                    for (URL url : urls) {

                        url = URLBuilder.from(url)
                                .addParameter(REGISTRY_KEY, url.getProtocol())
                                .setProtocol(extractRegistryType(url))  //// 将 URL 协议头设置为 registry 或 service-discovery-registry
                                .build();
                        // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
                        // (服务提供者 && register = true 或 null)
                        //    || (非服务提供者 && subscribe = true 或 null)
                        if ((provider && url.getParameter(REGISTER_KEY, true))
                                || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;
    }

 

进入 ServiceConfig#doExportUrlsFor1Protocol 方法

    /**
     * 不同的协议导出的 URL 不同
     */
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        // 如果协议名为空,或空串,则将协议名变量设置为 dubbo
        if (StringUtils.isEmpty(name)) {
            name = dubBO;
        }

        //将所有导出服务的信息添加到 map 中
        Map<String, String> map = new HashMap<String, String>();
        //side 为 provider, 表明导出的是服务的生产者
        map.put(SIDE_KEY, PROVIDER_SIDE);

        // 添加版本、时间戳以及进程号等信息到 map 中
        ServiceConfig.appendRuntimeParameters(map);
        AbstractConfig.appendParameters(map, getMetrics());

        // 通过反射将对象的字段信息添加到 map 中
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);

        MetadataReportConfig MetadataReportConfig = getMetadataReportConfig();
        if (MetadataReportConfig != null && MetadataReportConfig.isValid()) {
            map.putIfAbsent(MetaDATA_KEY, REMOTE_MetaDATA_STORAGE_TYPE);
        }

        //添加method 信息,getmethods() 返回 MethodConfig 列表, MethodConfig 中存储了 <dubbo:method> 标签的配置信息
        if (CollectionUtils.isNotEmpty(getmethods())) {
            for (MethodConfig method : getmethods()) {
                // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
                // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
                // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
                AbstractConfig.appendParameters(map, method, method.getName());

                // 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }

                // 获取 ArgumentConfig 列表
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        // 检测 type 属性是否为空,或者空串(分支1 ⭐️)
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getmethods();
                            // visit all methods
                            if (methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

        // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
            // 获取接口类的所有方法名称
            String[] methods = Wrapper.getWrapper(interfaceClass).getmethodNames();
            // 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 methods = init,destroy
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }

        /**
         * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
         */
        // 先用 token 属性保存 provider 的 token
        if(ConfigUtils.isEmpty(token) && provider != null) {
            token = provider.getToken();
        }

        // 如果token 不为空 token,根据 token 是否为 “true” 或者 “default” 来判断是生成随机 token 还是将现有 token 加入 map 中
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);

        // export service
        //服务导出
        //生成导出服务的 URL, 协议://ip:端口号/服务接口名称?参数列表
        String host = findConfigedHosts(protocolConfig, registryURLs, map); //获取服务 IP 地址
        Integer port = findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getcontextpath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(ScopE_KEY);
        // scope 为 none 的时候不导出
        // don't export when none is configured
        if (!ScopE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            // scope != remote ,导出到本地
            if (!ScopE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            // scope != local,导出到远程
            if (!ScopE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        // 如果 协议为 injvm, 不用注册协议
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        // 加载监视器链接
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            // 将监视器链接作为参数添加到 url 中
                            url = url.addParameterandEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        // 为服务提供类(ref)生成 Invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterandEncoded(EXPORT_KEY, url.toFullString()));
                        // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 导出服务,并生成 Exporter
                        /**
                         * 这里调用的是 {@link RegistryProtocol#export(org.apache.dubbo.rpc.Invoker)
                         */
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                // 不存在注册中心,仅导出服务
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService MetadataService = WritableMetadataService.getExtension(url.getParameter(MetaDATA_KEY, DEFAULT_MetaDATA_STORAGE_TYPE));
                if (MetadataService != null) {
                    MetadataService.publishServiceDeFinition(url);
                }
            }
        }
        this.urls.add(url);
    }

 

  ServiceConfig#doExportUrlsFor1Protocol 方法的作用主要是根据协议 protocolConfig 来导出服务,它主要执行了以下几步:

  1. STEP1. 将所有导出服务的属性添加到 map 中;
  2. STEP2.  解析MethodConfig,MethodConfig 就是 <dubbo:method> 标签
  3. STEP3:获取要导出类的所有方法名称,将这些方法名称用 “,” 分隔保存在 map 中;
  4. STEP4:添加 token 信息、添加所有 serviceMetadata 的 attachments 属性到 map 中;
  5. STEP5:前几步都是导出的准备工作,主要是用 map 收集所有导出属性,从这一步才开始执行真正的导出工作。现获取当前服务的 host 和 port。然后用协议名称 name、主机地址host、端口 port、导出服务的全类名 path 和 收集到的 map 属性 生成 URL。生成的 URL 格式为 协议://ip:端口号/服务接口名称?参数列表,如 name = dubbo,host=192.168.11.11,port=20880,path = org.apache.dubbo.demo.DemoService,map={"side":"provider","methods":"sayHello,sayHelloAsync",...},那么生成的 URL 格式为:dubbo://192.168.11.11:20880/org.apache.dubbo.demo.DemoService?methods=sayHello,sayHelloAsync&side=provider;
  6. STEP6:获取 url 中的 scop 参数,如果 scop != none,代表要执行导出逻辑,先判断如果 scope != remote,调用 ServiceConfig#exportLocal 方法导出到本地,再判断如果 scope != local,代表要导出到远程,进一步再判断注册中心 registryURLs 是否为空,如果不为空,将服务注册注册中心,如果 protocol == injvm,直接跳过不用注册服务,否则,先加载监视器 url,并将监视器 url作为参数添加到导出服务 url 中,然后将导出服务url 作为参数加到注册中心 url 中, 再根据真正的服务提供类 ref、服务接口 interfaceClass、服务注册中心 registryURL 调用 ProxyFactory#getInvoker 方法生成一个 invoker,dubbo 认是用 Javassist 库作为字节码生成工具的,所有这里认的也就是 JavassistProxyFactory#getInvoker 方法,用 DelegateProviderMetaDataInvoker 类包装 invoker 和 serviceConfig 信息生成 wrapperInvoker,最后调用 Protocol#export 方法生成导出服务对象 exporter,注册中心导出服务的方法一般是 RegistryProtocol#export,最后添加 exporter 到缓存 exporters 中。但是如果注册中心 registryURLs 为空,代表不存在注册中心,仅调用 Protocol#export 方法导出服务即可;
  7. STEP7:将导出服务 url 添加到 urls 缓存中

  

 进入 ServiceConfig#exportLocal 方法,看看服务是如何导出到本地

    /**
     * 服务导出到本地,就是导出到 JVM
     */
    /**
     * always export injvm
     */
    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

 ServiceConfig#exportLocal 方法很简单,就是用原 url 的值,替换 protocol=injvm,host=127.0.0.1,port=0 生成一个新的 url, 并调用 Protocol#export 方法导出服务即可

 

 进入 JavassistProxyFactory#getInvoker 方法

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // Todo Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

 JavassistProxyFactory#getInvoker 方法会先根据参数调用 Wrapper#getWrapper 方法生成一个 wrapper,再返回一个 AbstractProxyInvoker 匿名类对象。

  

 进入 Wrapper#getWrapper 方法

    /**
     * get wrapper.
     *
     * @param c Class instance.
     * @return Wrapper instance(not null).
     */
    public static Wrapper getWrapper(Class<?> c) {
        while (ClassGenerator.isDynamicclass(c)) // can not wrapper on dynamic class.
        {
            c = c.getSuperclass();
        }

        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }

        return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key));
    }

 Wrapper#getWrapper 方法调用 Wrapper#makeWrapper 方法创建一个 wrapper,加入 WRAPPER_MAP 并返回。

 

 进入 Wrapper#makeWrapper 方法

  Todo

 

 进入 RegistryProtocol#export 方法,看看是如何注册和导出的

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
        // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        // 获取注册的服务提供者 URL,比如:
        // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        // 获取订阅 URL,比如:
        // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        // 创建监听器
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        //导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        //向注册中心注册服务
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        //通知服务导出的监听
        notifyExport(exporter);
        // 创建并返回 DestroyableExporter
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

 

 RegistryProtocol#export 方法主要有两件事,导出服务和注册服务

  1. STEP1:前期准备:先获取支持中心URL,比如:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    ,再获取注册服务的url ,比如:dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,再获取订阅 URL,比如:provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,根据 订阅url 和 originInvoker 生成监听器 OverrideListener 并加入 overrideListeners 缓存,根据 overrideSubscribeListener 覆盖原 providerUrl 中的一些信息。
  2. STEP2:调用 RegistryProtocol#doLocalExport 方法导出服务
  3. STEP3:调用 RegistryProtocol#register 方法注册中心注册服务
  4. STEP4:调用 RegistryProtocol#notifyExport 方法通知服务导出监听
  5. STEP5:创建并返回 DestroyableExporter

  

 进入 RegistryProtocol#doLocalExport 方法

    /**
     * 导出服务, 调用 protocol#export() 导出,认 export 为 {@link org.apache.dubbo.rpc.protocol.dubbo.dubboProtocol#export(org.apache.dubbo.rpc.Invoker)}
     * @param originInvoker
     * @param providerUrl
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

 这个方法很简单,就是调用 Protocol#export 创建 Exporter, 包装成 ExporterChangeableWrapper 加入 bounds 缓存并返回,dubbo 认使用的protocol 为 dubboProtocol,所以这里调用的是 dubboProtocol#export 方法

 

 进入  dubboProtocol#export 方法,看看dubbo协议是如何导出服务的

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
        // demogroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        // export service.
        String key = serviceKey(url);
        // 创建 dubboExporter
        dubboExporter<T> exporter = new dubboExporter<T>(invoker, key, exporterMap);
        // 将 <key, exporter> 键值对放入缓存中
        exporterMap.put(key, exporter);

        // 本地存根相关代码
        //export an stub service for dispatching event
        Boolean isstubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isstubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }

        // 启动或刷新服务器
        openServer(url);
        // 优化序列化
        optimizeSerialization(url);

        return exporter;
    }

 dubboProtocol#export 方法会先获取服务标识 key,服务标识由服务组名,服务名,服务版本号以及端口组成。比如:demogroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880,然后创建 dubboExporter对象,加入 exporterMap 缓存,然后调用 openServer(URL url) 启动服务,调用optimizeSerialization(url) 优化序列化,最后返回 exporter。

 

 进入 openServer(URL url) 方法

    private void openServer(URL url) {
        // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //双重检查锁创建服务器并加入缓存
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        //创建服务器并加入缓存
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

 openServer(URL url) 方法会先判断 server 是否为空,如果为空,DCL 双重检查锁调用 createServer(url) 方法创建一个 server 加入缓存并返回,如果不为空,reset 重置。

 

 进入 createServer(url) 方法

    /**
     * 创建服务器
     */

    private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                //服务关闭时发送 readonly 事件
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                //添加心跳检测
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                // 添加编码解码器参数
                .addParameter(CODEC_KEY, dubboCodec.NAME)
                .build();
        // 获取 server 参数,认为 netty
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 创建 ExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        // 获取 client 参数,可指定 netty,mina
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            // 检测当前 dubbo 所支持的 Transporter 实现类名称列表中,
            // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new dubboProtocolServer(server);
    }

 

 createServer(url) 方法会先根据原 url 添加一些新的参数如readonly 事件,心跳检测,编解码生成一个新的 url,再获取 url 中的 server 参数,认为 netty,通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,如果不存在,抛出异常,再调用 Exchangers#bind(URL, ExchangeHandler) 方法绑定url 和exchangeHandler 并返回一个 ExchangeServer,再通过 SPI 检测 url 中的 client 参数列表中是否都有支持的 Transporter,如果有一个不支持,抛出异常,最后生成一个 dubboProtocolServer 并返回

  

 进入 Exchangers#bind(URL, ExchangeHandler) 方法

  Todo

 

 进入 RegistryProtocol#register 方法查看服务注册逻辑

    private void register(URL registryUrl, URL registeredProviderUrl) {
        // 获取 Registry 并注册服务
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

 一个空壳方法,实际调用的是  RegistryService#register 方法,这里调用到的方法是 ListenerRegistryWrapper.register 方法

 

 进入 ListenerRegistryWrapper.register 方法

    @Override
    public void register(URL url) {
        try {
            registry.register(url);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.onRegister(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }

 ListenerRegistryWrapper.register 方法一个 RegistryService 的装饰类,它内部保存了一个真正的 registry,比如注册中心使用zookeeper,那么就真正调用的是 ZookeeperRegistry#register 方法,并在注册完之后调用  RegistryServiceListener#onRegister 方法通知所有监听器

  

 至此服务注册和导出完成

相关文章

在网络请求时,总会有各种异常情况出现,我们需要提前处理这...
作者:宇曾背景软件技术的发展历史,从单体的应用,逐渐演进...
hello,大家好呀,我是小楼。最近一个技术群有同学at我,问我...
 一个软件开发人员,工作到了一定的年限(一般是3、4年左右...
当一个服务调用另一个远程服务出现错误时的外观Dubbo提供了多...
最近在看阿里开源RPC框架Dubbo的源码,顺带梳理了一下其中用...