java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version08 (借鉴Dubbo源码、gRPC源码、京东何小锋老师开源的RPC框架、Mini RPC、性能优化)能学多少是多少~整起
自己作为初学者,对Dubbo的源码还没有看的很明白很透彻,只是看了其中的一些模块,搭配着视频整理了一些笔记【特此感谢程序员田螺前辈、微观技术前辈、Dubbo源码以及javaGuide提供者各位前辈,的文章中的代码】,整起~

- 之前咱们用SSM或者Springboot写的项目,其实也算是提供的一个服务,准确来说应该是本地服务吧,这个本地服务咱们后台主要是controller—>service,后面---->dao结合框架的mybatis映射文件以及实体,将数据从DB和sercice中相互传递,
所以也可以说controller<----->service也相当于咱们的项目或者说某个应用(程序)向外提供了一个服务
- 所以,也可以这样总结咱们Version01-Version06中的主要工作。比如我新搞了一个XxxController层作为服务消费者,XxxService作为服务提供者。然后实现的过程主要是我服务消费者这边发一个id过去,服务提供者那边帮我通过这个id找一个实体Pilliow给我返还回来【
其实一个RPC其实就是可以让客户端直接调用服务端方法就像调用本地方法那样方便的一个框架,这个框架需要有的一些功能比如:服务发现(服务提供端 Server 向注册中心注册服务,服务消费者 Client 通过注册中心拿到服务相关信息,然后再通过网络请求服务提供端 Server)、负载均衡、容错等】。这个比较简易的过程中实现的重要点主要有以下三个:- 1.我每次都不能只发一个id过去呀,我要有其他参数怎么办,参数多了怎么办。所以我搞了动态代理进来,然后通过InvocationHandler接口的invoke()方法帮我去增强或者说处理一下这个调用过程,从而实现动态生成接口的代理类。
- 2.在网络调用代码中我用的是BIO阻塞模型,当我需求有变化时这里面有很多代码要我重复写好多次,所以我就将网络通信相关代码封装了起来,对外提供一个stub接口,这样调用起来也方便一点。
- 3.我当时比如说,我通过那个id在服务端这边找到这个Pilliow,但是呢,我返回回来的是这个实体的成员变量们,成员变量们说到底也不能和人家一个对象等效,所以我引入了ObjectXxxputStream,能直接传递对象。大概的过程就是这。但是后来呢,也是我由于工作需求,接触到了这里面的东西。因为说到底,哪怕上面这东西写的再完整,总归是单机版的。如果服务消费者由于访问量过大,搞出一个集群。而服务提供者端一般来说,比如说数据库呀、缓存呀都已经是搞成了集群,那么上面那个过程就远远不够了。所以我对自己的过程有了如下改进:分为不同的几个点去考虑设计:服务提供者、服务消费者、注册中心或者说提供服务注册与发现的东西、
- 比如说服务提供者和服务消费着,是不是得用比如SpringBoot,把他们俩搞成两个独立的jar包以便可以独立启动,
- 然后,因为服务提供者很多,服务消费者如何到注册中心中找到唯一的服务提供者,就得搞一个比如说AbstractLoadBalance接口,然后搞出自己的负载均衡实现,以便服务消费者从得到的可选的服务提供者列表中找到可用的服务提供者。
- 另外,服务提供者如果哪个坏了,怎样考虑找到一种合适的方法去下线,你下线不能光拍拍屁股走人了,人家服务消费者这边也得知道,或者说注册中心得通过某些机制保持和你服务提供者的联系,你某一天不干了人家可以及时知道,就不会让服务消费者再选你,浪费时间了
- 然后就是你对象在网络中传输,得在这边序列化压缩、传输过去之后在那边又反序列化解压、然后又序列化压缩传过来等等,那么这个过程也需要自己想负载均衡那样,搞一个什么接口,然后去弄出自己的一个网络传输实现。然后我翻了人家的一些源码,我觉的谈不上看源码,只是觉得确实自己考虑的东西太少了,大概就是自己做这个RPC的一些心得。
- 如果需要和 HTTP 协议打交道,解析和封装 HTTP 请求和响应。这类框架并不能算是“RPC 框架”,比如 Feign
- 所以,也可以这样总结咱们Version01-Version06中的主要工作。比如我新搞了一个XxxController层作为服务消费者,XxxService作为服务提供者。然后实现的过程主要是我服务消费者这边发一个id过去,服务提供者那边帮我通过这个id找一个实体Pilliow给我返还回来【
PART1:自己思考,模拟一下,写一个特简陋版本【咱们Version01-Version06是基于传统的 BIO 的方式 Socket 进行网络传输,然后利用 JDK 自带的序列化机制 来实现这个 RPC 框架的】。下面可能可以算是Version01-Version06的一个升级但是还算是一个简略版吧



