本文主要介绍dubbo消费者如何引用服务器端接口,以及如何像本地调用一样调用远程服务。
站在设计师的角度,想想为什么这么设计。
@Component
Public class DubboConsumer {
@Reference(check=false)
private hello service hello service;
public string say hello(string name){
返回(name);
}
} 
例如,DubboConsumer在创建实例时依赖HelloService的方法是HelloService实现类位于远程的其他服务器上。
Spring容器启动后,所有类都将加载到BeanDefinition中,然后在调整getBean方法时,您知道类将被实例化并注入相关性。
表示将使用@EnableDubbo启动Dubbo。多个post processor(包括referenceannotationbeanpost processor)已注册,并处理带有注释@Reference的属性(sprinne)
1.首先,从整体上看一下杜甫的引文
(1)整个体系结构中消费者端参考的生成过程。
很多人看到这幅画,脸都糊涂了,我也不例外。经过七七四十九天的考验,终于知道是怎么回事了。我会一点一点地分解,分给你。(莎士比亚)。
第一个是消费者接受服务端界面参考,核心门户网站是ReferenceConfig,整体上可以分为7个阶段。
1.调整ReferenceConfig的get方法以获取服务器端接口实例。2.调整Protocol的refer方法,获取服务器端接口使用的invoker。3.收到invoker后,封装在RegistryDirectory中,通过RegistryProtocol完成对提供程序和相关配置的订阅。4.实际的invoker包括消费者和供应商之间的连接客户端。5 .客户端生成门户是Exchangers类,Exchanger(默认实现类是HeaderExchanger)的connect方法用于获取连接到服务器的客户端。6 .实际客户端生成发生在Transporters类中,通过调用Transporter(默认实现类为NettyTransporter)的connect方法创建NettyClient7
通过沿着调用链接暂时忽略步骤3,您可以使用以下调用顺序图集中精力创建invoker:
上图显示了导入invoker最重要的调用序列,包括客户端生成、服务器端接口信息封装(invoker)和代理类生成。
(2)看一下注册中心
我知道客户端要想调节服务器端,首先要知道服务的IP和端口号等信息。那么,在Dubo消费端开始的时候,如何知道服务端接口的信息呢?
首先想到的方法是创建invoker的入口,传递服务器端信息,并在new NettyClient中建立与服务器端的TCP连接。
但是这种方式对用户特别不方便,代码也有入侵性。
那么,能否将提供者的接口信息放置在某个地方,在创建invoker时能否获取接口信息,自然会想起配置中心。
但是配置中心需要手动设置值,并推送到消费者端,所以界面非常多的时候无法维护。
所以引入注册中心,消费者端和服务端都可以与注册中心互动。
服务器端
接口信息自动暴露到注册中心,消费者可以从注册中心获取到接口信息。又有一个问题,如果消费者引用的接口发生变动,比如新增了一台提供者,或者服务端宕机了,消费者如何能够实时得感知到并及时做出调整呢?
这就需要消费者能够监听注册中心,注册中心发生变更,及时通知消费者。
最终消费者、服务提供者和注册中心的关系如下图:
也就要考虑我们的第3步了,这时调用序列图变为:
红色线框部分即为与注册中心相关的调用,三个核心类:
RegistryProtocol:处理注册中心相关的Protocol实现,如获取注册中心实例,关联注册中心与invoker
ZookeeperRegistry:代表Zookeeper为注册中心的实体,封装了与Zookeeper交互操作,如订阅、监听等
RegistryDirectory:是一个目录实现类,顾名思义,它持有Invoker列表,同时还有到路由、负载均衡等
另外,RegistryDirectory实现了NotifyListener,在注册中心信息发生变更的时候,会调notify方法,更新RegistryDirectory中的invoker列表,从而实现了消费端对服务端接口的动态同步。
2.对源码庖丁解牛
dubbo消费端注入提供者服务引用,可以认为从Re开始,
该类在dubbo-spring中,代码与注释如下:
//ReferenceAnnotationBeanPostProcessor.
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                   Injec injectedElement) throws Exception {
    /**
     * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
     获取@Service注解的服务bean name,即被引用的bean
     */
    String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
    /**
     * The name of bean that is declared by {@link Reference @Reference} annotation injection
     获取@Reference注解的服务引用bean name,即提供者服务bean name
     */
    String referenceBeanName = getReferenceBeanName(attributes, injectedType);
      //获取ReferenceBean实例,ReferenceConfig的实例
    ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
      //注册ReferenceBean到Spring容器中
    registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
    
    cacheInjectedReferenceBean(referenceBean, injectedElement);
        //获取并创建提供者服务接口的代理类,即使用者最终得到的实例,通过该实例完成RPC透明化调用
    return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}
