Dubbo 服务发表流程分析

2、原理和源码深入分析

露马脚远程服务时导出Invoker为Exporter

Invoker导出为Exporter分为三种情形,第3种是Registry类型的Invoker,第二种是其它协商项目标Invoker,分开解析。

代码入口:

Exporter<?> exporter = protocol.export;
首先,什么是dubbo?

Dubbo是一个布满式服务框架,致力于提供高质量和透明化的RPC远程服务调用方案,以及SOA服务治理方案。

二.陆 本地暴光

假设布署 scope=none,
则不会开展劳动暴光;假设未有布置 scope 或者scope=local,则会议及展览开本地暴光。

ServiceConfig.java

//public static final String LOCAL_PROTOCOL = "injvm";
//public static final String LOCALHOST = "127.0.0.1";

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //......
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //......
    }
    //......
}

private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
    }
}

class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token blockquote ace_string ace_constant ace_other”> class=”token md md-gt”> class=”token md md-li”>一. 爆出服务的时候,会通过代办创制 class=”token strong ace_keyword ace_strong”> class=”token md md-strong”>Invoker class=”token md md-strong”>;

class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token md md-code”> class=”token lf”> class=”token md md-li”> class=”token code ace_support ace_function”> class=”token md md-code”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token blockquote ace_string ace_constant ace_other”> class=”token md md-gt”> class=”token md md-li”> class=”token strong ace_keyword ace_strong”> class=”token md md-strong”> class=”token lf”> class=”token blockquote ace_string ace_constant ace_other”> class=”token md md-gt”> class=”token md md-li”>二. 当地揭示时使用 class=”token strong ace_keyword ace_strong”> class=”token md md-strong”>injvm class=”token md md-strong”> 协议, class=”token strong ace_keyword ace_strong”> class=”token md md-strong”>injvm class=”token md md-strong”> 协议是一个伪协议,它不开启端口,不能被远程调用,只在
class=”token md md-strong”>JVM class=”token md md-strong”> 内一贯关乎,但实践 class=”token strong ace_keyword ace_strong”> class=”token md md-strong”>Dubbo 的
class=”token md md-strong”>Filter class=”token md md-strong”> 链。

利用dubbo协议导出

此间protocol.export(invokerDelegete)就要去具体的DubboProtocol中施行了,DubboProtocol的外场包裹着ProtocolFilterWrapper,再外面还包裹着ProtocolListenerWrapper。会先经过ProtocolListenerWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //其他具体协议类型的Invoker //先进行导出protocol.export //然后获取自适应的监听器 //最后返回的是包装了监听器的Exporter //这里监听器的获取是getActivateExtension,如果指定了listener就加载实现,没有指定就不加载 return new ListenerExporterWrapper<T>(protocol.export, Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}

再经过ProtocolFilterWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //其他具体协议类型的Invoker //先构建Filter链,然后再导出 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}

