C++网络编程(3)

**重点内容**EPOLLONESHOT模式

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。

对于注册EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下次一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include #define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024struct fds
{int epollfd;int sockfd;
};int setnonblocking( int fd )
{int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;
}void addfd( int epollfd, int fd, bool oneshot )
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;if( oneshot ) //只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里{event.events |= EPOLLONESHOT;}epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd );
}void reset_oneshot( int epollfd, int fd )
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); //将fd重新添加到epoll队列中
}void* worker( void* arg )
{int sockfd = ( (fds*)arg )->sockfd;int epollfd = ( (fds*)arg )->epollfd;printf( "start new thread to receive data on fd: %d\n", sockfd );char buf[ BUFFER_SIZE ];memset( buf, '\0', BUFFER_SIZE );while( 1 ){int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );if( ret == 0 ){close( sockfd );printf( "foreiner closed the connection\n" );break;}else if( ret < 0 ){if( errno == EAGAIN ){reset_oneshot( epollfd, sockfd ); //将事件重新添加到epoll监听列表中printf( "read later\n" );break;}}else{printf( "get content: %s\n", buf ); //添加线程执行计划sleep( 5 );}}printf( "end thread receiving data on fd: %d\n", sockfd );
}int main( int argc, char* argv[] )
{if( argc <= 2 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}const char* ip = argv[1];int port = atoi( argv[2] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;inet_pton( AF_INET, ip, &address.sin_addr );address.sin_port = htons( port );int listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );epoll_event events[ MAX_EVENT_NUMBER ];int epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd, false );while( 1 ){int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ret < 0 ){printf( "epoll failure\n" );break;}for ( int i = 0; i < ret; i++ ){int sockfd = events[i].data.fd;if ( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );addfd( epollfd, connfd, true );}else if ( events[i].events & EPOLLIN ){pthread_t thread;fds fds_for_new_worker;fds_for_new_worker.epollfd = epollfd;fds_for_new_worker.sockfd = sockfd;pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );}else{printf( "something else happened \n" );}}}close( listenfd );return 0;
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部