Netty的线程模型编解码粘包拆包心跳检测
BIO、基于Epoll的NIO和Reactor线程模型
几个基本概念
IO就是Input Output,我们从网络接受数据,放到内存,然后处理后输出到其他介质,这就是一个IO的过程。要理解Java的IO模型,我们需要先理解一下几个概念。
- 同步:客户端发起调用,服务端在处理完之前,是不会通知客户端的,只有处理完成后,才会通知客户端,给客户端想要的结果
- 异步:客户端发起调用,服务端先告诉客户端我收到了你的请求,你先忙其他事情吧,我处理好了告诉你
- 阻塞:客户端发起调用,一直等待服务端的结果,被阻塞住,无法做其他的事情
- 非阻塞:客户端发起调用,但是客户端在等服务端的结果的时候,可以做其他事情,不是阻塞的
因此说,同步和异步主要是针对服务端的通知方式而言的,而阻塞非阻塞是针对客户端而言的。然后我们结合一个例子再来深入理解下这几个概念。以前我等班车的时候,只能干等着,班车不会通知我,我也不敢走开,这就叫同步阻塞。后来,车站附近开了彩票店,我会在等车的时候去彩票店刮彩票,但是怕错过车,所以隔几分钟就跑出来看一下,这叫同步非阻塞。后来有了车来了的App,我可以设置到站提醒,他会在车到站后提醒我,但是我还是傻傻的在车站等着,这就叫异步阻塞。那如果我开了到站提醒,然后去旁边的彩票店刮彩票,等车来了App提醒我车到了,我在跑去车站,就叫异步非阻塞。Java中给我们提供了BIO(同步阻塞IO)、NIO(同步非阻塞IO)、AIO(异步非阻塞IO)。
BIO(Blocking IO)
BIO是一种同步阻塞的IO模型,数据的读取和写入会阻塞在一个线程上。BIO程序可以有两种基本模型
- 服务端只有一个线程处理,这样的话,会导致大量的请求被阻塞,类似客人去饭店吃饭,这种相当于去很多客人,但是只有一个服务员,招待完一个,在招待下一个。这样的弊端是大量的请求被阻塞,服务器的并发度上不去,也就是大量的客人站在门口排队,你想想会不会很生气。
- 服务端为每一个socket创建一个线程去处理,接着上面的例子,类似来一个客人,安排一个服务员,期间一个服务员只负责招待一个客人。这样的话,有两个弊端:
- 每次有一个客户端连接都创建一个线程去处理,并且是阻塞的,那么如果有的客户端只是连接而不发送数据,则是一种资源浪费。比如有些客人只是来看看菜单,一看半天,我们还安排一个服务员一直伺候他,这合理吗?
- 如果基于上述情况,并且同时有大量客户端连接,则会造成服务器创建大量线程,负载过大。比如一时间来100个客人,你也安排100个服务员,酒楼还好,如果你是个早餐店,不得撑爆了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LbXeoL0c-1616406401774)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615772958147-3510843e-4a20-48eb-965f-c7086cd112a3.png)]
代码示例:
public static void main(String[] args) throws IOException {ServerSocket serverSocket = new ServerSocket(8080);while (true) {log.info("wait client connect...");// 这个方法会阻塞直到有客户端连接final Socket clientSocket = serverSocket.accept();log.info("client connected...");// 每次有一个socket连接,都分配一个线程去负责处理,这个处理也是阻塞的new Thread(() -> {try {handler(clientSocket);} catch (IOException e) {e.printStackTrace();}}).start();}
}private static void handler(Socket clientSocket) throws IOException {byte[] bytes = new byte[1024];log.info("wait read...");// 这一行方法会阻塞,直到客户端收到数据int read = clientSocket.getInputStream().read(bytes);log.info("read form client success...");if (read != -1) {log.info("accept data form client:" + new String(bytes, 0, read));}clientSocket.getOutputStream().write("HelloClient".getBytes());clientSocket.getOutputStream().flush();
}
NIO模型(Non Blocking IO)
基础版本
上述BIO模型最大的问题就是阻塞,多线程资源浪费,因此为了解决上面问题,我们先实现一个基于NIO的简单版本
static List<SocketChannel> channelList = new ArrayList<>();public static void main(String[] args) throws IOException {ServerSocketChannel serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(8080));serverSocket.configureBlocking(false);log.info("服务启动成功");while (true) {SocketChannel socketChannel = serverSocket.accept();if (null != socketChannel) {log.info("连接成功");// 设置非阻塞,将每一个socketChannel先放到一个全局的listsocketChannel.configureBlocking(false);channelList.add(socketChannel);}// 遍历连接进行数据读取Iterator<SocketChannel> iterator = channelList.iterator();while (iterator.hasNext()) {SocketChannel channel = iterator.next();ByteBuffer byteBuffer = ByteBuffer.allocate(128);int read = channel.read(byteBuffer);if (read > 0) {log.info("接收到消息:" + new String(byteBuffer.array()));} else if (read == -1) { // 如果客户端断开,把socket从集合中去掉iterator.remove();log.info("客户端断开连接");}}}
}
上述代码中,如果有客户端连接我们不会阻塞,而是先放到一个全局的List中,然后不断的循环遍历整个List,如果发现这个List有数据读写,我们就接受数据进行处理。这个如果以饭店的例子作比较,就相当于现在如果来了可客人,您先坐着看菜单,喝茶水,我只有一个服务员,过一会我就一个一个来问,客官您要不要点菜啊。这样优化后虽然只用一个线程就解决了多个线程阻塞,导致资源浪费的问题。但是依然有他的缺点:
- 如果我们有1000个连接在channelList中,那么每次的迭代都需要遍历这1000个连接,但是也许可能这1000个连接中只有一个链接有数据发送到Server,那么这个遍历的过程也是很耗费性能的。
多路复用器
那么结合上面的例子,如果你是老板,那么聪明的你,准备如何干啊,那肯定是不能让店小二一个一个问啊,那么我们可以让有需求的顾客看好了在通知店小二,或者如果那个客人看好了菜单,就坐到点菜区,然后小二每次只要去点菜区域一个一个问要点什么菜就OK。我们知道NIO通过Selector多路复用器为我们实现了一种同步非阻塞的IO模型很好的解决了上面的问题,那么我们一起看看是如何解决的。想学习NIO,要知道NIO的三大核心组件
- Buffer(缓冲区)
Buffer是一个抽象类,他的子类有ByteBuffer、CharBuffer、LongBuffer等。他的底层是一个数组,可以通过allocate(int capacity)方法创建一个指定容量的Buffer。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yKAiOPOG-1616406401779)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773012925-719bf2cb-59ef-4402-b150-a7d7dd2a462e.png)]
- Channel(通道)
Channel表示IO源和目标打开的链接,类似于传统意义的流,Channel本身不能读写数据,每一个Channel会和Buffer对应起来。常见的Channel有FileChannel(读取、写入、映射和操作文件的通道)、SocketChannel(读写网络中的数据)、ServerSocketChannel(监听连接,对每一个连接都会创建一个SocketChannel)
- Selector(多路复用器)
Selector是NIO的核心,可以通过Selector.open()方法传建一个Selector。然后还用到了到了Selector的相关三个重要Api:open()、serverSocket.register(selector, SelectionKey.OP_ACCEPT);和selector.select();。所以我们要理解Selector的原理,必须要知道这三个api是干啥的。这里给出了一段基本代码以及总结了一张图,供参考。
代码示例:
public static void main(String[] args) throws IOException {ServerSocketChannel serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(9000));serverSocket.configureBlocking(false);// 1. 创建一个SelectorSelector selector = Selector.open();// 2. 将socket注册到Selector上SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);log.info("服务启动成功");while (true) {// 3. 阻塞方法,如果我们注册的事件有发生,就会结束阻塞,往下走,就像上面我们注册了接受客户端连接的事件,那么这种事件发生后就会结束阻塞往下走selector.select();// 获取所有的SelectionKey,每一个SelectionKey和每一个socket是绑定的,通过他可以获取到对应的socket// 这里获取的只是我们注册过,关心的就绪事件列表,解决了上面说的需要循环遍历所有socket的弊端Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isAcceptable()) {// 通过SelectionKey拿到这个socket的事件类型,如果是accept,那么表示客户端连接成功,我们给对应的socket注册读事件ServerSocketChannel server = (ServerSocketChannel)key.channel();SocketChannel socketChannel = server.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);log.info("客户端连接成功");} else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel)key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(128);int read = socketChannel.read(byteBuffer);if (read > 0) {log.info("接收到消息:" + new String(byteBuffer.array()));} else if (read == -1) { // 如果客户端断开连接,关闭Socketlog.info("客户端断开连接");socketChannel.close();}}iterator.remove();}}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yQlr0CJq-1616406401787)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773034873-5ced6f24-fdee-4d05-a4f1-d2f738892b0e.png)]
结合图以及上述代码,可以看出开发NIO程序主要可以分为以下三步:
- 通过
Selector.open();方法创建一个Selector多路复用器。其实底层是调用了Linux的内核函数int epoll_create(int size)创建了一个Epoll对象。 - 通过
serverSocket.register(selector, SelectionKey.OP_ACCEPT);将相关socket注册到Selector。其实是将这个socket通过pollWrapper.add(fd);添加到EPollArrayWrapper中。这里第二个参数SelectionKey一共有四种值,表示四种事件
// 连接事件
public static final int OP_CONNECT = 1 << 3;
// 接收事件
public static final int OP_ACCEPT = 1 << 4;
// 读事件
public static final int OP_READ = 1 << 0;
// 写事件
public static final int OP_WRITE = 1 << 2;
- 调用
selector(),这是一个阻塞方法,他会阻塞等到有socket准备就绪,返回的是一个int类型,表示有多少个socket准备OK。底层调用了两个方法Linux的内核方法,一个是int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event),将我们感兴趣的socket添加到epoll对象中,另一个方法是int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);,等到我们感兴趣的事件产生。这几个函数图中有较为详细的解释。
从上面的分析可以看出NIO的多路复用器在Linux中就是通过Epoll实现的。结合上面的例子,我们这里在通过一张图简单总结一下Epoll。
- 通过
epoll_create创建一个epoll对象,其中有两个重要的属性:rbr和rdlist - 通过
epoll_ctl将socket和我们关注的事件注册到第一步创建的epoll对象的rbr上,这里通过红黑树进行维护 - 内核系统监听到对应的事件发生后,将这个节点从红黑树上移到rdlist上。rdlist是一个双向链表维护的就绪列表
- 程序调用
epoll_wait拿到就绪列表的长度,遍历就绪列表。解析对应的事件进行处理。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uTEHrwgR-1616406401789)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773049677-bafa5808-ef73-42a1-845d-fa8afc8e028c.png)]
Redis中的Epoll模型
Redis中也是用Epoll模型去做的,他的源代码在ae_epoll.c中。
/** 创建一个新的 epoll 实例,并将它赋值给 eventLoop*/
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 创建 epoll 实例state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}// 赋值给 eventLoopeventLoop->apidata = state;return 0;
}/** 关联给定事件到 fd*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;/* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. ** 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。* 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。*/int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.u64 = 0; /* avoid valgrind warning */ee.data.fd = fd;// 注册事件到 epollif (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}/** 从 fd 中删除给定事件*/
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee;int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.u64 = 0; /* avoid valgrind warning */ee.data.fd = fd;if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}/** 获取可执行事件*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 等待时间retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);// 有至少一个事件就绪?if (retval > 0) {int j;// 为已就绪事件设置相应的模式// 并加入到 eventLoop 的 fired 数组中numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}// 返回已就绪事件个数return numevents;
}
从NIO到Reactor模型
理解了NIO,我们可以进一步理解一个概念,Netty底层是基于NIO开发的,而Netty的线程模型就是Reactor模型,这是整个Netty框架的核心。什么是Reactor模型呢,这个概念是著名的并发编程大师Doug Lea提出的。我们这里从最基础的线程模型一步步看看Reactor的发展过程。
经典的线程模型
经典线程模型和我们上面说的BIO模型是一样的,每次有一个客户端连接后,我们从线程池分配一个线程去处理(读数据、解码、处理、编码、发送到客户端)。那这种模式的弊端我们前面也讲过了,这里在赘述一遍
- 如果客户端连接太多,会导致线程太多,服务器压力过大
- 大部分客户端只连接不发送数据,导致大量线程阻塞住,造成资源大量浪费
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6BNnAWkN-1616406401792)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773089935-f89093e7-1730-4914-9583-1bc99a214823.png)]
基于事件响应的模型
明显一个专门的线程负责一个客户端有点浪费,所以我们有了基于事件的响应模型,类似于AWT开发的规则,每一个按钮可能有很多事件,比如单击事件、双击事件,每一个事件我们绑定一个监听器,等如果发生了单击事件,我们通知这个监听器,监听器里面写业务逻辑,最终异步的给客户端响应。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-49ZQfphi-1616406401794)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773110015-6859a99d-f654-4a7b-bfcc-7b6136477ebc.png)]
Reactor单线程模型设计
有了以上基于事件的模型的参考,为了更好的解决经典模型的设计,所以有了Reactor单线程模型的出现。这种模型呢,所有的操作由一个线程完成,内部有一个dispatcher负责事件的派发,如果有客户端连接是Accept事件,那么派发给Acceptor处理,如果是读写等事件呢,派发给我们定义的Handler去处理,这样避免了创建大量线程的,将一个线程压榨到了极致。
到了这里已经和我们前面说的NIO很像了,事实上,可以理解NIO的Selector多路复用器就是这种模式的一个实现。大量的客户端连接到服务端后,我们通过将socket和相关的事件注册到Selector。然后用一个线程轮询,如果是连接事件,我们进行相关的处理,如果是读写事件,我们进行另外的处理。并且对事件的处理都是非阻塞的。代码实现可以参考我们NIO部分的实例代码。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o2uxVMDS-1616406401795)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773125404-9781406c-84d5-462a-b908-4c3cbf0464c0.png)]
Reactor多线程模型设计
上述单线程模型版本依然是有缺陷的,业务连接事件并不很耗时,一个线程处理可以,但是读写等逻辑处理往往是比较耗时的,如果同时有上百万连接有读写事件发生,那么一个线程是处理不过来的,所以有了多线程版本,我们创建一个线程池,每次有读写事件来,我们交给线程池去处理。就很大程度提高了并发度。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MWka1TNH-1616406401796)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773135754-5afba7c3-5883-4078-872e-62e96c9247d6.png)]
实例代码改写自NIO多路复用器部分的代码:
while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel)key.channel();SocketChannel socketChannel = server.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("客户端连接成功");} else if (key.isReadable()) {// 这部分交给线程池去处理threadPool.execute(() -> {try {SocketChannel socketChannel = (SocketChannel)key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(128);int read = socketChannel.read(byteBuffer);if (read > 0) {System.out.println("接收到消息:" + new String(byteBuffer.array()));} else if (read == -1) { // 如果客户端断开连接,关闭SocketSystem.out.println("客户端断开连接");socketChannel.close();}} catch (Exception e) {e.printStackTrace();}});}iterator.remove();}
}
Reactor主从模型
当然上述的多线程模型固然能提高并发度,但是依然有很多问题,假设如果连接的客户端数量在加一个量级,那应该怎么办呢。所以就有了下面的主从Reactor模型,相当于有NIO有两个Selector,主Reactor只负责客户端的接受事件,从Reactor只负责处理客户端的读写事件。下图是一主一从,如果一主一从不够的话,我们也可以设计多主多从。Netty的线程模型用的就是这种Reactor主从模型。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ybeLEjRh-1616406401798)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773157445-937b19c0-dafa-4275-acf4-36869a011a71.png)]
我们看一段Netty代码如何体现Reactor主从模型的。
public static void main(String[] args) throws InterruptedException {// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成// 这里就相当于设置主Reactor和从Reactor,根据需要可以指定线程数EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();// 设置启动器bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {//对workerGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println("netty server start...");// 绑定一个端口并且同步启动服务器ChannelFuture cf = bootstrap.bind(9000).sync();cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
编解码
什么是编解码
关于编解码,这里先和大家看一下例子,下面这段代码实现了一个很简单的功能,启动服务端,在启动客户端,当客户端启动成功建立连接后,会向服务端发送一个字符串Hello World,服务端收到数据后打印这条消息。但是如果我们把flag1和flag2处的代码注释掉,会发现服务端是收不到这条消息的。当我们把上述两行代码放开,服务端才能收到消息。这是为什么呢?因为网络只能传输二进制的流数据,是不能传递字符串对象这些东西的。因此我们必须在将数据发送到服务端之前将其转化成二进制,这个过程就叫做编码,而服务端收到二进制的数据,也不能直接用,需要反序列化成原本的字符串或者对象,这个过程就叫解码。flag1和flag2标记处的代码就是Netty为我们提供的编解码器。
public class NettyServer {public static void main(String[] args) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// falg1pipeline.addLast(new StringDecoder());pipeline.addLast(new NettyServerHandler());}});System.out.println("netty server start...");ChannelFuture future;try {future = bootstrap.bind(8080).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}public class NettyServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" ======> [server] received message: " + msg);}
}
public class NettyClient {public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// falg2pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});System.out.println("netty client start...");try {bootstrap.connect("127.0.0.1", 8080).sync().channel();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("hello world");}
}
责任链模式
这里我们先复习一下责任链模式和管道模式,因为下面会用到。那么什么是责任链模式呢,我们还是通过一段代码来看。假设我现在要实现一个需求就是打印日志,但是我需要根据日志级别判断是打印ERROR级别还是INFO级别。可能最简单的方式就是下面这种方式:
public class PrintLogger {public static final int INFO = 1;public static final int DEBUG = 2;public static final int ERROR = 3;protected int level;public void logMessage(int level, String message){if(this.level = INFO) {// 打印INFO日志} else if (this.level = DEBUG) {// 打印DEBUG日志} else {// 打印ERROR日志}}
}
这样写虽然容易理解,但是有两个严重的问题
- 代码耦合度高,如果我们将来又要加一个TRANCE级别的日志,那么就得在这里在加一个if条件,而且要想改动打印INFO级别的日志的逻辑,也得改这个代码
- 不好维护,如果我们的业务越来越复杂,这里的if条件越来越多,慢慢的这个代码也会越来越长,越难维护。因此最好能将每个if条件下的逻辑解耦出来。这里就用到了前辈们给我们总结的责任链模式。
所谓责任链模式结合上述示例,就是类似于维护一个链表,如果有请求来,那么我第一个节点先看能否处理请求,如果能就直接处理返回,如果不能,这个节点则会将请求转交给下一个责任节点去处理。这样未来如果有新的逻辑,我们就加到这个链表中去就行。说起来可能有些抽象,不如直接上代码。
// 抽象的处理器
public abstract class AbstractLogger {public static final int INFO = 1;public static final int DEBUG = 2;public static final int ERROR = 3;protected int level;// 责任链中的下一个元素protected AbstractLogger nextLogger;public void setNextLogger(AbstractLogger nextLogger){this.nextLogger = nextLogger;}// 核心逻辑,负责接受请求,如果能处理就处理,否则交给责任链中的下一个节点处理public void logMessage(int level, String message){if(this.level <= level){write(message);return;}if(nextLogger != null){nextLogger.logMessage(level, message);}}// 写日志的方法,供子类实现abstract protected void write(String message);
}
// 不同的处理器
public class InfoLogger extends AbstractLogger{public InfoLogger(int level){this.level = level;}@Overrideprotected void write(String message) {System.out.println("Info ::Logger: " + message);}
}public class ErrorLogger extends AbstractLogger{public ErrorLogger(int level){this.level = level;}@Overrideprotected void write(String message) {System.out.println("Error ::Logger: " + message);}
}public class DebugLogger extends AbstractLogger{public DebugLogger(int level) {this.level = level;}@Overrideprotected void write(String message) {System.out.println("Debug ::Logger: " + message);}
}
public class Main {private static AbstractLogger getChainOfLoggers(){AbstractLogger errorLogger = new ErrorLogger(AbstractLogger.ERROR);AbstractLogger debugLogger = new DebugLogger(AbstractLogger.DEBUG);AbstractLogger consoleLogger = new InfoLogger(AbstractLogger.INFO);errorLogger.setNextLogger(debugLogger);debugLogger.setNextLogger(consoleLogger);return errorLogger;}public static void main(String[] args) {AbstractLogger loggerChain = getChainOfLoggers();loggerChain.logMessage(AbstractLogger.INFO, "This is a info message..");}
}
pipeLine(管道)模式
那责任链模式呢,他核心是一个责任的传递,找到能处理这个请求的责任节点处理完就结束了,那有的场景可能就不合适了,比如我们下单买东西可能经过这么几个步骤:检查库存、计算金额、下单、提交物流、发送短信、发送邮件等,这些步骤每一步都是必经的,并且有一个严格的顺序。这个时候也许你可能会写出这样的代码。
void buy() {checkStock();caculateAmount();...
}
这样也许很容易理解。但是不易扩展(比如日后如果这个流程业务变化还要改核心代码),而且不好实现代码重用(也许检查库存,计算金额这些代码其他模块也能用到呢)。那么如果这个时候我们将上面的责任链模式进行一些变化,我们将检查库存、计算金额这些业务都抽象成一个个单独的类,然后加到责任链(管道)中,让他们按顺序经过每一个节点完成对应的处理(一个节点处理完成转给下一个节点不结束程序)。然后在加一个上下文的Context维护每一个节点都用到的共享数据。岂不是既解决了问题,而且解决了上面说到的不好扩展和不好实现代码重用(将每个业务抽象出来成单独的类,谁都可以用)的问题。
也许讲了这么一大堆还是有些糊涂,我们画一张图解释一下管道模式吧。这次我们就说netty处理网络请求的例子。我们知道一个请求从客户端发出,要经过编码-》服务端接受到进行解码-》服务端进行业务逻辑的处理-》在编码将消息发给客户端这么多流程。其中编码、解码等逻辑是处理每一个网络请求都能复用的。因此我们将这些都抽象成一个个handler,放在一个有方向的管道中,让请求经过这个管道中的每一个节点去处理。其中context存储上下文数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3aFrUd7z-1616406401800)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773353737-2ceb1ab2-90c3-43eb-8809-da9da2da7af4.png)]
下面我们给出代码实现,方便进一步理解:
/*** 上下文容器* * @author : HXY* @date : 2021-03-04 21:07**/
@Data
public class BaseContext {private String request;public BaseContext(String request) {this.request = request;}
}
/*** Handler抽象接口,提供一个deal方法让实现类实现* * @param */
public interface Handler<C extends BaseContext> {void deal(C context);
}
// 三个具体的handler,都实现了Handler接口public class EncodeHandler implements Handler<BaseContext>{@Overridepublic void deal(BaseContext context) {System.out.println("encode..." + context.getRequest());}
}public class DecodeHandler implements Handler<BaseContext>{@Overridepublic void deal(BaseContext context) {System.out.println("decode..." + context.getRequest());}
}public class ComputeHandler implements Handler<BaseContext> {@Overridepublic void deal(BaseContext context) {System.out.println("deal..." + context.getRequest());}
}
/*** 管道* * @author : HXY* @date : 2021-03-04 21:20**/
@Data
public class ChainPipeLine {// 上下文private BaseContext context;// 借用一个list来存储handlerprivate List<Handler<BaseContext>> pipeLine = new ArrayList<>();public ChainPipeLine(BaseContext context) {this.context = context;}// 核心逻辑,遍历所有handler,让数据流转到每一个handlerpublic void doHandler() {for (Handler<BaseContext> handler : pipeLine) {handler.deal(context);}}// 提供向管道中动态添加handler的方法public void addHandler(Handler<BaseContext> handler) {pipeLine.add(handler);}
}
public class Main {public static void main(String[] args) {BaseContext context = new BaseContext("message");ChainPipeLine pipeLine = new ChainPipeLine(context);pipeLine.addHandler(new EncodeHandler());pipeLine.addHandler(new ComputeHandler());pipeLine.addHandler(new DecodeHandler());pipeLine.doHandler();}
}// 执行结果
encode...message
deal...message
decode...message
Netty对管道模式的实践
了解了管道模式,那么我们也就了解了最开始看到的Netty两个核心组件:ChannelPipeline和ChannelHandler。ChannelPipeline就是管道模式中的管道,其实底层是一个有头节点有尾节点的双向链表。而ChannelHandler就是管道模式中的Handler,它作为一个顶级的接口,提供了很多方法供其他的handler实现,那么我们编解码的StringDecoder和StringEncoder就实现了编解码的功能。ChannelHandlerContext就是管道模式中那个上下文了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FGQqFNWG-1616406401801)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773368855-52de3fc7-4d7f-41b4-9659-366818e143e1.png)]
这里我们可能着重讲的是ChannelHandler,他又有两个核心的子接口,ChannelOutboundHandler和ChannelInboundHandler。从命名提炼出来核心的区别就是Out和In。究竟有什么用呢。我们这样想 ,请求从客户端发出,到服务端,服务端响应给客户端,对客户端或者服务端来说,都有两个动作,出和入。那么针对出和入这两个不同的动作,有些处理总是有区别的。比如编码就应该在消息发出这个过程进行,而解码就应该在消息收入这个过程进行。那么一个管道中既有编码又有解码的Handler,那么如何防止在消息发出的时候错走了解码,消息接受的时候错走了编码呢?这个时候ChannelOutboundHandler和ChannelOutboundHandler就有用武之地了。凡是实现了ChannelOutboundHandler接口的Handler,我们就知道他是负责处理消息出逻辑的,ChannelOutboundHandler反之。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K7pyhT2e-1616406401802)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773403410-c545af26-0dca-43a1-9cd6-7d1a09bdae53.png)]
Netty提供的编解码器
Netty为我们提供了很多编解码器,比如编解码字符串的StringDecoder和StringEncoder和编解码对象的ObjectDecoder和ObjectEncoder。我们通过字符串编解码器看他是如何实现编解码的。
首先看StringDecoder,作为一个解码器,他是在消息入的时候用的,所以肯定是实现了ChannelInboundHandler的。下文是他的部分源码,其中继承的MessageToMessageDecoder实现了ChannelInboundHandler。
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {// 解码的核心逻辑,其实就是把二进制的字节转换成字符串。这个方法什么时候被调用的呢?看父类@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {out.add(msg.toString(charset));}
}
// channelRead,当通道有读取事件的时候会触发
// 父类定义如果有数据读取到,就调用解码方法,而解码的逻辑又交给不同的子类实现,又是对模板方法设计模式的应用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CodecOutputList out = CodecOutputList.newInstance();try {if (acceptInboundMessage(msg)) {@SuppressWarnings("unchecked")I cast = (I) msg;try {// 这里调用了解码的方法decode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}} else {out.add(msg);}} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size = out.size();for (int i = 0; i < size; i++) {ctx.fireChannelRead(out.getUnsafe(i));}} finally {out.recycle();}}
}protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
StringEncoder逻辑和上面大概雷同,不在赘述。
自定义编解码器
了解了基本套路后,如何实现一个自定义编解码,其实大体的模板代码只是照猫画虎的事情,核心在于选用一个高效给力的序列化工具来进行编解码。这里推荐protostuff,据说性能很不错,比java提供的序列化工具要优秀很多。
protostuff使用案例
- 引入依赖
<dependency><groupId>com.dyuproject.protostuffgroupId><artifactId>protostuff-apiartifactId><version>1.0.10version>
dependency>
<dependency><groupId>com.dyuproject.protostuffgroupId><artifactId>protostuff-coreartifactId><version>1.0.10version>
dependency>
<dependency><groupId>com.dyuproject.protostuffgroupId><artifactId>protostuff-runtimeartifactId><version>1.0.10version>
dependency>
- 实现序列化反序列化的方法
public class ProtostuffUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();private static <T> Schema<T> getSchema(Class<T> clazz) {@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);if (schema == null) {schema = RuntimeSchema.getSchema(clazz);if (schema != null) {cachedSchema.put(clazz, schema);}}return schema;}/*** 序列化** @param obj* @return*/public static <T> byte[] serializer(T obj) {@SuppressWarnings("unchecked")Class<T> clazz = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(clazz);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化** @param data* @param clazz* @return*/public static <T> T deserializer(byte[] data, Class<T> clazz) {try {T obj = clazz.newInstance();Schema<T> schema = getSchema(clazz);ProtostuffIOUtil.mergeFrom(data, obj, schema);return obj;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}public static void main(String[] args) {byte[] userBytes = ProtostuffUtil.serializer(new User(1, "hanxingyu"));User user = ProtostuffUtil.deserializer(userBytes, User.class);System.out.println(user);}
}
- 实现自定义的编解码器的Handler,在其中的decode和encode方法中使用上述工具类。
粘包拆包
粘包拆包问题重现
关于什么是粘包拆包呢,我们先通过一个demo来看一下。
Server端代码
public class NettyServer {public static void main(String[] args) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});System.out.println("netty server start...");ChannelFuture future;try {future = bootstrap.bind(8080).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}public class NettyServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" ======> [server] received message: " + msg);}
}
Client端代码
public class NettyClient {public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());pipeline.addLast(new NettyClientHandler());}});System.out.println("netty client start...");try {bootstrap.connect("127.0.0.1", 8080).sync().channel();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for (int i = 0; i < 100; i++) {ctx.writeAndFlush("hello world");}}
}
上面这段代码主要实现一个功能,启动服务端,然后在启动客户端,客户端启动后会循环向服务端发送100次hello world,然后服务端收到消息打印一下。那么我们看一下执行的结果会是什么样。执行后我们发现并没有像我们预期的一样,到一次收到消息打印一次hello world。而是每次收到的很没有规律,有时候一次收到三四个消息,有时候一连收到十几个消息,这个呢就叫粘包。
======> [server] received message: hello worldhello world======> [server] received message: hello worldhello worldhello worldhello world======> [server] received message: hello worldhello worldhello worldhello worldhello worldhello worldhello worldhello worldhello world======> [server] received message: hello worldhello world======> [server] received message: hello world// 删去一部分
粘包拆包的原因
那么为什么会造成上面这种状况呢?主要是因为TCP协议是基于流传输的协议,数据在整个传输过程中是没有界限区分的,如果一个包过大的话,可能会被切成多个包进行分批传输,如果一个包过小的话,也会放到缓冲区,等到将好几个小包整合成一个大包进行传输。因此我们读取数据,往往不一定能获取到一个完整的预期的数据包。
粘包拆包的描述
上面我们复现了一个粘包的现象,那么我们知道除了粘包,还有拆包。这里呢,我们通过几张图详细的解释一下粘包拆包。
-
这张图中客户端向服务端发送两个数据包:Hello 和 World。服务端按照预期收到两个数据包Hello 和 World。属于正常现象。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mw65ZFUW-1616406401803)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773581290-0a136169-39af-4760-bdd0-33f77781664c.png)] -
这张图中客户端向服务端发送两个数据包:Hello 和 World。然而服务端只收到一个数据包HelloWorld。两个本来要分开的包粘在了一起。就叫粘包。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-G9V3HUEB-1616406401804)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773594062-19e30b50-600a-48f8-a1b1-8820d7bb2ec1.png)] -
这张图中又分为出现两种情况。虽然两种情况服务端收到的数据不一样,但都有一个特点,那就是一个完整的包被拆成两部分,我们就叫做拆包。
- 第一张图中客户端向服务端发送一个数据包:HelloWorld,然而服务端收到两个包:HelloWo和rld。
- 第二张图中客户端向服务端发送一个数据包:HelloWorld,然而服务端收到两个包:Hel和loWorld。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vFhqwnM5-1616406401805)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773603627-56934d91-e30e-443b-9ca6-0f2214d6c459.png)]
粘包拆包的解决方案
Netty为了解决粘包拆包的问题,给我们提供了很多方案,我们简单介绍常用的几种:
-
根据固定长度来分包
上面的示例代码中我们常常收到多个数据包粘到一起的情况,那么如果我们在服务端限定每次收到的消息长度固定为11,如果收到超过11位,也只截取11位作为一个包,那么其实就可以解决粘包的问题。这种方案Netty已经为我们提供了一个Handler,就是
FixedLengthFrameDecoder,它支持传入一个int类型的参数frameLength,如果出现粘包,那么我们根据frameLength对数据包进行拆分;如果出现拆包,则可以等待下一个数据包过来,然后根据frameLength对下一个包进行截取和上一个不完整的包进行拼接,直到得到一个完整的包。然后我们对上述示例代码进行改造,就可以解决粘包拆包的问题。
public class NettyServer {public static void main(String[] args) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 通过FixedLengthFrameDecoder解决粘包拆包问题pipeline.addLast(new FixedLengthFrameDecoder(11));pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});System.out.println("netty server start...");ChannelFuture future;try {future = bootstrap.bind(8080).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}} } -
特殊分隔符来分包
除过上述的方案,我们还可以根据某一个特殊符号来进行分包,比如我们每次发送消息的时候在结尾多加一个自定义的特殊符号,那么服务端在收到数据包后,如果粘包,就根据自定义的特殊符号进行拆分;如果出现拆包,就等下一个数据包发过来拼接到特殊符号为止凑成一个完整的包,也可以解决粘包拆包。这种方案,针对这种方案,Netty为我们提供了
DelimiterBasedFrameDecoder这个handler,他的最基础的构造函数如下:// maxFrameLength 最大的长度限制,超过这个长度,会抛异常,防止数据过大,导致内存溢出 // delimiter就是我们那个自定义的特殊符号 public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {this(maxFrameLength, true, delimiter); }同样对上述示例代码进行改造,也可以解决粘包拆包的问题
public class NettyServer {private static final String DELIMITER = "-";public static void main(String[] args) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// DelimiterBasedFrameDecoder解决粘包拆包pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(DELIMITER.getBytes())));pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());// 自定义收到心跳包如何处理的handlerpipeline.addLast(new NettyServerHandler());}});System.out.println("netty server start...");ChannelFuture future;try {future = bootstrap.bind(8080).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}} } -
自定义分包解码器解决粘包拆包
上面的方式总是有局限性的,比如我们按照
-这个特殊字符分包,那么假如我们本身发的消息内容就包括-,就会导致一个完整的包也会被拆分。所以我们还可以自定义编解码器去解决分包拆包。这里我们提供一种方案就是每次客户端发送消息的时候都把消息本身的内容和消息的长度都发送过去,这样在服务端解码的时候就可以根据长度来解析数据包。@Data public class MessageProtocol {// 消息长度private int length;// 消息内容private byte[] content; }public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());} }public class MessageDecoder extends ByteToMessageDecoder {int length = 0;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if(in.readableBytes() >= 4) {// 先解析出消息的长度if (length == 0){length = in.readInt();}// 在读取收到的内容,如果收到的内容长度小于length,说明发生了拆包,在等新的包发来if (in.readableBytes() < length) {return;}// 走到这里说明完整的包已经收到了,按照根据长度读出内容byte[] content = new byte[length];if (in.readableBytes() >= length){in.readBytes(content);// 封装成MyMessageProtocol对象,传递到下一个handler业务处理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLength(length);messageProtocol.setContent(content);out.add(messageProtocol);}length = 0;}} }
心跳检测
何为心跳检测
我们常常听到TCP连接有长连接和短链接。那么什么是长连接,什么又是短连接呢?
短连接
短连接一般经过这么几个步骤:
- 客户端向服务端发送请求,服务端收到请求,建立连接
- 客户端发送数据,服务端收到数据并回应客户端
- 连接断开
长连接
长连接一般经过这么几个步骤:
- 客户端向服务端发送请求,服务端收到请求,建立连接
- 客户端发送数据,服务端收到数据并回应客户端
- 连接保持,不断开
- 客户端有数据继续发送,服务端正常处理
那么这里就发现了长连接的一个问题,我们都知道网络是不稳定的,那么如果在一个长连接的过程中,客户端因为网络故障挂了,而这时服务端还一直保存这对应的通道连接,那如果这种情况多了,岂不是一种对资源的浪费吗?又或者一个客户端和服务端建立长连接后,只发送了一次数据,然后长时间都没有数据发送,那么我们仍然保存这他这个连接,是不是也不是很合理,这不是占着茅坑不拉屎吗?所以这个时候就需要一种机制来检测客户端和服务端之间的连接是否正常或者检测你客户端在一定时间内还没有没有数据发给我,我服务端要不要把你的连接关掉,把资源腾出来,让给其他需要的客户端。这个时候就出现了我们今天的主角:心跳检测。心跳检测的原理呢就是说客户端和服务端定时发送一种特殊的数据包**(心跳包)**,来确保这个TCP长连接是有效的。如果超过一段时间我没有收到你的心跳,那对不起,我就要把你断开了。
如何使用Netty的心跳检测
Netty作为一个优秀的网络通信开发框架,也给我们提供了心跳检测的机制。那么首先我们看看Netty的心跳检测如何使用,在来研究他是如何设计的?我们直接通过一个demo来看吧。
首先我们创建一个Netty的服务端,大部分都是模板代码,核心的一行是17行和19行。17行我们向pipeline中加入了一个IdleStateHandler。这是心跳检测的核心类。他的构造器有四个参数:
long readerIdleTime: 读超时,在该参数给定的时间间隔内,如果没有从Channel中读取到数据,就会触发一个READER_IDLE的事件long writerIdleTime: 写超时,在该参数给定的时间间隔内,如果没有数据写入到Channel中,就会触发一个WRITER_IDLE的事件long allIdleTime: 读/写超时,在该参数给定的时间间隔内,如果没有读或写操作时, 会触发一个 ALL_IDLE 的事件TimeUnit unit: 时间单位,就是上面三个参数的单位,如果不指定,默认是秒
public class HeartBeatServer {public static void main(String[] args) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 编解码器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());// 心跳机制的关键handlerpipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));// 自定义收到心跳包如何处理的handlerpipeline.addLast(new HeartBeatServerHandler());}});System.out.println("netty server start...");ChannelFuture future;try {future = bootstrap.bind(8080).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
除了17行的代码,19行依然重要,这里我们加了一个自定义的handler,用来处理收到心跳包的逻辑。代码如下:
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {private static final int IDLE_NUM = 3;private int readIdleTimes = 0;// 打印收到的客户端发送来的心跳包,我们发送的是字符串@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" ======> [server] received message: " + msg);}// 上面说会触发三种事件:READER_IDLE、WRITER_IDLE、ALL_IDLE,那么触发后如何感知到呢,就需要复写这个方法,在方法里就能处理这三种事件。// 我们在服务端定义的是READER_IDLE事件, 所以这里关注如果是读超时事件,就给readIdleTimes++, 当readIdleTimes大于3的时候,我们断开连接// 并且给客户端发送一个消息@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {readIdleTimes++;}}if (readIdleTimes > IDLE_NUM) {System.out.println("idle num more than three times, channel closed...");ctx.channel().writeAndFlush("channel close");ctx.channel().close();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");}
}
上述是服务端的代码,我们在看客户端的代码
public class HeartBeatClient {public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());// 同样这里我们也定义了一个客户端处理心跳的逻辑pipeline.addLast(new HeartBeatClientHandler());}});System.out.println("netty client start...");try {Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();String text = "Heartbeat Packet";Random random = new Random();// 随机休息几秒,模拟网络不稳定,间断给服务端发送心跳的情况。while (channel.isActive()) {int num = random.nextInt(8);Thread.sleep(num * 1000);channel.writeAndFlush(text);}} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {// 客户端收到服务端的消息,也断开连接@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("client received :" + msg);if ("channel close".equals(msg)) {System.out.println("client channel close...");ctx.channel().closeFuture();}}
}
Netty如何实现心跳检测的源码
知道了如何用,当然我们也应该知道为什么可以这么用。首先我们看他的构造方法里做了什么
private final long readerIdleTimeNanos;
private final long writerIdleTimeNanos;
private final long allIdleTimeNanos;public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}// 构造方法真正调用的逻辑,给readerIdleTimeNanos writerIdleTimeNanos allIdleTimeNanos三个成员变量就行了赋值
public IdleStateHandler(boolean observeOutput,long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {ObjectUtil.checkNotNull(unit, "unit");this.observeOutput = observeOutput;if (readerIdleTime <= 0) {readerIdleTimeNanos = 0;} else {readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);}if (writerIdleTime <= 0) {writerIdleTimeNanos = 0;} else {writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);}if (allIdleTime <= 0) {allIdleTimeNanos = 0;} else {allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);}
}
然后他的核心逻辑在channelActive这个方法,这个方法是通道一被激活就会触发的方法。
public void channelActive(ChannelHandlerContext ctx) throws Exception {// This method will be invoked only if this handler was added// before channelActive() event is fired. If a user adds this handler// after the channelActive() event, initialize() will be called by beforeAdd().initialize(ctx);super.channelActive(ctx);
}private void initialize(ChannelHandlerContext ctx) {// Avoid the case where destroy() is called before scheduling timeouts.// See: https://github.com/netty/netty/issues/143switch (state) {case 1:case 2:return;}state = 1;initOutputChanged(ctx);lastReadTime = lastWriteTime = ticksInNanos();// 核心的逻辑就在下面这三个if条件if (readerIdleTimeNanos > 0) {// 检测读超时的定时任务readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (writerIdleTimeNanos > 0) {// 检测写超时的定时任务writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (allIdleTimeNanos > 0) {// 检测读写超时的定时任务allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),allIdleTimeNanos, TimeUnit.NANOSECONDS);}
}// 接着往下看schedule方法的实现,这里我们发现第二个参数其实是一个Runnable任务,第三个参数就是我们传过来的超时时间
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {// ctx.executor()得到一个EventExecutor,是一个定时任务线程池,下面看代码结构,// 因此上核心逻辑应该在task的run()方法里定义的,再去一探究竟,以ReaderIdleTimeoutTask为例return ctx.executor().schedule(task, delay, unit);
}private final class ReaderIdleTimeoutTask extends AbstractIdleTask {ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {super(ctx);}@Overrideprotected void run(ChannelHandlerContext ctx) {// 我们设置的时间,假定为3秒long nextDelay = readerIdleTimeNanos;if (!reading) {// nextDelay = nextDelay - (ticksInNanos() - lastReadTime)// ticksInNanos() - lastReadTime当前时间减去最后一次调用channelRead方法的时间,假设4秒,说明上次调用已经是4秒之前的事情了// nextDelay = nextDelay - (ticksInNanos() - lastReadTime) => 3 - 4 = -1// 这一步<0,说明超时了,否则说明还没超时,算出来的差就是还有几秒超时。然后赋值给nextDelaynextDelay -= ticksInNanos() - lastReadTime;}// 超时的逻辑if (nextDelay <= 0) {// 假设超时了,那么3秒后(我们设置的时间)在调用定时任务readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);boolean first = firstReaderIdleEvent;firstReaderIdleEvent = false;try {IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);// 超时的核心逻辑,代码走到:标记1,触发下一个handler的userEventTriggered(evt)方法,// 发送读超时的事件,就到了我们前面自定义的那个handlerchannelIdle(ctx, event);} catch (Throwable t) {ctx.fireExceptionCaught(t);}} else {// 没超时,假设62行算出来是1,那么1秒后执行定时任务readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);}}
}// 标记1
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {ctx.fireUserEventTriggered(evt);
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qQRzHi6G-1616406401807)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773729282-e31859f4-df4f-40b2-b69f-ff04f96d2a01.png)]
Dubbo2.7.x版本的心跳检测如何通过Netty实现
Dubbo2.7.x之后(这里取得是2.7.8)的版本的心跳检测就是通过Netty实现的。我们一起看看他的代码是怎么写的:
代码路径:org.apache.dubbo.remoting.transport.netty4
服务端代码
@Override
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");workerGroup = NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),"NettyServerWorker");final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NettyEventLoopFactory.serverSocketChannelClass()).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation",SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));}ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())// 定义IdleStateHandler.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler);}});// bindChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 原文注释:server will close channel when server don't receive any heartbeat from client util timeout.if (evt instanceof IdleStateEvent) {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {logger.info("IdleStateEvent triggered, close channel " + channel);channel.close();} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}super.userEventTriggered(ctx, evt);
}
客户端代码:
@Override
protected void doOpen() throws Throwable {final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);bootstrap = new Bootstrap();bootstrap.group(NIO_EVENT_LOOP_GROUP).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()).channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())// 设置IdleStateHandler.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// send heartbeat when read idle.if (evt instanceof IdleStateEvent) {try {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);if (logger.isDebugEnabled()) {logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);}Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(HEARTBEAT_EVENT);channel.send(req);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} else {super.userEventTriggered(ctx, evt);}
}
看完是不是发现都是一样的套路。。
零拷贝
堆内存和直接内存
什么是直接内存
要说零拷贝,那我们先要知道什么是直接内存和堆内存。所谓堆内存就是分配给JVM堆区的内存,而直接内存是java通过调用navite方法可以直接分配JVM之外的内存区域,所以我们也把这块内存叫做堆外内存。而我们可以在堆区通过一个DirectByteBuffer去引用这块内存。因为这块内存是没有交给JVM去管理的,比较容易发生内存溢出,为了避免一直没有FULL GC,导致耗完物理内存,我们可以通过参数-XX:MaxDirectMemorySize 指定直接内存的大小,这样当直接内存到达一个阈值的时候,就会被动触发Full GC。直接内存和堆内存的关系如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yfM5upz7-1616406401808)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773791957-1219d8d7-3cc0-4618-94c9-e12403f3cd21.png)]
JDK给我们提供了Api可以查看默认的堆外内存大小:
System.out.println("direct memory : "+ VM.maxDirectMemory() / 1024 / 1024);
System.out.println("heap memory : "+ Runtime.getRuntime().maxMemory() / 1024 / 1024);
直接内存如何申请使用
那么了解了什么是直接内存,我们接着看一下如何使用直接内存,代码如下:
// 通过这个方法我们可以申请1000字节的直接内存
ByteBuffer buffer = ByteBuffer.allocateDirect(1000);// 跟踪这个方法到DirectByteBuffer.java:120行,我们看到他是调用了一个navite方法去申请的内存,返回来的是一个地址
// 如果有openJDK的代码,可以看到这个方法底层的代码
base = unsafe.allocateMemory(size);// 这是navite方法调用操作系统的方法为我们申请内存 unsafe.cpp:628
void* x = os::malloc(sz, mtInternal);
直接内存和堆内存对比
下面这段代码通过分别对堆内存和直接内存的反复申请读写,测试了一下耗时,发现直接内存的访问性能普遍高于堆内存的访问性能,这是因为使用直接内存只需要一次拷贝,而使用堆内存需要两次拷贝,详情参考下文零拷贝。而直接内存的申请效率普遍低于堆内存的申请。
public class MemoryTest {/*** 操作堆内存*/private static void heapAccess() {long startTime = System.currentTimeMillis();// 分配堆内存ByteBuffer buffer = ByteBuffer.allocate(1000);readAndWrite(buffer);long endTime = System.currentTimeMillis();System.out.println("堆内存访问:" + (endTime - startTime) + "ms");}/*** 操作直接内存*/private static void directAccess() {long startTime = System.currentTimeMillis();// 分配直接内存ByteBuffer buffer = ByteBuffer.allocateDirect(1000);readAndWrite(buffer);long endTime = System.currentTimeMillis();System.out.println("直接内存访问:" + (endTime - startTime) + "ms");}private static void readAndWrite(ByteBuffer buffer) {for (int i = 0; i < 100000; i++) {for (int j = 0; j < 200; j++) {buffer.putInt(j);}buffer.flip();for (int j = 0; j < 200; j++) {buffer.getInt();}buffer.clear();}}private static void heapAllocate() {long startTime = System.currentTimeMillis();for (int i = 0; i < 100000; i++) {ByteBuffer.allocate(100);}long endTime = System.currentTimeMillis();System.out.println("堆内存申请:" + (endTime - startTime) + "ms");}private static void directAllocate() {long startTime = System.currentTimeMillis();for (int i = 0; i < 100000; i++) {ByteBuffer.allocateDirect(100);}long endTime = System.currentTimeMillis();System.out.println("直接内存申请:" + (endTime - startTime) + "ms");}public static void main(String[] args) {for (int i = 0; i < 10; i++) {heapAccess();directAccess();}System.out.println("---------------------------");for (int i = 0; i < 10; i++) {heapAllocate();directAllocate();}}
}// 执行结果
堆内存访问:54ms
直接内存访问:151ms
堆内存访问:84ms
直接内存访问:41ms
堆内存访问:78ms
直接内存访问:40ms
堆内存访问:83ms
直接内存访问:40ms
堆内存访问:99ms
直接内存访问:49ms
堆内存访问:166ms
直接内存访问:55ms
堆内存访问:123ms
直接内存访问:54ms
堆内存访问:102ms
直接内存访问:49ms
堆内存访问:79ms
直接内存访问:39ms
堆内存访问:78ms
直接内存访问:40ms
---------------------------
堆内存申请:13ms
直接内存申请:71ms
堆内存申请:6ms
直接内存申请:23ms
堆内存申请:23ms
直接内存申请:29ms
堆内存申请:2ms
直接内存申请:28ms
堆内存申请:7ms
直接内存申请:77ms
堆内存申请:3ms
直接内存申请:26ms
堆内存申请:2ms
直接内存申请:21ms
堆内存申请:213ms
直接内存申请:41ms
堆内存申请:2ms
直接内存申请:40ms
堆内存申请:2ms
直接内存申请:38ms
零拷贝
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ln5jz9rb-1616406401809)(https://intranetproxy.alipay.com/skylark/lark/0/2021/png/314404/1615773812361-bcb3f164-0d82-4292-8d9f-577884987a7f.png)]
何为零拷贝呢,不是说一次拷贝都没有,而是说相比较操作堆内存少了两次拷贝。平时发生一次读写,如果是操作堆内存的话,往往需要四次拷贝,读的时候将数据从直接内存拷贝到堆内存,再从堆内存拷贝到缓冲区,写的时候将数据从缓冲区拷贝到堆内存,在从堆内存拷贝到直接内存。而操作直接内存就不一样了,少去了堆内存和直接内存之间的拷贝过程。我们把这个叫做零拷贝。而Netty接受和发送数据用了直接内存来提升效率。
代码附件:neety.7z
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