翻开下构建Invoker链的章程:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { //我们要处理的那个Invoker作为处理链的最后一个 Invoker<T> last = invoker; //根据key和group获取自动激活的Filter List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size { //把所有的过滤器都挨个连接起来,最后一个是我们真正的Invoker for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get; final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last;}

随即就到了DubboProtocol的export方法,这里张开暴露服务:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService? //anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService& //methods=sayHello URL url = invoker.getUrl(); // export service. //key由serviceName,port,version,group组成 //当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。 //dubbo.common.hello.service.HelloService:20880 String key = serviceKey; //将Invoker转换成Exporter //直接new一个新实例 //没做啥处理,就是做一些赋值操作 //这里的exporter就包含了invoker DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //缓存要暴露的服务,key是上面生成的 exporterMap.put(key, exporter); //export an stub service for dispaching event //是否支持本地存根 //远程服务后,客户端通常只剩下接口,而实现全在服务器端, //但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存, //提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub, //客户端生成Proxy实,会把Proxy通过构造函数传给Stub, //然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length{ } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据URL绑定IP与端口,建立NIO框架的Server openServer; return exporter;}

上边得到的Exporter会被内置缓存中去,key便是上边生成的,客户端就能够发请求依据key找到Exporter,然后找到invoker进行调用了。接下来是创办服务器并监听端口。

随即调用openServer方法创制NIO Server举行监听:

private void openServer { // find server. //key是IP:PORT //192.168.110.197:20880 String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if  { ExchangeServer server = serverMap.get; //同一JVM中,同协议的服务,共享同一个Server, //第一个暴露服务的时候创建server, //以后相同协议的服务都使用同一个server if (server == null) { serverMap.put(key, createServer; } else { //同协议的服务后来暴露服务的则使用第一次创建的同一Server //server支持reset,配合override功能使用 //accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化 //这时需要重新设置Server server.reset; } }}

继续看createServer方法:

//url为://dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?//anyhost=true&application=dubbo-provider&//application.version=1.0&dubbo=2.5.3&environment=product&//interface=dubbo.common.hello.service.HelloService&//methods=sayHelloprivate ExchangeServer createServer { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString; //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //默认使用netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //Exchangers是门面类,里面封装的是Exchanger的逻辑。 //Exchanger默认只有一个实现HeaderExchanger. //Exchanger负责数据交换和网络通信。 //从Protocol进入Exchanger,标志着程序进入了remote层。 //这里requestHandler是ExchangeHandlerAdapter server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains { throw new RpcException("Unsupported client type: " + str); } } return server;}

Exchangers.bind方法:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //getExchanger方法根据url获取到一个默认的实现HeaderExchanger //调用HeaderExchanger的bind方法 return getExchanger.bind(url, handler);}

HeaderExchanger的bind方法:

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //直接返回一个HeaderExchangeServer //先创建一个HeaderExchangeHandler //再创建一个DecodeHandler //最后调用Transporters.bind return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler));}

这里会先创制三个HeaderExchangerHandler,包括着ExchangeHandlerAdapter,接着制造3个DecodeHandler,会蕴藏前面包车型地铁handler,接下去调用Transporters的bind方法,重回二个Server,接着用HeaderExchangeServer包装一下,就赶回给Protocol层了。在HeaderExchangerServer包装的时候会运转心跳电磁照拂计时器startHeatbeatTimer();

Transports的bind方法:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { //如果有多个handler的话,需要使用分发器包装下 handler = new ChannelHandlerDispatcher; } //getTransporter()获取一个Adaptive的Transporter //然后调用bind方法(默认是NettyTransporter的bind方法) return getTransporter().bind(url, handler);}

getTransporter()生成的Transporter的代码如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; //Server默认使用netty String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString use keys([server, transporter])"); //获取到一个NettyTransporter com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension; //调用NettyTransporter的bind方法 return extension.bind(arg0, arg1); } public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension; return extension.connect(arg0, arg1);}}

NettyTransporter的bind方法:

 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //创建一个Server return new NettyServer(url, listener);}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ //handler先经过ChannelHandlers的包装方法 //然后再初始化 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}

ChannelHandlers.wrap方法中会根据SPI扩张机制动态生成Dispatcher的自适应类,生成的代码不在列出,暗许使用AllDispatcher管理,会回来2个AllChannelHandler,会把线程池和DataStore都初步化了。然后通过HeartbeatHandler封装,再通过MultiMessageHandler封装后回到。

NettyServer构造,会挨个通过AbstractPeer,AbstractEndpoint,AbstractServer,NettyServer的开头化。注重看下AbstractServer的构造方法:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost.getHost ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort; this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //初始化的时候会打开Server //具体实现这里是NettyServer中 doOpen(); } catch (Throwable t) { } if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); }}

下一场调用doOpen方法:

protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); //boss线程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); //worker线程池 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); //ChannelFactory,没有指定工作者线程数量,就使用cpu+1 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler, this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder; pipeline.addLast("encoder", adapter.getEncoder; pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind之后返回一个Channel channel = bootstrap.bind(getBindAddress;}

doOpen方法创制Netty的Server端并展开,具体的专门的学问就交由Netty去管理了。

其基本部分含有:
  • 长距离通信:
    提供对多样基于长连接的NIO框架抽象封装,包涵种种线程模型,类别化,以及“请求-响应”格局的音讯交流格局。
  • 集群容错:
    提供基于接口方法的晶莹远程进度调用,包罗多协议帮助,以及软负载均衡,退步容错,地址路由,动态配置等集群援救。
  • 自行发现:
    基于注册宗旨目录服务,使服务消费方能动态的查找服务提供方,使地方透明,使劳动提供能够以平滑增添或调整和裁减机器。

2.5 组装URL

本着各种体协会议、每种注册宗旨,初叶建设构造 U宝马X5L。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    //处理host

    //处理port

    Map<String, String> map = new HashMap<String, String>();
    //设置参数到map

    // 导出服务
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    //此处省略:服务暴露(详见 2.6 和 2.7)

    this.urls.add(url);
}

ServiceConfig的export

export的步调简单介绍

  • 首先会检讨各样配置音信,填充各类品质,同理可得正是保障本身在开首揭示服务从前,全部的事物都准备好了,并且是合情合理的。
  • 加载全数的挂号宗旨,因为我们表露服务必要注册到注册主题中去。
  • 基于铺排的具有协构和注册宗旨url分别开始展览导出。
  • 实行导出的时候,又是一波属性的拿走设置检查等操作。
  • 假定安顿的不是remote,则做本地导出。
  • 要是安插的不是local,则揭露为远程服务。
  • 不论是是当地依然长途服务暴露,首先都会猎取Invoker。
  • 收获完Invoker之后,转变来对外的Exporter,缓存起来。

export方法先判定是还是不是要求延期暴光(Thread.sleep,然后实践doExport方法。

 public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep; } catch (Throwable e) { } doExport; thread.setDaemon; thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); }

doExport方法先实行一雨后春笋的自己批评措施,然后调用doExportUrls方法。检查方法会检查实验dubbo的配置是或不是在Spring配置文件中注明,未有的话读取properties文件初步化。

doExportUrls方法先调用loadRegistries获取具备的登记中央url,然后遍历调用doExportUrlsFor1Protocol艺术。对于在标签中钦赐了registry属性的Bean,会在加载BeanDefinition的时候就加载了挂号宗旨。

赢得注册中央url,会把注册的新闻都坐落2个UXC60L对象中,二个ULacrosseL内容如下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=&owner=&pid=2939&registry=zookeeper&timestamp=1488898049284

doExportUrlsFor壹Protocol基于不一样的构和将服务以UOdysseyL情势暴光。倘诺scope配置为none则不暴光,就算服务未配备成remote,则地面揭露exportLocal,如若未安插成local,则注册服务registryProcotol。

这里的URL是:

dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=&owner=&pid=2939&side=provider&timestamp=1488898464953
露马脚服务:

a、 直接暴光服务端口
在一向不运用登记大旨的情形,这种场地一般适用在开荒条件下,服务的调用那和提供在同贰个IP上,只需求开垦服务的端口就可以。
即,当配置 or ServiceConfig剖析出的UBMWX伍L的格式为:
Dubbo://service-host/com.xxx.TxxService?version=1.0.0
基于扩大点的Adaptiver机制,通过U奥迪Q3L的“dubbo://”协议头识别,直接调用DubboProtocol的export()方法,展开服务端口。

