UDP收发包性能测试
自己写的UDP收发包性能测试工具,代码如下:
首先是两个头文件
#ifndef _TEST_HEADER_H
#define _TEST_HEADER_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define __USE_GNU
#include
#include
#include //@@sem#define DEBUG 0
#if DEBUG
#define debug(...) printf(__VA_ARGS__)
#else
#define debug(...)
#endif#define RECV_BUFF_SIZE 64 //定义环形缓冲区的大小,单位为可存储的报文个数
//#define SEND_BUFF_SIZE 100 //目前没有使用到发送环形缓冲,后期的数据产生模块中可能会使用
#define PORTNUM 10000 //定义socket的发送或监听端口
#define ALLOWIP_ADDR "0.0.0.0" //接收端使用recvfrom时指定的接收地址
#define PRINTINTERVAL 3 //打印线程打印信息的间隔,单位seconds
#define WINDOWSIZE 1024 //使用滑动窗口机制时的窗口大小,当报文接收很快时,需要提高此值,否则会引起coredump
#define FRAGMAXSIZE 65507 //定义应用层进行分片操作的报文临界点,由于UDP协议长度16bits,不能超过65507#ifdef MULTICASTTEST
#define PEERNUM 2 //Multicast peer node number
#endif //@end MULTICASTTEST//TCP defination
#define MAX_LISTEN_NUM 5/*适配曾目前主要用于应用层使用UDP发送大报文时的分片和重组*/
typedef enum{false,true
}bool;
//适配层的报文头部
typedef struct
{bool isFrag;unsigned int numFrag;unsigned int indexFrag;unsigned int lenFrag;
}fragment;
//应用层的报文头部登记的信息
#if SENDNODEOTHER||RECVNODEOTHER
typedef struct {int flag; //0x0001--正常数据报文 x0010--通信终止请求报文,0x0100--通信终止响应报文unsigned int packetseq; //报文序号unsigned int packetrecvlen1; //远端节点登记接收的报文长度,用于接收端正确记录此次报文接收的长度unsigned int packetrecvlen2; //本地节点登记接收的报文长度,用于发送段正确记录此次报文接收的长度,一般在回送业务中使用struct timeval sendtime1; //发送端发送的时间戳struct timeval recvtime1; //接收端接收的时间戳struct timeval sendtime2; //接收端发送的时间戳struct timeval recvtime2; //发送端接收的时间戳fragment fragInfo;
}packet;
#elif MULTICASTTEST
typedef struct {int flag;unsigned int packetseq;struct sockaddr_in srcaddr;unsigned int packetrecvlen;fragment fragInfo;
}packet;
#else
typedef struct {int flag;unsigned int packetseq;unsigned int packetrecvlen;fragment fragInfo;
}packet;
#endif//单个缓冲区节点的结构
typedef struct loopnode_TAG{bool isValid; //figure out whether the node has been coped withpacket *packetptr; //通常指向报文存放的地址struct loopnode_TAG *next; //下一个缓冲区节点的地址
}loopnode;//用于统计环形缓冲区信息的结构体
typedef struct {int buffsize; //缓冲区大小loopnode *wbuffptr; //当前可写指针的地址loopnode *rbuffptr; //当前可读指针的地址loopnode *pbuffptr; //cope pointerint packetnum; //当前存放的报文数目,实时更新
}loopbuff;//应用程序的统计全局变量
typedef struct {struct sockaddr_in hostaddr; //登记节点自身的地址unsigned int packetsize; //登记由命令行输入的发送或接收应用层报文大小unsigned int sendinterval; //登记发送端由命令行输入的发送应用层报文间隔,涉及usleep和for延迟的实现unsigned int recvnum; //登记总共接收的应用层报文数目unsigned int sendnum; //登记总共发送的应用层报文数目unsigned int misspktnum; //关于丢包的统计变量unsigned int duplpktnum; //关于duplicate包的统计变量unsigned long totalrecvlen; //登记关于总共记录下的接收字节数unsigned long totalrelaylen; //relay total send length
#if SENDNODEOTHERunsigned int delaycount; //关于计算往返时延的统计变量unsigned int delaycumulate;
#elif RECVNODEOTHERunsigned long trembcumulate; //关于抖动时间的统计变量unsigned int trembnum;
#endif
}recordInfo;#ifdef MULTICASTTEST
typedef struct{struct sockaddr_in clientaddr;unsigned int sendnum;unsigned int recvnum;//unsigned long totalrecvlen;
}multiNodeInfo;
#endif //@end MULTICASTTEST//TCP function declaration
int setSendBuffsize(int socket,unsigned int size);void initpthreadmutex(void);
void destorypthreadmutex(void);
void initPthreadAttr(pthread_attr_t *attr);
int setCPUAffinity(int cpu_id);
unsigned long timeval2ul(struct timeval *time);
int campareIPaddr(struct sockaddr_in *firstaddr,struct sockaddr_in *secondaddr);
void initbuff( loopbuff *buff, int number, int size);
int recv2buff(int socket,loopbuff *buff, struct sockaddr_in *clientaddr,int length);
void recyclebuff(loopbuff *buff);void slidingWinscheme(int *minseq,int *maxseq,int *curseq,unsigned int array[],recordInfo * recordptr);
//unsigned long sendtoAdaption(int socket,void *sendbuff, unsigned long packetsize,int flag,struct sockaddr_in *remoteaddr);
unsigned long sendtoAdaption(int socket,void *sendbuff, unsigned long packetsize,int flag,struct sockaddr_in *remoteaddr,unsigned int counts);
int recvfromAdaption(int socket,void *recvbuf,unsigned long packetsize,int flag,struct sockaddr_in *clientaddr);//全局变量,注意有些程序未必会使用到全部的变量
pthread_cond_t qready;
sem_t loopbuffsem; //@@sem
pthread_mutex_t mutex; //对环形缓冲区的缓存报文数目进行加锁同步,loopbuff->packetnum
pthread_rwlock_t rwmutex; //slidingWinscheme函数中更新丢失包和重复包使用
pthread_rwlock_t rwmutex1; //send & recv thread 更新全局参数结构体时使用
pthread_rwlock_t rwmutex2; //deal thread 更新全局参数结构体时使用
pthread_attr_t attr;/*
void *sendbuf=NULL; //线程发送应用层报文的缓存区地址
char *recvbuf=NULL; //线程接收应用层报文的缓冲区地址
char *fragSendbuff=NULL; //线程发送适配层报文分片的缓存地址
char *fragRecvbuff=NULL; //线程接收适配层报文分片的缓存地址
*/#ifndef TCPTEST
unsigned int fragSeq; //线程接收分片处理的参数
int fragNum; //线程接收分片处理的参数
int fragIndex; //线程接收分片处理的参数
int fragFlag; //线程接收分片处理的参数
unsigned long totalrecvFragsize;
int fragRecvNum;
#endifrecordInfo recordvar;
#endif
#include "Testheader.h"extern char *sendbuf;
extern char *recvbuf;
extern char *fragSendbuff;
extern char *fragRecvbuff;
extern unsigned int fragSeq;
extern int fragNum;
extern int fragIndex;
extern int fragFlag;
extern unsigned long totalrecvFragsize;
extern recordInfo recordvar;
extern int fragRecvNum;
extern sem_t loopbuffsem;//@@sem//统一初始化线程同步的互斥锁、条件变量、读写锁等
void initpthreadmutex(void)
{pthread_cond_init(&qready, NULL);pthread_mutex_init(&mutex, NULL);pthread_rwlock_init(&rwmutex,NULL);pthread_rwlock_init(&rwmutex1,NULL);pthread_rwlock_init(&rwmutex2,NULL);return;
}void destorypthreadmutex(void)
{pthread_cond_destroy(&qready);pthread_mutex_destroy(&mutex);pthread_rwlock_destroy(&rwmutex);pthread_rwlock_destroy(&rwmutex1);pthread_rwlock_destroy(&rwmutex2);return;
}//主线程调用,设置线程属性为实时抢占式调度,更改线程优先级,供分支线程使用
void initPthreadAttr(pthread_attr_t *attr)
{int policy,inher;struct sched_param param;pthread_attr_init(attr);pthread_attr_setinheritsched(attr,PTHREAD_EXPLICIT_SCHED);pthread_attr_setschedpolicy(attr,SCHED_FIFO);//param.__sched_priority=sched_get_priority_max(SCHED_FIFO);param.__sched_priority=90;pthread_attr_setschedparam(attr,¶m);return;
}//分支线程调用,设置本线程对应CPU的亲和力
int setCPUAffinity(int cpu_id)
{if(sysconf(_SC_NPROCESSORS_CONF)<1){perror("sysconf error, no cpu exists!");return -1;}cpu_set_t mask;CPU_ZERO(&mask);CPU_SET(cpu_id,&mask);if(-1==sched_setaffinity(0,sizeof(mask),&mask)){perror("sched_setaffinity fails!");return -1;}return 0;
}unsigned long timeval2ul(struct timeval *time)
{unsigned long value=((time->tv_sec)*1000000+time->tv_usec);return value;
}int setSendBuffsize(int socket,unsigned int size)
{unsigned int recvbufsize;int size11 = sizeof(int);int resultret;resultret = getsockopt(socket, SOL_SOCKET, SO_SNDBUF, &recvbufsize,&size11);if (resultret != 0) {perror("ERROR!getsockopt");return -1;}int size12 = sizeof(int);resultret = setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &size,size12);if (resultret != 0) {perror("ERROR!setsockopt");return -1;}return 0;
}int campareIPaddr(struct sockaddr_in *firstaddr,struct sockaddr_in *secondaddr)
{char addr1[20],addr2[20];memset(addr1,0,20);memset(addr2,0,20);memcpy(addr1,inet_ntoa(firstaddr->sin_addr),20);memcpy(addr2,inet_ntoa(secondaddr->sin_addr),20);return strcmp(addr1,addr2);
}void maketimeout(struct timespec *timespec, int second) {struct timeval now;gettimeofday(&now, NULL);timespec->tv_sec = now.tv_sec + second;timespec->tv_nsec = now.tv_usec * 1000;
}
//初始化环形缓冲区函数
void initbuff( loopbuff *buff, int number, int size) {int i;loopnode *node1, *node2;sem_init(&loopbuffsem,0,0);//@@semnode1 = malloc(sizeof(loopnode));node1->isValid=false;node1->packetptr = malloc(sizeof(char) * size);node1->next = NULL;buff->wbuffptr = node1;buff->rbuffptr = node1;buff->pbuffptr = node1;buff->buffsize = number;buff->packetnum = 0;for (i = 2; i <= number; i++) {node2 = malloc(sizeof(loopnode));node2->isValid=false;node2->packetptr = malloc(sizeof(char) * size);if (i != number) {node2->next = NULL;} else {node2->next = buff->wbuffptr;}node1->next = node2;node1 = node2;}return;
}//环形缓冲区回收处理函数
void recyclebuff(loopbuff * buff)
{//释放动态分配的缓冲区空间int i;loopnode *temp=(loopnode *)(buff->wbuffptr);loopnode *ptr;for(i=0;ibuffsize;i++){free(temp->packetptr);ptr=temp;temp=temp->next;free(ptr);}return;
}#ifdef TCPTEST
//TCP接收线程的接收操作函数
int recv2buff(int socket, loopbuff *buff, struct sockaddr_in *clientaddr,int length) {if ((buff->wbuffptr == buff->rbuffptr)&&(buff->rbuffptr->isValid==true)) {return 2;}loopnode *tempptr=buff->wbuffptr;tempptr->isValid=true;packet *pktptr=NULL;int len = recv(socket, tempptr->packetptr,recordvar.packetsize, MSG_WAITALL);if (len > 0) {pktptr = tempptr->packetptr;if ((pktptr->flag) == 1) {pktptr->packetrecvlen=len;#ifdef MULTICASTTESTpktptr->srcaddr=*clientaddr;
#endif //@end MULTITESTbuff->wbuffptr = tempptr->next;/*pthread_mutex_lock(&mutex);if (buff->packetnum == 0) {pthread_cond_signal(&qready);}buff->packetnum++;pthread_mutex_unlock(&mutex);*/pthread_mutex_lock(&mutex);buff->packetnum++;pthread_mutex_unlock(&mutex);sem_post(&loopbuffsem);return 0;}} else if (len == 0) {//peer socket disconnectreturn 1;} else if (len < 0) {perror("error! recvfrom error!");return -1;}
}#else
//UDP接收线程的接收操作函数
int recv2buff(int socket,loopbuff *buff, struct sockaddr_in *clientaddr,int length) {if ((buff->wbuffptr == buff->rbuffptr)&&(buff->rbuffptr->isValid==true)) {//缓冲区溢出//printf(" @# %d ",buff->packetnum);return 2;}loopnode *tempptr=buff->wbuffptr;packet *pktptr=NULL;debug("@@recv2buff a packet1!\n");int label = recvfromAdaption(socket, tempptr->packetptr, recordvar.packetsize,0, clientaddr);debug("@@recv2buff a packet2 label=%d!\n",label);if (label == 0) {tempptr->isValid=true;pktptr = tempptr->packetptr;if ((pktptr->flag) == 1) {//正确接收一个应用层报文并判断报文类型正常后,进行报文头部处理,以便处理线程操作
#ifdef SENDNODEOTHERgettimeofday(&(pktptr->recvtime2), NULL);pktptr->packetrecvlen2=totalrecvFragsize;
#elif RECVNODEOTHERgettimeofday(&(pktptr->recvtime1), NULL);pktptr->packetrecvlen1=totalrecvFragsize;
#elsepktptr->packetrecvlen=totalrecvFragsize;
#endif //@end SENDNODEOTHER#ifdef MULTICASTTESTpktptr->srcaddr=*clientaddr;
#endif //@end MULTITESTbuff->wbuffptr = tempptr->next;//method one condition vars/*pthread_mutex_lock(&mutex);if(buff->packetnum == 0) {//通过条件变量通知当前empty缓冲区有新的报文写入pthread_cond_signal(&qready);}buff->packetnum++;pthread_mutex_unlock(&mutex);*///method two sem mechanismpthread_mutex_lock(&mutex);buff->packetnum++;// printf("%d",buff->packetnum);pthread_mutex_unlock(&mutex);sem_post(&loopbuffsem);}debug("@@recv2buff a packet6!\n");return 0;} else if (label == 1) {//正确接收一个应用层碎片报文return 1;} else if (label < 0) {//接收出错perror("error! recvfrom error!");return -1;}
}//应用层报文发送处理函数
unsigned long sendtoAdaption(int socket,void *sendbuff, unsigned long packetsize,int flag,struct sockaddr_in *remoteaddr,unsigned int counts)
{packet *pktptr=NULL;packet fragHeader;int fragHLen=sizeof(packet);unsigned int mm=0;unsigned int NumFrag=0;unsigned int LengthFrag=0;unsigned int sendFragsize=0;unsigned long sendbuffOffset=0;unsigned long totalsendFragsize=0;if(packetsize<=FRAGMAXSIZE){LengthFrag=packetsize;pktptr=(packet *)sendbuff;pktptr->fragInfo.isFrag=false;pktptr->fragInfo.numFrag=0;pktptr->fragInfo.indexFrag=0;pktptr->fragInfo.lenFrag=packetsize;//应用层报文长度小于分片规定的大小,不进行分片操作sendFragsize=sendto(socket, sendbuff, LengthFrag, flag,(struct sockaddr *) remoteaddr, sizeof(struct sockaddr));debug("@@sendtoAdaption seq=%d len=%d\n",pktptr->packetseq,sendFragsize);totalsendFragsize=sendFragsize;int i;for(i=0;ifragInfo.isFrag=true;pktptr->fragInfo.numFrag=NumFrag;for(mm=1;mm<=NumFrag;mm++){pktptr->fragInfo.indexFrag=mm;if(mm!=NumFrag){pktptr->fragInfo.lenFrag=FRAGMAXSIZE;}else{pktptr->fragInfo.lenFrag=(packetsize-FRAGMAXSIZE)-(mm-2)*(FRAGMAXSIZE-fragHLen)+fragHLen;}LengthFrag=pktptr->fragInfo.lenFrag;debug("@@sendtoAdaption fraglen=%u\n",LengthFrag);if(mm!=1){sendbuffOffset=(mm-1)*FRAGMAXSIZE-(mm-2)*sizeof(packet);}debug("@@sendtoAdaption sendbuffOffset=%u\n",sendbuffOffset);memcpy(fragSendbuff+fragHLen,sendbuff+sendbuffOffset,LengthFrag-fragHLen);sendFragsize=sendto(socket, fragSendbuff, LengthFrag, 0,(struct sockaddr *) remoteaddr, sizeof(struct sockaddr));debug("@@sendtoAdaption frag seq=%d index=%d[%d] len=%d[%d]\n",pktptr->packetseq,pktptr->fragInfo.indexFrag,NumFrag,LengthFrag,sendFragsize);totalsendFragsize+=sendFragsize;int i;for(i=0;i<5000;i++){}}debug("@@sendtoAdaption totalsendFragsize=%u\n",totalsendFragsize);}return totalsendFragsize;
}//应用层报文接收处理函数,主要用于分片重组
int recvfromAdaption(int socket,void *recvbuf,unsigned long packetsize,int flag,struct sockaddr_in *clientaddr)
{packet *pktptr;int fragHLen=sizeof(packet);unsigned long recvFragsize=0;unsigned int ptrOffset=0;int length=sizeof(struct sockaddr);//memset(fragRecvbuff,0,FRAGMAXSIZE);recvFragsize = recvfrom(socket, fragRecvbuff, FRAGMAXSIZE,flag,(struct sockaddr *)clientaddr,&length);if(packetsizepacketseq;fragFlag=pktptr->fragInfo.isFrag;fragNum=pktptr->fragInfo.numFrag;fragRecvNum=0;debug("@@new packet fragSeq=%d\n",fragSeq);}/* ** 此次接收到的报文序号等于登记的fragseq,则:* 1. 本次处理的报文仍然为登记的应用firstaddr层报文中一个分片,* 2. 上一次处理的应用层报文完整接收,此次为新的应用报文* */if(fragSeq==pktptr->packetseq){if(fragFlag==false){//该应用层报文未分片//remove the header of adaption layer,and deliver the packet to application layer buffmemcpy(recvbuf,fragRecvbuff,recvFragsize);//fragSeq=0;totalrecvFragsize=recvFragsize;debug("@@recvfromAdaption a packet!\n");return 0;}else if(fragFlag==true){//该应用层报文分片fragIndex=pktptr->fragInfo.indexFrag;debug("@@new packet fragSeq=%d index=%d[%d]\n",fragSeq,fragIndex,fragNum);//ptrOffset locate the storage position for the fragment in this timeptrOffset=(fragIndex-1)*(FRAGMAXSIZE-fragHLen);memcpy(recvbuf+ptrOffset,fragRecvbuff+fragHLen,recvFragsize-fragHLen);totalrecvFragsize+=recvFragsize-fragHLen;fragRecvNum++;if(fragRecvNum==fragNum){//All fragments receivedreturn 0;}//there will be more fragment to be comereturn 1;}}else{//上一次处理的应用层报文没有完整接收,此次为新的应用报文到来//reset global variables,clear recvbufdebug("@@miss packet\n");fragSeq=0;totalrecvFragsize=0;memset(recvbuf,0,packetsize);goto deal_TAG;}
}#endif //@end TCPTEST//滑动窗口函数,主要用于丢包和重复包的统计,实现机制是动态拷贝滑动窗口的标识至滑动窗口开始的位置
/*
void slidingWinscheme(int *minseq,int *maxseq,int *curseq,unsigned int array[],recordInfo * recordptr)
{int k,i,j,dd1,dd2,index;//判断当前的报文序号与窗口可接受的序号大小关系if (*minseq > *curseq) {//当前序号小于窗口可接受最小序号,则不做处理,函数已经判断为丢包printf("packet arrives too late!\n");} else if (((*curseq > *minseq) && (*curseq < *maxseq)) || (*curseq == *minseq)|| (*curseq == *maxseq)) {//当前序号处于窗口可接受序号范围k = (*curseq - *minseq) % WINDOWSIZE;//当前报文序号所在的窗口索引,进行标识和重复包判断if (array[k] == 0) {array[k] = 1;} else {pthread_rwlock_wrlock(&rwmutex);recordptr->duplpktnum++;pthread_rwlock_unlock(&rwmutex);}} else if ((*curseq > *maxseq)) {//当前序号大于窗口可接受序号最大序号,进行窗口移动//dd1用于窗口从最小序号开始,第一个标识不为1(已接收)的索引位置for (dd1 = 0; dd1 < WINDOWSIZE; dd1++) {if (array[dd1] != 1)break;}//dd2用于以当前序号为新窗口的最大序号时,新窗口的最小序号的索引位置dd2 = *curseq - *minseq - WINDOWSIZE + 1;//根据dd1与dd2进行窗口的回拢操作if (dd2 > dd1) {for (i = dd1; i < dd2; i++) {if (array[i] == 0) {pthread_rwlock_wrlock(&rwmutex);recordptr->misspktnum++;pthread_rwlock_unlock(&rwmutex);}}}//根据旧窗口的标识,更新部分新窗口的标识index = (dd1 <= dd2) ? dd2 : dd1;*minseq = *minseq + index;*maxseq = *minseq + WINDOWSIZE - 1;for (j = index; j < WINDOWSIZE; j++) {array[j - index] = array[j];}for (j = 1; j <=index; j++) {array[WINDOWSIZE - j] = 0;}k = (*curseq - *minseq) % WINDOWSIZE;array[k] = 1;int offset,realIndex,virIndex;offset= index = (dd1 <= dd2) ? dd2 : dd1;*minseq = *minseq + offset;*maxseq = *minseq + WINDOWSIZE - 1;virIndex = (*curseq - *minseq) % WINDOWSIZE;realIndex=(virIndex+offset)%WINDOWSIZE;array[realIndex]=1;}
}
*///滑动窗口函数,主要用于丢包和重复包的统计,使用循环数组的方式进行实现,避免了循环拷贝操作,相比动态拷贝接收标识具有更高的运行效率
void slidingWinscheme(int *minseq,int *maxseq,int *curseq,unsigned int array[],recordInfo * recordptr)
{int i,dd1,dd2,index,temp;debug("@@slidingWinscheme minseq=%d,maxseq=%d,curseq=%d\n",*minseq,*maxseq,*curseq);//判断当前的报文序号与窗口可接受的序号大小关系if (*minseq > *curseq) {//当前序号小于窗口可接受最小序号,则不做处理,函数已经判断为丢包//printf("packet arrives too late!\n");} else if (((*curseq > *minseq) && (*curseq < *maxseq)) || (*curseq == *minseq)|| (*curseq == *maxseq)) {//定位curseq报文在滑动窗口中的下标index = (*curseq-1) % WINDOWSIZE;if (array[index] == 0) {array[index] = 1;} else {pthread_rwlock_wrlock(&rwmutex);recordptr->duplpktnum++;pthread_rwlock_unlock(&rwmutex);}debug("@@slidingWinscheme index=%d\n",index);} else if ((*curseq > *maxseq)) {//dd1与dd2来判断丢包率,都代表所指的报文序号,进而能够确定报文在滑动窗口的下标位置//dd2表示以curseq为滑动窗口可接收最大报文序号时,滑动窗口开始处的报文序号int dd2=*curseq-WINDOWSIZE+1;//dd1根据滑动窗口的标识,当前可进行回拢操作的结束位置for(dd1=*minseq;dd1<=*maxseq;dd1++){if(1!=array[(dd1-1)%WINDOWSIZE]){break;}array[(dd1-1)%WINDOWSIZE]=0;}debug("@@slidingWinscheme dd1=%d,dd2=%d\n",dd1,dd2);//根据dd1和dd2,计算丢包情况if(dd2>dd1){for (i = dd1; i < dd2; i++) {if (array[(i-1)%WINDOWSIZE] == 0) {pthread_rwlock_wrlock(&rwmutex);recordptr->misspktnum++;pthread_rwlock_unlock(&rwmutex);}}*minseq=dd2;}else{*minseq=dd1;}*maxseq = *minseq + WINDOWSIZE - 1;index = (*curseq-1) % WINDOWSIZE;debug("@@slidingWinscheme index=%d\n",index);array[index]=1;}
}
下面是发送文件
#include "Testheader.h"/***********************************************************************************************Program target:caculate VPX blade server's bandwidth(principle 40Gbps)----UDP senderCommand format:./aout [Local IP address] [Receiver IP address] [Interval(us)] [Packet Size(Bytes)][counts]Structure Description:Single-threadthe main thread --------- initial operationthe send thread --------- send the periodic packetthe print thread --------- print the info in real-timeOutcome:when we set the interval to send data, time cost for execute the operating of sending a packet is exclusive, in that way, the actual numberof packets which is sent by the program is less than the number we expected.***********************************************************************************************///Global variables
pthread_t thread_id1,thread_id2;
unsigned int count = 1;
struct sockaddr_in remoteaddr;
struct sockaddr_in hostaddr;
int sock;
void *sendbuf=NULL; //线程发送应用层报文的缓存区地址
void *recvbuf=NULL; //线程接收应用层报文的缓冲区地址
void *fragSendbuff=NULL; //线程发送适配层报文分片的缓存地址
void *fragRecvbuff=NULL; //线程接收适配层报文分片的缓存地址
extern pthread_attr_t attr;
extern recordInfo recordvar;void *process_print(void *arg);
void *process_send(void *arg);int counts=0;
int main(int argc, char *argv[]) {memset((void *) &remoteaddr, 0, sizeof(remoteaddr));memset((void *) &hostaddr, 0, sizeof(remoteaddr));remoteaddr.sin_family = AF_INET;if(1==argc||(2==argc&&strcmp(argv[1],"-help"))){printf("CMD Format: ./a.out [Local IP] [Remote IP ] [Interval(us)] [Packet Size(Bytes)] [counts]\n");return -1;}if(6==argc) {counts=atoi(argv[5]);}if (5>argc) {printf("Invalid Parameter Number, Please cheack the Input parameter! ,argc = %d\n",argc);printf("CMD Format: ./a.out [Local IP] [Remote IP ] [Interval(us)] [Packet Size(Bytes)] [counts]\n");return -1;}if (INADDR_NONE == (remoteaddr.sin_addr.s_addr = inet_addr(argv[2]))) {printf("Receiver IP address is invalid!");exit(1);} else {printf("Receiver IP address:%s ", argv[2]);}printf("Send Interval:%sus Single Packet Size:%s Bytes couns:%s\n",argv[3], argv[4],argv[5]);remoteaddr.sin_port = htons(PORTNUM);if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {printf("ERROR!create socket failed!\n");exit(1);}memset(&recordvar,0,sizeof(recordInfo));recordvar.hostaddr.sin_addr.s_addr=inet_addr(argv[1]);recordvar.sendinterval=atoi(argv[3]);recordvar.packetsize=atoi(argv[4]);recordvar.sendnum=0;recordvar.recvnum=0;fragSendbuff=malloc(FRAGMAXSIZE);sendbuf = malloc(recordvar.packetsize);memset(sendbuf, '1',recordvar.packetsize);fragSendbuff=malloc(FRAGMAXSIZE);printf("@@There are %d cpus\n",sysconf(_SC_NPROCESSORS_CONF));initpthreadmutex();initPthreadAttr(&attr);pthread_create(&thread_id1, &attr, process_send, NULL);pthread_create(&thread_id2, NULL, process_print, NULL);//收尾处理,等待send_thread print_thread的结束,回收线程占用的系统资源if (!pthread_join(thread_id1, NULL)) {perror("child thread terminal exceptionally");}if (!pthread_join(thread_id2, NULL)) {perror("child thread terminal exceptionally");}destorypthreadmutex();free(sendbuf);free(fragSendbuff);close(sock);return 0;
}void *process_send(void *arg)
{setCPUAffinity(9);packet pkt;while (1) {pkt.flag=0x01;pkt.packetseq = count;memcpy(sendbuf, &pkt, sizeof(packet));//len1=sendto(sock, sendbuf, atoi(argv[4]), 0,(struct sockaddr *) &remoteaddr, sizeof(remoteaddr));if (-1 ==sendtoAdaption(sock, sendbuf, recordvar.packetsize, 0,&remoteaddr,counts)) {//if (-1 ==sendto(sock, sendbuf, recordvar.packetsize, 0,(struct sockaddr *)&remoteaddr,sizeof(remoteaddr))) {perror("ERROR! send error!");exit(1);}count++;pthread_rwlock_wrlock(&rwmutex1);recordvar.sendnum++;pthread_rwlock_unlock(&rwmutex1);if(recordvar.sendinterval!=0){usleep(recordvar.sendinterval);}}
}void *process_print(void *arg)
{setCPUAffinity(0);struct timeval tt1,tt2;float datarate;float rate;int flags=0;unsigned long difftime;unsigned long printtime=PRINTINTERVAL*1000000;unsigned int lastsendnum=0;unsigned long lasttotalrecvlen=0;gettimeofday(&tt1,NULL);while(1){gettimeofday(&tt2,NULL);difftime=timeval2ul(&tt2)-timeval2ul(&tt1);if(difftime>printtime){pthread_rwlock_rdlock(&rwmutex1);datarate=1000000*(8.0)*recordvar.packetsize*(recordvar.sendnum-lastsendnum)/difftime;if(!flags){if(datarate/1024>1.0&&datarate/1024<1024)flags=1;else if(datarate/(1024*1024)>1.0&&datarate/(1024*1024)<1024)flags=2;else if(datarate/(1024*1024*1024)>1.0)flags=3;}//rate =100.0*(recordvar.sendnum-lastsendnum)/(printtime/recordvar.sendinterval);//printf("%d [%5.2f\%]packets send to %s during %u us : data rate comes to %10.3fbps\n",(recordvar.sendnum-lastsendnum),rate,inet_ntoa(remoteaddr.sin_addr),difftime,datarate);printf("%d Packets send to %s During %uus ",(recordvar.sendnum-lastsendnum),inet_ntoa(remoteaddr.sin_addr),difftime);if(flags==0){printf("BandWidths come up to %7.3fbps\n",datarate);}else if(flags==1){printf("BandWidths come up to %7.3f Kbps\n",datarate/1024);}else if(flags==2){printf("BandWidths come up to %7.3f Mbps\n",datarate/(1024*1024));}else if(flags==3){printf("BandWidths come up to %7.3f Gbps\n",datarate/(1024*1024*1024));}lastsendnum=recordvar.sendnum;pthread_rwlock_unlock(&rwmutex1);tt1=tt2;}}
}
下面是接收端函数
#include "Testheader.h"/**********************************************************************Program target:caculate VPX blade server's bandwidth(principle 40Gbps)--UDP receiverCommand format:./aout [Local IP address] [packet Size(Bytes)]Structure Description:Single-threadthe main thread --------- initial operationthe recv thread --------- recv the periodic packetthe print thread --------- print the info in real-time**********************************************************************///Global variables
unsigned int array[WINDOWSIZE] = { 0 };
pthread_t thread_id1,thread_id2;
struct sockaddr_in addr, clientaddr;
int sock;
void *sendbuf=NULL; //线程发送应用层报文的缓存区地址
void *recvbuf=NULL; //线程接收应用层报文的缓冲区地址
void *fragSendbuff=NULL; //线程发送适配层报文分片的缓存地址
void *fragRecvbuff=NULL; //线程接收适配层报文分片的缓存地址
extern pthread_attr_t attr;void *process_print(void *arg);
void *process_recv(void *arg);int main(int argc, char *argv[]) {memset((void *) &addr, 0, sizeof(addr));memset((void *) &clientaddr, 0, sizeof(addr));addr.sin_family = AF_INET;if(1==argc||(2==argc&&(!strcmp(argv[1],"-help")))){printf("CMD Format: ./aout [Local IP address] [packet Size(Bytes)]\n");return -1;}if (3 != argc) {printf("Invalid Parameter Number, Please cheack the Input parameter!\n");exit(1);}addr.sin_addr.s_addr = inet_addr(ALLOWIP_ADDR);addr.sin_port = htons(PORTNUM);if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {printf("ERROR!create socket failed!\n");exit(1);}if (bind(sock,(struct sockaddr *)&addr,sizeof(struct sockaddr))==-1) {perror("ERROR!");exit(1);}//initial the recordInfo structure before threads runningmemset(&recordvar,0,sizeof(recordInfo));recordvar.hostaddr.sin_addr.s_addr=inet_addr(argv[1]);recordvar.packetsize=atoi(argv[2]);recordvar.sendnum=0;recordvar.recvnum=0;recordvar.misspktnum=0;recordvar.duplpktnum=0;recordvar.totalrecvlen=0;recvbuf = malloc(recordvar.packetsize);memset(recvbuf,0,recordvar.packetsize);fragRecvbuff=malloc(FRAGMAXSIZE);fragSeq=0;totalrecvFragsize=0;fragNum=0;fragIndex=0;fragFlag=0;printf("@@there are %d CPUs\n",sysconf(_SC_NPROCESSORS_CONF));initpthreadmutex();initPthreadAttr(&attr);pthread_create(&thread_id1,&attr,process_recv,NULL);pthread_create(&thread_id2, NULL,process_print, NULL);if (!pthread_join(thread_id1, NULL)){perror("child thread terminal exceptionally");}if (!pthread_join(thread_id2, NULL)) {perror("child thread terminal exceptionally");}destorypthreadmutex();free(recvbuf);free(fragRecvbuff);close(sock);return 0;
}void *process_recv(void *arg)
{setCPUAffinity(9);fd_set fd;struct timeval tt;int ret;tt.tv_sec = 3;tt.tv_usec = 0;packet *recvinfo;unsigned int minseq, maxseq, curseq;minseq = 1;maxseq = minseq + WINDOWSIZE-1;int label;int len;int length=sizeof(struct sockaddr);printf("Ready to recv packets!\n");while (1) {FD_ZERO(&fd);FD_SET(sock, &fd);ret = select(sock + 1, &fd, NULL, NULL, &tt);if (-1 == ret) {//perror("select error!");//exit(1);printf("select error!\n");} else if (0 == ret) {//printf("No packet has been arrived!\n");continue;} else {if (FD_ISSET(sock, &fd)) {label =recvfromAdaption(sock,recvbuf,recordvar.packetsize,0,&clientaddr);/*len= recvfrom(sock,recvbuf,65500,0,(struct sockaddr *)&clientaddr,&length);if(len>recordvar.packetsize){printf("recvbuf is smaller than received packet\n");continue;}else if(len>0){recvinfo=(packet *)recvbuf;if (recvinfo->flag==1) {curseq = recvinfo->packetseq;slidingWinscheme(&minseq,&maxseq,&curseq,array,&recordvar);recordvar.recvnum++;recordvar.totalrecvlen+=len;} }*/if (label == 0) {recvinfo=(packet *)recvbuf;if (recvinfo->flag==1) {curseq = recvinfo->packetseq;slidingWinscheme(&minseq,&maxseq,&curseq,array,&recordvar);recordvar.recvnum++;recordvar.totalrecvlen+=totalrecvFragsize;} fragSeq=0;totalrecvFragsize=0;} else if (label ==1) {//fragment isn't overcontinue;}else if(label<0){printf("Recv error!\n");}}}}
}void *process_print(void *arg)
{setCPUAffinity(0);struct timeval tt1,tt2;float datarate;float rate;int flags=0;float localmissrate;float globalmissrate;unsigned long difftime;unsigned long printtime=PRINTINTERVAL*1000000;unsigned int lastrecvnum=0;unsigned int lastmisspktnum=0;unsigned int lastduplpktnum=0;unsigned long lasttotalrecvlen=0;gettimeofday(&tt1,NULL);while(1){gettimeofday(&tt2,NULL);difftime=timeval2ul(&tt2)-timeval2ul(&tt1);if(difftime>printtime){pthread_rwlock_rdlock(&rwmutex);pthread_rwlock_rdlock(&rwmutex1);float datarate=1000000*8.0*(recordvar.totalrecvlen-lasttotalrecvlen)/difftime;if(datarate/1024>1.0&&datarate/1024<1024)flags=1;else if(datarate/(1024*1024)>1.0&&datarate/(1024*1024)<1024)flags=2;else if(datarate/(1024*1024*1024)>1.0)flags=3;localmissrate=100.0*(recordvar.misspktnum-lastmisspktnum)/((recordvar.recvnum - lastrecvnum)+(recordvar.misspktnum-lastmisspktnum));globalmissrate=100.0*recordvar.misspktnum/(recordvar.recvnum+recordvar.misspktnum);printf("%d Packets received from %s :Duplicated[%u], Lost[%u],[%5.3f%/%5.3f%]",(recordvar.recvnum - lastrecvnum), inet_ntoa(clientaddr.sin_addr),(recordvar.duplpktnum-lastduplpktnum), (recordvar.misspktnum-lastmisspktnum),localmissrate,globalmissrate);if(flags==0){printf("BandWidths come up to %7.3fbps\n",datarate);}else if(flags==1){printf("BandWidths come up to %7.3f Kbps\n",datarate/1024);}else if(flags==2){printf("BandWidths come up to %7.3f Mbps\n",datarate/(1024*1024));}else if(flags==3){printf("BandWidths come up to %7.3f Gbps\n",datarate/(1024*1024*1024));}lastmisspktnum=recordvar.misspktnum;lastduplpktnum=recordvar.duplpktnum;pthread_rwlock_unlock(&rwmutex);lastrecvnum = recordvar.recvnum;lasttotalrecvlen=recordvar.totalrecvlen;pthread_rwlock_unlock(&rwmutex1);tt1 = tt2;}}
}
下面是发送端和接收端的截图,其中可以统计丢包情况。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
