创建一个能用的SOCKET是非常简单的,因为GLIBC已经为你做了很多简化工作,但是从另一个角度来说,一个通用的SOCKET不代表一个高效性能的网络应用。我们前面说到sockfd其实同真正的FD是一样的。都是LINUX下的一个打开的设备描述符。内核通过这个描述符进行I/O操作。进行I/O操作就有一个性能问题,这个性能问题在于两个条件,一个条件是对同一个FD,有多个客户进行操作时如何更好的排队。另一个就是一个客户如果有多个FD,那应该怎么排队选择问题。因为我们知道不管是READ还是READFREOM它其实都是阻塞操作。一旦占用就始终等到有新数据来到。那么如何解决这个问题呢?首先我们看第一个排队问题,就是多个客户使用同一个SOCKET,如果当前来的数据不是占据的客户,那显然会导致阻塞。所以我们想出另一个方法,就是当一个或多个I/O条件满足,如输入数据已准备好被读或者描述字可以承接更多输出时的时候,作为消费者的客户端可以被通知到,这样的能力称之为I/O复用。这个在GLIBC中设计了两个新的函数就是SELECT/POLL。以下是几种I/O模型的比较图:
1)阻塞I/O模型,缺省的套接口都是阻塞的,你使用READ时一定要有数据时进程才会进行下去。如下图:
2)非阻塞I/O,在将套接口设置为非阻塞方式下,内核就让请求的I/O操作在没有数据的情况直接返回一个错误,不再等特。显然这种操作需要不停的尝试,消耗非常多的CPU。
非阻塞I/O可以使用fcntl参数进行设置READ/RECVFROM,但很明显它的尝试次数非常多。
[windriver@windriver-machine ltest]$ gcc noread.c -o noread
[windriver@windriver-machine ltest]$ strace -e read -o out.txt ./noread
aaa
aaa
bbbbb
bbbbb
ccccc
ccccc
[windriver@windriver-machine ltest]$ cat out.txt
read(3, "\177ELF\1\1\1\0\0\0\0\0\0\0\0\0\3\0\3\0\1\0\0\0\320\20"..., 512) = 512
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, "aaa\n", 4096) = 4
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, "bbbbb\n", 4096) = 6
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, "ccccc\n", 4096) = 6
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, "\n", 4096) = 1
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(0, 0x80498c0, 4096) = -1 EAGAIN (Resource temporarily unavailable)
--- SIGINT (Interrupt) @ 0 (0) ---
+++ killed by SIGINT +++
[windriver@windriver-machine ltest]$
[windriver@windriver-machine ltest]$ cat noread.c
#include<stdio.h>
#include<unistd.h>
#include<fcntl.h>
#include<stdlib.h>
#include<errno.h>
char buffer[4096];
int main(int argc,char** argv)
{
int delay = 1,n,m = 0;
if(argc>1)
delay = atoi(argv[1]);
fcntl(0,F_SETFL,fcntl(0,F_GETFL)|O_NONBLOCK);
fcntl(1,F_SETFL,fcntl(1,F_GETFL)|O_NONBLOCK);
while(1)
{
n = read(0,buffer,4096);
if(n>0)
m = write(1,buffer,n);
if((n<0||m<0) && (errno!=EAGAIN))
break;
sleep(delay);
}
perror(n<0 ?"stdin":"stdout");
exit(1);
}
3)I/O复用,I/O复用的基本想法就是使用一个数据结构来存储I/O操作与FD的映射关系,使用专门的函数select和poll来检测socketFD,一旦FD上数据准备好,则直接就可以使用I/O操作。
3.1)I/O复用之信号驱动方式 这种方式不使用select 函数,只是在应用刚建立时安装好相应的信号,然后在信号处理程序中进行数据I/O操作。这方法一般不建议使用,因为毕竟信号量是一种比较大的中断操作,会导致系统停顿,而且内核是否支持这种SIGIO信号量,还有SIGIO是否是SOCKET操作以及I/O操作集中于信号处理程序中。这些都是需要考虑的因素,一般不建议使用。
3.2)I/O复用之异步I/O模式,这只见于POSIX.1的1993版本中,是2.6内核的一个标准特性,简称AIO,基本思想是允许进程发起很多I/O操作,而不用阻塞或者等特操作完成。虽然是2.6标准特性,但由于复杂,目前没有很好的实现。
3.3)select 函数是建立在fd_set这个数据类型基础之上,本质上对FD集合的枚举过程,它的操作过程非常简单,就是在三种类型的FD集合中,在指定时间范围内检测是否有数据准备好,如果准备好,则返回大于0数值表示,如下图所示,
其中maxfdp1是需要检查的文件描述符个数,通常是后面三种文件描述符集合中FD值最大值加上1。这主要是为了枚举性能考虑,而不是将所有FD都算上。返回值是响应I/O操作的操作文件描述符的数量的最大值,因此,如果有多个FD,需要使用FD_ISSET进行测试是否是当前返回。另外因为在不同平台的fd_set长度不同,通常系统默认是FD_SET_SIZE是1024,可以通过修改参数,并且内部实现不一样,所以通常使用一些宏操作进行增删清空。
这里说到select函数,实际上还有一个非常类似的函数是poll,通常poll是System V标准,而select是BSD标准。 但是LINUX比较搞,它是上层用SELECT,实际上底层还是用的POLL. 在LINUX下实际上POLL性能比SELECT要高一点,POLL也是监视FD集合,不过将这个FD集合单独使用一个数据结构pollfd.
struct pollfd {
int fd;
short events;
short revents;
};
然后用poll(struct pollfd *ufds,unsigned int nfds, int timeout),所以poll枚举一个确定的FD集合,并且使用确定的事件来关注,通过返回这些确定的事件,可以确定I/O读写。相对于select来说,没有FD_SETSIZE限制。但是仍然需要对FD集合进行线性扫描。
[windriver@windriver-machine ltest]$ gcc pollexam.c -o pollexam
[windriver@windriver-machine ltest]$ ./pollexam
sdfdasdftimeout :0,0
[windriver@windriver-machine ltest]$ sdfdasdf
bash: sdfdasdf: command not found
[windriver@windriver-machine ltest]$ ./pollexam
sfsdfdasdf
============
input string is :sfsdfdasdf
[windriver@windriver-machine ltest]$ ./pollexam
timeout :0,0
[windriver@windriver-machine ltest]$ cat pollexam.c
#include <sys/poll.h>
#include<stdio.h>
#include<string.h>
int main(void)
{
struct pollfd pfds;
int retval;
char str[256];
memset(&pfds,0,sizeof(pfds));
pfds.fd=0;
pfds.events =POLLIN;
retval= poll(&pfds,1,2*1000);
if(pfds.revents & POLLIN){
fscanf(stdin,"%s",str);
printf("============\ninput string is :%s\n",str);
}
else {
printf("timeout :%d,%x\n",retval,pfds.revents);
}
}
poll使用事件进行返回,应用通过获取指定FD上关联的事件来进行相应的处理。常用的事件包括读事件POLLIN,写事件POLLOUT如下所示:
4)改进版I/O复用之epoll。是LINUX内核在2.6之后为处理大量客户端的socketFd而改进的poll,它也可以称之为select/poll的增强版本。虽然是增强版本,但是也是适用于特定场景下的,这个特定场景是大量并发连接中只有少量活跃的情况。在这种情况下如何避免扫描FD集合的开销和如何有效触发活跃I/O操作。这里有两个关键改进,一个相对于select时FD_SETSIZE无限制,它实际上就是LINUX能够打开的FD的最大数量,通常可以cat /porc/sys/fs/file-max来设制或用ulimit –n 来设置。epoll所支持的FD上限理论上就是最大可以打开文件的数量,也就是说如果你有1G的内存,理论上可以打开10W个FD。这一点可以极大的满足大量用户的服务器开发。另外一个相对于poll需要对保存FD集合的数据结构进行线性扫描,并返回对应的事件这种模式的改进,FD不再进行线性扫描,它只针对活跃的SOCKET进行操作,具体实现就是内核会对FD进行回调实现,只有活跃SOCKET的这些回调函数才能触发IO事件,返回给用户。因此,对一个大并发量的应用服务器,如果有很多连接,但一时时段的活跃连接并不多时,采用EPOLL效率非常高。Epoll对事件触发的方式提供了两种选择,一种是默认的LT模式,即Level trigger,适应于非阻塞和阻塞I/O,缺省。这种模式下内核会一直触发,直到事件被用户消费掉。也就是说在这种情况下FD上的数据一定被写完或者读完才不会有下一次的触发事件。而ET模式,俗称Edge trigger,边沿触发适用于非阻塞模式下,在这种模式下,当文件描述符从未就绪变为就绪时,内核通过epoll告诉你,然后内核假设你已知道文件描述符已就绪,可以进行读写。后面内核不再发送通知,只有当新的文件描述符来到时候才会发出就绪通知。因此,在这种情况下如果没有读完的FD,内核不会继续通知,所以进行文件读写时需要进行循环读写到EATRAN。
[windriver@windriver-machine ltest]$ g++ epollserv.cpp -o epollserv
[windriver@windriver-machine ltest]$ ./epollserv
accept a connect from 127.0.0.1
EPOLLIN
read 12345
EPOLLIN
read 67890
[windriver@windriver-machine ltest]$ cat epollserv.cpp
#include<iostream>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<fcntl.h>
#include<unistd.h>
#include<stdio.h>
#include<errno.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000
void setnonblocking(int sock)
{
int opts;
opts = fcntl(sock,F_GETFL);
if(opts<0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
opts = opts|O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}
int main()
{
int i,maxi,listenfd, connfd,sockfd,epfd,nfds;
ssize_t n;
char line[MAXLINE];
socklen_t clilen;
struct epoll_event ev,events[20];
epfd = epoll_create(256);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
setnonblocking(listenfd);
ev.data.fd = listenfd;
ev.events=EPOLLIN|EPOLLET;
//ev.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
bzero(&serveraddr,sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char* local_addr="127.0.0.1";
inet_aton(local_addr,&(serveraddr.sin_addr));
serveraddr.sin_port = htons(SERV_PORT);
bind(listenfd,(sockaddr*)&serveraddr,sizeof(serveraddr));
listen(listenfd,LISTENQ);
maxi = 0;
for(;;){
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd == listenfd)
{
connfd = accept(listenfd,(sockaddr*)&clientaddr,&clilen);
if(connfd<0)
{
perror("connfd<0");
exit(1);
}
setnonblocking(connfd);
char *str = inet_ntoa(clientaddr.sin_addr);
cout<<"accept a connect from "<<str<<endl;
ev.data.fd = connfd;
ev.events = EPOLLIN|EPOLLET;
//ev.events = EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
else if(events[i].events & EPOLLIN)
{
cout<<"EPOLLIN"<<endl;
if((sockfd = events[i].data.fd)<0)
continue;
if((n=read(sockfd,line,MAXLINE))<0)
{
if(errno == ECONNRESET)
{
close(sockfd);
events[i].data.fd = -1;
}
else
cout<<"readline error"<<endl;
}
else if(n == 0)
{
close(sockfd);
events[i].data.fd = -1;
}
line[n]='\0';
cout<<"read "<<line<<endl;
ev.data.fd = sockfd;
ev.events = EPOLLOUT|EPOLLET;
//epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
else if(events[i].events & EPOLLOUT)
{
sockfd = events[i].data.fd;
write(sockfd,line,n);
ev.data.fd = sockfd;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
}
}
return 0;
}
[windriver@windriver-machine ltest]$
[windriver@windriver-machine ltest]$ cat cli.pl
#!/usr/bin/perl
use IO::Socket;
my $host = "127.0.0.1";
my $port = 5000;
my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";
my $msg_out = "1234567890";
print $socket $msg_out;
print "now send over,go to sleep...\n";
while(1)
{
sleep(1);
}
[windriver@windriver-machine ltest]$