b、向登记大旨揭露服务:
和上1种的分别:必要将劳动的IP和端口一起暴光给登记核心。
ServiceConfig深入分析出的url格式为:
registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode(“dubbo://service-host/com.xxx.TxxService?version=1.0.0”)
听新闻说扩充点的Adaptive机制,通过U途胜L的“registry://”协议头识别,调用RegistryProtocol的export方法,将export参数中的提供者U途胜L先挂号到注册核心,再另行传给Protocol扩充点进行暴光:
Dubbo://service-host/com.xxx.TxxService?version=一.0.0

1、框架设计

在官方《Dubbo
用户指南》架构部分,给出了服务调用的一体化架商谈流程:

图片 1

 

另外,在官方《Dubbo
开辟指南》框架设计有个别,给出了整机设计:

图片 2

以及揭示服务时序图:

图片 3

 

本文将基于以上几张图,深入分析服务暴光的兑现原理,并打开详细的代码追踪与分析。

Registry类型的Invoker处理进程

大抵的步子是:

  • 通过多个不要做任何管理的Wrapper类,然后达到RegistryProtocol中。
  • 通过具体的协议导出Invoker为Exporter。
  • 注册服务到注册主题。
  • 订阅注册核心的服务。
  • 变化1个新的Exporter实例,将地方的Exporter举办引进,然后重返。protocol是下边列出的动态变化的代码,会先调用ProtocolListenerWrapper,这一个Wrapper担负起先化揭示和引用服务的监听器。对于Registry类型的不做拍卖,代码如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //registry类型的Invoker,不需要做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //非Registry类型的Invoker,需要被监听器包装 return new ListenerExporterWrapper<T>(protocol.export, Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}

继而调用ProtocolFilterWrapper中的export方法,ProtocolFilterWrapper负担起头化invoker全数的Filter。那么些类特别关键,dubbo机制里面日志记录、超时等等成效都以在那1有的实现的。这几个类有二个特色:一.它有一个参数为Protocol
protocol的构造函数;二.它完成了Protocol接口;三.它选择权利链情势,对export和refer函数举行了包装。

echo=com.alibaba.dubbo.rpc.filter.EchoFiltergeneric=com.alibaba.dubbo.rpc.filter.GenericFiltergenericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFiltertoken=com.alibaba.dubbo.rpc.filter.TokenFilteraccesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilteractivelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilterclassloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFiltercontext=com.alibaba.dubbo.rpc.filter.ContextFilterconsumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilterexception=com.alibaba.dubbo.rpc.filter.ExceptionFilterexecutelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFiltercompatible=com.alibaba.dubbo.rpc.filter.CompatibleFiltertimeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

那在那之中涉嫌到广大职能,包罗权力验证、极度、超时、计算调用时间等都在那些类达成。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker不做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //非Registry类型的Invoker需要先构建调用链,然后再导出 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}

这里我们先剖判的是Registry类型的Invoker,接着就能够调用RegistryProtocol的export方法,RegistryProtocol肩负登记服务到注册中央和向登记中央订阅服务。代码如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker //这里就交给了具体的协议去暴露服务(先不解析,留在后面,可以先去后面看下导出过程) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider //根据invoker中的url获取Registry实例 //并且连接到注册中心 //此时提供者作为消费者引用注册中心核心服务RegistryService final Registry registry = getRegistry(originInvoker); //注册到注册中心的URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //调用远端注册中心的register方法进行服务注册 //若有消费者订阅此服务,则推送消息让消费者引用此服务。 //注册中心缓存了所有提供者注册的服务以供消费者发现。 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); //提供者向注册中心订阅所有注册服务的覆盖配置 //当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务,这由管理页面完成。 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 //返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage; } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage; } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage; } } };}
引用服务:

a、直接引用服务:
在未有注册大旨的,直连提供者景况下, ReferenceConfig剖析出的U奥迪Q3L格式为:
Dubbo://service-host/com.xxx.Txx瑟维斯?version=一.0.0
依据增加点的Adaptive机制,通过url的“dubbo://”协议头识别,直接调用DubboProtocol的refer方法,重回提供者引用。

b、从注册中央意识引用服务:
此刻,ReferenceConfig分析出的UPRADOL的格式为:
registry://registry-host/com.alibaba.dubbo.registry.RegistryService?refer=URL.encode(“consumer://consumer-host/com.foo.FooService?version=1.0.0”)
听闻扩张点的Apaptive机制,通过U奥迪Q5L的“registry://”协议头识别,就可以调用RegistryProtocol的refer方法,基于refer参数总的条件,查询提供者U奥迪Q7L,如:
Dubbo://service-host/com.xxx.TxxService?version=一.0.0
基于扩大点的Adaptive机制,通过提供者UCRUISERL的“dubbo://”协议头识别,就能够调用DubboProtocol的refer()方法,获得提供者引用。
然后RegistryProtocol将多少个提供者引用,通过Cluster扩张点,伪装成单个提供这引用再次来到。

二.八 揭发服务

不采纳注册大旨时,直接调用对应协议(如 Dubbo 协议)的 export() 揭穿服务。以 Dubbo 协议为例:

DubboProtocol.java

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);

    return exporter;
}

调用 openServer() 方法成立并运营 Server:

DubboProtocol.java

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
    }
}

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

Exchanger (暗中同意 HeaderExchanger)封装请求响应情势,同步转异步,以 Request、Response 为主导:

HeaderExchager.java

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters.java

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

底层传输暗许使用 NettyTransporter,最终是创造 NettyServer:

NettyTransporter.java

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

NettyServer.java

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }
}

NettyServer.java

public class NettyServer extends AbstractServer implements Server {
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
}

连年注册主旨并拿走Registry实例

切切实实的协商实行揭露并且再次来到了2个ExporterChangeableWrapper之后,接下去看下一步连接注册主旨并注册到注册宗旨,代码是在RegistryProtocol的export方法:

//先假装此步已经分析完final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//得到具体的注册中心,连接注册中心,此时提供者作为消费者引用注册中心核心服务RegistryServicefinal Registry registry = getRegistry(originInvoker);final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//调用远端注册中心的register方法进行服务注册//若有消费者订阅此服务,则推送消息让消费者引用此服务registry.register(registedProviderUrl);//提供者向注册中心订阅所有注册服务的覆盖配置registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//返回暴露后的Exporter给上层ServiceConfig进行缓存return new Exporter<T>() {。。。}

getRegistry(originInvoker)方法:

//根据invoker的地址获取registry实例private Registry getRegistry(final Invoker<?> originInvoker){ //获取invoker中的registryUrl URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol { //获取registry的值,这里获得是zookeeper,默认值是dubbo String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); //这里获取到的url为: //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-provider&application.version=1.0&dubbo=2.5.3& //environment=product&export=dubbo%3A%2F%2F192.168.1.100%3A20880%2F //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26 //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26 //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D9457%26side%3Dprovider%26timestamp%3D1489807681627 registryUrl = registryUrl.setProtocol.removeParameter(Constants.REGISTRY_KEY); } //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry return registryFactory.getRegistry(registryUrl);}

这里的registryFactory是动态变化的代码,如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory { public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol; if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString use keys([protocol])"); com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension; return extension.getRegistry; }}

为此这里registryFactory.getRegistry(registryUrl)用的是ZookeeperRegistryFactory。

先看下getRegistry方法,会意识该方法会在AbstractRegistryFactory中落到实处:

public Registry getRegistry { url = url.setPath(RegistryService.class.getName .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); //这里key为: //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService String key = url.toServiceString(); // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { //先从缓存中获取Registry实例 Registry registry = REGISTRIES.get; if (registry != null) { return registry; } //创建registry,会直接new一个ZookeeperRegistry返回 //具体创建实例是子类来实现的 registry = createRegistry; if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } //放到缓存中 REGISTRIES.put(key, registry); return registry; } finally { // 释放锁 LOCK.unlock(); }}

createRegistry;是在子类中落到实处的,这里是ZookeeperRegistry,首先供给经过AbstractRegistry的构造:

public AbstractRegistry { //url保存起来 setUrl; // 启动文件保存定时器 // syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //保存的文件为: ///home/xxx/.dubbo/dubbo-registry-127.0.0.1.cache String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty) { file = new File; if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists{ if(! file.getParentFile().mkdirs{ throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile; } } } this.file = file; //加载文件中的属性 loadProperties(); //通知订阅 notify(url.getBackupUrls;}

一、初入Dubbo

二.九 服务注册

设若采用了登记中央,则在经过实际协议(如 Dubbo 协议)暴光服务之后(即在 2.八基础之上)进入劳动注册流程,将劳动节点注册到注册中央。

RegistryProtocol.java

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //向Zookeeper注册节点
    registry.register(registedProviderUrl);
    // 订阅override数据
    // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

private Registry getRegistry(final Invoker<?> originInvoker){
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryFactory.getRegistry(registryUrl);
}

getRegistry() 方法依照登记大旨项目(私下认可 Zookeeper)获取注册主旨客户端,由注册中央客户端实例来举行真正的劳动登记。

注册中央客户端将节点注册到注册宗旨,同期订阅对应的 override 数据,实时监听服务的习性变动实现动态配置效应。

末尾回到的 Exporter 完毕了 unexport() 方法,那样在服务下线时清理相关能源。

 

至此,服务揭露流程截至。

 

 

 

 

付出具体的磋商进行劳动揭破

此处也正是非Registry类型的Invoker的导出进度。首要的步骤是将当地ip和20880端口开垦,举办监听。最终打包成exporter重回。doLocalExport:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ //原始的invoker中的url: //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-provider&application.version=1.0&dubbo=2.5.3 //&environment=product&export=dubbo%3A%2F%2F10.42.0.1%3A20880%2F //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26 //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26 //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D7876%26side%3Dprovider%26timestamp%3D1489057305001& //organization=china&owner=cheng.xi&pid=7876&registry=zookeeper&timestamp=1489057304900 //从原始的invoker中得到的key: //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&//methods=sayHello String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get; if (exporter == null) { synchronized  { exporter = (ExporterChangeableWrapper<T>) bounds.get; if (exporter == null) { //得到一个Invoker代理,里面包含原来的Invoker final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类 //这里我们需要调用DubboProtocol的export方法 //这里的使用具体协议进行导出的invoker是个代理invoker //导出完之后,返回一个新的ExporterChangeableWrapper实例 exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter;}
深入分析服务:

一)、基于dubbo.jar内的Meta-inf/spring.handlers配置,spring在碰到dubbo名称空间时,会回调DubboNamespaceHandler类。
二)、全部的dubbo标签,都统一用DubboBeanDefinitionParser举行分析,基于一对一属性映射,将XML标签剖析为Bean对象。生产者也许消费者开端化的时候,会将Bean对象转会为url格式,将全体Bean属性转成url的参数。
然后将UQX56L传给Protocol扩充点,基于扩展点的Adaptive机制,依据UCRUISERL的协议头,进行分裂协商的劳动暴光和引用。

github新添仓库 “dubbo-read”(点此查看),群集全体《Dubbo原理和源码剖析》连串作品,后续将持续补充该体系,同不时候将本着Dubbo所做的法力扩大也进展分享。不定时更新,招待Follow。

在起首分析以前,有必须熟知一下Dubbo源码的目录结构,以及各模块的职能。

二、Dubbo结构图

duubo结构图

小编们批注以下那几个架构图:
Consumer服务消费者,Provider服务提供者。Container服务容器。消费当然是invoke提供者了,invoke那条实线遵照图上的求证当然同步的情趣了。可是在实际调用进度中,Provider的职位对于Consumer来讲是晶莹剔透的,上三次调用服务的地方(IP地址)和下一回调用劳动的岗位,是不鲜明的。那些地点就须求选用注册宗旨来兑现软负载。
Register
劳务提供者先运维start,然后注册register服务。消费订阅subscribe服务,如果未有订阅到温馨想获取的服务,它会不停的尝尝订阅。新的劳务登记到注册中央现在,注册中心会将这么些劳动通过notify到消费者。
Monitor
那是三个督察,图中虚线注解Consumer
和Provider通过异步的不二诀窍发送消息至Monitor,Consumer和Provider会将消息寄存在该地磁盘,平均一min会发送一次消息。Monitor在任何框架结构中是可选的(图中的虚线并不是可选的乐趣),Monitor功用须求独自安顿,不陈设只怕配置将来,Monitor挂掉并不会影响服务的调用。

二.柒 远程揭破

例如未有铺排scope 或许scope=remote,则会进展长距离揭露。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //......
        //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
        if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        }
    }
}