private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
              //如果引用的服务接口在本地,则直接使用本地Spring容器中的服务实例
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else { // ReferenceBean should be initialized and get immediately
            //获取远程服务接口实例
            return re();
        }
    }接下来,我们按之前的调用序列图,一步步往下看。
(1)Reference的get
源码非常长(包含URL的构建等),这里做了精简,只保留了关键部分,如下:
//ReferenceConfig
public synchronized T get() {
    if (ref == null) {
        init();
    }
    return ref;
}
public synchronized void init() {
  //1.参数准备和处理
  //2.创建代理
  ref = createProxy(Map);
}
private T createProxy(Map<String, String> map) {
  //获取注册中心
  Con(this, false);
  
  //1.根据注册中心构造注册URL,由此去构建invoker。存在多个注册中心时,会通过CLUSTER进行包装
  //此处,REF_PROTOCOL实例为RegistryProtocol,通过RegistryProtocol获取invoker
  invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
  
  //2.创建远程invoker的代理类,便于消费者无侵入使用,默认PROXY_FACTORY=JavassistProxyFactory
  return (T) PROXY_FACTORY.getProxy(invoker);
}其中,代码中涉及到一些ReferenceConfig关键属性,前两个是通过SPI获取的Protocol和ProxyFactory,ref为接口的最终代理实例,invoker为引用服务的封装,如下:
/**
 * Protocol的自适应类,与URL相关,协议类型为registry(如registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample),对应的类为RegistryProtocol;协议类型为dubbo,对应的类为DubboProtocol;
 * 同时为Protocol实例自动包装两个类,ProtocolFilterWrapper和ProtocolListenerWrapper
 */
    private static final Protocol REF_PROTOCOL = Ex).getAdaptiveExtension();
/**
     * ProxyFactory自适应类,默认实现为JavassistProxyFactory
     */
    private static final ProxyFactory PROXY_FACTORY = Ex).getAdaptiveExtension();
    /**
     * The interface proxy reference
     */
    private transient volatile T ref;
    /**
     * The invoker of the reference service
     */
    private transient volatile Invoker<?> invoker;在REF_PROTOCOL通过自适应获取的时候,会封装几个关键包装类,分别是ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper:
