概述
暴露服务是指服务的提供者向注册中心(也可以不用注册中心)暴露自己的信息,这样服务的消费者才能发现并且调用。逻辑上分为两步:
- ref(服务的实际实现者) --> Invoker,使用ProxyFactory#getInvoker()
- Invoker --> Exporter,使用Protocol#export()
ServiceConfig
暴露服务的逻辑在ServiceConfig#export(),ServiceConfig表示<dubbo:service />
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();//伪代码if (registryURLs != null && registryURLs.size() > 0) { // 暴露服务,并且向注册中心注册 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 将监控url添加到url对象中,monitor参数 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, // 将url对象添加到注册url中,export参数 registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }} else { // 注册中心配置地址为N/A,表示不像注册中心注册 // 只暴露服务,消费者,可以直接配置url,不从注册中心获取,点对点直连的方式调用服务 Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter exporter = protocol.export(wrapperInvoker); exporters.add(exporter);}
- 暴露给注册中心,URL表现为:registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0"),根据扩展自适配机制,先由RegistryProtocol执行,在export()方法里会想注册中心注册提供者的url,并根据URL:dubbo://service-host/com.foo.FooService?version=1.0.0,交给DubboProtocol进行暴露。
- 不适用注册中心,URL表现为:dubbo://service-host/com.foo.FooService?version=1.0.0,根据扩展自适配机制,由DubboProtocol进行暴露。
RegistryProtocol
真正的暴露服务的行为交个属性Protocol,本身只是负责向注册中心注册提供者。
- 先根据提供者url,用Protocol进行暴露
- 想注册中注册提供者url
publicExporter export(final Invoker originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); // 获得服务提供者的url final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // 向注册中心注册提供者的url registry.register(registedProviderUrl);}private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper exporter = (ExporterChangeableWrapper ) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper ) bounds.get(key); if (exporter == null) { final Invoker invokerDelegete = new InvokerDelegete (originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper ((Exporter ) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter;}
DubboProtocol
实现暴露服务的逻辑,包含通信模块,创建服务端等待客户端的调用。
//伪代码publicExporter export(Invoker invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter exporter = new DubboExporter (invoker, key, exporterMap); exporterMap.put(key, exporter); openServer(url); return exporter;}private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override // 多个ServiceConfig公用一个ProtocolConfig时,key会相同,server会重置 server.reset(url); } }}private ExchangeServer createServer(URL url) { // 设置默认的,服务关闭时,发送readonly时间 // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default // 设置默认心跳时间 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 获取并检查Transporter的扩展实现 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 获取编码器,默认DubboCountCodec url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server;}
通信模块在接收到客户端的调用时,会根据客户端的信息构建出serviceKey,然后从exportMap中找出与之对应的Export对象,从而找到应该调用的Invoker(这个Invoker对象,代理真正的服务实现)完成服务的调用。
serviceKey,格式:${group}/${path}:${version}:${port},比如:test/com.alibaba.dubbo.demo.DemoService:1.0.0:20888 或者 com.alibaba.dubbo.demo.DemoService:1.0.0:20888
附录
参照: