Invoker 表示远程通信的对象
Directory 表示服务地址列表
服务发布过程
dubbo源码使用样例(不使用Spring-Boot的Starter组件):
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
system.in.read();
}
@Configuration
@Enabledubbo(scanBasePackages = "com.anto.dubbo.dubboprovider")
@PropertySource("classpath:/spring/dubbo-provider.properties")
static class ProviderConfiguration {
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://172.30.2.7:2181");
return registryConfig;
}
}
}
而在dubbo-spring-boot-starter
组件中,则可以直接不带@Enabledubbo
直接在properties文件配置扫描路径即可。
dubbo.registry.address=zookeeper://172.30.2.7:2181
dubbo.scan.base-packages=com.anto.dubbo.dubboprovider
是因为自动装配类中,dubboRelaxedBinding2AutoConfiguration
会将上述配置绑定至指定的Bean中。
@Bean(name = BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME)//dubboScanBasePackagesPropertyResolver
public PropertyResolver dubboScanBasePackagesPropertyResolver(ConfigurableEnvironment environment) {
ConfigurableEnvironment propertyResolver = new AbstractEnvironment() {
@Override
protected void customizePropertySources(MutablePropertySources propertySources) {
//查找properties文件中的 dubbo.scan. 配置
Map<String, Object> dubboScanProperties = getSubProperties(environment.getPropertySources(), dubBO_SCAN_PREFIX);
propertySources.addLast(new MapPropertySource("dubboScanProperties", dubboScanProperties));
}
};
ConfigurationPropertySources.attach(propertyResolver);
return new DelegatingPropertyResolver(propertyResolver);
}
查找待发布的服务--扫描xml或注解
dubbo服务发布的形式
- xml形式
- 注解形式
@Enabledubbo
包含了两个注解@EnabledubboConfig
、@dubboComponentScan
疑问:dubbo启动时 @Enabledubbo
是否是必须的注解?
非必须的注解,当用Spring-Boot方式集成Starter组件时,扫描路径是直接读取application.properties文件的;
至于ServiceAnnotationBeanPostProcessor则在dubboAutoConfiguration声明了该Bean。
以下都是基于注解的方式来进行初始化的。
- ServiceAnnotationBeanPostProcessor
在@dubboComponentScan
注解中会import类dubboComponentScanRegistrar
,然后预先往IOC容器中注册几个BeanDeFinition。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(dubboComponentScanRegistrar.class)
public @interface dubboComponentScan {//该注解Import了dubboComponentScanRegistrar类
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDeFinitionRegistry registry) {
//注册ServiceAnnotationBeanPostProcessor的BeanDeFinition
BeanDeFinitionBuilder builder = rootBeanDeFinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDeFinition.ROLE_INFRASTRUCTURE);
AbstractBeanDeFinition beanDeFinition = builder.getBeanDeFinition();
BeanDeFinitionReaderUtils.registerWithGeneratedname(beanDeFinition, registry);
}
注册完ServiceAnnotationBeanPostProcessor
的BeanDeFinition后,就应该是将该Bean进行实例化。
//AbstractApplicationContext 调用refresh()方法时触发
invokebeanfactoryPostProcessors(beanfactory);
//PostProcessorRegistrationDelegate 触发getBean的过程
currentRegistryProcessors.add(beanfactory.getBean(ppName, BeanDeFinitionRegistryPostProcessor.class));
初始化完成后,就应该是真正开始其作用了。
而它实现了BeanDeFinitionRegistryPostProcessor
,那么就应该是调用其postProcessBeanDeFinitionRegistry()
方法。
public void postProcessBeanDeFinitionRegistry(BeanDeFinitionRegistry registry) throws BeansException {
// 再次保证注册了dubboBootstrapApplicationListener 其实在`@dubboComponentScan注解中,
//导入`dubboComponentScanRegistrar`类时已经注册了
registerBeans(registry, dubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
//开始注册带有dubbo注解的Bean
registerServiceBeans(resolvedPackagesToScan, registry);
}
//...略
}
跟Mybatis中类似,dubbo也定义了专门用来扫描指定路径的类dubboClasspathBeanDeFinitionScanner
。
private void registerServiceBeans(Set<String> packagesToScan, BeanDeFinitionRegistry registry) {
dubboClasspathBeanDeFinitionScanner scanner =
new dubboClasspathBeanDeFinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
// serviceAnnotationTypes是一个list 包含dubboService.class 和Service.class
//使得scanner只扫描带这俩路径的注解
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});
for (String packagetoScan : packagesToScan) {
// Registers @Service Bean first
scanner.scan(packagetoScan);
// Finds all BeanDeFinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDeFinitionHolder> beanDeFinitionHolders =
findServiceBeanDeFinitionHolders(scanner, packagetoScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDeFinitionHolders)) {
for (BeanDeFinitionHolder beanDeFinitionHolder : beanDeFinitionHolders) {
registerServiceBean(beanDeFinitionHolder, registry, scanner);
}
//...略
}
}
服务的发布入口
当dubbo的服务扫描完成后,需要发布服务,发布服务我们需要考虑以下的要点:
服务以什么协议发布
服务发布的端口
- 服务发布的入口
dubboBootstrapApplicationListener
监听了ContextRefreshedEvent
事件,当Spring完成Bean的装载后,会触发事件的接口,为真正发布服务入口
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
那么该类是在哪里进行注册的呢?
ServiceClasspostProcessor
触发方法postProcessBeanDeFinitionRegistry()
时会显式的注册dubboBootstrapApplicationListener
的Bean。
// 此处是为了保证有dubboBootstrapApplicationListener的Bean
//其实在`@dubboComponentScan注解中,导入`dubboComponentScanRegistrar`类时已经注册了
registerBeans(registry, dubboBootstrapApplicationListener.class);
- 服务发布的流程
真正开始进行dubbo服务的发布是通过dubboBootstrap
类的start()
来完成的。
public dubboBootstrap start() {
if (started.compareAndSet(false, true)) {
//...略
//初始化必要的组件 配置中心、注册中心、校验必要的配置等
initialize();
// 1. 暴露dubbo服务
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
//...略
return this;
}
接下里调用 dubboBootstrap
类的exportServices()
方法
private void exportServices() {
//逐个服务暴露
configManager.getServices().forEach(sc -> {
// Todo, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
- 关键对象
ServiceConfig
服务发布过程大致可以分为三个过程:
1、生成具体服务的Invoker对象
2、发布协议服务(默认以dubbo发布),Invoker转换生成成Exporter
//ServiceConfig.doExportUrls()
private void doExportUrls() {
//得到服务仓库ServiceRepository 将服务注册进去全局唯一的服务仓库对象中
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
//注册中心地址集合
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//默认只有dubbo协议 默认的ProtocolConfig对象 <dubbo:protocol name="dubbo" port="20880" />
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getcontextpath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// Todo, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
根据 RegistryConfig
的配置,组装 registryURL,形成的 URL 格式如下:
registry://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=65324®istry=zookeeper&release=2.7.7×tamp=1604025007827
这个 URL 表示它是一个 registry 协议(RegistryProtocol),地址是注册中心的ip:port,服务接口是 RegistryService,registry 的类型为 zookeeper。在有多个注册中心时,会生成多个registryURL。
接下来开始根据具体的协议(默认的dubbo协议)暴露服务,同时将服务注册到一(多)个注册中心。
doExportUrlsFor1Protocol()
本地暴露服务
在ServiceConfig
中调用doExportUrlsFor1Protocol()
方法进行服务暴露时,会有如下判断:
//当没有显式的指定scope的值为remote时,会进行本地暴露
if (!ScopE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
injvm://127.0.0.1/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=65324&release=2.7.7&side=provider×tamp=1604025491044
通过指定scope的值,显式指定以何种方式暴露服务。
<dubbo:service interface="org.apache.dubbo.samples.local.api.DemoService" ref="target" scope="remote"/>
#指定消费者提供端的暴露服务方式 不指定将以dubbo、injvm同时暴露
dubbo.provider.scope=remote
使用 dubbo 本地调用不需做特殊配置,按正常 dubbo 服务暴露服务即可。
任一服务在暴露远程服务的同时,也会同时以 injvm 的协议暴露本地服务。injvm 是一个伪协议,不会像其他协议那样对外开启端口,只用于本地调用的目的。
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
因为发布injvm协议时,其协议头是injvm,所以PROTOCOL根据自适应扩展点得到的是InjvmProtocol
。所以此处生成的的Exporter对象是InjvmExporter
类型。
那么通过injvm方式来暴露服务有什么好好处呢?
与本地对象上方法调用不同的是,dubbo 本地调用会经过 Filter 链,其中包括了 Consumer 端的 Filter 链以及 Provider 端的 Filter 链。通过这样的机制,本地消费者和其他消费者都是统一对待,统一监控,服务统一进行治理。
本地调用何时是无用的?
第一,泛化调用的时候无法使用本地调用。
第二,消费者明确指定 URL 发起直连调用。
生成Invoker对象
调用器,是dubbo领域比较重要的一个对象,在服务的发布和调用过程中,服务本身会以Invoker对象存在。不管是发布dubbo服务还是发布本地的injvm服务,都需要生成一个Invoker对象。
//发布dubbo服务
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterandEncoded(EXPORT_KEY, url.toFullString()));
dubbo中都是通过生成一个Invoker对象,然后PROTOCOL.export(wrapperInvoker);
来完成服务的发布。
//发布本地的injvm服务
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
PROXY_FACTORY
是一个自适应扩展点得到的一个对象。
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
看到这,那么肯定在/dubbo/meta-inf/internal
路径下会有一个名称为org.apache.dubbo.rpc.ProxyFactory
文件的ProxyFactory
接口的配置。
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
- 首先根据URL中的参数来判断,proxy="jdk",若配置了则直接根据URL中的配置查找。此处逻辑是在生成的
ProxyFactory$Adaptive
类中 - 未配置,则按照默认的
@SPI("javassist")
则为JavassistProxyFactory类型。
根据之前的分析知道,只要该接口被@SPI
修饰,且方法上有@Adaptive
修饰时,会生成一个$Adaptive结尾的代理类。所以这里会生成一个由dubbo框架生成的一个类ProxyFactory$Adaptive
。
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
//获取url中的proxy参数 无则是javassist
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
//再次调用JavassistProxyFactory.getInvoker()方法
return extension.getInvoker(arg0, arg1, arg2);
}
}
所以ProxyFactory$Adaptive
的作用主要是根据url中的proxy参数,决定需要用ProxyFactory
接口的哪个实现,当没有配置时,则用@SPI("javassist")
配置的值。
此时参数为
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url)
proxy: com.anto.dubbo.dubboprovider.HelloServiceImpl
type: com.anto.dubbo.HelloService
url: injvm://127.0.0.1/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=16136&release=2.7.7&side=provider×tamp=1606266760859
然后就是调用JavassistProxyFactory
的getInvoker
方法了。
//JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 创建一个动态代理
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//调用构建的动态代理类的invokeMethod()
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
通过Wrapper类来创建一个动态代理,(Wrapper类的258行)其核心方法如下:
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.anto.dubbo.dubboprovider.HelloServiceImpl w;
try {
w = ((com.anto.dubbo.dubboprovider.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
//判断参数是否String
if ("sayHello".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
if ("sayHello".equals($2) && $3.length == 1 && $3[0].getName().equals("com.anto.dubbo.User")) {
return ($w) w.sayHello((com.anto.dubbo.User) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.anto.dubbo.dubboprovider.HelloServiceImpl.");
}
看到这个动态代理类是否有一丝丝的亲切感?
这不就是根据不同的方法名和参数类型来决定调用接口的哪个方法嘛!
构建好由Wrapper类生成的动态代理后,返回一个匿名的AbstractProxyInvoker
类型的Invoker对象。那么它有什么特点呢?
可以看到它重写了doInvoke()
方法,最终是调用动态代理类的invokeMethod()
,那本质上也就是调用dubbo接口的方法。
回顾下生成Invoker对象的过程:
1.dubbo框架生成一个ProxyFactory$Adaptive代理类---决定用哪个ProxyFactory
2.Wrapper类为具体要发布的服务创建一个动态代理类
3.生成一个重写了doInvoke()方法的AbstractProxyInvoker类型的匿名类--用以转发消费发起的请求到Wrapper生成的代理类的invokeMethod()
所以,简单总结一下Invoke本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个Invoker进行调用。
最终发布出去的Invoker, 也不是单纯的一个代理,也是经过多层包装
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))
远程暴露服务
在ServiceConfig
类中doExportUrlsFor1Protocol()
方法中,首先是本地服务的暴露,然后是远程服务的暴露。
远程服务暴露的过程其实也就是伴随着生成Exporter对象过程。
//通过某种协议来暴露服务
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
远程服务暴露时,首先需要得到一个PROTOCOL
对象,它是一个自适应扩展点接口的得到Protocol
的对象。
得到Protocol对象
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
由于PROTOCOL
接口的方法标注了@Adaptive,所以会为其生成代理类对象。
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
动态生成的类如下Protocol$Adaptive
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
//...略
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
//根据url中的协议头参数 决定加载哪个协议的实现,最开始协议头为registry
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
//...略
}
思考下:为什么需要这么设计呢?每个标注了@Adaptive扩展类都听过Compile来生成代码,而不是单独设计一个Protocol$Adaptive
类呢?
dubbo针对@SPI扩展接口中,方法标注了@Adaptive注解的类都会生成一个代理类,名称为接口名$Adaptive。
这样设计主要是扩展性和灵活性。
通过注解就能够去声明一个动态的适配类,同时用户在使用的时候,可以根据配置中声明的属性来决定适配到的目标类。
可扩展性体现在spi的机制上,当我们自己开发扩展的实现时,同样可以利用这个动态适配的功能来实现目标类的路由。
在上面一步生成的Invoker对象中,它的URL为:
registry://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=12248&release=2.7.7&side=provider×tamp=1606719802290&pid=12248®istry=zookeeper&release=2.7.7×tamp=1606719800257
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
所以在上面的getExtension("registry")
会去查找RegistryProtocol
,但是在ExtensionLoader中有如下语句:
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
此时的wrapperClasses则是这三个
当在解析meta-inf/dubbo/internal/org.apache.dubbo.rpc.Protocol
时,会把Protocol接口的包装类放在缓存属性cachedWrapperClasses中。
思考下:怎么判断这个实现SPI接口是一个包装类型呢?
private boolean isWrapperClass(Class<?> clazz) {
try {
//当实现类有将接口自身传进来的构造函数时,认为其是一个包装类型
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
dubbo运用装饰器模式对协议的spi接口起到一个装饰增强作用。
所以暴露服务的代码Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
此时PROTOCOL
为ProtocolListenerWrapper
对象。然后依次调用QosProtocolWrapper
、ProtocolFilterWrapper
的export()
方法。
ProtocolListenerWrapper
:用于服务export时候插入监听机制 。
QosprotocolWrapper
:如果当前配置了注册中心,则会启动一个Qos server.qos是dubbo的在线运维命令,dubbo2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222 。
ProtocolFilterWrapper
:对invoker进行filter的包装,实现请求的过滤 。
调用链路如下:
ProtocolListenerWrapper.export()--->QosProtocolWrapper.export()---->ProtocolFilterWrapper.export()--->RegistryProtocol.export()
一句话,最后我们得到的是RegistryProtocol
对象。
启动Netty监听服务
接下来将调用RegistryProtocol.export()
方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//这里获得的是zookeeper注册中心的url: zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
// 这里是获得服务提供者的url, dubbo://ip:port...
URL providerUrl = getProviderUrl(originInvoker);
//订阅override数据。在admin控制台可以针对服务进行治理,比如修改权重,修改路由机制等,当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//这里就交给了具体的协议去暴露服务(如dubbo
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
//获取要注册到注册中心的URL: dubbo://ip:port
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 若配置了注册中心,向注册中心如zookeeper中注册服务
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//保证每次export都返回一个新的exporter实例
return new DestroyableExporter<>(exporter);
}
在RegistryProtocol.expor()
t中,有两个核心流程:
接下来看下doLocalExport(originInvoker, providerUrl);
方法。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//key的值为发布该dubbo服务的一个协议串 dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?...
String key = getCacheKey(originInvoker);
//当bouds这个Map中不存在该服务的key时,会生成一个该服务的Exporter
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
先将要发布的服务生成一个唯一的带有dubbo协议串的key值。
dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=3876&release=2.7.7&side=provider×tamp=1606963046596
此时再次调用protocol.export(invokerDelegate)
,会再次进入到protocol$Adaptive.export()
方法中。
ProtocolListenerWrapper.export()--->QosProtocolWrapper.export()---->ProtocolFilterWrapper.export()--->dubboProtocol.export()
所以自来就来到了dubboProtocol.export()
方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 将服务名称端口作为key Invoker作为dubboExporter的参数存储 一起放进一个exportMap中
//key com.anto.dubbo.HelloService:20880
String key = serviceKey(url);
dubboExporter<T> exporter = new dubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//是否配置了参数回调机制
Boolean isstubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isstubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//开启一个Netty服务
openServer(url);
optimizeSerialization(url);
return exporter;
}
- openServer()
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//创建服务器实例
serverMap.put(key, createServer(url));
}
}
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
往下则是基于org.apache.dubbo.remoting.transport.netty4.NettyTransporter
开启一个Netty服务的过程了。
在基于dubboProtocol协议发布服务的过程中,有几个重要的步骤
- 构建一个exporterMap,以服务路径名称作为key,把invoker包装成了dubboExporter作为value存储 ;
- 针对同一台机器上的多个服务,只启动一个服务实例 ;
- 采用Netty4来发布服务 。
注册服务
当配置了诸如Zookeeper的注册中心时,会将服务的节点信息在相应的地方写入。
// 向zookeeper中注册服务
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
根据URL的key 来动态的找到需要注册的服务中心,registryFactory是个动态扩展点,先经过包装的扩展点,然后当为zookeeper时,则为ZookeeperRegistryFactory
。
此时的registryURL已经是解析成Zookeeper开头的url了。
zookeeper://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.30.60.208%3A20880%2Fcom.anto.dubbo.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26bind.ip%3D172.30.60.208%26bind.port%3D20880%26cluster%3Dfailsafe%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.anto.dubbo.HelloService%26methods%3DsayHello%26pid%3D16172%26release%3D2.7.7%26side%3Dprovider%26timestamp%3D1606978398061&pid=16172&release=2.7.7×tamp=1606978397349
registeredProviderUrl
则是dubbo开头的服务提供地址。
private void register(URL registryUrl, URL registeredProviderUrl) {
//registry为ListenerRegistryWrapper registryFactory同样也是由动态扩展点生成的对象
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
ListenerRegistryWrapper.register()
public void register(URL url) {
try {
//调用ZookeeperRegistry.register()
registry.register(url);
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
ListenerRegistryWrapper
是对ZookeeperRegistry
做了一层包装,增加监听器相应的功能。
FailbackRegistry.register()
FailbackRegistry是一个提供的重试机制的父类,是ZookeeperRegistry
、NacosRegistry
、Sofaregistry
等具体注册中心的父类。
ZookeeperRegistry 类中并没有register()
,所以将进入父类FailbackRegistry
的方法中。
public void register(URL url) {
//...略
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 真正调用ZookeeperRegistry.doRegister()
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 若配置的属性check为true 则直接抛出异常 不再进行重试
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 如果注册失败抛异常了,会将注册失败的url放入注册失败的容器中
addFailedRegistered(url);
}
}
ZookeeperRegistry.doRegister()
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}