过滤器包装类ProtocolFilterWrapper:对非注册invoker增加过滤器
public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
        //ProtocolFilterWrapper为SPI Protocol的包装类
    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
         = protocol;
    }
  
      @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
          //注册URL不需要增加过滤器
        if (url)) {
            return (type, url);
        }
          //其他invoker需要通过filter进行包装,实现过滤功能
        return buildInvokerChain((type, url), REFERENCE_FILTER_KEY, CommonCon);
    }
  
  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = Ex).getActivateExtension(), key, group);
                //对filters循环遍历,构建被filter修饰的Invoker链,真实的invoker在链表尾部
  }
}监听包装类ProtocolListenerWrapper:对非注册invoker注册监听器
public class ProtocolListenerWrapper implements Protocol {
  @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (url)) {
            return (type, url);
        }
          //为普通的invoker增加监听,从InvokerListener接口看,只有引用invoker和destroy时会触发listener
        return new ListenerInvokerWrapper<T>((type, url),
                Collec(
                        Ex)
                                .getActivateExtension(url, INVOKER_LISTENER_KEY)));
    }
}Qos包装类QosProtocolWrapper:该类只对注册URL时生效
public class QosProtocolWrapper implements Protocol {
  @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
          //只有对注册URL,才开启QOS
        if (url)) {
            startQosServer(url);
            return (type, url);
        }
        return (type, url);
    }
}用张图总结上边的过程:
(2)RegistryProtocol的refer
RegistryProtocol作为注册中心与invoker之间的沟通桥梁,代码如下:
public class RegistryProtocol implements Protocol {
      @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
          //1.根据URL获取注册中心,此处会创建注册中心客户端,并连接注册中心。如果之前已经创建过,则直接返回缓存值
          //此处默认使用ZookeeperRegistry,具体创建过程,后续再说,现在不用管
        Registry registry = regi(url);
          //如果获取的是注册服务对应的invoker,则直接通过代理工厂生成代理对象
        if (type)) {
            return ((T) registry, type, url);
        }
                
          //2.判断配置group,使用mergeable的cluster,可以暂时不关心
        // group="a,b" or group="*"
        Map<String, String> qs = S(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if (group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
          //3.调doRefer获取invoker
        return doRefer(cluster, registry, type, url);
    }
}最终会调doRefer方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
          //1.构造RegistryDirectory,其对注册中心、路由、配置、负载均衡、invoker列表等信息进行封装
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        direc(registry);
        direc(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>().getParameters());
        //2.构造消费者需要订阅的URL,用于后续订阅zk中配置、路由、provider等
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, (REGISTER_IP_KEY), 0, (), parameters);
        if (!ANY_VALUE.equal()) && url.getParameter(REGISTER_KEY, true)) {
            //3.获取并设置消费者的URL
            direc(getRegisteredConsumerUrl(subscribeUrl, url));
            将消费者URL注册到注册中心,如果没有consumer节点,则创建
            regi());
        }
        //4.构建路由链
        direc(subscribeUrl);
        //5 订阅注册中心的 provider、配置、路由等节点,当发生变动时,即时更新invoker信息
        direc(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        //6 cluster对目录进行封装,暴露给使用者只有一个invoker,在实际调用时,通过路由、负载均衡等,发送请求到某一个invoker
        Invoker invoker = clu(directory);
        return invoker;
    }在doRefer方法中,我们看到构建了RegistryDirectory(每个提供者的服务接口都对应一个RegistryDirectory),并通过RegistryDirectory关联了注册中心与invoker;同时完成消费者在注册中心的注册,以及对注册中心的订阅。