在服务揭露时,有三种情景:

  • 不行使登记中央:直接揭示对应协议的劳务,引用服务时只好通过直连格局引用
  • 运用登记中央:揭露对应协议的劳务后,会将服务节点注册到注册宗旨,引用服务时能够透过注册中央动态获取服务提供者列表,也得以因而直连格局引用

订阅注册大旨的服务

在注册到注册宗旨之后,registry会去订阅覆盖配置的劳务,这一步之后就能够在/dubbo/dubbo.common.hello.service/HelloService节点下多1个configurators节点。

4、服务提供与开销详细进度

劳务提供者暴露3个劳务的详细经过

纸包不住火服务的主进程:
第二ServiceConfig类得到对外提供劳动的实在类ref,然后将ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完结具体服务到invoker的倒车。接下来就是Invoker转形成Exporter的进度。
Dubbo管理服务揭破的重要就在Invoker转造成Exporter的经过,下边我们以Dubbo和rmi这二种标准协议的达成来拓展验证:
Dubbo的达成:
Dubbo协议的Invoker转为Exporter发生在DubboProtocol类的export方法,它根本是开荒socket侦听服务,并摄取客户端发来的各类请求,通信细节由dubbo自个儿落成。
中华Vmi的落到实处:
中华VMI商业事务的Invoker转为Exporter产生在HavalmiProtocol类的export方法,他经过Spring或Dubbo或JDK来促成劳务,通信细节由JDK底层来完成。

劳务消费的主进度

服务消费的主进度:
先是ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例。接下来把Invoker转为客户端需求的接口

先是节的Dubbo介绍就到这里呀,后续会更新怎么样引进dubbo到项目中应用的篇章。
今天的享受希望对您有用,每日分享一丝丝

发表评论

电子邮件地址不会被公开。 必填项已用*标注