网络编程 AIO
AIO模型介绍
AIO(Asynchronous I/O) 异步非阻塞模型, 在java jdk.17版本开始支持AIO,AIO模型需要操作系统的支持。AIO最大的特性是异步能力,对socket和I/O起作用。

与NIO模型不同,一读写操作为例,只需直接调用read和write的API即可。对于读操作:当有流可读时,系统会将可读的流传入到read方法的缓冲区,并通知应用程序。读写都是异步的,完成之后会主动调用回调函数。
在JDK 1.7中,aio也称为nio.2.0,主要在java.nio.channels包下新增了四个异步通道:
- AsynchronousSocketChannel:异步操作TCP通道,主要连接AsynchronousServerSocketChannel,一般在客户端实现;
- AsynchronousServerSocketChannel:异步操作TCP通道,主要接收客户端的连接,一般在服务端实现;
- AsynchronousFileChannel:操作文件;
- AsynchronousDatagramChanel:异步操作UDP的通道。
AsynchronousServerSocketChannel:AIO中网络通信服务端的socket
AIO中实现方法,以accept方法为例。
由于异步IO实际IO操作是交给操作系统来做,应用程序只负责通知操作系统进行IO和接口操作系统IO完成的通知,所以异步的accept的方法调用是不会阻塞的。
异步IO中有两种实现方式:
1、future方法
Future accept();
提交一个IO操作请求(Accept/read/write),返回future,就可以对future进行检查,future.get()方法,future方法会让用户程序阻塞直至操作正常完成,使用future方法比较简单,但是future.get()是同步的,使用该方式容易进入到同步的编程模式,这种方式会使AIO的异步操作成为摆设。
2、callback回调方式
void accept(A attachment, CompletionHandler handler)
开始接收客户端的连接,连接成功或者失败都是触发CompletionHandler对象的响应方法。
CompletionHandler接口提供了两个方法:
void completed(V result, A attachment);
当IO完成时触发该方法,该方法的第一个参数代表IO操作的返回的对象,第二个参数代表发起IO操作时传入的附加参数。
void failed(Throwable exc, A attachment);
当IO失败时触发该方法,第一个参数表示IO操作失败引起的异常或错误,第二个参数代表发起IO操作时传入的附加参数。
即提交一个IO操作请求(Accept/read/write),指定一个CompletionHandler,当异步IO操作完成时,发送一个通知,这个时候CompletionHandler对象的completed或者failed方法将会被调用。
AIO的实现需要充分调用操作系统参数,IO需要操作系统的支持,并发也同样需要操作系统的支持,所以性能方面不同的操作系统差异会比较明显。
AIO 的回调方式编程
AIO服务端(Server.java):
/*** AIO服务端代码* @Author : quwenjing* @Date : 2021/11/18 19:01**/
public class Server {public static void main(String[] args) {try {//创建服务端通道AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();//绑定端口asynchronousServerSocketChannel.bind(new InetSocketAddress(6666));System.out.println("服务端启动");//接受客户端的连接acceptasynchronousServerSocketChannel.accept(null,new AcceptCompletionHandler(asynchronousServerSocketChannel));//BIO accept操作返回Socket实例//AIO accept操作返回AsynchronousSocketChannel
//accept是异步操作,防止当前程序直接执行结束//方法一:while(ture) +sleepwhile (true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}} catch (IOException e) {e.printStackTrace();}}
}
接受连接accept的回调(AcceptCompletionHandler.java):
/*** 接收连接accept的回调* @Author : quwenjing* @Date : 2021/11/18 19:07**/
public class AcceptCompletionHandler implements CompletionHandler {
//返回参数,传入参数
private AsynchronousServerSocketChannel channel;public AcceptCompletionHandler(AsynchronousServerSocketChannel channel){this.channel = channel;}@Overridepublic void completed(AsynchronousSocketChannel result, Object attachment) {System.out.println("有新客户的连接");//完成accept连接操作//读写操作,需要使用buffer//创建新的bufferByteBuffer byteBuffer = ByteBuffer.allocate(1024);//读客户端的数据,读操作是异步操作,需要实现CompletionHandler对象/*void read(ByteBuffer dst,A attachment,CompletionHandler handler);读操作异步方式方法解读dst:数据读取目的地 attachment:给读回调传递的信息CompletionHandler:当读数据完成后CompletionHandler对象*/result.read(byteBuffer,byteBuffer,new ReadCompletionHandler(result));
//再次接受其他客户端的连接,调用accept方法channel.accept(null,new AcceptCompletionHandler(channel));}@Overridepublic void failed(Throwable exc, Object attachment) {
}
}
读操作的回调(ReadCompletionHandler.java):
/*** 读操作的回调* @Author : quwenjing* @Date : 2021/11/18 19:21**/
public class ReadCompletionHandler implements CompletionHandler {//读操作返回的结果是读取的个数,应该是Integer
//用户接收或者发送操作的通道private AsynchronousSocketChannel asynchronousSocketChannel;public ReadCompletionHandler(AsynchronousSocketChannel channel){this.asynchronousSocketChannel = channel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {//读数据完成后//数据已经完成并写入ByteBuffer类型的result变量中attachment.flip();byte[] bytes = new byte[attachment.remaining()];attachment.get(bytes);String string = null;try {string = new String(bytes,"utf-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("服务端接收的数据:"+string);attachment.clear();
//重复接收消息,再次调用异步读操作this.asynchronousSocketChannel.read(attachment,attachment,new ReadCompletionHandler(this.asynchronousSocketChannel));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {
}
}
客户端(Client.java):
public class Client {public static void main(String[] args) {//创建异步通道try {AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
//连接服务端异步方式asynchronousSocketChannel.connect(new InetSocketAddress("127.0.0.1",6666),asynchronousSocketChannel,new ConnectionCompletionHandler());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//写操作Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String msg = scanner.nextLine();if (msg!=null && !"".equals(msg.trim())){byteBuffer.put(msg.getBytes());byteBuffer.flip();asynchronousSocketChannel.write(byteBuffer);byteBuffer.clear();}}} catch (IOException e) {e.printStackTrace();}}
}
连接服务端(ConnectionCompletionHandler.java):
public class ConnectionCompletionHandler implements CompletionHandler {@Overridepublic void completed(Void result, AsynchronousSocketChannel attachment) {//连接服务端成功System.out.println("连接服务端成功");}
@Overridepublic void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