可能有人疑惑,invoker在哪,没看到怎么就去订阅了呢?
这个地方应该是设计者为了对代码复用进行的设计,因为在应用运行过程中,需要监听注册中心,判断提供者是否有变动。
这也是为什么RegistryDirectory实现监听接口,同时持有注册中心和invoker。在变动的情况下,会更新对应的invoker列表。
接下来,我们会考到会调到RegistryDirectory的notify接口。
首先看上边代码的第5步,通过RegistryDirectory实现消费者对注册中心的订阅,代码如下:
//RegistryDirectory
public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    //注册中心发起订阅,注册中心为ZookeeperRegistry,会调其父类FailbackRegi,其中包含了失败重试的策略
    regi(url, this);
}其中registry为ZookeeperRegistry实例,其父类为FailbackRegistry,此处会调肤类的订阅方法。FailbackRegistry在注册中心实例的基础上,增加了失败重试的功能。
其中参数this是NotifyListener,即RegistryDirectory本身。
//FailbackRegistry
@Override
public void subscribe(URL url, NotifyListener listener) {
    (url, listener);
    //1 从失败订阅列表中删除对应的订阅请求,取消定时重试
    removeFailedSubscribed(url, listener);
    try {
        //2 Sending a subscription request to the server side,调ZookeeperRegistry实现订阅
        doSubscribe(url, listener);
    } catch (Exception e) {
       
        //3 Record a failed registration request to a failed list, retry regularly 失败后加入失败订阅列表进行重试
        addFailedSubscribed(url, listener);
    }
}订阅逻辑发生在doSubscribe中,由ZookeeperRegistry实例进行的实现。代码如下:
//ZookeeperRegistry
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equal())) {
            //省略
        } else {
            List<URL> urls = new ArrayList<>();
            //1 遍历provider、配置、路由node,并注册监听到这些节点上,当节点发生变化,会调用ZookeeperRegistry的notify接口
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkLi(url);
                if (listeners == null) {
                    zkLi(url, new ConcurrentHashMap<>());
                    listeners = zkLi(url);
                }
                ChildListener zkListener = li(listener);
                if (zkListener == null) {
                    //2 创建zk监听器
                    li(listener, (parentPath, currentChilds) -> Zookee(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = li(listener);
                }
                //3 在zk创建provider、配置、路由对应的路径
                zkClient.create(path, false);
                //4 增加监听器
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            //5 通知监听器,根据urls变化更新服务端invoker列表,在初次启动时,构建invoker
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}我们看到,在ZookeeperRegistry的订阅方法中,其持有到zk的客户端,可以创建相应的节点,并实现监听。
同时,for循环是对provider、配置、路由进行创建于订阅,即完成消费者对提供者服务接口、配置、路由规则的订阅,从而可以实现对提供者变化时得到通知,配置货路由发生变化时也能得到通知。
第5步,notify方法,会调父类FailbackRegistry的notify方法,如下:
//FailbackRegistry
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    try {
        //1 通知
        doNotify(url, listener, urls);
    } catch (Exception t) {
        //2 Record a failed registration request to a failed list, retry regularly 通知失败后,加入失败通知列表,用于重试
        addFailedNotified(url, listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}FailbackRegistry再调父类AbstractRegistry的notify方法,里边会调用监听器即RegistryDirectory,进行更新或构建invoker,代码如下:
//AbstractRegistry
/**
 * Notify changes from the Provider side.
 *
 * @param url      consumer side url
 * @param listener listener
 * @param urls     provider latest urls  最新的服务端URLs(包括provider、配置、路由的URL)
 */
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //1 keep every provider's category. 按类别划分URL
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = re(category, k -> new ArrayList<>());
            ca(u);
        }
    }
    if () == 0) {
        return;
    }
    //2 根据urls,通知变更所有invoker、配置等信息
    Map<String, List<URL>> categoryNotified = no(url, u -> new ConcurrentHashMap<>());
    for ;String, List<URL>> entry : re()) {
        String category = en();
        List<URL> categoryList = en();
        ca(category, categoryList);
        //3 通知监听器RegistryDirectory,更新其中配置、路由、provider、invoker等信息
        li(categoryList);
        saveProperties(url);
    }
}第3步,调监听器RegistryDirectory,更新服务端信息
//RegistryDirectory
@Override
public synchronized void notify(List<URL> urls) {
    //1 按类别(provider、配置、路由)划分需要更新的urls
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collec(url -> {
                if (url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (url)) {
                    return ROUTERS_CATEGORY;
                } else if (url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));
    //2 更新配置信息
    List<URL> configuratorURLs = ca(CONFIGURATORS_CATEGORY, Collec());
     = Con(configuratorURLs).orElse();
    //3 更新路由信息
    List<URL> routerURLs = ca(ROUTERS_CATEGORY, Collec());
    toRouters(routerURLs).ifPresent(this::addRouters);
    //4 获取最新的 providers URL
    List<URL> providerURLs = ca(PROVIDERS_CATEGORY, Collec());
    
    //5 更新消费端对应的invoker列表
    refreshOverrideAndInvoker(providerURLs);
}第5步,通过refreshOverrideAndInvoker更新invoker列表
private void refreshOverrideAndInvoker(List<URL> urls) {
        overrideDirectoryUrl();
        //更新invokers
        refreshInvoker(urls);
}继续往后走,就是具体更新逻辑
private void refreshInvoker(List<URL> invokerUrls) {
    //invokerUrls为空,表示更新配置或路由
    A(invokerUrls, "invokerUrls should not be null");
    //1 如果只有一个invokerUrl,同时协议为empty,一般表示接口没有可用提供者,会注销所有invoker
    if () == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equal(0).getProtocol())) {
         = true; // Forbid to access
         = Collec();
        rou();
        destroyAllInvokers(); // Close all invokers
    } else {
        //2 有可用的提供者,更新invoker的缓存urlInvokerMap
         = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = ; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        //3。如果invokerUrls为空,则继续使用缓存的invokerUrls。否则使用最新的
        if () &&  != null) {
            invokerUrls.addAll();
        } else {
             = new HashSet<>();
            .addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if ()) {
            return;
        }
        //4 转换新的invokerUrls为invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        List<Invoker<T>> newInvokers = Collec(new ArrayList<>()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        rou(newInvokers);
         = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
          //5 替换最新的invoker
         = newUrlInvokerMap;
        try {
            //6 销毁不用的invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}调toInvokers方法,将URL转换为invoker,并缓存起来
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  for (URL providerUrl : urls) {
    // 构造InvokerDelegate,其中返回的为原invoker
    invoker = new InvokerDelegate<>((serviceType, url), url, providerUrl);
    newUrlInvokerMap.put(key, invoker);
  }
}到此为止,dubbo完成了从注册中心获取服务端接口信息,并将其转换为invoker列表;
同时这些invoker列表存储在RegistryDirectory中,可以实时监听注册中心的变更。
在toInvokers方法中,会调用DubboProtocol的refer方法实现服务提供者URL到invoker的转变,具体见后边文章。
这里有个问题,为什么要封装invoker到在InvokerDelegate中呢?
官网的解释:
The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer.其实就是对invoker和服务端URL的封装,便于后续使用。
具体如何使用的,我们后边再说。
用张图总结上边的过程:起于RegistryProtocol,终于RegistryDirectory。
(3)DubboProtocol的refer
在调用DubboProtocol的refer方法的过程中,也还会调用ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper这三个包装类,实现对invoker的拦截与监听。
首先会调到父类AbstractProtocol的refer方法,如下
//AbstractProtocol
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //构造异步实现同步的invoker
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}我们看到,会将DubboProtocol创建的invoker封装为AsyncToSyncInvoker,为何要进行这样封装呢?
原因是,dubbo调用底层是基于netty进行的,是异步的过程,AsyncToSyncInvoker可以实现同步的调用,具体细节后边再说,暂时可以不管,只知道进行封装就行了。
千呼万唤始出来,终于找到URL转换为invoker的根了,我们看到invoker实际就是DubboInvoker的实例
//DubboProtocol
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // create rpc invoker.创建真实的rpc invoker,其中包含客户端的创建
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}到此,我们追踪到invoker它祖先DubboInvoker,在ReferenceConfig拿到的invoker是DubboInvoker经过层层包装的结果。
也正是由于这些包装,我们可以对invoker进行一些定制化和扩展性的控制。
用张图总结上边的过程:
(4) 创建Client
getClients(url)根据URL获取客户端,建立消费者与提供者之间的TCP连接。
默认情况下,对服务端是共享的,即一个消费者与提供者之间保持一个TCP连接。
private ExchangeClient[] getClients(URL url) {
    // whether to share connection
    boolean useShareConnect = false;
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // if not configured, connection is shared, otherwise, one connection for one service
    //1 默认使用共享的1个客户端
    if (connections == 0) {
        useShareConnect = true;
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        //2 默认保持消费者与提供者之间只有一个TCP连接
        connections = In(shareConnectionsStr) ? Con(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        //3 获取共享客户端
        shareClients = getSharedClient(url, connections);
    }
    //4 构造ExchangeClient数组,若不共享,需要创建多个消费者与提供者之间的TCP连接
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clien; i++) {
        if (useShareConnect) {
            clients[i] = (i);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}共享客户端是如何实现的呢?
DubboProtocol持有客户端缓存referenceClientMap,key为服务端host:port,value为ExchangeClient的封装类ReferenceCountExchangeClient列表,其中包含引用计数。
/**
     * <host:port,Exchanger>
     */
private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>();
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    //1 共享的client,key为提供者ip和端口号
    String key = url.getAddress();
    List<ReferenceCountExchangeClient> clients = re(key);
    if (checkClientCanUse(clients)) {
        batchClientRefIncr(clients);
        return clients;
    }
    locks.putIfAbsent(key, new Object());
    synchronized (key)) {
        clients = re(key);
        // dubbo check
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }
        // connectNum must be greater than or equal to 1
        connectNum = Ma(connectNum, 1);
        // If the clients is empty, then the first initialization is
        if (clients)) {
            //2 构建引用计数的ExchangeClient
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            re(key, clients);
        } else {
            for (int i = 0; i < clien(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clien(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || re()) {
                    clien(i, buildReferenceCountExchangeClient(url));
                    continue;
                }
                //引用计数增1
                re();
            }
        }
        /**
         * I understand that the purpose of the remove operation here is to avoid the expired url key
         * always occupying this memory space.
         */
        locks.remove(key);
        return clients;
    }
}在buildReferenceCountExchangeClientList方法中,会调用initClient方法,创建客户端。
/**
 * Create new connection
 *
 * @param url
 */
