Dubbo SPI机制

目录

1.SPI机制

2.ExtensionLoader源码

3.ExtensionFactory源码


最近两天忙里偷闲看了看dubbo的源码,参考书是诣极写的《深入理解Apache dubbo与实战》。上班的时候在公司的KM上做了些笔记,但是苦于文件发不出来,所以回来再记录一下,就当是备忘。我看的是2.6.*版本,最新的2.7.*相对旧版本是有些许改动的,不过影响不大,设计思想和流程都一样。

1.SPI机制

本质:一个接口(SPI),多种实现(扩展类)。

扩展类又分为三种:

(1)普通扩展类;

(2)包装扩展类;内部有对其他扩展类的引用,可以实现方法增强;装饰设计模式;

(3)自适应扩展类;通过自适应扩展类来调用方法时,可以根据url传入的参数或者其他参数动态决定加载哪一个扩展类实例来运行。可以用来“管理”普通扩展类,充当一个代理的角色。

2.ExtensionLoader源码

ExtensionLoader是SPI机制里最关键的一个类,可以实现为SPI加载扩展类的功能。其中有3个重要方法:getExtension,getActivateExtension,getAdaptiveExtension。从这3个方法入手阅读源码就差不多把这个类看完了。

(1)getExtension(根据name获取扩展类)

public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
 //name为“true”则获取认扩展类实例
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    final Holder<Object> holder = getorCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
    //double check确保单例;instance仍为空则创建实例
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

点进createExtension方法
private T createExtension(String name) {
 //先根据name拿到class对象
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
  //再根据class对象获取实例
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
        //没有获取到则创建实例
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
  //对实例完成依赖注入
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
    //对实例生成包装类实例,可以实现一些方法加强,本质上是装饰设计模式;包装类和扩展类一样,都实现了同一个SPI
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                type + ") Couldn't be instantiated: " + t.getMessage(), t);
    }
}

点进getExtensionClasses方法;此方法获取扩展类name->扩展类class对象的Map
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
    //double check;classes仍为空则进行加载
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

点进loadExtensionClasses方法;此方法可以从配置文件加载扩展类信息
private Map<String, Class<?>> loadExtensionClasses() {
 //获取SPI注解的value值,作为认扩展类的名称
    cacheDefaultExtensionName();
 //从各目录下的配置文件加载
    Map<String, Class<?>> extensionClasses = new HashMap<>();
    loadDirectory(extensionClasses, dubBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, dubBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, dubBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, dubBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

点进loadDirectory方法;
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
 //目录+接口名称=文件全路径
    String fileName = dir + type;
    try {
        Enumeration<java.net.URL> urls;
        ClassLoader classLoader = findClassLoader();
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL resourceURL = urls.nextElement();
    //对每个url进行加载,我理解是文件里定义的每一行
                loadResource(extensionClasses, classLoader, resourceURL);
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", description file: " + fileName + ").", t);
    }
}

点进loadResource;
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
     //去掉“#”后的注释部分
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
      //用“=”分开,左边是扩展类名称,右边是扩展类的类路径
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i + 1).trim();
                        }
                        if (line.length() > 0) {
       //加载这个扩展类
                            loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", class file: " + resourceURL + ") in " + resourceURL, t);
    }
}

点进loadClass;注意:
只是保存了扩展类的class对象,还没有真正实例化,第一次用到的时候再实例化
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
  //如果这个扩展类没有实现此SPI,则抛异常
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                type + ", class line: " + clazz.getName() + "), class "
                + clazz.getName() + " is not subtype of interface.");
    }
    if (clazz.isAnnotationPresent(Adaptive.class)) {
  //这个扩展类是自适应扩展类,则缓存之
        cacheAdaptiveClass(clazz);
    } else if (isWrapperClass(clazz)) {
  //这个扩展类是包装类,缓存之
        cacheWrapperClass(clazz);
    } else {
  //这个扩展类是普通的扩展类
        clazz.getConstructor();
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }
  //同一个扩展类可能有多个名称
        String[] names = NAME_SEParaTOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
   //如果这个扩展类打了@Activate注解,则缓存起来(getActivateExtension会用到)
            cacheActivateClass(clazz, names[0]);
            for (String n : names) {
    //缓存class对象->名字的映射
                cacheName(clazz, n);
    //缓存名字->class对象的映射
                saveInExtensionClass(extensionClasses, clazz, n);
            }
        }
    }
}