- PART1:那咱们接下来自己模拟思考,如何在Version01-Version06基础上再仿照Dubbo的源码升华一下咱们RPCDemo呢~首先呢,咱们得先清楚RPC的工作流程以及其中各个解决的任务等,所以,你不得先看看这一篇能行?这篇很重要,记得这两篇结合看哦
- 第一步:(模拟)服务提供者Provider:dubbo中不会这样干,毕竟人家是远程服务调用嘛。
那咱们模拟其实就是写一个XxxService接口,再用一个子实现类实现这个XxxService接口就行【客户端通过网络传输将请求对象序列化、压缩之后的字节码传输到服务端之后,同样先通过解压、反序列化将字节码重建为请求对象。有了请求对象之后,就可以进行关键的方法调用环节了。】,这样就模拟出了咱们的服务提供者。模拟还是用的Spring。【在 RPC 中,服务提供者这个生产者和和服务消费者有一个共同的服务接口 API。如下,定义一个 XxxService接口】
服务提供者这个生产者要提供服务接口的实现,创建 XxxServiceImpl 实现类并重写接口中的抽象方法- 比如我们的服务接口是:
public interface HelloService {String hello(String name); } - 服务的实现类是:
@RpcService public class HelloServiceImpl implements HelloService {@Overridepublic String hello(String name) {return "a1";} } - 最终新生成的代理类是
//代理类 HelloService$proxy1649315143476 中有一个服务接口类型 HelloService 的静态属性 serviceProxy,值就是通过 ApplicationContext 上下文获取到的服务接口实现类 HelloServiceImpl 这个 Bean(SpringContext 已经被提前缓存到 Container 类中) public class HelloService$proxy1649315143476 {private static cn.ppphuang.rpcspringstarter.service.HelloService serviceProxy = ((org.springframework.context.ApplicationContext)cn.ppphuang.rpcspringstarter.server.Container.getSpringContext()).getBean("helloServiceImpl");public cn.ppphuang.rpcspringstarter.common.model.RpcResponse hello(cn.ppphuang.rpcspringstarter.common.model.RpcRequest request) throws java.lang.Exception {java.lang.Object[] params = request.getParameters();if(params.length == 1&& (params[0] == null||params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){java.lang.String arg0 = null;arg0 = cn.ppphuang.rpcspringstarter.util.ConvertUtil.convertToString(params[0]);java.lang.String returnValue = serviceProxy.hello(arg0);return new cn.ppphuang.rpcspringstarter.common.model.RpcResponse(returnValue);}}public cn.ppphuang.rpcspringstarter.common.model.RpcResponse invoke(cn.ppphuang.rpcspringstarter.common.model.RpcRequest request) throws java.lang.Exception {String methodName = request.getMethod();if(methodName.equalsIgnoreCase("hello")){java.lang.Object returnValue = hello(request);return returnValue;}} } ... public class HelloService$proxy1649315143476 {private static HelloService serviceProxy = ((ApplicationContext)Container.getSpringContext()).getBean("helloServiceImpl");//public RpcResponse hello(RpcRequest request) throws Exception 该方法通过调用 serviceProxy.hello() 的方法获取结果public RpcResponse hello(RpcRequest request) throws Exception {Object[] params = request.getParameters();if(params.length == 1&& (params[0] == null|| params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){String arg0 = ConvertUtil.convertToString(params[0]);String returnValue = serviceProxy.hello(arg0);return new RpcResponse(returnValue);}}//public RpcResponse invoke(RpcRequest request) throws Exception 该方法判断调用的方法名是 hello 来调用代理类中的hello方法。public RpcResponse invoke(RpcRequest request) throws Exception {String methodName = request.getMethod();if(methodName.equalsIgnoreCase("hello")){Object returnValue = hello(request);return returnValue;}} } ... public interface InvokeProxy {/*** invoke调用服务接口*/RpcResponse invoke(RpcRequest rpcRequest) throws Exception; } - 调用代理对象方法:代理对象的生成之后,开始调用代理对象的方法,【抽象类 RequestBaseHandler 有两个实现类 RequestJavassistHandler 和 RequestReflectHandler。】
- Java 反射调用
public class RequestReflectHandler extends RequestBaseHandler {@Overridepublic RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception {Method method = serviceObject.getClazz().getMethod(request.getMethod(), request.getParametersTypes());//用 Java 框架中最常见的反射来调用代理类中的方法,大部分 RPC 框架也都是这么来实现的。Object value = method.invoke(serviceObject.getObj(), request.getParameters());RpcResponse response = new RpcResponse(RpcStatusEnum.SUCCESS);response.setReturnValue(value);return response;} } - 通过 Javassists 【是用源码级别的 api 去修改字节码,Duboo、MyBatis 也都使用了 Javassist】生成的代理对象 invoke 方法调用:
public class RequestJavassistHandler extends RequestBaseHandler {@Overridepublic RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception {//直接将代理对象转为 InvokeProxy,调用 InvokeProxy.invoke() 方法获得返回值。调用代理对象的方法获取到结果,仍要通过序列化、压缩后,将字节流数据包通过网络传输到客户端,客户端拿到响应的结果再解压,反序列化得到结果对象。InvokeProxy invokeProxy = (InvokeProxy) serviceObject.getObj();return invokeProxy.invoke(request);} }
- Java 反射调用
- 比如我们的服务接口是:
- 或者说,
咱们搞一个抽象类 RequestBaseHandler ,RequestBaseHandler 是调用服务方法的抽象实现 handleRequest ,handleRequest 通过请求对象的服务名、服务分组、服务版本在 serverRegister.getServiceObject 获取代理对象。然后调用 invoke 抽象方法来真正通过代理对象调用方法获得结果。人家服务消费者有自己消费者,还有相应的代理类,到服务提供者咱们知道也得有一个Stub实现真正的服务调用,那有两个问题:【服务的代理对象怎么产生的?+如何通过代理对象调用方法?】//抽象类 RequestBaseHandler 是调用服务方法的抽象实现 handleRequest 通过请求对象的服务名、服务分组、服务版本在 serverRegister.getServiceObject 获取代理对象。然后调用 invoke 抽象方法来真正通过代理对象调用方法获得结果。 public abstract class RequestBaseHandler {public RpcResponse handleRequest(RpcRequest request) throws Exception {//1. 查找目标服务代理对象ServiceObject serviceObject = serverRegister.getServiceObject(request.getServiceName() + request.getGroup() + request.getVersion());RpcResponse response = null;//2. 调用对应的方法response = invoke(serviceObject, request);//响应客户端return response;}//具体代理调用public abstract RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception; }- DefaultRpcBaseProcessor 抽象类帮咱们生成服务代理对象:
//DefaultRpcBaseProcessor 抽象类也有两个实现类 DefaultRpcReflectProcessor 和 DefaultRpcJavassistProcessor,来实现关键的生成代理对象的 startServer 方法。 public abstract class DefaultRpcBaseProcessor implements ApplicationListener<ContextRefreshedEvent> {@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {//Spring启动完毕会收到Eventif (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) {ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext();Container.setSpringContext(applicationContext);startServer(applicationContext);injectService(applicationContext);}}private void injectService(ApplicationContext context) {}protected abstract void startServer(ApplicationContext context); } - 服务接口实现类的 Bean 作为代理对象:
//DefaultRpcReflectProcessor 中获取到所有有 RpcService 注解的服务接口实现类 Bean,然后将该 Bean 作为服务代理对象注册到 serverRegister 中供上述的反射调用中使用。 public class DefaultRpcReflectProcessor extends DefaultRpcBaseProcessor {@Overrideprotected void startServer(ApplicationContext context) {Map<String, Object> beans = context.getBeansWithAnnotation(RpcService.class);if (beans.size() > 0) {boolean startServerFlag = true;for (Object obj : beans.values()) {Class<?> clazz = obj.getClass();Class<?>[] interfaces = clazz.getInterfaces();/* 如果只实现了一个接口就用接口的className作为服务名* 如果该类实现了多个接口,则使用注解里的value作为服务名*/RpcService service = clazz.getAnnotation(RpcService.class);if (interfaces.length != 1) {String value = service.value();ServiceObject so = new ServiceObject(value, Class.forName(value), obj, service.group(), service.version());} else {Class<?> supperClass = interfaces[0];ServiceObject so = new ServiceObject(supperClass.getName(), supperClass, obj, service.group(), service.version());}serverRegister.register(so);}}} } - 使用 Javassist 生成新的代理对象
//DefaultRpcJavassistProcessor 与 DefaultRpcReflectProcessor 的差异在于后者直接将服务实现类对象 Bean 作为服务代理对象,而前者通过 ProxyFactory.makeProxy(value, beanName, declaredMethods) 创建了新的代理对象,将新的代理对象注册到 serverRegister 中供后续调用调用中使用。该方法通过 Javassist 来生成代理类 public class DefaultRpcJavassistProcessor extends DefaultRpcBaseProcessor {@Overrideprotected void startServer(ApplicationContext context) {Map<String, Object> beans = context.getBeansWithAnnotation(RpcService.class);if (beans.size() > 0) {boolean startServerFlag = true;for (Map.Entry<String, Object> entry : beans.entrySet()) {String beanName = entry.getKey();Object obj = entry.getValue();Class<?> clazz = obj.getClass();Class<?>[] interfaces = clazz.getInterfaces();Method[] declaredMethods = clazz.getDeclaredMethods();/** 如果只实现了一个接口就用接口的className作为服务名* 如果该类实现了多个接口,则使用注解里的value作为服务名*/RpcService service = clazz.getAnnotation(RpcService.class);if (interfaces.length != 1) {String value = service.value();//bean实现多个接口时,javassist代理类中生成的方法只按照注解指定的服务类来生成declaredMethods = Class.forName(value).getDeclaredMethods();Object proxy = ProxyFactory.makeProxy(value, beanName, declaredMethods);ServiceObject so = new ServiceObject(value, Class.forName(value), proxy, service.group(), service.version());} else {Class<?> supperClass = interfaces[0];Object proxy = ProxyFactory.makeProxy(supperClass.getName(), beanName, declaredMethods);ServiceObject so = new ServiceObject(supperClass.getName(), supperClass, proxy, service.group(), service.version());}serverRegister.register(so);}}} }
- DefaultRpcBaseProcessor 抽象类帮咱们生成服务代理对象:
- 第二步:咱们使用 Spring 来管理 bean,采用自定义 xml 和解析器的方式来
将第二步中的服务实现类载入容器(当然也可以采用自定义注解的方式)并将服务接口信息注册到注册中心【后来就不能这样玩了,系统选用 Zookeeper 作为注册中心】。首先自定义xsd,然后分别指定schema和xmd、schema和对应handler的映射

- 将编写好的文件放入 classpath 下的 META-INF 目录下

- 然后是不是得在Spring配置文件中配置服务类,并编写对应的处理器类和解析类,就是实现
将服务实现类载入 Spring 容器中,且服务接口信息也注册到了注册中心。大概的思路如上,再看看人家Dubbo中咋搞的。
- 将编写好的文件放入 classpath 下的 META-INF 目录下
- 第三步:(模拟)服务消费者端Consumer
- 服务消费者的代理服务接口生成代理对象,然后连接 zookeeper,拿到服务地址列表,通过客户端负载策略获取合适的服务地址,通过 Netty等进行远程方法调用,也就是发送消息(也就是调用服务需要的一些参数之类的),并获取响应结果
//客户端调用本地方法一样调用远程方法的完美体验与 服务消费者端的Java 动态代理或者服务消费者端的Stub的强大密不可分。而咱们代码中的体现就是ClientProxyFactory 类的 getProxy来帮咱们创建了接口的代理类,对该接口的所有方法都会使用创建的代理类来调用 //DefaultRpcBaseProcessor 抽象类实现了 ApplicationListener public abstract class DefaultRpcBaseProcessor implements ApplicationListener<ContextRefreshedEvent> {@Override//ApplicationListener中的onApplicationEvent 方法在 Spring 项目启动完毕会收到时间通知,获取 ApplicationContext 上下文之后开始注入服务 injectService (依赖的其他服务)或者启动服务 startServer (自身服务实现)。public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {//Spring启动完毕会收到Eventif (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) {ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext();//保存spring上下文 后续使用Container.setSpringContext(applicationContext);startServer(applicationContext);injectService(applicationContext);}} //injectService 方法会遍历 ApplicationContext 上下文中的所有 Bean , Bean 中是否有属性使用了 InjectService 注解。有的话生成代理类,注入到 Bean 的属性中private void injectService(ApplicationContext context) {String[] names = context.getBeanDefinitionNames();for (String name : names) {Object bean = context.getBean(name);Class<?> clazz = bean.getClass();//clazz = clazz.getSuperclass(); aop增强的类生成cglib类,需要Superclass才能获取定义的字段Field[] declaredFields = clazz.getDeclaredFields();//设置InjectService的代理类for (Field field : declaredFields) {InjectService injectService = field.getAnnotation(InjectService.class);if (injectService == null) {continue;Class<?> fieldClass = field.getType();Object object = context.getBean(name);field.set(object, clientProxyFactory.getProxy(fieldClass, injectService.group(), injectService.version()));ServerDiscoveryCache.SERVER_CLASS_NAMES.add(fieldClass.getName());}}}protected abstract void startServer(ApplicationContext context); }//调用 ClientProxyFactory 类的 getProxy ,getProxy方法中也是逃脱不了咱们动态代理的原理,调用InvocationHandler 的invoke,根据服务接口、服务分组、服务版本、是否异步调用来创建该接口的代理类,对该接口的所有方法都会使用创建的代理类来调用。方法调用的实现细节都在 ClientInvocationHandler 中的 invoke 方法,主要内容是,获取服务节点信息,选择调用节点,构建 request 对象,最后调用网络模块发送请求。 public class ClientProxyFactory {public <T> T getProxy(Class<T> clazz, String group, String version, boolean async) {return (T) objectCache.computeIfAbsent(clazz.getName() + group + version, clz -> Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new ClientInvocationHandler(clazz, group, version, async)));}private class ClientInvocationHandler implements InvocationHandler {public ClientInvocationHandler(Class<?> clazz, String group, String version, boolean async) {}//调用 ClientProxyFactory 类的 getProxy ,getProxy方法中也是逃脱不了咱们动态代理的原理,调用InvocationHandler 的invoke,根据服务接口、服务分组、服务版本、是否异步调用来创建该接口的代理类,对该接口的所有方法都会使用创建的代理类来调用。方法调用的实现细节都在 ClientInvocationHandler 中的 invoke 方法,主要内容是,获取服务节点信息,选择调用节点,构建 request 对象,最后调用网络模块发送请求。@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//1. 获得服务信息String serviceName = clazz.getName();List<Service> serviceList = getServiceList(serviceName);Service service = loadBalance.selectOne(serviceList);//2. 构建request对象RpcRequest rpcRequest = new RpcRequest();rpcRequest.setServiceName(service.getName());rpcRequest.setMethod(method.getName());rpcRequest.setGroup(group);rpcRequest.setVersion(version);rpcRequest.setParameters(args);rpcRequest.setParametersTypes(method.getParameterTypes());//3. 协议编组RpcProtocolEnum messageProtocol = RpcProtocolEnum.getProtocol(service.getProtocol());RpcCompressEnum compresser = RpcCompressEnum.getCompress(service.getCompress());RpcResponse response = netClient.sendRequest(rpcRequest, service, messageProtocol, compresser);return response.getReturnValue();}} } - 咱们知道动态代理是在服务消费者端使用的,帮服务消费者生成一个代理类,完成参数信息的传送。咱们在细看一下这里面的细节:
- 在项目中,当我们要使用 RPC 的时候,
我们一般的做法是先找服务提供方要接口,通过 Maven 或者其他的工具把接口依赖到我们项目中。我们在编写业务逻辑的时候,如果要调用服务提供方的接口,我们就只需要通过依赖注入的方式把接口注入到项目中就行了,然后在代码里面直接调用接口的方法 。接口里并不会包含真实的业务逻辑,业务逻辑都在服务提供方应用里【也就是接口的很多很多实现类中】,但我们通过调用接口方法,确实拿到了想要的结果,这里面用到的核心技术就是动态代理【RPC 会自动给服务提供者端的暴露出来的接口生成一个代理类,当我们在项目中注入服务提供者端的提供服务的接口的时候,运行过程中实际绑定的是这个接口生成的代理类。这样在服务提供者端的暴露出来的接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样我们就可以在生成的代理类里面,加入远程调用逻辑。通过这种“偷梁换柱”的手法,就可以帮用户屏蔽远程调用的细节,实现像调用本地一样地调用远程的体验】 - 举个例子【其实这个例子也就是在回忆咱们动态代理,也就是文章Spring右臂中的右臂AOP那里提到的动态代理而已】。
给 HelloDemoByHu接口生成一个动态代理类,并调用接口 sayAiYWM() 方法,但真实返回的值居然是来自 RealHelloDemoByHu里面的 invoke() 方法返回值/** *要代理的接口 */ public interface HelloDemoByHu{String sayAiYWM(); }/** * 真实调用对象 */ public class RealHelloDemoByHu{public String invoke(){return " Ai YWM";} }/** * JDK代理类生成 */ public class JDKProxy implements InvocationHandler{private Object target;JDKProxy(Object target){this.target = target;}@Overridepublic Object invoke(Object proxy, Method method, Object[] paramValue){return ((RealHelloDemoByHu)target).invoke();} }/** * 测试示例 */ public class TestProxy{public static void main(String[] args){//构造代理器JDKProxy proxy = new JDKProxy(new RealHelloDemoByHu());ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader(); //把生成的代理类保存到文件 Sysyem.setProperty("sun.misc.ProxyGenerator.saveGeneratedFiles","true");//通过代理器生成代理类HelloDemoByHu hello = Proxy.newProxyInstance(classLoader, new Class[]{HelloDemoByHu.class}, proxy);// 方法调用 System.out.println(hello.sayAiYWM());} } - 里面通过咱们自己写的代理器【代理器一般要实现InvocationHandler接口并重写invoke的方法哦】生成代理类Proxy,生成代码主要就是这一句:Proxy.newProxyInstance(…),这一句的主要流程就是:
- 在 Java 领域,
除了 JDK 默认的 InvocationHandler 能完成代理功能,我们还有很多其他的第三方框架也可以,比如像 Javassist、Byte Buddy 这样的框架【这三种框架的区别就只是通过什么方式生成的代理类以及在生成的代理类里面是怎么完成的方法调用。同时呢,也正是因为这些细小的差异,才导致了不同的代理框架在性能方面的表现不同】
- 使用 JDK 默认的代理功能,最大的问题就是性能问题。它生成后的代理类是使用反射来完成方法调用的,而这种方式相对直接用编码调用来说,性能会降低,但好在 JDK8 及以上版本对反射调用的性能有很大的提升
- Javassist 的定位是能够操纵底层字节码,所以使用起来并不简单,要生成动态代理类恐怕是有点复杂了。但好的方面是,通过 Javassist 生成字节码,不需要通过反射完成方法调用,所以性能肯定是更胜一筹的。但是在使用中通过 Javassist 生成一个代理类后,此 CtClass 对象会被冻结起来,不允许再修改;否则,再次生成时会报错。
- Byte Buddy:像 Spring、Jackson 都用到了 Byte Buddy 来完成底层代理。相比 Javassist,Byte Buddy 提供了更容易操作的 API,编写的代码可读性更高。更重要的是,生成的代理类执行速度比 Javassist 更快。
- 在 Java 领域,
- 通过反编译工具打开class文件时会看到:
package com.sun.proxy;import com.proxy.Hello; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException;public final class $Proxy0 extends Proxy implements Hello {private static Method m3;private static Method m1;private static Method m0;private static Method m2;//$Proxy0 类里面有一个跟 HelloDemoByHu 一样签名的 sayAiYWM() 方法,其中 this.h 绑定的是刚才传入的 JDKProxy 对象,所以当我们调用 HelloDemoByHu.sayAiYWM() 的时候,其实它是被转发到了 JDKProxy.invoke()public $Proxy0(InvocationHandler paramInvocationHandler) {super(paramInvocationHandler);}public final String sayAiYWM() {try {return (String)this.h.invoke(this, m3, null);} catch (Error|RuntimeException error) {throw null;} catch (Throwable throwable) {throw new UndeclaredThrowableException(throwable);} }public final boolean equals(Object paramObject) {try {return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();} catch (Error|RuntimeException error) {throw null;} catch (Throwable throwable) {throw new UndeclaredThrowableException(throwable);} }public final int hashCode() {try {return ((Integer)this.h.invoke(this, m0, null)).intValue();} catch (Error|RuntimeException error) {throw null;} catch (Throwable throwable) {throw new UndeclaredThrowableException(throwable);} }public final String toString() {try {return (String)this.h.invoke(this, m2, null);} catch (Error|RuntimeException error) {throw null;} catch (Throwable throwable) {throw new UndeclaredThrowableException(throwable);} }static {try {m3 = Class.forName("com.proxy.Hello").getMethod("say", new Class[0]);m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);return;} catch (NoSuchMethodException noSuchMethodException) {throw new NoSuchMethodError(noSuchMethodException.getMessage());} catch (ClassNotFoundException classNotFoundException) {throw new NoClassDefFoundError(classNotFoundException.getMessage());} } }
- 在项目中,当我们要使用 RPC 的时候,
- 服务消费者的代理服务接口生成代理对象,然后连接 zookeeper,拿到服务地址列表,通过客户端负载策略获取合适的服务地址,通过 Netty等进行远程方法调用,也就是发送消息(也就是调用服务需要的一些参数之类的),并获取响应结果
- 第四步:网络传输【
客户端封装调用请求对象之后需要通过网络将调用信息发送到服务端,在发送请求对象之前还需要经历序列化、压缩两个阶段。】
- 虽然网络传输这里的最终走向是Netty,但是咱们得先了解BIO到底有啥缺点被人家Netty给改进了。
- BIO的客户端与服务端
//服务端 public class ServerDemoByHu{//为了规范,提前部署好日志的记录环境private static final Logger logger = LoggerFactory.getLogger(ServerDemoByHu.class);public static void main(String[] args){ServertDemoByHu huServer = new ServerDemoByHu();huServer.stub(2221);//stub是我自己想把网络细节封装起来,对外提供的一个stub方法,这跟Version01~Version06一样}//你封装成啥样,你得让人家知道吧public void stub(int port){//try...catch的细节就省了,不过正式写代码还是加上好一些,规范ServerSocket server = new ServerSocket(port);//后期不就可以把这个port封装到配置文件中了嘛//ServerSocket的accept()方法是阻塞方法,accept()方法被调用时,也就是服务端在等待客户端的连接请求时会卡,直到服务端收到客户端连接时才会继续往下while((socket = server.accept()) != null){logger.info(“如果程序走到这里,打印出这个括号中的话就说明客户端的bind、listen、accept等步骤已经进行完了,客户端已经和服务端建立起连接了”);//...,下来就是客户端或者客户端代理通过输出流发送请求信息,服务器通过输入流读取客户端发送来的请求信息。同样的服务端通过输出流向客户端发送响应消息。一般都是固定写法:ObjectInputStream oi = new ObjectInputStream(socket.getInputStream());Message message = (Message)oi.readObject();logger.info(...);message.setContent("你想发啥呀");ObjectOutputStream oo = new ObjectOutputStream(socket.getOutputStream());oo.writeObject(message);oo.flush();...}} }//客户端 public class ClientDemoByHu{//为了规范,提前部署好日志的记录环境private static final Logger logger = LoggerFactory.getLogger(ClientDemoByHu.class);public static void main(String[] args){ClientDemoByHu huClient = new ClientDemoByHu();Message message = (Message) huClient.send(new Message("我客户端想发送这一坨东西"),"127.0.0.1",2221);}public Object send(Message message, String host, int port){//try...catch的细节就省了,不过正式写代码还是加上好一些,规范Socket socket = new Socket(host, port);//后期不就可以把这个host, port封装到配置文件中了嘛ObjectOutputStream oo = new ObjectOutputStream(socket.getOutputStream());//通过输出流向服务器端发送请求信息oo.writeObject(message);//通过输入流获取服务器响应的信息ObjectInputStream oi = new ObjectInputStream(socket.getInputStream());return oi.readObject();} }//发送的消息实体类 public class Message implements Serialiable{private String content; }- 在引入基于NIO的网络编程框架Netty之前,【为什么不用NIO,就是因为用同步非阻塞的NIO来进行网络编程简直太麻烦了,框架在使用方面还是挺想的,你想想为什么Spring能这么火?】,因为BIO下一个线程只能一次处理一个客户端的连接,如果需要管理多个客户端的连接就需要为我们请求的每一个单独的客户端单独创建一个线程。但是你这也太不现实了,太浪费资源了吧。
所以有一种比较简单且实际的改进方法就是使用线程池,可以指定创建线程的最大数量,就不会导致线程创建过多,让线程的创建和回收成本降低。但BIO就是BIO,竞技世界菜是原罪。还是得Netty来。
- 在引入基于NIO的网络编程框架Netty之前,【为什么不用NIO,就是因为用同步非阻塞的NIO来进行网络编程简直太麻烦了,框架在使用方面还是挺想的,你想想为什么Spring能这么火?】,因为BIO下一个线程只能一次处理一个客户端的连接,如果需要管理多个客户端的连接就需要为我们请求的每一个单独的客户端单独创建一个线程。但是你这也太不现实了,太浪费资源了吧。
- Netty:
- https://github.com/AIminminAI/DIYByHu_BIOUpgradeNetty
- https://gitee.com/aiminminai/diyby-hu_-bioupgrade-netty#diybyhu_bioupgradenetty
- BIO的客户端与服务端
- 序列化与反序列化:序列化与反序列化的核心作用就是
对象的保存与重建,方便客户端与服务端通过字节流传递对象,快速对接交互。【序列化就是指把 Java 对象转换为字节序列的过程。反序列化就是指把字节序列恢复为 Java 对象的过程。】Java序列化的方式有很多,诸如 JDK 自带的 Serializable 、 Protobuf 、 kryo(性能最高的是 Kryo 、其次是 Protobuf)public interface MessageProtocol {byte[] marshallingRequest(RpcRequest request) throws Exception;RpcRequest unmarshallingRequest(byte[] data) throws Exception;byte[] marshallingResponse(RpcResponse response) throws Exception;RpcResponse unmarshallingResponse(byte[] data) throws Exception; }-
自己做的简略版中用的是借鉴javaGuide老师的Kryo思路。https://github.com/AIminminAI/DIYByHu_BIOUpgradeNetty。【Kryo序列化代码】

-
在RPCVersion07版本中,提到了RPC的具体工作流程,其中涉及到了RpcRequest和RpcReponse两个客户端与服务端进行交互的实体类。
客户端的Stub接收到调用请求后封装出一个RpcRequest消息体出来,然后客户端将RpcRequest类型的对象发送到服务端,服务端再将响应结果封装为能够进行网络传输的RpcRequest对象返回给客户端。

-
- 压缩与解压:网络通信的成本很高,
为了减小网络传输数据包的体积,将序列化之后的字节码压缩不失为一种很好的选择。Gzip 压缩算法比率在3到10倍左右,可以大大节省服务器的网络带宽,各种流行的 web 服务器也都支持 Gzip 压缩算法。Java 接入也比较容易,接入代码可以查看下方接口的实现。public interface Compresser {byte[] compress(byte[] bytes);byte[] decompress(byte[] bytes); } - 网络通信:
将请求对象序列化成字节码,并且压缩体积之后,需要使用网络将字节码传输到服务器。常用网络传输协议有 HTTP 、 TCP 、 WebSocke t等。HTTP、WebSocket【 捡田螺的小男孩老师关于websocket协议的解释,值得一看】是应用层协议,TCP 是传输层协议。追求简洁、易用的 RPC 框架可以选择 HTTP 协议的。TCP传输的高可靠性和极致性能是主流RPC框架选择的最主要原因。选用 Netty 作为网络通信模块, TCP 数据流的粘包、拆包不可避免。- Dubbo 使用 Netty 作为底层的网络通信框架,采用了我们熟悉的主从 Reactor 线程模型。
- 其中 Boss 和 Worker 线程池就可以看作 I/O 线程【
I/O 线程可以理解为主要负责处理网络数据,例如事件轮询、编解码、数据传输等。如果业务逻辑能够立即完成,也可以使用 I/O 线程进行处理,这样可以省去线程上下文切换的开销。如果业务逻辑耗时较多,例如包含查询数据库、复杂规则计算等耗时逻辑,那么 I/O 必须将这些请求分发到业务线程池中进行处理,以免阻塞 I/O 线程】 哪些请求需要在 I/O 线程中执行,哪些又需要在业务线程池中执行呢?Dubbo 框架的做法值得借鉴,它给用户提供了多种选择,它一共提供了 5 种分发策略

- 其中 Boss 和 Worker 线程池就可以看作 I/O 线程【
- 粘包、拆包问题:TCP 传输协议是一种
面向连接的、可靠的、基于字节流的传输层通信协议。为了最大化传输效率。发送方可能将单个较小数据包合并发送,这种情况就需要接收方来拆包处理数据了。 - Netty 提供了
3种类型的解码器来处理 TCP 粘包/拆包问题:- 定长消息解码器:FixedLengthFrameDecoder 。
发送方和接收方规定一个固定的消息长度,不够用空格等字符补全,这样接收方每次从接受到的字节流中读取固定长度的字节即可,长度不够就保留本次接受的数据,再在下一个字节流中获取剩下数量的字节数据。 - 分隔符解码器:LineBasedFrameDecoder 或 DelimiterBasedFrameDecoder。LineBasedFrameDecoder 是行分隔符解码器,分隔符为 \n 或 \r\n ;DelimiterBasedFrameDecoder 是自定义分隔符解码器,可以定义一个或多个分隔符。
接收端在收到的字节流中查找分隔符,然后返回分隔符之前的数据,没找到就继续从下一个字节流中查找 - 数据长度解码器:LengthFieldBasedFrameDecoder。
将发送的消息分为 header 和 body,header 存储消息的长度(字节数),body 是发送的消息的内容。同时发送方和接收方要协商好这个 header 的字节数,因为 int 能表示长度,long 也能表示长度。接收方首先从字节流中读取前n(header的字节数)个字节(header),然后根据长度读取等量的字节,不够就从下一个数据流中查找。- Netty 也提供了很多开箱即用的拆包器,推荐最广泛使用的 LengthFieldBasedFrameDecoder,它可以满足实际项目中的大部分场景。如果对 LengthFieldBasedFrameDecoder 的参数不够熟悉,实际直接使用 ByteBuf 反而更加直观,根据个人喜好按需选择。
- 当然也可以自定义解码器哦
- 定长消息解码器:FixedLengthFrameDecoder 。
- Dubbo 使用 Netty 作为底层的网络通信框架,采用了我们熟悉的主从 Reactor 线程模型。
- 虽然网络传输这里的最终走向是Netty,但是咱们得先了解BIO到底有啥缺点被人家Netty给改进了。
- 第一步:(模拟)服务提供者Provider:dubbo中不会这样干,毕竟人家是远程服务调用嘛。
PART2:上面看完了咱们自己的想法,咱们再看看人家gRPC中的好想法gRPC源码、
- gRPC
- 特点:
- gRPC 是由 Google 开发并且开源的一款
高性能、跨语言的 RPC 框架,当前支持 C、Java 和 Go 等语言,当前 Java 版本最新 Release 版为 1.27.0 跨语言,通信协议是基于标准的 HTTP/2 设计的,序列化支持 PB(Protocol Buffer)和 JSON
- gRPC 是由 Google 开发并且开源的一款
- 调用过程:

- 实现一个Demo:
定义一个 say 方法,调用方通过 gRPC 调用服务提供方的对应服务,然后服务提供方通过调用具体实现,经过处理后会返回一个字符串给调用方【当然其中很细节的东西,去看Version07吧】- 为了保证调用方和服务提供方能够正常通信,我们需要先约定一个通信过程中的
契约,也就是我们在 Java 里面说的定义一个接口,这个接口里面只会包含一个 say 方法//在 gRPC 里面定义接口是通过写 Protocol Buffer 代码,从而把接口的定义信息通过 Protocol Buffer 语义表达出来syntax = "proto3";option java_multiple_files = true; option java_package = "io.grpc.hello"; option java_outer_classname = "HelloProto"; option objc_class_prefix = "HLW";package hello;service HelloService{ rpc Say(HelloRequest) returns (HelloReply) {} }message HelloRequest { string name = 1; }message HelloReply { string message = 1; } - 有了这段代码,
我们就可以为客户端和服务器端生成消息对象和 RPC 基础代码。我们可以利用 Protocol Buffer 的编译器 protoc,再配合 gRPC Java 插件(protoc-gen-grpc-java),通过命令行 protoc3 加上 plugin 和 proto 目录地址参数,我们就可以生成消息对象和 gRPC 通信所需要的基础代码。如果项目是 Maven 工程的话,你还可以直接选择使用 Maven 插件来生成同样的代码。 - 发送原理:生成完基础代码以后,我们就可以基于生成的代码写下调用端或者说服务消费者端代码。调用端代码大致分成三个步骤:
首先用 host 和 port 生成 channel 连接然后用前面生成的 HelloService gRPC 创建 Stub 类- 最后我们可以
用生成的这个 Stub 调用 say 方法发起真正的 RPC 调用【response = blockingStub.say(request); 这一句就是发起了请求】,后续其它的 RPC 通信细节就对我们使用者透明了
package io.grpc.helloimport io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException import java.util.concurrent.TimeUnit;public class HelloByHuClient{private final ManagedChannel channel;private final HelloServiceGrpc.HelloServiceBlockingStub blockingStub;/** 构建Channel连接*/public HelloByHuClient(String host, int port){this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build());}/*** 构建Stub用于发送服务消费者的调用请求*/HelloByHuClient(ManagedChannel channel){this.channel = channel;blockingStub = HelloServiceGrpc.newBlockingStub(channel);}//远程调用请求发送完手动关闭public void shutdown() throws InterruptedException {channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); }/*** 发送rpc请求*/public void say(String name){HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try { //发送请求 response = blockingStub.say(request); } catch (StatusRuntimeException e) { return; } System.out.println(response);}public static void main(String[] args) throws Exception { HelloByHuClient client = new HelloByHuClient("127.0.0.1", 50051);try { client.say("world"); } finally { client.shutdown(); } } } - 发送完了,咱们知道过程中肯定也得有序列化相关的,因为你服务消费者把请求参数不能直接扔到人家服务提供者端,要进行序列化、压缩等
- 二进制流经过网络传输后,怎么正确地还原请求前语义【
在请求收到后需要进行请求“断句”,那肯定就需要在发送的时候把断句的符号加上】?gRPC 的通信协议是基于标准的 HTTP/2 设计的,而 HTTP/2 相对于常用的 HTTP/1.X 来说,它最大的特点就是多路复用、双向流【HTTP/1.X 就是单行道,HTTP/2 就是双行道。】,因为 gRPC 是基于 HTTP/2 协议,而 HTTP/2 传输基本单位是 Frame,Frame 格式是以固定 9 字节长度的 header,后面加上不定长的 payload 组成

- 二进制流经过网络传输后,怎么正确地还原请求前语义【
- 接收原理:服务提供方收到请求后会怎么处理【
服务对外暴露的目的是让从服务消费者过来的请求在被还原成信息后,能找到对应接口的实现】//HelloServiceImpl 类是按照 gRPC 使用方式实现了 HelloService 接口逻辑,但是对于调用者来说并不能把它调用过来,因为我们没有把这个接口对外暴露。在 gRPC 里面我们是采用 Build 模式对底层服务进行绑定 static class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Overridepublic void say(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();} }package io.grpc.hello;import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver;import java.io.IOException;public class HelloWorldServer {private Server server;/*** 对外暴露服务**/private void start() throws IOException {int port = 50051;server = ServerBuilder.forPort(port).addService(new HelloServiceImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {HelloWorldServer.this.stop();}});}/*** 关闭端口**/private void stop() {if (server != null) {server.shutdown();}}/*** 优雅关闭**/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}public static void main(String[] args) throws IOException, InterruptedException {final HelloWorldServer server = new HelloWorldServer();server.start();server.blockUntilShutdown();}}- 我们得先保证服务消费者那端能够正常把请求调用信息发送过来吧。我们需要先保证能正常接收请求,通俗地讲就是要
先开启一个 TCP 端口,让调用方也就是服务消费者方可以建立连接,并把二进制数据发送到这个连接通道里面

- 我们得先保证服务消费者那端能够正常把请求调用信息发送过来吧。我们需要先保证能正常接收请求,通俗地讲就是要
- 为了保证调用方和服务提供方能够正常通信,我们需要先约定一个通信过程中的
- 总的来说:可以简单地认为
gRPC 就是采用 HTTP/2 协议,并且默认采用 PB 序列化方式的一种 RPC,它充分利用了 HTTP/2 的多路复用特性,使得我们可以在同一条链路上双向发送不同的 Stream 数据,以解决 HTTP/1.X 存在的性能问题。【当然你Dubbo牛逼,人家gRPC也不差,都是RPC框架呀,都一样。】- 也就是说,在实际使用的时候,
我们的服务提供方通常都是以一个集群的方式对外提供服务的,所以在 gRPC 里面也可以看到负载均衡、服务发现等功能。而且 gRPC 采用的是 HTTP/2 协议,我们还可以通过 Stream 方式来调用服务,以提升调用性能。
- 也就是说,在实际使用的时候,
- 特点:
PART3:上面看完了gRPC的想法,咱们再看看人家Dubbo中的好想法:
-
Provider肯定不能这么写呀,得升级:
pub1ic class Provider {pub1ic static void main(String[] args) {//暴露服务。启动应用时就通过调用LocalRegister的regist把接口与其子实现类的对应关系存起来了LocalRegister.regist(HelloService.class.getName()+versionx, Hel1oServiceImp1.class);//假设当前用户的配置tomcat、netty、 jetty,然后接收的请求也就是Http请求,当然上面咱们说了,也可以接收dubbo请求,只是举一种例子HttpServer httpServer = new HttpServer();httpServer.start(hostname: "localhost", port: 8080);}pub1ic class Hel1oServiceImp1 imp1ements He11oService {pub1ic String sayHe11o(String userName){return "Hello:" + userName ; }-
dubbo中一个服务的唯一标识:接口的名字(HelloService.Class.getName ( ))+group+versionx
-
服务提供者接收到服务消费者发来的Http请求或者Dubbo请求之后---->会先接收请求并解析请求(所以,要从请求里面解析出来的所谓的“欲调用服务的四个参数”:我调用的是哪个接口(大服务)中的哪个小服务(方法),然后你调用的这个子服务有可能需要参数,那你是不是要不参数以及参数类型列表给人家传过去)---->调用某个大服务中的子服务(相当于远程调用咱们接口中的某个方法,仅此而已)---->所以你服务消费者是不是得把这四个传过去给服务提供者或者服务提供者的代理那里

-
当咱们在启动Provider这个应用时,咱们内嵌的tomcat已经启动起来了,咱们此时就可以像8080端口发送http请求了,由tomcat中的servelt去处理接收到的http请求。tomcat每接收到一个请求,就会将请求交给Dispatcherservlet 进行转发(Dispatcherservlet 负责接收请求并转发请求),然后Dispatcherservlet 将请求交给HttpServerHandler进行处理,也就是解析//Dispatcherservlet 负责接收请求并转发请求 pub1ic class Dispatcherservlet extends HttpServlet {@overrideprotected void service (HttpServletRequest req, HttpServletResponse resp) throws ServletExceptionnew HttpServerHandler().handler(res, resp);} }//Dispatcherservlet 负责接收请求并转发请求,处理请求我们还得另外写一个HttpServerHandler类,专门来处理请求 pubTic class HttpServerHandler {//handler()方法负责处理请求pub1ic void handler(HttpServletRequest req, HttpServletResponse resp) {//处理请求的逻辑目的,就是从请求中解析出来四个东西:我调用的是哪个接口(大服务)中的哪个小服务(方法),然后你调用的这个子服务有可能需要参数,那你是不是要不参数以及参数类型列表给人家传过去//解析请求--->调用哪个服务--->1. HelloService2. sayHello 3.参数类型列表4. 方法参数try {Invocation invocation = JSONobject.parseObject(req.getInputStream(),Invocation.class);//根据服务消费者发来的http请求或者dubbo请求,得到请求中接口的名字(大服务),然后由本地注册得到则个接口对应的子实现类String interfaceName = invocation.getInterfaceName();String methodName = invocation.getMethodName();Class[] paramType = invocation.getParamType();/***拿到方法后肯定得执行方法呀,我是通过反射执行方法的,但是我通过反射不可能执行接口(也就是大服务)里面的方法(小服务),肯定执行的是这个接口(也就是大服务)的子实现类中的方法(小服务)* 所以下面三行代码就是在执行服务提供者提供的大服务(众多接口中的某个接口的子实现类)中的子服务(也就是接口中的子实现类的方法)*///然后由本地注册得到则个接口对应的子实现类impl,然后执行(接口的子实现类中的)服务(或者叫方法)Class impl = LocalRegister.get(interfaceName);Method method = imp1.getMethold(methodName,paramType);//调用服务提供者中的哪个服务(也就是哪个方法),通过反射执行这个服务(也就是方法)object result = method.invoke (imp1.newInstance(),invocation.getParams());//返回执行结果,或者响应请求IOUtils.write(result, resp.getOutputStream());} catch (IOException e) {e.printStackTrace();}//代表服务消费者当前要调用的是哪一个服务pub1ic class Invocation implements Serializable {private String interfaceName;private Stri ng methodName;private Class[] paramType;private object[] params;pub1ic Invocation(String interfaceName, String methodName, object[] params, Class[] paramType) {this.interfaceName = interfaceName;this.methodName = methodName;this.params = params;this.paramType = paramType;}...//上面属性的get/set方法们 -
服务提供者其实实现有两种思路:
- 捋一下,如果咱们的web项目或者叫maven项目,
A要依赖B的话,一般第一种方法就是把B以依赖坐标的形式添加到A的pom.xml中(老早咱们是引入jar包的形式),然后导入包直接实例化出来,用就行,这不就是咱们之前干项目一直这样干的嘛。这种,咱们以后分布式项目一般都是服务提供者和服务消费者属于两个不同的项目或者应用程序,可能会分别部署在不同的机器上,你这样肯定行不通 - 另一种方法就是,让动态代理帮咱们针对被调用的人动态生成一个代理对象,意思就是相当于买房子,动态代理为咱们动态生成一个中介帮咱们买房子。咱们服务消费者要调用服务提供者所在机器或者主机上的服务时会先把要调用的“欲调用服务的四个参数”发给服务提供者所在主机的服务对应的代理类,然后代理类利用自己的invoke方法中的逻辑进行一系列的处理,然后返回调用结果。
- 代理类的产生
pubTic class ProxyFactory {pub1ic static <T> T getproxy(fina1 Class interfaceClass) {//用户的配置jdk,用了jdk的动态代理之后就写活了return (T) proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class []{interfaceClass},@overridepub1ic object invoke(object proxy, Method method, object[] args) throws Throwable// 用咱们自己写的HttpC1ient来发送数据,发了一个啥,发了一个invocation对象HttpC1ient httpClient = new Httpc1ient();//代表服务消费者当前要调用的是哪一个服务,假设要调用He11oService这个接口中的sayHello()方法,然后调用这个方法需要传入的参数以及参数类型是啥Invocation invocation = new Invocation(interfaceClass.getName(),method.getName(), method.getParameterTypes(), args);//数据要发送到1oca1host主机的8080端口上,但是也不能把域名和端口像现在这样写死呀,所以用到了zookeeper注册中心//String result = httpClient.send( hostname: "1oca1host",port: 8080,invocation);/* 服务提供者可能会有集群,100台服务器上都有各自的服务提供者(因为咱们用集群横向扩展,是因为访问量太大的,所以这些集群上运行的俄式同样的代码),那我服务消费者发请求进行远程服务调用时我怎么知道调用的是哪一台机器上的服务呢(对于服务消费者而言调用哪一个都行,因为反正你部属的服务都是同样的嘛)* 此时dubbo加了一个zookeeper注册中心,dubbo在启动你里面的服务时要把你的服务调用地址注册到zookeeper注册中心,【注册中心中保存了这俩个东西:当前所注册的这个服务的名字(也就是接口的名字)和各个服务所在机器的ip[ip1, ip2, ...]】* 服务消费者先从zookeeper得到服务提供者对应的ipList,然后根据负载均衡策略(随机、哈希、轮询...)从ipList中选择出唯一的一个ip(相当于选择出来了一个机器以及机器上的服务)* 然后服务消费者就可以将自己的请求发送给服务提供者的代理类,代理类获知你要调的服务(接口)名字是啥,服务中的小服务是啥,调用所需的参数以及参数列表是啥,通过反射得到,传过来给我* **有个缺陷就是这个从注册中心查,再根据负载均衡策略得到唯一的服务提供者所暴露的服务,这个过程很费时间,那怎么办,搞个缓存,把服务提供者对应的服务(接口)以及机器ip等存在缓存中,以后先查缓存,没有再按照正常的路径进行下去,这样就相当于只调用了一次Zookeeper,性能提升了* 但是有个问题就是为了保证注册中心和本地缓存的数据同步(集群中的哪个机器挂了,本地不知道呀),所以用Redis实现注册中心的发布订阅,zookeeper中的watch其实也是一个道理*///配置 zookeeper注册中心,传入服务消费者要调用的服务提供者的服务(也就是接口名),然后返回这个服务对应的哪些服务器地址和ip等信息List<URL> urls = RemoteMapRegister.get(interfaceClass.getName());//负载均衡,选择出来这个服务提供者暴露的服务对应的集群中的其中一个机器的urlURL ur1 = LoadBalance.random(ur1s);|return result;}});} }pub1ic class LoadBalance {//当然可以写很多个方法,对应不同的负载均衡策略//通过负载均衡技术,将流量尽可能均摊到集群中的每台机器上,以此克服单台机器硬件资源的限制,做到横向扩展。/*** 随机的负载均衡策略,从list从随机取出一个url*/public static URL random(List<URL> list) {Random random =new Random( ) ;int n = random.nextInt(list.size());return list. get(n);} }
- 代理类的产生
- 捋一下,如果咱们的web项目或者叫maven项目,
-
-
(模拟)服务消费者Consumer:用Springmvc当作消费者去消费
服务提供者暴露的或者说提供的服务pub1ic class consumer {pub1ic static void main(String[] args) {//下面两种发送请求的写法/*** 亲自出马,发送请求*/// 用咱们自己写的HttpC1ient来发送数据,发了一个啥,发了一个invocation对象HttpC1ient httpClient = new Httpc1ient();//代表服务消费者当前要调用的是哪一个服务,假设要调用He11oService这个接口中的sayHello()方法,然后调用这个方法需要传入的参数以及参数类型是啥Invocation invocation = new Invocation(He11oService. class. getName(),sayHello(), new Class[]{string.class}, new object[]{"hhbmin"}]);//数据要发送到1oca1host主机的8080端口上httpClient.send( hostname: "1oca1host",port: 8080,invocation);/*** 不用亲自出门,由代理类帮我传递请求过去* 咱们用的是JDK的动态代理机制*///dubbo针对咱们服务消费者要调用的服务(接口)生成一个代理对象,dubbo会把这个代理对象放到spring容器中,这个代理对象成为spring容器中的一个bean,然后就可以通过@Autowired注入某个远程服务提供者(接口)然后紧接着就可以调用这个远程服务提供者中的服务(也就是接口中的方法)He11oService helloService = ProxyFactory. getProxy(He11oService.class);//dubbo+springhe11oService.sayHe11o( userName: "123");} }//然后返回这个结果 pub1ic class Httpclient {pub1ic string send(String hostname ,Integer port, Invocation invocation) {//用户的配置jdk11try{//构造了一个http请求request,制订了这个请求对应的url(含域名、端口以及要发送的数据(要发送的数据被Invocation给序列化了,所以可以说要发送的对象就是Invocation对象))var request = HttpRequest. newBuilder().uri(new URI(scheme: "http", userInfo: nu11, hostname, port, path: "/", query: nu11, frag.POST(HttpRequest.BodyPublishers.ofstring(JSONobject.toJSONString(invocation))).bui1d();var client = java.net.http.HttpClient.newHttpClient();//通过client去发送了这个请求,最终得到了一个结果responseHttpResponse<String> response = client.send(request,HttpResponse. BodyHandlers.ofstring());String result = response.body();return result;} catch (MalformedURLException e) {e.printStackTrace() ;} catch(IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} }- 咱们服务消费者调用服务提供者暴露的服务或者说提供的服务,
肯定不是凭空说调用就调用了,肯定是通过Netty\Tomcat\Jetty这些技术,但是不管是通过哪个,肯定是得有个启动类来启动这些技术,肯定不能手动吧。然后应用启动的时候要根据用户的配置来决定接下来要启动什么(这个启动的逻辑肯定是Dubbo这个框架来实现的,跟咱们业务逻辑没啥关系)-
你启动,咋启动,比如启动tomcat,虽然可以通过导入jar包等去写java代码调用OS中的启动tomcat的startup.bat脚本,来启动tomcat,但是这就有点子太手动了,这样肯定不行---->
解决办法就是模仿人家springboot那样,自己内嵌一个tomcat.<dependency><groupId>org.apache.tomcat.embedgroupId><artifactId>tomcat-embed-coreartifactId><version>9.0.12version> dependency>- 之前咱们是在server.xml中配置tomcat的,然后在tomcat中的四种容器中分别配置不同的信息,比如Connector容器中配置端口port,然后Host容器中配置域名
-
在Dubbo,
如果启动的是tomcat、jetty,那么相当于服务提供者就可以接收http请求。假设用的是tomcat,那么tomcat接收到数据以后,就会按照http协议去解析数据【表示我当前这个Provider应用下面定义了一个服务,这个服务叫HelloService,我现在这个服务或者叫应用可以通过接收HTTP请求或者Dubbo请求来调用我的HelloService这个服务,因为对于服务消费者而言你可能会发dubbo请求也可能会发http请求】,那么咱们在咱们的protocol协议底下肯定写两个:HttpServer和DubboServer分别启动不同的来接收HTTP请求的或者Dubbo请求... /** * 下面的代码用来完成tomcat的配置 */ pub1ic class HttpServer {public void start(String hostname, Integer port){//tomcat底层就是用的socket,tomcat就是在socket之上进行了一层封装Tomcat tomcat = new Tomcat() ;Server server = tomcat.getServer();Service service = server.findservice(s:"Tomcat");//指定我要绑定的端口Connector connector = new Connector() ;connector.setPort(port) ;Engine engine = new StandardEngine();engine.setDefaultHost(hostname);//指定当前对应的域名Host host = new StandardHost() ;host.setName(hostname);String contextPath ="" ;Context context = new StandardContext();context.setPath(contextPath);context.addLifecycleListener(new Tomcat.FixContextListener());host.addchi1d(context);engine.addChild(host);service.setContainer(engine) ;service.addConnector(connector);//实例化一个叫做dispatcher的DispatcherServlettomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet());//"/*"代表我当前tomcat中的servlet接收到的所有请求,都交给一个叫做dispatcher的DispatcherServlet去处理。Dispatcherservlet 负责接收请求并转发请求context.addServletMappingDecoded("/*", "dispatcher");try {//启动tomcattomcat.start();tomcat.getServer().await();} catch (LifecycleException e) {e.printStackTrace();}}- dubbo请求和http请求的区别就是:数据格式不一样,并且底层实现也不一样
-
- 咱们服务消费者调用服务提供者暴露的服务或者说提供的服务,
-
序列化:

-
服务注册与发现:
本地注册与远程注册【系统选用 Zookeeper 作为注册中心, ZooKeeper 将数据保存在内存中,性能很高。在读多写少的场景中尤其适用,因为写操作会导致所有的服务器间同步状态。服务注册与发现是典型的读多写少的协调服务场景。Zookeeper 是一个典型的CP系统,在服务选举或者集群半数机器宕机时是不可用状态,相对于服务发现中主流的AP系统来说,可用性稍低】-
先大概看看Zookeeper的基本情况:
- ZooKeeper节点介绍:【
ZooKeeper存储的节点信息包括服务名,服务 IP:PORT ,序列化协议,压缩协议等】- 持久节点( PERSISENT ):一旦创建,除非主动调用删除操作,否则一直持久化存储
- 临时节点( EPHEMERAL ):与客户端会话绑定,客户端会话失效,这个客户端所创建的所有临时节点都会被删除。
- 节点顺序( SEQUENTIAL ):创建子节点时,如果设置SEQUENTIAL属性,则会自动在节点名后追加一个整形数字,上限是整形的最大值;同一目录下共享顺序,例如(/a0000000001,/b0000000002,/c0000000003,/test0000000004)。
- ZooKeeper服务注册:
在 ZooKeeper 根节点下根据服务名创建持久节点 /rpc/{serviceName}/service ,将该服务的所有服务节点使用临时节点创建在 /rpc/{serviceName}/service 目录下,相当于是一个又一个服务呗

public void exportService(Service serviceResource) {String name = serviceResource.getName();String uri = GSON.toJson(serviceResource);String servicePath = "rpc/" + name + "/service";zkClient.createPersistent(servicePath, true);String uriPath = servicePath + "/" + uri;//创建一个新的临时节点,当该节点宕机会话失效时,该临时节点会被清理zkClient.createEphemeral(uriPath); } - ZooKeeper服务发现:
客户端或者说服务消费者启动后,不会立即从注册中心获取可用服务节点,而是在调用远程方法时获取节点信息(懒加载),并放入本地缓存 MAP中,供后续调用,当注册中心通知目录变化时清空服务所有节点缓存//美团分布式 ID 生成系统Leaf就使用 Zookeeper 的顺序节点来注册 WorkerID ,临时节点保存节点 IP:PORT 信息。 public List<Service> getServices(String name) {Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>();String servicePath = "rpc/" + name + "/service";List<String> children = zkClient.getChildren(servicePath);List<Service> serviceList = Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {String deCh = URLDecoder.decode(str, StandardCharsets.UTF_8.toString());return gson.fromJson(deCh, Service.class);}).collect(Collectors.toList());SERVER_MAP.put(name, serviceList);return serviceList; }public class ZkChildListenerImpl implements IZkChildListener {//监听子节点的删除和新增事件@Overridepublic void handleChildChange(String parentPath, List<String> childList) throws Exception {//有变动就清空服务所有节点缓存String[] arr = parentPath.split("/");SERVER_MAP.remove(arr[2]);} }
- ZooKeeper节点介绍:【
-
本地注册:咱们不是服务提供者是一个HelloService,然后被调用的小服务其实指的是接口的子实现类中方法,但是如果接口的子实现类有很多,HelloServiceImpl1、HelloServiceImpl2、HelloServiceImpl3…,我服务提供者接收到一个请求之后,我到底选择哪一个服务(哪一个接口),用(大服务的)接口的哪一个子实现类(以及这个子实现类中的哪个方法、这个方法有啥形参、形参的参数列表是啥)呢。
-
利用本地注册,会将暴露的是哪些服务、每个服务对应的子实现类是什么存起来,所以请求来了之后就可以通过反射得到的接口名字之后你调用的是哪一个服务或者说当前你这个服务提供者里面所对应的具体的小服务(子实现类)是谁
pub1ic class LocalRegister {//String代表具体接口的名字,Class代表这个接口对应的子实现类的类对象private static Map<String, Class> map = new HashMap<>();//把接口和子实现类的对应关系存到map中pub1ic static void regist(String interfaceName, Class imp1class) { map.put(interfaceName,imp1class);}//get方法实现的是,传入接口的名字,我返回给你这个接口的对应的子实现类pub1ic static Class get(String interfaceName) {return map.get(interfaceName); } } -
远程注册,用到了Zookeeper或者Redis来实现注册中心:
pub1ic class RemoteMapRegister {private static Map<String, List<URL>> REGISTER = new HashMap<>();pub1ic static void regist(String interfaceName,URL ur1){List<URL> list = REGISTER.get(interfaceName);if (list == nu11) {list = new ArrayList<>();}list.add(url)}pub1ic static List<URL> get(String interfaceName) {List<URL> 1ist = REGISTER.get(interfaceName);return 1ist;}... }
-
-
网络通信
- 作为
服务提供者这个生产者对外提供 RPC 服务,必须有一个网络程序来来监听请求和做出响应。在 Java 领域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 来实现的【Netty 的响应是异步的,为了在方法调用返回前获取到响应结果,需要将异步的结果同步化。】。那咱们咋用Netty呢: - 得先构建Netty并启动Netty服务监听指定端口呗。
向 Netty 服务的 pipeline 中添加了编解码和业务处理器,当接收到请求时,经过编解码后,真正处理业务的是业务处理器,即NettyServerInvokeHandler, 该处理器继承自SimpleChannelInboundHandler, 当数据读取完成将触发一个事件,并调用NettyServerInvokeHandler#channelRead0方法来处理请求。继承 MessageToByteEncoder 和 ByteToMessageDecoder 覆写对应的 encode 和 decode 方法即可自定义编解码器,使用到的序列化工具如 Hessian/Proto 也可以自定义 - 但是,总而言之,一般而言
- 还得,
请求和响应包装为便于封装请求和响应,定义两个 bean 来表示请求和响应。/*** @Descrption***/ public class StormRequest implements Serializable {private static final long serialVersionUID = -5196465012408804755L;//UUID, 唯一标识一次返回值private String uniqueKey;//服务提供者信息private ProviderService providerService;//调用的方法名称private String invokedMethodName;//传递参数private Object[] args;//消费端应用名private String appName;//消费请求超时时长private long invokeTimeout;// getter/setterpublic class StormResponse implements Serializable {private static final long serialVersionUID = 5785265307118147202L;//UUID, 唯一标识一次返回值private String uniqueKey;//客户端指定的服务超时时间private long invokeTimeout;//接口调用返回的结果对象private Object result;//getter/setter }
- 还得,
- 客户端(消费者):
客户端(消费者)在 RPC 调用中主要是生成服务接口的代理对象,并从注册中心获取对应的服务列表发起网络请求。客户端和服务端一样采用 Spring 来管理 bean 解析 xml 配置- 通过 jdk 动态代理来生成引入服务接口的代理对象
public Object getProxy() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this); } - 从注册中心获取服务列表并依据某种策略选取其中一个服务节点
- 通过 Netty 建立连接,发起网络请求
- 通过 jdk 动态代理来生成引入服务接口的代理对象
- 作为
-
PART4:上面看完了gRPC和Dubbo的想法,咱们再看看人家京东何小锋老师开源的RPC框架里面的好想法:
- 整个调用过程:调用端向服务端发起调用时,会先经过动态代理,之后会调用 Refer 对象的 invoke 方法,Refer 对象会先对要透传的消息进行处理,再执行 Filter 链,调用端最后一个 Filter 会根据配置的路由规则选择出符合条件的一组服务端节点,之后调用 Route 对象的 route 方法,route 方法的内部逻辑会根据配置的负载均衡策略选择一个服务端节点,最后向这个服务端节点发送请求消息。服务端的传输层收到调用端发送过来的请求消息,在对请求消息进行一系列处理之后(如解码、反序列化、协议转换等等),会在业务线程池中处理消息,关键的逻辑就是调用 Exporter 对象的 invoke 方法,Exporter 对象的 invoke 方法会执行服务端配置的 Filter 链,最终通过反射或预编译对象执行业务逻辑,再将最终结果封装成响应消息,通过传输层响应给调用端。针对服务端启动流程、调用端启动流程、RPC 调用流程三个流程做以下介绍。
- 服务提供者端启动流程
- 服务提供者端启动代码示例
public static void main(String[] args) throws Exception {DemoService demoService = new DemoServiceImpl(); //服务提供者设置ProviderConfig<DemoService> providerConfig = new ProviderConfig<>();providerConfig.setServerConfig(new ServerConfig());providerConfig.setInterfaceClazz(DemoService.class.getName());providerConfig.setRef(demoService);providerConfig.setAlias("joyrpc-demo");providerConfig.setRegistry(new RegistryConfig("broadcast"));//providerConfig 是通过调用 exportAndOpen() 方法来启动服务端的providerConfig.exportAndOpen().whenComplete((v, t) -> {if (t != null) {logger.error(t.getMessage(), t);System.exit(1);}});System.in.read();} ...//服务的启动流程被分为了两个部分:export(创建 Export 对象)以及 open(打开服务) public CompletableFuture<Void> exportAndOpen() {CompletableFuture<Void> future = new CompletableFuture<>();export().whenComplete((v, t) -> {if (t != null) {future.completeExceptionally(t);} else {Futures.chain(open(), future);}});return future;} - 服务端的启动流程被分为了两部分:服务端的创建流程与服务端的开启流程
- 服务端的创建流程

- 服务端开启流程

在服务端的启动流程中,核心工作就是创建和开启 Exporter 对象。ProviderConfig 在创建 Exporter 对象之前会先创建 Registry 对象,从注册中心中订阅接口配置与全局配置,之后才会创建 Exporter 对象,在 Exporter 开启时,会启动一个 Server 对象来开启一个端口,Exporter 开启成功之后,才会通过 Registry 对象向注册中心发起注册。
- 服务端的创建流程
- 服务提供者端启动代码示例
- 调用端(服务消费者)启动流程
- 调用端启动代码示例
public static void main(String[] args) {ConsumerConfig<DemoService> consumerConfig = new ConsumerConfig<>(); //consumer设置consumerConfig.setInterfaceClazz(DemoService.class.getName());consumerConfig.setAlias("joyrpc-demo");consumerConfig.setRegistry(new RegistryConfig("broadcast"));try {//调用端流程的启动入口就是 ConsumerConfig 对象的 refer 方法,ConsumerConfig 对象就是调用端的配置对象。refer 方法的返回值是 CompletableFuture,与服务端相同,调用端的启动流程也完全是异步的CompletableFuture<DemoService> future = consumerConfig.refer();DemoService service = future.get();String echo = service.sayHello("hello"); //发起服务调用logger.info("Get msg: {} ", echo);} catch (Throwable e) {logger.error(e.getMessage(), e);}System.in.read();} - 调用端启动流程:【补充:Client 对象的 open 操作也是有着一系列的操作,比如创建 Transport 对象,创建 Channel 对象,生成并记录 session 信息等等;Refer 对象在构造调用链的时候,其最后一个调用链就是 Refer 对象的 distribute 方法,用来发送远程请求;动态代理对象内部的核心逻辑就是调用 ConsumerInvokerHandler 对象的 Invoke 方法,最终就是调用 Refer 对象】
在调用端的启动流程中,核心工作就是创建和开启 Refer 对象,开启 Refer 对象中处理逻辑最为复杂的就是对 Cluster 的 open 操作,Cluster 负责了调用端的集群管理操作,其中有注册中心服务节点变更事件的监听、与服务端节点建立连接以及服务端节点连接状态的管理等等
- 调用端启动代码示例
- RPC 调用流程:
- RPC 的整个调用流程就是
调用端发送请求消息以及服务端接收请求消息并处理,之后响应给调用端的流程。 - 调用端发送流程:

- 服务端接收流程:

- RPC 的整个调用流程就是
PART5:上面巴拉巴拉说了一大堆,都是从RPC的几个主要角色出发或者说RPC的基本工作原理出来而设计的、以及参考各位老师的源码学习总结的。但是呢,话说回来,假设咱们新进入了一家公司,老板让开发一个RPC框架供各服务间调用,咱们肯定是要从环境搭建开始吧,再到理清项目需求以及项目结构等等【虽然有Docker出现,可以看看这一篇关于Docker的文章,能够很有效解决环境搭建后的移植问题】…,那咱们来瞅瞅,特此感谢“ 技术专家若地老师的连载文章,没有他就没有这个总结文章”。 整起~~
- 环境搭建:

- 项目结构:
- 项目结构所涉及模块

- 梳理Maven的pom定义:

- 对需求分析除了对上面的某些细节以及角色做分析之外,对整体的实现效果也要做一定的整理,有个大体的思路:
// 模拟rpc-facade # HelloFacade public interface HelloFacade {String hello(String name); }// 模拟rpc-provider # HelloFacadeImpl @RpcService(serviceInterface = HelloFacade.class, serviceVersion = "1.0.0")//rpc-provider 通过 @RpcService 注解暴露 RPC 服务 HelloFacade public class HelloFacadeImpl implements HelloFacade {@Overridepublic String hello(String name) {return "hello" + name;} }// 模拟rpc-consumer # HelloController //rpc-consumer 通过 @RpcReference 注解引用 HelloFacade 服务并发起调用 @RestController public class HelloController {@RpcReference(serviceVersion = "1.0.0", timeout = 3000)//rpc-consumer 通过 @RpcReference 注解引用 HelloFacade 服务并发起调用private HelloFacade helloFacade;@RequestMapping(value = "/hello", method = RequestMethod.GET)public String sayHello() {return helloFacade.hello("mini rpc");} }- 同样的,不能耦合的太死了。为了方便在本地模拟客户端和服务端,我会
把 rpc-provider 和 rpc-consumer 两个模块能够做到独立启动。rpc-provider 通过 @RpcService 注解暴露 RPC 服务 HelloFacade,rpc-consumer 通过 @RpcReference 注解引用 HelloFacade 服务并发起调用,基本与我们常用的 RPC 框架使用方式保持一致。
- 同样的,不能耦合的太死了。为了方便在本地模拟客户端和服务端,我会
- 项目结构所涉及模块
- 然后呢,就是和上面PART1、PART2、PART3、PART4一样了。分角色逐个击破。
Netty 作为 NIO 的库,自然既可以作为服务端接受请求,也可以作为客户端发起请求。使用 Netty 开发客户端或服务端都是非常简单的,Netty 做了很好的封装,我们通常只要开发一个或多个 handler 用来处理我们的自定义逻辑就可以了。- 服务提供者rpc-provider发布服务:主要分为四个核心流程:
- 服务提供者rpc-provider启动服务,并暴露服务端口;
- 服务提供者的启动实现代码:
所有 Netty 服务端的启动类都可以采用如下代码结构进行开发。服务端的启动过程大致为:配置线程池【Netty 是采用 Reactor 模型进行开发的,在大多数场景下,我们采用的都是主从多线程 Reactor 模型。】、Channel 初始化【设置 Channel 类型,并向 ChannelPipeline 中注册 ChannelHandler,此外可以按需设置 Socket 参数以及用户自定义属性。】、最后绑定端口【调用 bind() 方法会真正触发启动,sync() 方法则会阻塞,直至整个启动过程完成。】,就可以完成服务器的启动了【Netty 提供了 ServerBootstrap 引导类作为程序启动入口,ServerBootstrap 将 Netty 核心组件像搭积木一样组装在一起,服务端启动过程我们需要完成上面三个基本启动步骤:】。- ServerBootstrap 类用于创建服务端实例,Bootstrap 用于创建客户端实例。
- 线程调用 b.bind(port) 这个方法会返回一个 ChannelFuture,bind() 是一个异步方法,当某个执行线程执行了真正的绑定操作后,那个执行线程一定会标记这个 future 为成功(我们假定 bind 会成功),然后这里的 sync() 方法(main 线程)就会返回了。
不管是服务端的 NioServerSocketChannel 还是客户端的 NioSocketChannel,在 bind 或 connect 时,都会先进入 initAndRegister 这个方法。【register 操作以后(register 操作是 Inbound 的,是从 head 开始的),将进入到 bind 或 connect 操作中。register 操作非常关键,它建立起来了很多的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工作的起点。】。- 对于客户端 NioSocketChannel 来说,前面 register 完成以后,就要开始 connect 了,这一步将连接到服务端。connect 操作是交给 pipeline 来执行的。进入 pipeline 中,我们会发现,connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的:
- 接下来就是 pipeline 的操作了,从 tail 开始,执行 pipeline 上的 Outbound 类型的 handlers 的 connect(…) 方法

这里的 connect 成功以后,这个 TCP 连接就建立起来了,后续的操作会在 NioEventLoop.run() 方法中被 processSelectedKeys() 方法处理掉。
- 接下来就是 pipeline 的操作了,从 tail 开始,执行 pipeline 上的 Outbound 类型的 handlers 的 connect(…) 方法
- bind 操作也是要由 pipeline 来完成的,bind 操作和 connect 一样,都是 Outbound 类型的,所以都是 tail 开始,最后的 bind 操作又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:
- 对于客户端 NioSocketChannel 来说,前面 register 完成以后,就要开始 connect 了,这一步将连接到服务端。connect 操作是交给 pipeline 来执行的。进入 pipeline 中,我们会发现,connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的:
- 如果 bind(port) 失败,我们知道,sync() 方法会将异常抛出来,然后就会执行到 finally 块了。
private void startRpcServer() throws Exception {this.serverAddress = InetAddress.getLocalHost().getHostAddress();//配置线程池。Netty 是采用 Reactor 模型进行开发的,在大多数场景下,我们采用的都是主从多线程 Reactor 模型。。Netty 是采用 Reactor 模型进行开发的所以可以非常容易切换三种 Reactor 模式:单线程模式、多线程模式、主从多线程模式。在大多数场景下,我们采用的都是主从多线程 Reactor 模型。//Boss 是主 Reactor,Worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept,然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。EventLoopGroup boss = new NioEventLoopGroup();//Boss 是主 ReactorEventLoopGroup worker = new NioEventLoopGroup();//Worker 是从 Reactortry {//Netty 提供了 ServerBootstrap 引导类作为程序启动入口,ServerBootstrap 将 Netty 核心组件像搭积木一样组装在一起,服务端启动过程我们需要完成三个基本步骤:ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker)//Channel 初始化,也就是设置 Channel 类型,并向 ChannelPipeline 中注册 ChannelHandler,此外可以按需设置 Socket 参数以及用户自定义属性。NIO 模型是 Netty 中最成熟且被广泛使用的模型。因此,推荐 Netty 服务端采用 NioServerSocketChannel 作为 Channel 的类型,客户端采用 NioSocketChannel。当然,Netty 提供了多种类型的 Channel 实现类,你可以按需切换,例如 OioServerSocketChannel、EpollServerSocketChannel 等。//设置Channel类型的方式就是这一句.channel(NioServerSocketChannel.class)//注册 ChannelHandler。在 Netty 中可以通过 ChannelPipeline 去注册多个 ChannelHandler,每个 ChannelHandler 各司其职,这样就可以实现最大化的代码复用,充分体现了 Netty 设计的优雅之处。Channel 初始化时都会绑定一个 Pipeline,它主要用于服务编排。Pipeline 管理了多个 ChannelHandler。I/O 事件依次在 ChannelHandler 中传播,ChannelHandler 负责业务逻辑处理。//ServerBootstrap 的 childHandler() 方法需要注册一个 ChannelHandler。ChannelInitializer是实现了 ChannelHandler接口的匿名类,通过实例化 ChannelInitializer 作为 ServerBootstrap 的参数。.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {}})//设置 Channel 参数。Netty 提供了十分便捷的方法,用于设置 Channel 参数。关于 Channel 的参数数量非常多,如果每个参数都需要自己设置,那会非常繁琐。幸运的是 Netty 提供了默认参数设置,实际场景下默认参数已经满足我们的需求,我们仅需要修改自己关系的参数即可。//ServerBootstrap 设置 Channel 属性有option和childOption两个方法,option 主要负责设置 Boss 线程组,而 childOption 对应的是 Worker 线程组。.childOption(ChannelOption.SO_KEEPALIVE, true);//端口绑定。在完成上述 Netty 的配置之后,bind() 方法会真正触发启动,sync() 方法则会阻塞,直至整个启动过程完成,具体使用方式如下:ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();log.info("server addr {} started on port {}", this.serverAddress, this.serverPort);//一旦绑定端口 bind 成功,进入下面一行,channelFuture.channel() 方法会返回该 future 关联的 channel。channel.closeFuture() 也会返回一个 ChannelFuture,然后调用了 sync() 方法,这个 sync() 方法返回的条件是:有其他的线程关闭了 NioServerSocketChannel,往往是因为需要停掉服务了,然后那个线程会设置 future 的状态( setSuccess(result) 或 setFailure(cause) ),这个 sync() 方法才会返回。channelFuture.channel().closeFuture().sync();} finally {boss.shutdownGracefully();worker.shutdownGracefully();} }- Netty 中的线程池 EventLoopGroup:
Netty 的线程池,指的就是 NioEventLoopGroup 的实例;线程池中的单个线程,指的是右边 NioEventLoop 的实例。NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,我们采用无参构造函数,或仅仅设置线程数量就可以了,其他的参数采用默认值。线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过它只有一个线程。
- Netty 中的 Channel,没有直接使用 Java 原生的 ServerSocketChannel 和 SocketChannel,在这篇文章中ctrl+F搜就行,而是包装了 NioServerSocketChannel 和 NioSocketChannel 与之对应。
NioSocketChannel 和 JDK 底层的 SocketChannel它们是一对一的关系。
- NioSocketChannel 的构造方法了。【NioSocketChannel 在实例化过程中,会先实例化 JDK 底层的 SocketChannel,也就是创建SocketChannel实例】


- 其实主要就是实例化了 JDK 层的 SocketChannel 或 ServerSocketChannel,然后设置了 SocketChannel 的非阻塞模式
- NioSocketChannel 的构造方法了。【NioSocketChannel 在实例化过程中,会先实例化 JDK 底层的 SocketChannel,也就是创建SocketChannel实例】
- NioServerSocketChannel 和 ServerSocketChannel 同理,也是一对一的关系。在 Bootstrap(客户端) 和 ServerBootstrap(服务端) 的启动过程中都会调用 channel(…) 方法:
- 服务端 ServerSocketChannel 在 accept 一个连接以后,需要创建 SocketChannel 的实例,childHandler(…) 中设置的 handler 就是用于处理新创建的 SocketChannel 的,而不是用来处理 ServerSocketChannel 实例的。
- handler 可以指定多个(需要上面的 ChannelInitializer 类辅助),它们会组成了一个 pipeline,它们其实就类似拦截器的概念,现在只要记住一点,
每个 NioSocketChannel 或 NioServerSocketChannel 实例内部都会有一个 pipeline 实例。pipeline 中还涉及到 handler 的执行顺序。 - ChannelFuture:这个涉及到 Netty 中的异步编程,和 JDK 中的 Future 接口类似。
ChannelFuture是Future 接口的子接口。这个接口用得最多,它将和 IO 操作中的 Channel 关联在一起了,用于异步处理 Channel 中的事件。

- 在使用 Java 的线程池 ThreadPoolExecutor 在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone() 和 get() 方法。
- JDK 中的 Future 接口 java.util.concurrent.Future:

- Netty 中的 Future 接口(同名)继承了 JDK 中的 Future 接口,然后添加了一些方法:Netty 的 Future 接口加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就可以了,那么我们就不一定要主动调用 isDone() 来获取状态,或通过 get() 阻塞方法来获取值。

- 两个 EventLoopGroup:bossGroup 和 workerGroup,它们涉及的是 Netty 的线程模型,可以看到服务端有两个 group,而客户端只有一个,它们就是 Netty 中的线程池。

- HTTP 服务器是我们平时最常用的工具之一。同传统 Web 容器 Tomcat、Jetty 一样,Netty 也可以方便地开发一个 HTTP 服务器【Netty 的模块化设计非常优雅,客户端或者服务端的启动方式基本是固定的。作为开发者来说,大多数场景下,
只需要实现与业务逻辑相关的一系列 ChannelHandler,再加上 Netty 已经预置了 HTTP 相关的编解码器就可以快速完成服务端框架的搭建。所以,我们只需要两个类就可以完成一个最简单的 HTTP 服务器,它们分别为服务器启动类和业务逻辑处理类】。HTTP服务器最基本的请求-响应的流程是:搭建 HTTP 服务器,配置相关参数并启动------>从浏览器或者终端发起 HTTP 请求----->成功得到服务端的响应结果。

- 服务提供者采用的是主从 Reactor 线程模型,
启动过程包括配置线程池、Channel 初始化、端口绑定三个步骤,我们暂时先不关注 Channel 初始化中自定义的业务处理器 Handler 是如何设计和实现的。- Netty 是采用 Reactor 模型进行开发的,可以非常容易切换三种 Reactor 模式:单线程模式、多线程模式、主从多线程模式。【从上述三种 Reactor 线程模型的配置方法可以看出:Netty 线程模型的可定制化程度很高。它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型,而且无需变更其他的代码,很大程度上降低了用户开发和调试的成本。】

- Netty 是采用 Reactor 模型进行开发的,可以非常容易切换三种 Reactor 模式:单线程模式、多线程模式、主从多线程模式。【从上述三种 Reactor 线程模型的配置方法可以看出:Netty 线程模型的可定制化程度很高。它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型,而且无需变更其他的代码,很大程度上降低了用户开发和调试的成本。】
- 服务提供者的启动实现代码:
- 启动时扫描需要对外发布的服务,并将服务元数据信息发布到注册中心;
对于 RPC 框架而言,可扩展性是比较重要的一方面。借助 Spring Boot 的能力将服务提供者启动所依赖的参数做成可配置化。- 参数配置:
- 指的是服务提供者启动需要配置一些参数,
我们不应该把这些参数固定在代码里,而是以命令行参数或者配置文件的方式进行输入。我们可以**使用 Spring Boot 的 @ConfigurationProperties 注解很轻松地实现配置项的加载,并且可以把相同前缀类型的配置项自动封装成实体类**。接下来我们为服务提供者提供参数映射的对象:

- 下面我们在 rpc-provider 模块的 resources 目录下创建全局配置文件 application.properties,并配置以上三个参数:

- 如果
只配置 @ConfigurationProperties 注解,Spring 容器并不能获取配置文件的内容并映射为对象,这时@EnableConfigurationProperties注解就登场了。@EnableConfigurationProperties 注解的作用就是将声明 @ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean。@Configuration //@EnableConfigurationProperties 注解的作用就是将声明 @ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean @EnableConfigurationProperties(RpcProperties.class)//通过 @EnableConfigurationProperties 注解使得 RpcProperties 生效。并通过 @Configuration 和 @Bean 注解自定义了 RpcProvider 的生成方式 //@Configuration 主要用于定义配置类,配置类内部可以包含多个 @Bean 注解的方法,可以替换传统 XML 的定义方式 public class RpcProviderAutoConfiguration {@Resourceprivate RpcProperties rpcProperties;@Bean//被 @Bean 注解的方法会返回一个自定义的对象,@Bean 注解会将这个对象注册为 Bean 并装配到 Spring 容器中,@Bean 比 @Component 注解的自定义功能更强。public RpcProvider init() throws Exception {RegistryType type = RegistryType.valueOf(rpcProperties.getRegistryType());RegistryService serviceRegistry = RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);return new RpcProvider(rpcProperties.getServicePort(), serviceRegistry);} } - 我们服务提供者启动的准备工作就完成了,下面需要添加 Spring Boot 的 main 方法,如下所示,然后就可以启动 rpc-provider 模块了。

- 指的是服务提供者启动需要配置一些参数,
- 除了上面的参数配置。在服务提供者启动时,我们需要思考一个核心问题,服务提供者需要将服务发布到注册中心,怎么知道哪些服务需要发布呢?服务提供者需要定义需要发布服务类型、服务版本等属性,
主流的 RPC 框架都采用 XML 文件或者注解的方式进行定义哪些服务需要发布以及这些服务的类型、服务版本等属性。以注解的方式暴露服务现在最为常用,省去了很多烦琐的 XML 配置过程。- 例如
Dubbo 框架中使用 @Service 注解替代 dubbo:service 的定义方式。服务消费者则使用 @Reference 注解替代 dubbo:reference。 - 作为服务提供者,如何通过注解暴露服务

- 那么 serviceInterface、serviceVersion 的属性值怎么才能和 Bean 关联起来呢?这就需要用到 Bean 的生命周期以及 Bean 的可扩展点相关知识了。
- 可以看看Spring的IOC理论知识
- 以及Spring源码系列
- 例如
- 接收 RPC 请求,解码后得到请求消息;
- 提交请求至自定义线程池进行处理,并将处理结果写回客户端。
- 服务提供者rpc-provider启动服务,并暴露服务端口;
- 服务消费者订阅服务:
- 与服务提供者不同的是,
服务消费者并不是一个常驻的服务,每次发起 RPC 调用时它才会去选择向哪个远端服务发送数据。所以服务消费者的实现要复杂一些,对于声明 @RpcReference 注解的成员变量,我们需要构造出一个可以真正进行 RPC 调用的 Bean【对声明 @RpcReference 注解的成员变量构造出 RpcReferenceBean。】,然后将它注册到 Spring 的容器中。或者说咱们这里主要对服务消费者和服务提供者干的活是实现:【服务提供者使用 @RpcService 注解是如何发布服务的,服务消费者相应需要一个能够注入服务接口的注解 @RpcReference,被 @RpcReference 修饰的成员变量都会被构造成 RpcReferenceBean,并为它生成动态代理类】- @RpcReference 注解的定义:

- Spring 的 FactoryBean 接口可以帮助我们实现自定义的 Bean,FactoryBean 是一种特种的工厂 Bean,通过 getObject() 方法返回对象,而并不是 FactoryBean 本身。

- 有了 @RpcReference 注解和 RpcReferenceBean 之后,我们可以使用 Spring 的扩展点 BeanFactoryPostProcessor 对 Bean 的定义进行修改。上面服务提供者使用的是 BeanPostProcessor,BeanFactoryPostProcessor 和 BeanPostProcessor 都是 Spring 的核心扩展点,它们之间有什么区别呢?
BeanFactoryPostProcessor 是 Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行,所以 BeanFactoryPostProcessor 可以在 Bean 实例化之前获取 Bean 的配置元数据,并允许用户对其修改。而 BeanPostProcessor 是在 Bean 初始化前后执行,它并不能修改 Bean 的配置信息。 - 现在我们需要对声明 @RpcReference 注解的成员变量构造出 RpcReferenceBean,所以需要实现 BeanFactoryPostProcessor 修改 Bean 的定义
@Component @Slf4j //RpcConsumerPostProcessor 类中重写了 BeanFactoryPostProcessor 的 postProcessBeanFactory 方法,从 beanFactory 中获取所有 Bean 的定义信息,然后分别对每个 Bean 的所有 field 进行检测。 public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext context;private ClassLoader classLoader;private final Map<String, BeanDefinition> rpcRefBeanDefinitions = new LinkedHashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context = applicationContext;}@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader = classLoader;}//RpcConsumerPostProcessor 类中重写了 BeanFactoryPostProcessor 的 postProcessBeanFactory 方法,从 beanFactory 中获取所有 Bean 的定义信息,然后分别对每个 Bean 的所有 field 进行检测。@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {//从 beanFactory 中获取所有 Bean 的定义信息BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (beanClassName != null) {Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);ReflectionUtils.doWithFields(clazz, this::parseRpcReference);//然后分别对每个 Bean 的所有 field 进行检测。如果 field 被声明了 @RpcReference 注解,通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义,并为 RpcReferenceBean 的成员变量赋值,包括服务类型 interfaceClass、服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddr 以及超时时间 timeout。}}BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) -> {if (context.containsBean(beanName)) {throw new IllegalArgumentException("spring context already has a bean named " + beanName);}registry.registerBeanDefinition(beanName, rpcRefBeanDefinitions.get(beanName));log.info("registered RpcReferenceBean {} success.", beanName);});}//如果 field 被声明了 @RpcReference 注解,通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义,并为 RpcReferenceBean 的成员变量赋值,包括服务类型 interfaceClass、服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddr 以及超时时间 timeout。private void parseRpcReference(Field field) {RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);if (annotation != null) {//通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);//并为 RpcReferenceBean 的成员变量赋值,包括服务类型 interfaceClass、服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddr 以及超时时间 timeout。builder.setInitMethodName(RpcConstants.INIT_METHOD_NAME);builder.addPropertyValue("interfaceClass", field.getType());builder.addPropertyValue("serviceVersion", annotation.serviceVersion());builder.addPropertyValue("registryType", annotation.registryType());builder.addPropertyValue("registryAddr", annotation.registryAddress());builder.addPropertyValue("timeout", annotation.timeout());//构造完 RpcReferenceBean 的定义之后,会将RpcReferenceBean 的 BeanDefinition 重新注册到 Spring 容器中。BeanDefinition beanDefinition = builder.getBeanDefinition();rpcRefBeanDefinitions.put(field.getName(), beanDefinition);}} }
- @RpcReference 注解的定义:
- 与服务提供者不同的是,
- 服务提供者和服务消费者两个模块之间的通信机制:通过向 ChannelPipeline 添加自定义的业务处理器,来完成
RPC 框架的远程通信机制【自定义协议、编解码、序列化/反序列化都是实现远程通信的必备基础知识】。- ChannelPipeline 和 ChannelHandler 也是我们在平时应用开发的过程中打交道最多的组件,这两个组件为用户提供了 I/O 事件的全部控制权。
ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。- 使用 Netty 的时候,我们通常就只要写一些自定义的 handler 就可以了,我们定义的这些 handler 会组成一个 pipeline,用于处理 IO 事件,这个和我们平时接触的 Filter 或 Interceptor 表达的差不多是一个意思。

- 一个 Channel 关联一个 pipeline,NioSocketChannel 和 NioServerSocketChannel 在执行构造方法的时候,都会走到它们的父类 AbstractChannel 的构造方法中。
- 在 Netty 中,IO 事件被分为 Inbound 事件和 Outbound 事件。Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。在设计之初一定要梳理清楚 Inbound 和 Outbound 处理的传递顺序,以及数据模型之间是如何转换的。【
比如客户端在发起请求的时候,需要 connect 到服务器,然后 write 数据传到服务器,再然后 read 服务器返回的数据,前面的 connect 和 write 就是 out 事件,后面的 read 就是 in 事件。客户端连接进来的时候,读取(read)客户端请求数据的操作是 Inbound 的。处理完数据后,返回给客户端数据的 write 操作是 Outbound 的】
- Outbound 的 out 指的是 出去,有哪些 IO 事件属于此类呢?比如 connect、write、flush 这些 IO 操作是往外部方向进行的,它们就属于 Outbound 事件。
- 诸如 accept、read 这种就属于 Inbound 事件。
- 使用 Netty 的时候,我们通常就只要写一些自定义的 handler 就可以了,我们定义的这些 handler 会组成一个 pipeline,用于处理 IO 事件,这个和我们平时接触的 Filter 或 Interceptor 表达的差不多是一个意思。
- 需要实现的主要功能如下:
- 服务消费者实现协议编码,向服务提供者发送调用数据,也就是实现RPC 请求调用过程

- 服务提供者收到数据后解码,然后向服务消费者发送响应数据,【暂时忽略 RPC 请求是如何被调用的】,也就是实现RPC结果响应过程
- RpcRequestHandler 处理成功后是如何向服务消费者返回响应结果的。
- 服务消费者入站操作,首先要经过 MiniRpcDecoder 解码器,根据报文类型 msgType 解码出 MiniRpcProtocol 响应结果,然后传递给 RpcResponseHandler 处理器,RpcResponseHandler 负责响应不同线程的请求结果

- 服务消费者入站操作,首先要经过 MiniRpcDecoder 解码器,根据报文类型 msgType 解码出 MiniRpcProtocol 响应结果,然后传递给 RpcResponseHandler 处理器,RpcResponseHandler 负责响应不同线程的请求结果
- RpcRequestHandler 处理成功后是如何向服务消费者返回响应结果的。
- 服务消费者收到响应数据后成功返回
- 服务消费者实现协议编码,向服务提供者发送调用数据,也就是实现RPC 请求调用过程
- 自定义 RPC 通信协议:理清了RPC通信流程之后,就得着手开始实现咱们的自定义 RPC 通信协议了。
- 协议是服务消费者和服务提供者之间通信的基础,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。一个简易版的 RPC 自定义协议【
一个完备的网络协议需要具备的基本要素:魔数、协议版本号、序列化算法、报文类型、长度域字段、请求数据、保留字段。】

- 对应协议实体类的定义:
- MiniRpcRequest 主要包含 RPC 远程调用需要的必要参数:

- 在 RPC 结果响应的场景下,MiniRpcProtocol 中泛型 T 对应的 MiniRpcResponse 类型,MiniRpcResponse 实体类的定义如下所示。

- MiniRpcRequest 主要包含 RPC 远程调用需要的必要参数:
- 协议是服务消费者和服务提供者之间通信的基础,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。一个简易版的 RPC 自定义协议【
- 设计完 RPC 自定义协议之后,我们接下来再来解决在通信过程中 MiniRpcRequest 和 MiniRpcResponse 如何进行编码的问题,也就是序列化选型问题。
- MiniRpcRequest 和 MiniRpcResponse 实体类表示的协议体内容都是不确定具体长度的,所以我们一般会选用通用且高效的序列化算法将其转换成二进制数据,这样可以有效减少网络传输的带宽,提升 RPC 框架的整体性能。
目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等,这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。 首先我们定义了一个通用的序列化接口 RpcSerialization,所有序列化算法扩展都必须实现该接口,RpcSerialization 接口分别提供了序列化 serialize() 和反序列化 deserialize() 方法
- 假设我们接下来为 RpcSerialization 提供了 HessianSerialization 和 JsonSerialization 两种类型的实现类。
以 HessianSerialization 为例,

- 假设我们接下来为 RpcSerialization 提供了 HessianSerialization 和 JsonSerialization 两种类型的实现类。
为了能够支持不同序列化算法,我们采用工厂模式来实现不同序列化算法之间的切换,使用相同的序列化接口指向不同的序列化算法。对于使用者来说只需要知道序列化算法的类型即可,不用关心底层序列化是如何实现的。public class SerializationFactory {public static RpcSerialization getRpcSerialization(byte serializationType) {SerializationTypeEnum typeEnum = SerializationTypeEnum.findByType(serializationType);switch (typeEnum) {case HESSIAN:return new HessianSerialization();case JSON:return new JsonSerialization();default:throw new IllegalArgumentException("serialization type is illegal, " + serializationType);}} }- 完成序列化选型,就得实现咱么不自定义的协议编解码了呗。在实现协议编解码时经常用到两个重要的抽象类:MessageToByteEncoder 编码器和ByteToMessageDecoder 解码器。
- 使用 Netty 实现自定义的通信协议。
Netty 提供了两个最为常用的编解码抽象基类 MessageToByteEncoder 和 ByteToMessageDecoder,帮助我们很方便地扩展实现自定义协议。- 在实现协议编解码时经常用到两个重要的抽象类:MessageToByteEncoder 编码器和ByteToMessageDecoder 解码器。Netty 也提供了很多开箱即用的拆包器,推荐最广泛使用的 LengthFieldBasedFrameDecoder,它可以满足实际项目中的大部分场景。如果对 LengthFieldBasedFrameDecoder 的参数不够熟悉,实际直接使用 ByteBuf 反而更加直观,根据个人喜好按需选择。
- ByteBuf:
ByteBuf 是必须要掌握的核心工具类,并且能够理解 ByteBuf 的内部构造。ByteBuf 包含三个指针:读指针 readerIndex、写指针 writeIndex、最大容量 maxCapacity,根据指针的位置又可以将 ByteBuf 内部结构可以分为四个部分:废弃字节、可读字节、可写字节和可扩容字节。

- 协议编码实现:我们接下来要完成的
编码器 MiniRpcEncoder 需要继承 MessageToByteEncoder,并重写 encode() 方法//在服务消费者或者服务提供者调用 writeAndFlush() 将数据写给对方前,都已经封装成 MiniRpcRequest 或者 MiniRpcResponse,所以可以采用 MiniRpcProtocol public class MiniRpcEncoder extends MessageToByteEncoder<MiniRpcProtocol<Object>> {@Overrideprotected void encode(ChannelHandlerContext ctx, MiniRpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception {MsgHeader header = msg.getHeader();byteBuf.writeShort(header.getMagic());byteBuf.writeByte(header.getVersion());byteBuf.writeByte(header.getSerialization());byteBuf.writeByte(header.getMsgType());byteBuf.writeByte(header.getStatus());byteBuf.writeLong(header.getRequestId());RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(header.getSerialization());byte[] data = rpcSerialization.serialize(msg.getBody());byteBuf.writeInt(data.length);byteBuf.writeBytes(data);} } - 协议解码实现:
解码器 MiniRpcDecoder 需要继承 ByteToMessageDecoder,并重写 decode() 方法

- 使用 Netty 实现自定义的通信协议。
- MiniRpcRequest 和 MiniRpcResponse 实体类表示的协议体内容都是不确定具体长度的,所以我们一般会选用通用且高效的序列化算法将其转换成二进制数据,这样可以有效减少网络传输的带宽,提升 RPC 框架的整体性能。
- 协议格式也设计实现了,序列化类型选好后编解码也实现了,但是还有个很重要的东西没有实现,就是用于请求处理与响应的RpcRequestHandler。
在 RPC 请求调用的场景下,服务提供者的 MiniRpcDecoder 编码器将二进制数据解码成 MiniRpcProtocol。对象后,再传递给 RpcRequestHandler 执行 RPC 请求调用 RpcRequestHandler 也是一个 Inbound 处理器,它并不需要承担解码工作,所以 RpcRequestHandler 直接继承 SimpleChannelInboundHandler 即可,然后重写 channelRead0() 方法

- ChannelPipeline 和 ChannelHandler 也是我们在平时应用开发的过程中打交道最多的组件,这两个组件为用户提供了 I/O 事件的全部控制权。
- 在分布式系统中,服务消费者和服务提供者都存在多个节点,如果服务提供者出现部分机器节点负载过高,那么可能会导致该节点上接收的请求处理超时,从而导致服务提供者整体可用率下降。所以 RPC 框架需要实现合理的负载均衡算法,那么如何
控制流量能够均匀地分摊到每个服务提供者呢?主要分为以下几个步骤:- 注册中心选型:
- 服务消费者在发起 RPC 调用之前,
需要知道服务提供者有哪些节点是可用的,而且服务提供者节点会存在上线和下线的情况。所以服务消费者需要感知服务提供者的节点列表的动态变化,在 RPC 框架中一般采用注册中心来实现服务的注册和发现。- 目前主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等,选择一个高性能、高可用的注册中心对 RPC 框架至关重要。
说到高可用自然离不开 CAP 理论,一致性 Consistency、可用性 Availability 和分区容忍性 Partition tolerance 是无法同时满足的,注册中心一般分为 CP 类型注册中心和 AP 类型注册中心。- 【
使用最为广泛的 Zookeeper 就是 CP 类型的注册中心,集群中会有一个节点作为 Leader,如果 Leader 节点挂了,会重新进行 Leader 选举,ZooKeeper 保证了所有节点的强一致性,但是在 Leader 选举的过程中是无法对外提供服务的,牺牲了部分可用性。】 - 【
Eureka 是典型的 AP 类型注册中心,在实现服务发现的场景下有很大的优势,整个集群是不存在 Leader、Flower 概念的,如果其中一个节点挂了,请求会立刻转移到其他节点上。可能会存在的问题是如果不同分区无法进行节点通信,那么可能会造成节点之间的数据是有差异的,所以 AP 类型的注册中心通过牺牲强一致性来保证高可用性。】。对于 RPC 框架而言,即使注册中心出现问题,也不应该影响服务的正常调用,所以AP 类型的注册中心在该场景下相比于 CP 类型的注册中心更有优势。对于成熟的 RPC 框架而言,会提供多种注册中心的选择,接下来我们便设计一个通用的注册中心接口,然后每种注册中心的实现都按该接口规范行扩展。- 到底选择 CP 还是 AP 类型的注册中心呢?没有最好的选择,需要根据实际的业务场景进行技术选型。对于 RPC 框架而言,应当弱依赖于注册中心,即使注册中心出现问题,也不应该影响服务正常使用。
所以建议使用 AP 类型的注册中心,在实现服务发现的场景下相比 CP 类型的注册中心有性能优势,整个集群是不存在 Leader、Flower 概念的,如果其中一个节点挂了,请求会立刻转移到其他节点上,通过牺牲强一致性来保证高可用性。 - 当服务节点下线时,注册中心需要及时通知服务消费者该节点已经下线了,否则可能会造成部分服务调用出现问题。
实现服务优雅下线比较好的方式是采用主动通知 + 心跳检测的方案,心跳检测可以由节点或者注册中心负责,例如注册中心可以向服务节点每 60s 发送一次心跳包,如果 3 次心跳包都没有收到请求结果,可以认为该服务节点已经下线。心跳检测通常也是客户端和服务端之间通知对方存活状态的一种机制
- 到底选择 CP 还是 AP 类型的注册中心呢?没有最好的选择,需要根据实际的业务场景进行技术选型。对于 RPC 框架而言,应当弱依赖于注册中心,即使注册中心出现问题,也不应该影响服务正常使用。
- 【
- 目前主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等,选择一个高性能、高可用的注册中心对 RPC 框架至关重要。
- 服务消费者在发起 RPC 调用之前,
- 注册中心接口设计:
- 注册中心主要用于存储服务的元数据信息,首先我们需要
将服务元数据信息封装成一个对象,该对象包括服务名称、服务版本、服务地址和服务端口号

- 接下来我们提供一个通用的注册中心接口,该接口主要的操作对象是上面的 服务元数据信息封装成一个包括包括服务名称、服务版本、服务地址和服务端口号的对象ServiceMeta,不应该与其他任何第三方的注册中心工具库有任何联系

假设咱们以 ZooKeeper 注册中心实现为例,逐一实现上面四个接口。Zookeeper 常用的开源客户端工具包有 ZkClient 和 Apache Curator,目前都推荐使用 Apache Curator 客户端。Apache Curator 相比于 ZkClient,不仅提供的功能更加丰富,而且它的抽象层次更高,提供了更加易用的 API 接口以及 Fluent 流式编程风格。在使用 Apache Curator 之前,我们需要在 pom.xml 中引入 Maven 依赖。注意Apache Curator和Zookeeper的兼容性问题。
- 注册中心初始化和销毁:
- 构建 Zookeeeper 的客户端,
使用 Apache Curator 初始化 Zookeeeper 客户端的基于用法大多都与如下代码类似:public class ZookeeperRegistryService implements RegistryService {public static final int BASE_SLEEP_TIME_MS = 1000;public static final int MAX_RETRIES = 3;public static final String ZK_BASE_PATH = "/mini_rpc";private final ServiceDiscovery<ServiceMeta> serviceDiscovery;public ZookeeperRegistryService(String registryAddr) throws Exception {//通过 CuratorFrameworkFactory 采用工厂模式创建 CuratorFramework 实例。构造客户端唯一需你指定的是重试策略CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));//创建完 CuratorFramework 实例之后需要调用 start() 进行启动。client.start();JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);//然后我们需要创建 ServiceDiscovery 对象,由 ServiceDiscovery 完成服务的注册和发现。在系统退出的时候需要将初始化的实例进行关闭this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class).client(client).serializer(serializer).basePath(ZK_BASE_PATH).build();this.serviceDiscovery.start();} }//destroy() 方法实现非常简单 @Override public void destroy() throws IOException {serviceDiscovery.close(); } - 服务注册实现:初始化得到 ServiceDiscovery 实例之后,我们就可以将服务元数据信息 ServiceMeta 发布到注册中心,
register() 方法的代码实现:@Override public void register(ServiceMeta serviceMeta) throws Exception {//ServiceInstance 对象代表一个服务实例,它包含名称 name、唯一标识 id、地址 address、端口 port 以及用户自定义的可选属性 payloadServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder().name(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion())).address(serviceMeta.getServiceAddr()).port(serviceMeta.getServicePort()).payload(serviceMeta).build();serviceDiscovery.registerService(serviceInstance); }
- 一般来说,我们会将相同版本的 RPC 服务归类在一起,所以可以将 ServiceInstance 的名称 name 根据服务名称和服务版本进行赋值

- RpcProvider 在启动过程中是如何根据 @RpcService 注解识别需要发布的服务,现在我们
可以使用 RegistryService 接口的 register() 方法将识别出的服务进行发布了,完善后的 RpcProvider#postProcessAfterInitialization() 方法实现如下。@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);if (rpcService != null) {String serviceName = rpcService.serviceInterface().getName();String serviceVersion = rpcService.serviceVersion();try {ServiceMeta serviceMeta = new ServiceMeta();serviceMeta.setServiceAddr(serverAddress);serviceMeta.setServicePort(serverPort);serviceMeta.setServiceName(serviceName);serviceMeta.setServiceVersion(serviceVersion);serviceRegistry.register(serviceMeta); // 注册服务rpcServiceMap.put(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()), bean);} catch (Exception e) {log.error("failed to register service {}#{}", serviceName, serviceVersion, e);}}return bean; } - 至此,
服务提供者在启动后就可以将 @RpcService 注解修饰的服务发布到注册中心了。 - 接下来我们就得把注册中心的服务发现 discovery() 方法补充完整了。服务发现的实现思路比较简单,
首先找出被调用服务所有的节点列表,然后通过 ZKConsistentHashLoadBalancer 提供的一致性 Hash 算法找出相应的服务节点@Override public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);ServiceInstance<ServiceMeta> instance = new ZKConsistentHashLoadBalancer().select((List<ServiceInstance<ServiceMeta>>) serviceInstances, invokerHashCode);if (instance != null) {return instance.getPayload();}return null; }
- 注册中心主要用于存储服务的元数据信息,首先我们需要
- 服务消费者应当如何通过合理的负载均衡算法得到合适的服务节点呢?
- 负载均衡算法基础:
服务消费者在发起 RPC 调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。有几种常用的负载均衡策略:Round-Robin 轮询、Weighted Round-Robin 权重轮询、Least Connections 最少连接数、Consistent Hash 一致性 Hash 等。以基于一致性 Hash 的负载均衡算法为例。一致性 Hash 算法可以保证每个服务节点分摊的流量尽可能均匀,而且能够把服务节点扩缩容带来的影响降到最低。- 一致性 Hash 算法的设计思路:
- 在服务端节点扩缩容时,一致性 Hash 算法会尽可能保证客户端发起的 RPC 调用还是固定分配到相同的服务节点上。一致性 Hash 算法是采用哈希环来实现的,通过 Hash 函数将对象和服务器节点放置在哈希环上,
一般来说服务器可以选择 IP + Port 进行 Hash

- 假设现在服务节点扩容了一台 N4,经过 Hash 函数计算将其放入到哈希环中,哈希环变化如下图所示:

- 利用虚拟节点保持节点分布均匀:

- 在服务端节点扩缩容时,一致性 Hash 算法会尽可能保证客户端发起的 RPC 调用还是固定分配到相同的服务节点上。一致性 Hash 算法是采用哈希环来实现的,通过 Hash 函数将对象和服务器节点放置在哈希环上,
- 负载均衡算法基础:
- 负载均衡算法实现:
- 与注册中心类似,
我们也首先定义一个通用的负载均衡接口,Round-Robin 轮询、一致性 Hash 等负载均衡算法都需要实现该接口- 接口的定义:

- select() 方法的传入参数是一批服务节点以及客户端对象的 hashCode,针对 Zookeeper 的场景,我们可以实现一个
比较通用的一致性 Hash 算法。public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {private final static int VIRTUAL_NODE_SIZE = 10;private final static String VIRTUAL_NODE_SPLIT = "#";@Overridepublic ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {//JDK 提供了 TreeMap 数据结构,可以非常方便地构造哈希环。通过计算出每个服务实例 ServiceInstance 的地址和端口对应的 hashCode,然后直接放入 TreeMap 中,TreeMap 会对 hashCode 默认从小到大进行排序。TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers); // 构造哈希环return allocateNode(ring, hashCode); // 根据 hashCode 分配节点}private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {//在为客户端对象分配节点时,通过 TreeMap 的 ceilingEntry() 方法找出大于或等于客户端 hashCode 的第一个节点,即为客户端对应要调用的服务节点。如果没有找到大于或等于客户端 hashCode 的节点,那么直接去 TreeMap 中的第一个节点即可。Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode); // 顺时针找到第一个节点if (entry == null) {entry = ring.firstEntry(); // 如果没有大于 hashCode 的节点,直接取第一个}return entry.getValue();}private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();for (ServiceInstance<ServiceMeta> instance : servers) {for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {ring.put((buildServiceInstanceKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);}}return ring;}private String buildServiceInstanceKey(ServiceInstance<ServiceMeta> instance) {ServiceMeta payload = instance.getPayload();return String.join(":", payload.getServiceAddr(), String.valueOf(payload.getServicePort()));} }
- 接口的定义:
- 与注册中心类似,
- 注册中心选型:
- 还有其实就是动态代理的设计,但是这个确实公共的共用的,不管是哪个源码(虽然负载均衡、注册中心也算是,但是不同的厂家确实或许有不同的实现,所以…得在重复一遍)。所以偷个懒,动态代理相关原理概念在上面ctrl+F搜搜动态代理

- 服务消费者动态代理实现:
- 我们在注册中心那里@RpcReference 注解的实现时是通过一个自定义的 RpcReferenceBean 完成了所有执行方法的拦截,RpcReferenceBean 中 init() 方法是当时留下的 TODO 内容,这里就是代理对象的创建入口
- 代理对象的创建
public class RpcReferenceBean implements FactoryBean<Object> {// 省略其他代码public void init() throws Exception {RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));this.object = Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},//RpcInvokerProxy 处理器是实现动态代理逻辑的核心所在,其中包含 RPC 调用时底层网络通信、服务发现、负载均衡等具体细节new RpcInvokerProxy(serviceVersion, timeout, registryService));}// 省略其他代码 } - 实现 RpcInvokerProxy 处理器
//RpcInvokerProxy 处理器必须要实现 InvocationHandler 接口的 invoke() 方法,被代理的 RPC 接口在执行方法调用时,都会转发到 invoke() 方法上。 public class RpcInvokerProxy implements InvocationHandler {private final String serviceVersion;private final long timeout;private final RegistryService registryService;public RpcInvokerProxy(String serviceVersion, long timeout, RegistryService registryService) {this.serviceVersion = serviceVersion;this.timeout = timeout;this.registryService = registryService;}//RpcInvokerProxy 处理器必须要实现 InvocationHandler 接口的 invoke() 方法,被代理的 RPC 接口在执行方法调用时,都会转发到 invoke() 方法上。invoke() 方法的核心流程主要分为三步:构造 RPC 协议对象、发起 RPC 远程调用、等待 RPC 调用执行结果。//发送 RPC 远程调用后如何等待调用结果返回呢?之前我们是使用 Netty 提供的 Promise 工具来实现 RPC 请求的同步等待,Promise 模式本质是一种异步编程模型,我们可以先拿到一个查看任务执行结果的凭证,不必等待任务执行完毕,当我们需要获取任务执行结果时,再使用凭证提供的相关接口进行获取。@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 构造 RPC 协议对象。就是只要根据用户配置的接口参数对 MiniRpcProtocol 类的属性依次赋值即可MiniRpcProtocol<MiniRpcRequest> protocol = new MiniRpcProtocol<>();MsgHeader header = new MsgHeader();long requestId = MiniRpcRequestHolder.REQUEST_ID_GEN.incrementAndGet();header.setMagic(ProtocolConstants.MAGIC);header.setVersion(ProtocolConstants.VERSION);header.setRequestId(requestId);header.setSerialization((byte) SerializationTypeEnum.HESSIAN.getType());header.setMsgType((byte) MsgType.REQUEST.getType());header.setStatus((byte) 0x1);protocol.setHeader(header);MiniRpcRequest request = new MiniRpcRequest();request.setServiceVersion(this.serviceVersion);request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParams(args);protocol.setBody(request);RpcConsumer rpcConsumer = new RpcConsumer();MiniRpcFuture<MiniRpcResponse> future = new MiniRpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout);MiniRpcRequestHolder.REQUEST_MAP.put(requestId, future);// 发起 RPC 远程调用。构建完MiniRpcProtocol 协议对象后,就可以对远端服务节点发起 RPC 调用了,所以 sendRequest() 方法是我们需要重点实现的内容。rpcConsumer.sendRequest(protocol, this.registryService);// 等待 RPC 调用执行结果return future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS).getData();} }/****************************************************************/ // public void sendRequest(MiniRpcProtocol<MiniRpcRequest> protocol, RegistryService registryService) throws Exception {MiniRpcRequest request = protocol.getBody();Object[] params = request.getParams();String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());int invokerHashCode = params.length > 0 ? params[0].hashCode() : serviceKey.hashCode();//发起 RPC 调用之前,我们需要找到最合适的服务节点,直接调用注册中心服务 RegistryService 的 discovery() 方法即可,默认是采用一致性 Hash 算法实现的服务发现。这里有一个小技巧,为了尽可能使所有服务节点收到的请求流量更加均匀,需要为 discovery() 提供一个 invokerHashCode,一般可以采用 RPC 服务接口参数列表中第一个参数的 hashCode 作为参考依据。ServiceMeta serviceMetadata = registryService.discovery(serviceKey, invokerHashCode);if (serviceMetadata != null) {//找到服务节点地址后,接下来通过 Netty 建立 TCP 连接,然后调用 writeAndFlush() 方法将数据发送到远端服务节点。ChannelFuture future = bootstrap.connect(serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()).sync();future.addListener((ChannelFutureListener) arg0 -> {if (future.isSuccess()) {log.info("connect rpc server {} on port {} success.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());} else {log.error("connect rpc server {} on port {} failed.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());future.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});//然后调用 writeAndFlush() 方法将数据发送到远端服务节点。future.channel().writeAndFlush(protocol);} }- Promise 接口和 ChannelFuture 一样,也继承了 Netty 的 Future 接口,然后加了一些 Promise 的内容:


- ChannelPromise 接口同时继承了 ChannelFuture 和 Promise。

- Promise 接口和 ChannelFuture 一样,也继承了 Netty 的 Future 接口,然后加了一些 Promise 的内容:
- 我们在注册中心那里@RpcReference 注解的实现时是通过一个自定义的 RpcReferenceBean 完成了所有执行方法的拦截,RpcReferenceBean 中 init() 方法是当时留下的 TODO 内容,这里就是代理对象的创建入口
- 服务提供者反射调用实现
- RPC 请求数据经过 MiniRpcDecoder 解码成 MiniRpcProtocol 对象后,再交由 服务提供者的 Handler 处理器RpcRequestHandler 执行 RPC 请求调用。
- RpcRequestHandler 中 channelRead0() 方法的处理逻辑:

- handle() 方法的实现:
private Object handle(MiniRpcRequest request) throws Throwable {String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());//rpcServiceMap 中存放着服务提供者所有对外发布的服务接口,我们可以通过服务名和服务版本找到对应的服务接口。Object serviceBean = rpcServiceMap.get(serviceKey);if (serviceBean == null) {throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));}//通过服务接口、方法名、方法参数列表、参数类型列表,我们一般可以使用反射的方式执行方法调用。Class<?> serviceClass = serviceBean.getClass();String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParams();//为了加速服务接口调用的性能,我们采用 Cglib 提供的 FastClass 机制直接调用方法,Cglib 中 MethodProxy 对象就是采用了 FastClass 机制,它可以和 Method 对象完成同样的事情,但是相比于反射性能更高。FastClass 机制并没有采用反射的方式调用被代理的方法,而是运行时动态生成一个新的 FastClass 子类,向子类中写入直接调用目标方法的逻辑。同时该子类会为代理类分配一个 int 类型的 index 索引,FastClass 即可通过 index 索引定位到需要调用的方法。FastClass fastClass = FastClass.create(serviceClass);int methodIndex = fastClass.getIndex(methodName, parameterTypes);return fastClass.invoke(methodIndex, serviceBean, parameters); }
- RpcRequestHandler 中 channelRead0() 方法的处理逻辑:
- RPC 请求数据经过 MiniRpcDecoder 解码成 MiniRpcProtocol 对象后,再交由 服务提供者的 Handler 处理器RpcRequestHandler 执行 RPC 请求调用。
- 服务消费者动态代理实现:
- 至此,整个 RPC 框架的原型我们已经实现完毕。
你可以在本地先启动 Zookeeper 服务器,然后启动 rpc-provider、rpc-consumer 两个模块,通过 HTTP 请求发起测试,如下所示:

- 虽然 RPC 框架原型已经可以运行起来了,但是离生产级使用还差得很远,例如
性能、高可用等。所以咱们得看看RPC 框架的性能优化的一些东西。RPC 框架的性能取决于很多因素,我们通常会关注几个方面:I/O 模型、网络参数、序列化方法、内存管理等。- I/O 模型:
- Netty 提供了高效的主从 Reactor 多线程模型,
主 Reactor 线程负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor,由从 Reactor 线程负责处理后续的 I/O 操作。主从 Reactor 多线程模型很好地解决了高并发场景下单个 NIO 线程无法承载海量客户端连接建立以及 I/O 操作的性能瓶颈。 - 通常我们使用如下的方式配置主从 Reactor 线程模型:
EventLoopGroup bossGroup = new NioEventLoopGroup(); //如果你没有指定 workerGroup 线程组初始化的线程数,那么 Netty 会默认创建 2 倍 CPU 核数作的线程,但这并不一定是一个最佳数量,可以根据实际的压测情况进行适当调整。一般来说,只要服务性能能够满足要求,workerGroup 初始化的线程数应该越少越好,这样可以有效地减少线程上下文切换。 EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)- Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例,默认值为 50。对于高并发的 RPC 调用场景,ioRatio 可以适当调大,控制 Netty 有更多的时间比例在执行 I/O 任务。

- Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例,默认值为 50。对于高并发的 RPC 调用场景,ioRatio 可以适当调大,控制 Netty 有更多的时间比例在执行 I/O 任务。
- Netty 提供了高效的主从 Reactor 多线程模型,
- Netty 网络参数配置:Netty 提供了 ChannelOption 以便于我们优化 TCP 参数配置,为了提高网络通信的吞吐量,一些可选的网络参数我们有必要掌握。
- TCP_NODELAY,是否开启 Nagle 算法。Nagle 算法通过缓存的方式将网络数据包累积到一定量才会发送,从而避免频繁发送小的数据包。Nagle 算法 在海量流量的场景下非常有效,但是会造成一定的数据延迟。如果对数据传输延迟敏感,那么应该禁用该参数。
- SO_BACKLOG,已完成三次握手的请求队列最大长度。同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大。但是 SO_BACKLOG 也不能太大,否则无法防止 SYN-Flood 攻击。
- SO_SNDBUF/SO_RCVBUF,TCP 发送缓冲区和接收缓冲区的大小。为了能够达到最大的网络吞吐量,SO_SNDBUF 不应当小于带宽和时延的乘积。SO_RCVBUF 一直会保存数据到应用进程读取为止,如果 SO_RCVBUF 满了,接收端会通知对端 TCP 协议中的窗口关闭,保证 SO_RCVBUF 不会溢出。
- SO_KEEPALIVE,连接保活。启用了 TCP SO_KEEPALIVE 属性,TCP 会主动探测连接状态,Linux 默认设置了 2 小时的心跳频率。TCP KEEPALIVE 机制主要用于回收死亡时间较长的连接,不适合实时性高的场景。
- 序列化方法:
- 在网络通信过程中,必然涉及序列化和反序列化操作,即将对象编码成字节,再把字节解码成对象的过程。序列化和反序列化属于高频且较笨重的操作,属于 RPC 框架中一个重要的性能优化点。
在选择序列化方式时需要综合考虑各方面因素,如高性能、跨语言、可维护性、可扩展性等。 - 比较常用的序列化算法有 Kryo、Hessian、Protobuf 等,这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。
- Kryo 序列化后占用字节数较少,网络传输效率更高,但是不支持跨语言。
Hessian 是目前业界使用较为广泛的序列化协议,它的兼容性好,支持跨语言,API 方便使用,序列化后的字节数适中。- Protobuf 是 gRPC 框架默认使用的序列化协议,属于 Google 出品的序列化框架。
Protobuf 支持跨语言、跨平台,具有较好的扩展性,并且性能优于 Hessian。但是 Protobuf 使用时需要编写特定的 prpto 文件,然后进行静态编译成不同语言的程序后拷贝到项目工程中,一定程序增加了开发者的复杂度。综合各方面因素以及实际口碑,个人比较推荐使用 Hessian 和 Protobuf 序列化协议。
- 关于 RPC 框架序列化进一步的性能优化我们可以采用以下方法:
- 减少不必要的字段以及精简字段的长度,从而降低序列化后占用的字节数。
- 提供不同的序列化策略。可以将不同的字段拆分至不同的线程里进行反序列化,例如
Netty I/O 线程可以只负责 className 和 消息头 Header 的反序列化,然后根据 Header 分发到不同的业务线程池中,由业务线程负责反序列化消息内容 Content,这样可以有效地降低 I/O 线程的压力。
- 在网络通信过程中,必然涉及序列化和反序列化操作,即将对象编码成字节,再把字节解码成对象的过程。序列化和反序列化属于高频且较笨重的操作,属于 RPC 框架中一个重要的性能优化点。
- 内存管理:
- Netty 会使用堆外内存 DirectBuffer 进行 Socket 读写,相比使用堆内存减少了一次内存拷贝。然而堆外内存的创建和销毁成本更高,所以通常会使用内存池来提高性能。
对于数据量较小的一些场景,可以考虑使用 HeapBuffer,由 JVM 负责内存的分配和回收可能效率更高。 - 此外,Netty 还提供了一些技巧来避免内存拷贝:
- CompositeByteBuf 是 Netty 中实现零拷贝机制非常重要的一个数据结构,它可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer,我们经常使用 CompositeByteBuf 拼接协议数据的 头部信息 Header 和消息体数据 Body。
- 在失败重试的场景,我们想保留 ByteBuf 继续使用,你可以使用 copy() 方法拷贝原始 ByteBuf 的所有信息。但是深拷贝非常浪费性能的,你可以使用浅拷贝操作 oldBuffer.duplicate().retain() 复制出独立的读写索引,底层分配的内存、引用计数都是与原始 ByteBuf 共享的,其中 retain() 又会将 ByteBuffer 的引用计数加 1,从而避免了 ByteBuffer 被释放
- Netty 会使用堆外内存 DirectBuffer 进行 Socket 读写,相比使用堆内存减少了一次内存拷贝。然而堆外内存的创建和销毁成本更高,所以通常会使用内存池来提高性能。
- 提高 RPC 框架的可用性:高可用是分布式系统架构设计中一个重要的因素
- 连接空闲检测+心跳检测:
连接空闲检测是指每隔一段时间检测连接是否有数据读写,如果服务端一直能收到客户端连接发送过来的数据,说明连接处于活跃状态,对于假死的连接是收不到对端发送的数据的。如果一段时间内没收到客户端发送的数据,并不能说明连接一定处于假死状态,有可能客户端就是长时间没有数据需要发送,但是建立的连接还是健康状态,所以服务端还需要通过心跳检测的机制判断客户端是否存活。客户端可以定时向服务端发送一次心跳包,如果有 N 次没收到心跳数据,可以判断当前客户端已经下线或处于不健康状态。由此可见,连接空闲检测和心跳检测是应对连接假死的一种有效手段,通常空闲检测时间间隔要大于 2 个周期的心跳检测时间间隔,主要是为了排除网络抖动的造成心跳包未能成功收到。- Netty 中提供了开箱即用的 IdleStateHandler 实现连接空闲检测,如果我们想把一定时间间隔内没有读到数据的客户端连接进行关闭,可以采取如下的实现方式:

- IdleStateHandler 实现心跳检测本质是向任务队列中添加定时任务,判断 channelRead() 或 write() 方法是否发生空闲超时,IdleStateHandler 的构造函数支持设置读空闲时间、写空闲时间、读写空闲时间。super(60, 0, 0, TimeUnit.SECONDS) 表示我们只关注读空闲时间,如果服务端 60s 没未读到数据,就会回调 channelIdle() 方法,此时我们进行连接关闭,避免资源浪费。

- 线程池隔离:
- 如果你的 RPC 服务是公司的基础服务,可能会有非常多的调用方,例如用户接口、订单接口等等。
在我们实现的 RPC 框架中,业务线程池是共用的,所有的 RPC 请求都会由该线程池处理。如果有一天其中一个服务调用方的流量激增,导致线程池资源耗尽,那么其他服务调用方都会受到严重的影响。我们可以尝试将不同的服务调用方划分到不同等级的业务线程池中,通过分组的方式对服务调用方的流量进行隔离,从而避免其中一个调用方出现异常状态导致其他所有调用方都不可用,提高服务整体性能和可用率。- 流量隔离技术是服务治理中非常重要的一个措施,在很多大规模流量的业务系统中都有所应用,例如秒杀系统,可以根据特殊的请求头识别出是否是秒杀请求,从而跟日常请求的流量隔离开来。 那么对于 RPC 框架而言,如何对服务调用方进行合理的分组呢?一般来说,
根据应用的重要等级作为分组依据是一个很好的衡量标准,一定要保障核心业务不受影响,例如下单、支付等接口都需要有自己独立的业务线程池,避免受到其他服务调用方的影响。
- 流量隔离技术是服务治理中非常重要的一个措施,在很多大规模流量的业务系统中都有所应用,例如秒杀系统,可以根据特殊的请求头识别出是否是秒杀请求,从而跟日常请求的流量隔离开来。 那么对于 RPC 框架而言,如何对服务调用方进行合理的分组呢?一般来说,
- 如果你的 RPC 服务是公司的基础服务,可能会有非常多的调用方,例如用户接口、订单接口等等。
- 重试机制:
- 重试机制在平时的项目开发中一定经常用到。
为了保障服务的稳定性和容错性,重试机制是一般可以帮助我们解决不少问题,例如网络抖动、请求超时等场景都需要重试机制。 RPC 框架的重试机制有几点最佳实践和注意事项:- 被调用的服务接口的业务逻辑需要保证幂等才可以考虑使用重试机制,例如数据插入、更新操作,
无论重复请求多少次都不会产生任何影响 - 重试机制虽然可以提升服务可用性,但是重试可能会导致服务提供方流量倍增,极端情况下甚至造成雪崩。
服务调用方最好设置合理的服务调用超时时间以及失败后的重试次数,需要综合考虑接口依赖服务的平均耗时、TP99 响应时间、服务重要等级等因素作为参考依据。为了防止重试引发的流量风暴,服务提供方必须考虑熔断、限流、降级等保护措施。 - RPC 框架的重试机制一般会采取指数退避的策略,
两次重试之间指数级增加间隔时间,例如 1s、2s、4s、8s,以此类推,同时必须限制最大延迟时间。指数退避会存在负载峰值的问题,例如服务提供方可能发生 FullGC 导致同一时间产生超时重试的请求增多。为了解决负载峰值问题,可以在重试间隔中增加随机值,将请求分摊在不同的时间点中 - 在负载均衡选择服务节点时,应该剔除上次重试失败的节点,进一步提高重试的成功率
- 被调用的服务接口的业务逻辑需要保证幂等才可以考虑使用重试机制,例如数据插入、更新操作,
- 重试机制在平时的项目开发中一定经常用到。
- 集群容错:集群容错看这里
- 连接空闲检测+心跳检测:
- I/O 模型:
巨人的肩膀:
微观技术
Spring源码深度解析
bizhan上各位老师
程序员田螺~手写一个RPC框架
RPC实战与核心原理
《Netty 核心原理剖析与 RPC 实践》
极客时间
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