private ExchangeClient initClient(URL url) {
    // client type setting.
    //1 client类型,默认为netty
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    //2 编码方式为DubboCodec
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    //3 增加心跳检测,默认事件间隔为1分钟
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, S(DEFAULT_HEARTBEAT));
    ExchangeClient client;
    try {
        // connection should be lazy
        if (LAZY_CONNECT_KEY, false)) {
            //4 构造懒连接,在使用的时候才会真正创建服务端连接
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            //5 默认直接创建连接
            client = Exc(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}接下来,我们看看客户端是如何创建的。
(5)ExchangeClient的connect
上边所有的代码都是在协议层(Protocol),接下来主要聚焦在交换层(Exchanger)。
客户端创建是通过工具类Exchangers进行创建,通过URL获取Exchanger(默认实现为HeaderExchanger)
public class Exchangers {
  public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        
        url = url.addParameterIfAbsen, "exchange");
        //先获取Exchanger,默认为HeaderExchanger
        return getExchanger(url).connect(url, handler);
    }
}从代码可以看出,HeaderExchanger为消费端和服务端创建的关键类,其创建client和server,分别为HeaderExchangeClient和HeaderExchangeServer。
public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        //1 Tran创建连接,返回Client;
        //2 构造HeaderExchangeClient,其中包含HeaderExchangeChannel,用于发送请求,并且失败重试
        return new HeaderExchangeClient(Tran(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}又出现一个问题,为什么通过Transporter connect获取到的client,要经过HeaderExchangeClient封装呢?
按官网的描述,是为上层的调用构建request- response的语义,相当于对netty client的封装。除此之外,HeaderExchangeClient中还有重新建立连接的功能,具体后边有时间再说怎么进行重新连接的。
(6)Transporter的connect
到这个地方,我们来到了传输层(Transporter),即用于发送和接受数据的地方。
工具类Transporters,创建客户端连接。
public class Transporters {
  public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        
        ChannelHandler handler;
        if (handlers == null ||  == 0) {
            handler = new ChannelHandlerAdapter();
        } else if ( == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //获取Transporter,默认为NettyTransporter
        return getTransporter().connect(url, handler);
    }
}Transporter的默认实现类为NettyTransporter,可以构建NettyClient和NettyServer,他们是数据发送和接受的执行实体,代码如下:
public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";
    @Override
    public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
        //构建NettyServer
        return new NettyServer(url, listener);
    }
    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        //构建NettyClient
        return new NettyClient(url, listener);
    }
}可以看到,底层的客户端是NettyClient,它持有URL和一系列ChannelHandler。
我们接下来看看客户端是怎么进行实例化的。
public class NettyClient extends AbstractClient {
  public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
        super(url, wrapChannelHandler(url, handler));
    }
}NettyClient实例化主要通过父类来完成的,在调父类之前,通过wrapChannelHandler给ChannelHandler封装了2个Handler,分别是MultiMessageHandler和HeartbeatHandler,用于多消息处理和心跳检测处理。
接下来看下父类进行了什么操作。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter, false);
    initExecutor(url);
    try {
        //1 打开client
        doOpen();
    } catch (Throwable t) {
        close();
    }
    try {
        //2 client发起连接
        connect();
    } catch (RemotingException t) {
        close();
    } catch (Throwable t) {
        close();
    }
}可这是一个模板方法,doOpen和connect借助子类,即NettyClient完成的,我们再看看具体是怎么完成客户端创建的。
//NettyClient
@Override
protected void doOpen() throws Throwable {
    //NettyClientHandler 为dubbo主要处理器
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    boo(nioEventLoopGroup)
            .option, true)
            .option, true)
            .option, PooledBy)
            //.option, getTimeout())
            .channel);
    boo, Ma(3000, getConnectTimeout()));
    boo(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlU(getUrl());
            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                ch.pipeline().addLast("negotiation", S(getUrl(), nettyClientHandler));
            }
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), Ne);
            ch.pipeline()//.addLast("logging",new LoggingHandler))//for debug
                    .addLast("decoder", ada())
                    .addLast("encoder", ada())
                    //空闲处理器,用于心跳检测
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = Con(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = In(Con(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}上边的代码是不是非常熟悉,这就是netty创建客户端的标准代码,如果不熟悉也没关系,以后有机会再分享下netty。
ChannelPipeline中配置了编解码器、空闲处理器、处理dubbo消息的NettyClientHandler。
doOpen代码只是初始化好了Bootstrap,连接发生在doConnect方法中,如下:
//NettyClient
@Override
protected void doConnect() throws Throwable {
    long start = Sy();
    //与netty服务端连接,闭关获取ChannelFuture
    ChannelFuture future = boo(getConnectAddress());
    boolean ret = (getConnectTimeout(), MILLISECONDS);
    if (ret && ()) {
       Channel newChannel = ();
       Channel oldChannel = Ne.channel;
       oldC();
       Ne.channel = newChannel;
    }
}至此,我们了解了dubbo消费者是如何通过Exchanger和Transport,利用底层netty创建客户端连接的。
将创建好的客户端,封装到Protocol层获取到的invoker,在消费者发起调用的时候,直接可以请求到服务端。
用张图总结上边Exchanger和Transport两层的过程:
通过Protocol、Exchange、Transport三层的支撑下,完成了最开始图中的1-6步,获得到了代表服务端的invoker。
为了减少dubbo框架对使用者的代码侵入,还需要对服务端接口进行代理,
这样真正做到消费者如同调用本地一样,调用远程服务,接下来我们看看是如何代理的。
(7)ProxyFactory的getProxy
生成服务端接口代理,主要涉及ReferenceConfig中createProxy的第二步getProxy。
return (T) PROXY_FACTORY.getProxy(invoker);经过StubProxyFactoryWrapper包装类,最终调用到默认实现JavassistProxyFactory,其通过反射获取服务端接口的实现。代码如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        //通过反射创建invoker的代理,处理器为InvokerInvocationHandler
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wra().getName().indexOf('$') < 0 ? () : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wra(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}该类包含两个方法,getProxy和getInvoker,前者用于消费端引用获取代理类,后者用于服务端暴露服务时获取对应的invoker。
此处关注getProxy方法,通过Proxy.getProxy反射获取代理,并且InvokerInvocationHandler为代理处理器。代码如下:
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFac);
    private final Invoker<?> invoker;
    public InvokerInvocationHandler(Invoker<?> handler) {
         = handler;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if () == Object.class) {
            return me(invoker, args);
        }
        String methodName = me();
        Class<?>[] parameterTypes = me();
        if  == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if  == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        r().getServiceKey());
        return invoker.invoke(rpcInvocation).recreate();
    }
}其中invoke方法,就是使用者调接口时,会被代理到该方法上。
我们看到服务接口等信息被封装到RpcInvocation中,通过持有的invoker进行调用。
调用关系如下图:
3.用一个例子,追踪数据的流转
举一个简单的例子,定义接口
public interface HelloService {
    String sayHello(String name);
}服务提供者
@Service
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String name) {
        return "hello:"+name;
    }
}dubbo-配置:
dubbo.a
dubbo.
dubbo.启动类:
public class DubboProviderMain {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContex);
        con();
        Sy();
    }
    @Configuration
    @EnableDubbo(scanBasePackages = "com.exm.;)
    @PropertySource("classpath:/dubbo-")
    static class ProviderConfiguration {
        @Bean
        public RegistryConfig registryConfig() {
            RegistryConfig registryConfig = new RegistryConfig();
            regi("zookeeper://127.0.0.1:2181?timeout=10000");
            return registryConfig;
        }
    }
}服务消费者
@Component
public class DubboConsumer {
    @Reference(check = false)
    private HelloService helloService;
    public String sayHello(String name) {
        return (name);
    }
}配置:
dubbo.a
dubbo.registry.address=zookeeper://127.0.0.1:2181启动类:
public class DubboConsumerMain {
    public static void main(String[] args) throws IOException, InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContex);
        con();
        DubboConsumer service = con);
        while (true) {
            Sy();
            try {
                String hello = ("world");
                Sy("result :" + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    @Configuration
    @PropertySource("classpa;)
    @ComponentScan("com.exm.bean")
    @EnableDubbo
    static class ConsumerConfiguration {
    }
}跟踪代码,将流转的URL记录如下,可以参考着阅读源码
//注册协议的URL
RegistryProtocol#Invoker<T> refer(Class<T> type, URL url)
  type:interface com.exm.
  url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=44159&refer=application%3Ddubbo-annotation-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.exm.%26methods%3DsayHello%26pid%3D44159%26register.ip%3D192.168.1.65%26release%3D2.7.5%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1654352381559®istry=zookeeper&release=2.7.5×tamp=1654352409705
//costumer的URL
RegistryDirectory#void subscribe(URL url)
  ZookeeperRegistry#void doSubscribe(final URL url, final NotifyListener listener)
  url:consumer://192.168.1.65/com.exm.?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559
//provider、配置、路由对应的URL
FailbackRegistry#void notify(URL url, NotifyListener listener, List<URL> urls)
  url:consumer://192.168.1.65/com.exm.?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559
    urls:
        0: dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763
        1: empty://192.168.1.65/com.exm.?application=dubbo-annotation-consumer&category=configurators&check=false&dubbo=2.0.2&init=false&interface=com.exm.&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559
        2: empty://192.168.1.65/com.exm.?application=dubbo-annotation-consumer&category=routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559
RegistryDirectory#void refreshInvoker(List<URL> invokerUrls)
  invokerUrls:dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763
InvokerDelegate#new InvokerDelegate<>((serviceType, url), url, providerUrl)
  //url表示消费端,要创建服务端的连接
    url:dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.&methods=sayHello&pid=44159®i;release=2.7.5&remo;side=consumer&sticky=false×tamp=1654352300763
    providerUrl:dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763
DubboProtocol#Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url)
  DubboProtocol#ExchangeClient initClient(URL url)
      url:dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.&methods=sayHello&pid=44259®i;release=2.7.5&remo;side=consumer&sticky=false×tamp=1654352300763
Exchangers#ExchangeClient connect(URL url, ExchangeHandler handler)
  NettyTransporter#Client connect(URL url, ChannelHandler listener)
      AbstractClient#AbstractClient(URL url, ChannelHandler handler)
          url:dubbo://192.168.1.65:20885/com.exm.?anyhost=true&application=dubbo-annotation-consumer&check=false&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&init=false&interface=com.exm.&methods=sayHello&pid=44259®i;release=2.7.5&remo;side=consumer&sticky=false×tamp=1654352300763到此为止,将dubbo时如何为消费端创造远程引用实例的(invoker+代理),可能依然有讲述不清晰的地方,请大家指出来,一块研读学习。
1.《【applicationisinterrupted】Dubbo的消费者如何获得供应商服务接口参考?》援引自互联网,旨在传递更多网络信息知识,仅代表作者本人观点,与本网站无关,侵删请联系页脚下方联系方式。
2.《【applicationisinterrupted】Dubbo的消费者如何获得供应商服务接口参考?》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。
3.文章转载时请保留本站内容来源地址,https://www.lu-xu.com/gl/2517260.html