(2)getActivateExtension

入口方法:
/**
 * This is equivalent to {@code getActivateExtension(url, url.getParameter(key).split(","), null)}
 *
 * @param url   url
 * @param key   url parameter key which used to get extension point names
 * @param group group
 * @return extension list which are activated.
 * @see #getActivateExtension(org.apache.dubbo.common.URL, String[], String)
 */
public List<T> getActivateExtension(URL url, String key, String group) {
    String value = url.getParameter(key);
    return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}

//1.根据url和group对打了@Activate且没有被values指定的扩展类进行筛选,符合条件的就激活;
//2.values是url中指定的扩展类名称(可能有多个);第二步就是激活url指定的这些扩展类
public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<>();
    List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values);
 //如果url中指定了“-default”,则不激活打了@Activate注解的扩展类(直接跳过这个if语句块),只有url指定的扩展类才能激活
    if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
  //getExtensionClasses,确保下一步的cachedActivates已经被初始化
        getExtensionClasses();
  //对打了@Activate注解的扩展类进行检查,满足条件则加到exts中
        for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
            String name = entry.getKey();
            Object activate = entry.getValue();

            String[] activateGroup, activateValue;

            if (activate instanceof Activate) {
                activateGroup = ((Activate) activate).group();
                activateValue = ((Activate) activate).value();
            } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
            } else {
                continue;
            }
   //先匹配group,再匹配value
   //注意:如果url中指定了这个扩展类,则先不add,后面遍历names时再add(防止重复add);如果url中指定了“-name”,则name对应的扩展类也不会被激活
   //如果一个扩展类加了@Activate注解,group和value不匹配,但url指定了这个类,则最终还是可能被激活的
            if (isMatchGroup(group, activateGroup)
                    && !names.contains(name)
                    && !names.contains(REMOVE_VALUE_PREFIX + name)
                    && isActive(activateValue, url)) {
                exts.add(getExtension(name));
            }
        }
  //排序;先根据before和after来排,再根据order来排
        exts.sort(ActivateComparator.COMParaTOR);
    }
 //处理完打了@Activate注解的扩展类后,逐一处理url指定的扩展类
    List<T> usrs = new ArrayList<>();
    for (int i = 0; i < names.size(); i++) {
        String name = names.get(i);
  //name以“-”开头/url指定了“-name”,都会导致name对应的扩展类不被激活
        if (!name.startsWith(REMOVE_VALUE_PREFIX)
                && !names.contains(REMOVE_VALUE_PREFIX + name)) {
            if (DEFAULT_KEY.equals(name)) {
    //如果name为“default”且usrs不为空,则添加usrs到exts的开头部分。这个不太明白啥意思
                if (!usrs.isEmpty()) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                usrs.add(getExtension(name));
            }
        }
    }
    if (!usrs.isEmpty()) {
        exts.addAll(usrs);
    }
    return exts;
}

(3)getAdaptiveExtension


public T getAdaptiveExtension() {
 //从缓存获取实例
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
  //没有获取到,且存在创建时抛出的异常,则抛出
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance: " +
                    createAdaptiveInstanceError.toString(),
                    createAdaptiveInstanceError);
        }
  //尝试加锁创建实例
        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
            if (instance == null) {
    //double check;确保单例
                try {
     //创建实例并缓存
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                }
            }
        }
    }

    return (T) instance;
}


