Dubbo源码解析一-以zookeeper为注册中心,深入了解服务提供者暴露服务过程

1、dubbo作用

提供rpc调用。架构如下:

2、服务提供者做了什么

根据之前的文章https://mp.csdn.net/editor/html/112392102可以大致推测出,如果要提供一个远程服务调用,那么必须要对服务提供者实现一个服务暴露的过程。

首先从官网下载源码,如果GitHub下载较慢的小伙伴可以通过https://gitee.com/wydhcws/dubbo.git下载。

导入idea项目后,可以发现有一个demo调用,同时我将提供者的配置文件中的注册方式改为了zookeeper,进行debug看看过程。

2.1 xml配置文件如何进行实现

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
  -->
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- provider's application name, used for tracing dependency relationship -->
    <dubbo:application name="demo-provider"/>

<!--    <dubbo:registry address="multicast://224.5.6.7:1234" /> 对应的注册中心-->

    <dubbo:registry address="zookeeper://127.0.0.1:2181" />

    <!-- use dubbo protocol to export service on port 20880 暴露协议-->
    <dubbo:protocol name="dubbo"/>

    <!-- service implementation, as same as regular local bean 指定暴露接口demoService的具体实现-->
    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- declare the service interface to be exported 指定服务暴露接口-->
    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>

</beans>

spring中的知识,可以了解到  http://dubbo.apache.org/schema/dubbo对应的handler

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.config.spring.schema;

import org.apache.dubbo.common.Version;

import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.MetadataReportConfig;
import org.apache.dubbo.config.ModuleConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.MonitorConfig;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.ProviderConfig;
import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.spring.ConfigCenterBean;
import org.apache.dubbo.config.spring.ReferenceBean;
import org.apache.dubbo.config.spring.ServiceBean;

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

/**
 * dubboNamespaceHandler
 *
 * @export
 */
public class dubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(dubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDeFinitionParser("application", new dubboBeanDeFinitionParser(ApplicationConfig.class, true));
        registerBeanDeFinitionParser("module", new dubboBeanDeFinitionParser(ModuleConfig.class, true));
        registerBeanDeFinitionParser("registry", new dubboBeanDeFinitionParser(RegistryConfig.class, true));
        registerBeanDeFinitionParser("config-center", new dubboBeanDeFinitionParser(ConfigCenterBean.class, true));
        registerBeanDeFinitionParser("Metadata-report", new dubboBeanDeFinitionParser(MetadataReportConfig.class, true));
        registerBeanDeFinitionParser("monitor", new dubboBeanDeFinitionParser(MonitorConfig.class, true));
        registerBeanDeFinitionParser("metrics", new dubboBeanDeFinitionParser(MetricsConfig.class, true));
        registerBeanDeFinitionParser("provider", new dubboBeanDeFinitionParser(ProviderConfig.class, true));
        registerBeanDeFinitionParser("consumer", new dubboBeanDeFinitionParser(ConsumerConfig.class, true));
        registerBeanDeFinitionParser("protocol", new dubboBeanDeFinitionParser(ProtocolConfig.class, true));
        registerBeanDeFinitionParser("service", new dubboBeanDeFinitionParser(ServiceBean.class, true));
        registerBeanDeFinitionParser("reference", new dubboBeanDeFinitionParser(ReferenceBean.class, false));
        registerBeanDeFinitionParser("annotation", new AnnotationBeanDeFinitionParser());
    }

}

可以了解到

service为通过ServiceBean进行加载,

先是执行对应的afterPropertiesSet对配置进行一些注册等,然后判断是支持supportedApplicationListener,如果不支持则进行服务暴露

        if (!supportedApplicationListener) {
            export();
        }

否则通过监听实现export

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

具体export如下,调用父类export

    @Override
    public void export() {
        super.export();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

    public synchronized void export() {
        // 校验和更新暴露配置
        checkAndUpdateSubConfigs();

        // 判断是否进行服务暴露
        if (!shouldExport()) {
            return;
        }

        // 是否需要延迟暴露,需要则进行延迟暴露
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 进行服务暴露
            doExport();
        }
    }

继续跟进,一些简单的校验:

    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

根据Urls进行服务暴露:

    private void doExportUrls() {
        // 获取注册中心对应的url
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //获取暴露的接口interface的全限定名 本案例为org.apache.dubbo.demo.DemoService
            String pathKey = URL.buildKey(getcontextpath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            // 获取需要暴露的对象信息,pathKey接口全限定名、ref具体接口的实例impl,interfaceClass接口
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            // 放入PROVIDED_SERVICES 缓存中
            ApplicationModel.initProviderModel(pathKey, providerModel);
            // 进行服务暴露
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

具体的loadRegistires主要为通过配置文件中的注册中心等等生成注册中心的url列表,本次结果如下:

接着根据指定的服务暴露协议进行服务暴露,认情况为使用dubbo协议,配置中使用dubbo。

总结出:即为多个注册中心和不同协议进行服务暴露。

具体协议中暴露:

doExportUrlsFor1Protocol方法很长主要过程

2.2 若暴露协议为空则用dubbo协议进行服务暴露

        String name = protocolConfig.getName();
        // 若为空认使用dubbo进行暴露
        if (StringUtils.isEmpty(name)) {
            name = dubBO;
        }

2.3 根据各种配置生成对应的map,并获取的对应的URL

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);

        appendRuntimeParameters(map);
        appendParameters(map, metrics);
        appendParameters(map, application);
        appendParameters(map, module);
        。。。。。

本次生成如下:

然后根据map等获取到服务暴露方的ip和host

        // export service 获取服务暴露的host和port
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);

在封装成对应的URL(根据对应的协议、组成中心已经对应的服务暴露方ip、port等)

URL url = new URL(name, host, port, getcontextpath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

2.4 进行服务暴露

先是判断是否有指定对应的服务暴露方式

// 若为空则进行本地服务暴露+远程服务暴露  如为指定本地则只进行本地暴露等
String scope = url.getParameter(ScopE_KEY);

2.4.1  本地服务暴露

    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        Exporter<?> exporter = protocol.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

local如下:

// 根据url和ref以及interfaceClass生成对应的代理类Invoker
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local)

//认使用javassist生成对应代理,通过生成代码中进行重新编写一个对应Clsee的方法进行代理,不需要通过反射直接调用。

在根据对应的暴露协议
本地为injvm协议进行服务暴露:
injvm协议进行export方法如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }


// 在对应的缓存中放入对应的AbstractExporter 对象实现缓存暴露完成
    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
    }

// 并放入exporters列表 本地暴露完成

2.4.2  远程服务暴露

                    // 对所有注册中心列表的url进行分别的服务暴露
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterandEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        // 根据url等进行封装invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterandEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 根据对应的协议进行服务暴露,认为dubbo协议
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }

进行RegistryProtocol的export:

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 获取注册中心的url 进行注册中心暴露
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally 获取dubbo本地的服务远端使用暴露url
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker 首先是dubbo协议的远端暴露
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry 进行组测中心的注入
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

dubbo协议暴露过程如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service. 获取对象的export service和对应的端口,认使用20880 如本案例org.apache.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
        // 封装成对应的exporter对象
        dubboExporter<T> exporter = new dubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isstubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isstubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        // 根据暴露url进行开启服务端的连接
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

服务端的连接开启:

    // 根据url 开启server,并放入对应的map
    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }



    private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, dubboCodec.NAME)
                .build();
        // 认使用netty进行开启服务端连接
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 通过Exchangers进行服务的绑定
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

认使用headerExchanger

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }


//Transporters.bind

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerdispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

// 根据exporter进行bind如下
    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
//生成一个nettyServer,调用模板方法,进行open
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            doopen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }



// 通过netty进行开启一个服务监听
    @Override
    protected void doopen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.so_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

总结出dubbo协议底层是通过netty进行开启对应的ServerSocket

开启了对应的ServerSocket后进行注册中心暴露

注册中心注入

    // 对url进行注入 zookeeper为在provider中创建对应的临时节点
    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

    // zookeeper中对应
    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

   // 创建完成后进行订阅,provider会创建对应configurators如/dubbo/org.apache.dubbo.demo.DemoService/configurators 进行监听

    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        // 如果子节点有变化则会收到通知遍历所有的子节点
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) { // 如果存在子节点还未订阅,说明是新节点进行订阅
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false); // 创建持久节点,接下来订阅持久节点的子节点
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {// 遍历所有子节点进行订阅
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls); // 回调NotifyListener,更新本地缓存
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

 

相关文章

在网络请求时,总会有各种异常情况出现,我们需要提前处理这...
作者:宇曾背景软件技术的发展历史,从单体的应用,逐渐演进...
hello,大家好呀,我是小楼。最近一个技术群有同学at我,问我...
 一个软件开发人员,工作到了一定的年限(一般是3、4年左右...
当一个服务调用另一个远程服务出现错误时的外观Dubbo提供了多...
最近在看阿里开源RPC框架Dubbo的源码,顺带梳理了一下其中用...