服务注册流程分析02

上一篇文章中、我们已经知道 Dubbo 会额外注册 ServiceBean 到 Spring 容器中、因为需要借助这个 ServiceBean 注册到服务中心

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// @since 2.7.5
registerBeans(registry, DubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {registerServiceBeans(resolvedPackagesToScan, registry);
}
}
}

registerBeans(registry, DubboBootstrapApplicationListener.class) 注册 DubboBootstrapApplicationListener 到 Spring 容器中

@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {dubboBootstrap.start();
}
/*** Start the bootstrap*/
public DubboBootstrap start() {if (started.compareAndSet(false, true)) {ready.set(false);initialize();if (logger.isInfoEnabled()) {logger.info(NAME + " is starting...");}// 1. export Dubbo ServicesexportServices();// Not only provider registerif (!isOnlyRegisterProvider() || hasExportedServices()) {// 2. export MetadataServiceexportMetadataService();//3. Register the local ServiceInstance if requiredregisterServiceInstance();}referServices();if (asyncExportingFutures.size() > 0) {new Thread(() -> {try {this.awaitFinish();} catch (Exception e) {logger.warn(NAME + " exportAsync occurred an exception.");}ready.set(true);}).start();} else {ready.set(true);}}return this;
}

其中 exportServices(); 就是将服务注册到注册中心

private void exportServices() {configManager.getServices().forEach(sc -> {// TODO, compatible with ServiceConfig.export()ServiceConfig serviceConfig = (ServiceConfig) sc;serviceConfig.setBootstrap(this);if (exportAsync) {ExecutorService executor = executorRepository.getServiceExporterExecutor();Future<?> future = executor.submit(() -> {sc.export();exportedServices.add(sc);});asyncExportingFutures.add(future);} else {sc.export();exportedServices.add(sc);}});
}

那么什么时候 ServiceBean 加入到这个 configManager 里面?

在 AbstractConfig#addIntoConfigManager 中

@PostConstruct
public void addIntoConfigManager() {ApplicationModel.getConfigManager().addConfig(this);
}

回到 export 方法中、我们得知它会调用 ServiceConfig 的 export 方法

public synchronized void export() {if (!shouldExport()) {return;}if (bootstrap == null) {bootstrap = DubboBootstrap.getInstance();bootstrap.init();}checkAndUpdateSubConfigs();//init serviceMetadataserviceMetadata.setVersion(version);serviceMetadata.setGroup(group);serviceMetadata.setDefaultGroup(group);serviceMetadata.setServiceType(getInterfaceClass());serviceMetadata.setServiceInterfaceName(getInterface());serviceMetadata.setTarget(getRef());if (shouldDelay()) {DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);} else {doExport();}exported();
}

这里我们没有进行延迟暴露、直接进入到 doExport 方法中

protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");}if (exported) {return;}exported = true;if (StringUtils.isEmpty(path)) {path = interfaceName;}doExportUrls();
}

没啥好说的、进入到 doExportUrls 中

protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");}if (exported) {return;}exported = true;if (StringUtils.isEmpty(path)) {path = interfaceName;}doExportUrls();
}

针对不同的协议将其注册到服务注册中心上

private void doExportUrls() {ServiceRepository repository = ApplicationModel.getServiceRepository();ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());repository.registerProvider(getUniqueServiceName(),ref,serviceDescriptor,this,serviceMetadata);List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);for (ProtocolConfig protocolConfig : protocols) {String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);// In case user specified path, register service one more time to map it to path.repository.registerService(pathKey, interfaceClass);// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(pathKey);doExportUrlsFor1Protocol(protocolConfig, registryURLs);}
}

主要涉及就是这三行代码

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {.........Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);.........
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

这个主要就是生成一个 Wrapper 。这个 Wrapper 比反射更加高效、因为它是强转类型然后直接调用方法。

当服务消费者来消费的时候就可以直接调用提供服务的 service 了、不需要进行反射

JavassistProxyFactory
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};
}

生成的 Wrapper 部分代码如下

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {com.demo.api.DemoService w;try {w = ((com.demo.api.DemoService) $1);} catch (Throwable e) {throw new IllegalArgumentException(e);}try {if ("sayHello".equals($2) && $3.length == 1) {return ($w) w.sayHello((java.lang.String) $4[0]);}} catch (Throwable e) {throw new java.lang.reflect.InvocationTargetException(e);}throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.demo.api.DemoService.");
}

至于 PROTOCOL.export(wrapperInvoker); 里面涉及到 Dubbo SPI 机制

RegistryProtocol

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {URL registryUrl = getRegistryUrl(originInvoker);// url to export locallyURL providerUrl = getProviderUrl(originInvoker);final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invokerfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registryfinal Registry registry = getRegistry(originInvoker);final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publishboolean register = providerUrl.getParameter(REGISTER_KEY, true);if (register) {register(registryUrl, registeredProviderUrl);}// register stated url on provider modelregisterStatedUrl(registryUrl, registeredProviderUrl, register);// Deprecated! Subscribe to override rules in 2.6.x or before.registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);exporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);notifyExport(exporter);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporter<>(exporter);
}

​ final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);

这里去启动 NettyServer

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {String key = getCacheKey(originInvoker);return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});
}

ProtocolFilterWrapper 这里为这个 Invoker 添加一系列的 Filter 在真正执行服务代码前执行

@Override
public  Exporter export(Invoker invoker) throws RpcException {if (UrlUtils.isRegistry(invoker.getUrl())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.put(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);}openServer(url);optimizeSerialization(url);return exporter;
}
private void openServer(URL url) {// find server.String key = url.getAddress();//client can export a service which's only for server to invokeboolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {ProtocolServer server = serverMap.get(key);if (server == null) {synchronized (this) {server = serverMap.get(key);if (server == null) {serverMap.put(key, createServer(url));}}} else {// server supports reset, use together with overrideserver.reset(url);}}
}
private ProtocolServer createServer(URL url) {url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// enable heartbeat by default.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME).build();String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported server type: " + str + ", url: " + url);}ExchangeServer server;try {server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return new DubboProtocolServer(server);
}

当我们收到消费者的消息时、通过 requestHandler 处理

server = Exchangers.bind(url, requestHandler);
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {@Overridepublic CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);..........RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());Result result = invoker.invoke(inv);return result.thenApply(Function.identity());}

服务启动介绍之后我们回到服务注册

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {URL registryUrl = getRegistryUrl(originInvoker);// url to export locallyURL providerUrl = getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call//  the same service. Because the subscribed is cached key with the name of the service, it causes the//  subscription information to cover.final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invokerfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registryfinal Registry registry = getRegistry(originInvoker);final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publishboolean register = providerUrl.getParameter(REGISTER_KEY, true);if (register) {register(registryUrl, registeredProviderUrl);}.........

​ register(registryUrl, registeredProviderUrl);

private void register(URL registryUrl, URL registeredProviderUrl) {Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);
}

根据你配置的注册中心地址、选择出 Registry

最终去到 ZookeeperRegistry 的 diRegister 中

@Override
public void doRegister(URL url) {try {zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}

到此服务注册基本结束了、只是细节还没去详细讲


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部