private T createAdaptiveExtension() {
    try {
  //创建实例并完成依赖注入
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}


private Class<?> getAdaptiveExtensionClass() {
 //执行这个方法可能触发加载,可能给cachedAdaptiveClass完成赋值
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
  //存在缓存则直接返回
        return cachedAdaptiveClass;
    }
 //不存在已定义好的自适应扩展类,则自动生成一个
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

//Compiler本质上也是一个SPI,有多种实现;
//所以是先获取Compiler的自适应扩展类,通过它调用compile方法,从而根据不同的情况动态加载不同的扩展类来编译
//最终认使用的是javassist扩展类;
//这里相当于是SPI的一个应用案例
private Class<?> createAdaptiveExtensionClass() {
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

3.ExtensionFactory源码

ExtensionFactory就是一个SPI,有三个扩展类:AdaptiveExtensionFactory,SpiExtensionFactory和SpringExtensionFactory。

AdaptiveExtensionFactory;看名字就知道它就是一个自适应扩展类。

调用它的getExtension方法时,本质上是调用了其他两个普通扩展类的方法


/**
 * AdaptiveExtensionFactory
 */
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
 //缓存ExtensionFactory的两个普通扩展类:SpiExtensionFactory和SpringExtensionFactory,由于字典顺序原因,前者在前
    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
  //先从SpiExtensionFactory找,再从SpringExtensionFactory找
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}


//getSupportedExtensions()方法可以触发加载,并获取普通扩展类的list
public Set<String> getSupportedExtensions() {
    Map<String, Class<?>> clazzes = getExtensionClasses();
    return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));
}

SpiExtensionFactory;一个普通扩展类;

通过getExtension获取的永远是自适应扩展类;


/**
 * SpiExtensionFactory
 */
public class SpiExtensionFactory implements ExtensionFactory {

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
            ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
            if (!loader.getSupportedExtensions().isEmpty()) {
    //外界获取的永远是自适应扩展类
                return loader.getAdaptiveExtension();
            }
        }
        return null;
    }

}

SpringExtensionFactory;一个普通扩展类;

可以实现从Spring上下文获取扩展类;

/**
 * SpringExtensionFactory
 */
public class SpringExtensionFactory implements ExtensionFactory {
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getExtension(Class<T> type, String name) {

        //SPI should be get from SpiExtensionFactory
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
   //SPI的扩展类必须由SpiExtensionFactory获取
            return null;
        }

        for (ApplicationContext context : CONTEXTS) {
   //遍历上下文先根据名字找
            if (context.containsBean(name)) {
                Object bean = context.getBean(name);
                if (type.isinstance(bean)) {
                    return (T) bean;
                }
            }
        }

        logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());

        if (Object.class == type) {
            return null;
        }

        for (ApplicationContext context : CONTEXTS) {
   //根据名字找不到,再根据类型找
            try {
                return context.getBean(type);
            } catch (NoUniqueBeanDeFinitionException multiBeanExe) {
                logger.warn("Find more than 1 spring extensions (beans) of type " + type.getName() + ", will stop auto injection. Please make sure you have specified the concrete parameter type and there's only one extension of that type.");
            } catch (NoSuchBeanDeFinitionException noBeanExe) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Error when get spring extension(bean) for type:" + type.getName(), noBeanExe);
                }
            }
        }

        logger.warn("No spring extension (bean) named:" + name + ", type:" + type.getName() + " found, stop get bean.");

        return null;
    }
}

 

初学dubbo源码,对SPI机制印象深刻的代码就是这些。SPI机制也是学习dubbo的基础,没有SPI就没有dubbo。初看的时候感觉很难,但跟着一个方法点进去看,感觉还是能看懂的。过几天又要忙起来了,等闲了继续往后面看。

相关文章

在网络请求时,总会有各种异常情况出现,我们需要提前处理这...
作者:宇曾背景软件技术的发展历史,从单体的应用,逐渐演进...
hello,大家好呀,我是小楼。最近一个技术群有同学at我,问我...
 一个软件开发人员,工作到了一定的年限(一般是3、4年左右...
当一个服务调用另一个远程服务出现错误时的外观Dubbo提供了多...
最近在看阿里开源RPC框架Dubbo的源码,顺带梳理了一下其中用...