WinSock 重叠IO模型

之前介绍的WSAAsyncSelect和WSAEvent模型解决了收发数据的时机问题,但是网卡这种设备相比于CPU和内存来说仍然是慢速设备,而调用send和recv进行数据收发操作仍然是同步的操作,即使我们能够在恰当的时机调用对应的函数进行收发操作,但是仍然需要快速的CPU等待慢速的网卡。这样仍然存在等待的问题,这篇博文介绍的重叠IO模型将解决这个等待的问题

重叠IO简介

一般接触重叠IO最早是在读写磁盘时提出的一种异步操作模型,它主要思想是CPU只管发送读写的命令,而不用等待读写完成,CPU发送命令后接着去执行自己后面的命令,至于具体的读写操作由硬件的DMA来控制,当读写完成时会向CPU发送一个终端信号,此时CPU中断当前的工作转而去进行IO完成的处理。
这是在磁盘操作中的一种高效工作的方式,为什么在网络中又拿出来说呢?仔细想想,前面的模型解决了接收数据的时机问题,现在摆在面前的就是如何高效的读写数据,与磁盘操作做类比,当接收到WSAAsyncSelect对应的消息或者WSAEvent返回时就是执行读写操作的时机,下面紧接着就是调用对应的读写函数来进行读写数据了,而联想到linux中的一切皆文件的思想,我们是不是可以认为操作网卡也是在操作文件?这也是在WinSock1中,使用WriteFile和ReadFile来进行网络数据读写的原因。既然它本质上也是CPU需要等待慢速的设备,那么为了效率它必定可以支持异步操作,也就可以使用重叠IO。

创建重叠IO的socket

要想使用重叠IO,就不能在像之前那样使用socket函数来创建SOCKET, 这函数最多只能创建一个普通SOCKET然后设置它为非阻塞(请注意非阻塞与异步的区别)。要创建异步的SOCKET需要使用WinSock2.0函数 WSASocket

1
2
3
4
5
6
7
8
SOCKET WSASocket(
int af,
int type,
int protocol,
LPWSAPROTOCOL_INFO lpProtocolInfo,
GROUP g,
DWORD dwFlags
);

该函数的前3个参数与socket的参数含义相同,第4个参数是一个协议的具体信息,配合WSAEnumProtocols 使用可以将枚举出来的网络协议信息传入,这样不通过前三个参数就可以创建一个针对具体协议的SOCKET。第5个参数目前不受支持简单的传入0即可。第6个参数是一个标志,如果要创建重叠IO的SOCKET,需要将这个参数设置为WSA_FLAG_OVERLAPPED。否则普通的SOCKET直接传入0即可

使用重叠IO除了要将SOCKET设置为支持重叠IO外,还需要使用对应的支持重叠IO的函数,之前了解的巴克利套接字函数最多只能算是支持非阻塞而不支持异步。在WinSock1.0 中可以使用ReadFile和WriteFile来支持重叠IO,但是WinSock2.0 中重新设计的一套函数来支持重叠IO

  • WSASend (send的等价函数)
  • WSASendTo (sendto的等价函数)
  • WSARecv (recv的等价函数)
  • WSARecvFrom (recvfrom的等价函数)
  • WSAIoctl (ioctlsocket的等价函数)
  • WSARecvMsg (recv OOB版的等价函数)
  • AcceptEx (accept 等价函数)
  • ConnectEx (connect 等价函数)
  • TransmitFile (专门用于高效发送文件的扩展API)
  • TransmitPackets (专门用于高效发送大规模数据包的扩展API)
  • DisconnectEx (扩展的断开连接的Winsock API)
  • WSANSPIoctl (用于操作名字空间的重叠I/O版扩展控制API)

那么如果使用上述函数但是传入一个非阻塞的SOCKET会怎么样呢,这些函数只看是否传入OVERLAPPED结构而不管SOCKET是否是阻塞的,一律按重叠IO的方式来运行。这也就是说,要使用重叠I/O方式来操作SOCKET,那么不一定非要一开初就创建一个重叠I/O方式的SOCKET对象(但是针对AcceptEx 来说如果传入的是普通的SOCKET,它会以阻塞的方式执行。当时测试时我传入的是使用WSASocket创建的SOCKET,我将函数的最后一个标志设置为0,发现AcceptEx只有当客户端连接时才会返回)

重叠IO的通知模型

与文件的重叠IO类似,重叠IO的第一种模型就是事件通知模型.

  1. 利用该模型首先需要把一个event对象绑定到OVERLAPPED(WinSokc中一般是WSAOVERLAPPED)上,然后利用这个OVERLAPPED结构来进行IO操作.如:WSASend/WSARecv等
  2. 判断对应IO操作的返回值,如果使用重叠IO模式,IO操作函数不会返回成功,而是会返回失败,使用WSAGetLastError得到的错误码为WSA_IO_PENDING,此时认为函数进行一种待决状态,也就是CPU将命令发送出去了,而任务没有最终完成
  3. 然后CPU可以去做接下来的工作,而在需要操作结果的地方调用对应的等待函数来等待对应的事件对象。如果事件对象为有信号表示操作完成
  4. 接着可以设置事件对象为无信号,然后继续投递IO操作.

要等待这些事件句柄,可以调用WSAWaitForMultipleEvents函数,该函数原型如下:

1
2
3
4
5
6
7
DWORD WSAWaitForMultipleEvents(
__in DWORD cEvents,
__in const WSAEVENT* lphEvents,
__in BOOL fWaitAll,
__in DWORD dwTimeout,
__in BOOL fAlertable
);

第一个参数是事件对象的数目;第二个参数是事件对象的数组首地址;第三个参数是一个bool类型表示是否等待数组中所有的对象都变为有信号;第四个参数表示超时值;第五个参数是表示在等待的时候是否进入可警告状态

在函数返回后我们只知道IO操作完成了,但是完成的结果是成功还是失败是不知道的,此时可以使用WSAGetOverlappedResult来确定IO操作执行的结果,该函数原型如下:

1
2
3
4
5
6
7
BOOL WSAGetOverlappedResult(
SOCKET s,
LPWSAOVERLAPPED lpOverlapped,
LPDWORD lpcbTransfer,
BOOL fWait,
LPDWORD lpdwFlags
);

第一个参数是对应的socket;第二个参数是对应的OVERLAPPED结构;第三个参数是一个输出参数,表示完成IO操作的字节数,通常出错的时候返回0;第四个参数指明调用者是否等待一个重叠I/O操作完成,通常在成功等待到事件句柄后,这个参数在这个模型中没有意义了;第五个参数是一个输出参数负责接收完成结果的标志。
下面是一个事件通知模型的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
typedef struct _tag_CLIENTCONTENT
{
OVERLAPPED Overlapped;
SOCKET sClient;
WSABUF DataBuf;
char szBuf[WSA_BUFFER_LENGHT];
WSAEVENT hEvent;
}CLIENTCONTENT, *LPCLIENTCONTENT;

int _tmain(int argc, TCHAR *argv[])
{
WSADATA wd = {0};
WSAStartup(MAKEWORD(2, 2), &wd);
CLIENTCONTENT ClientContent[WSA_MAXIMUM_WAIT_EVENTS] = {0};
WSAEVENT Event[WSA_MAXIMUM_WAIT_EVENTS] = {0};

int nTotal = 0;
SOCKET skServer = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);

SOCKADDR_IN ServerAddr = {0};
ServerAddr.sin_family = AF_INET;
ServerAddr.sin_port = htons(SERVER_PORT);
ServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);

bind(skServer, (SOCKADDR*)&ServerAddr, sizeof(SOCKADDR));
listen(skServer, 5);
printf("开始监听...........\n");
Event[nTotal] = WSACreateEvent();
ClientContent[nTotal].hEvent = Event[nTotal];
ClientContent[nTotal].Overlapped.hEvent = Event[nTotal];
ClientContent[nTotal].DataBuf.len = WSA_BUFFER_LENGHT;
ClientContent[nTotal].sClient = skServer;

//针对监听套接字做特殊的处理
WSAEventSelect(skServer, Event[0], FD_ACCEPT | FD_CLOSE);
nTotal++;

while (TRUE)
{
DWORD dwTransfer = 0;
DWORD dwFlags = 0;
DWORD dwNumberOfBytesRecv = 0;
int nIndex = WSAWaitForMultipleEvents(nTotal, Event, FALSE, WSA_INFINITE, FALSE);
WSAResetEvent(Event[nIndex - WSA_WAIT_EVENT_0]);
//监听socket返回
if (nIndex - WSA_WAIT_EVENT_0 == 0)
{
SOCKADDR_IN ClientAddr = {AF_INET};
int nClientAddrSize = sizeof(SOCKADDR);
SOCKET skClient = WSAAccept(skServer, (SOCKADDR*)&ClientAddr, &nClientAddrSize, NULL, NULL);
if (SOCKET_ERROR == skClient)
{
printf("接受客户端连接请求失败,错误码为:%08x\n", WSAGetLastError());
continue;
}
printf("有客户端连接进来[%s:%u]\n", inet_ntoa(ClientAddr.sin_addr), ntohs(ClientAddr.sin_port));

Event[nTotal] = WSACreateEvent();
ClientContent[nTotal].hEvent = Event[nTotal];
ClientContent[nTotal].Overlapped.hEvent = Event[nTotal];
ClientContent[nTotal].DataBuf.len = WSA_BUFFER_LENGHT;
ClientContent[nTotal].DataBuf.buf = ClientContent[nTotal].szBuf;
ClientContent[nTotal].sClient = skClient;

//获取客户端发送数据,这是为了触发后面的等待
WSARecv(ClientContent[nTotal].sClient, &ClientContent[nTotal].DataBuf, 1, &dwNumberOfBytesRecv, &dwFlags, &ClientContent[nTotal].Overlapped, NULL);
nTotal++;
continue;

}else
{
//等待发送完成
WSAGetOverlappedResult(ClientContent[nIndex - WSA_WAIT_EVENT_0].sClient, &ClientContent[nIndex - WSA_WAIT_EVENT_0].Overlapped, &dwTransfer, TRUE, &dwFlags);
if (dwTransfer == 0)
{
printf("接受数据失败:%08x\n", WSAGetLastError());
closesocket(ClientContent[nIndex - WSA_WAIT_EVENT_0].sClient);
WSACloseEvent(ClientContent[nIndex - WSA_WAIT_EVENT_0].hEvent);

for (int i = nIndex - WSA_WAIT_EVENT_0; i < nTotal; i++)
{
ClientContent[i] = ClientContent[i];
Event[i] = Event[i];
nTotal--;
}
}

if (strcmp("exit", ClientContent[nIndex - WSA_WAIT_EVENT_0].DataBuf.buf) == 0)
{
closesocket(ClientContent[nIndex - WSA_WAIT_EVENT_0].sClient);
WSACloseEvent(ClientContent[nIndex - WSA_WAIT_EVENT_0].hEvent);

for (int i = nIndex - WSA_WAIT_EVENT_0; i < nTotal; i++)
{
ClientContent[i] = ClientContent[i];
Event[i] = Event[i];
nTotal--;
}

continue;
}

send(ClientContent[nIndex - WSA_WAIT_EVENT_0].sClient, ClientContent[nIndex - WSA_WAIT_EVENT_0].DataBuf.buf, dwTransfer, 0);
WSARecv(ClientContent[nIndex - WSA_WAIT_EVENT_0].sClient, &ClientContent[nIndex - WSA_WAIT_EVENT_0].DataBuf, 1, &dwNumberOfBytesRecv, &dwFlags, &ClientContent[nIndex - WSA_WAIT_EVENT_0].Overlapped, NULL);
}
}

WSACleanup();
return 0;
}

上述代码中定义了一个结构,方便我们根据事件对象获取一些重要信息。
在main函数中首先完成了WinSock环境的初始化然后创建监听套接字,绑定,监听。然后定义一个事件对象让他与对应的WSAOVERLAPPED绑定,然后WSAEventSelect来投递监听SOCKET以便获取到客户端的连接请求(这里没有使用AcceptEx,因为它需要特殊的加载方式)
接着在循环中首先调用WSAWaitForMultipleEvents等待所有信号,当函数返回时判断当前是否为监听套接字,如果是那么调用WSAAccept函数接收连接,并准备对应的事件和WSAOVERLAPPED结构,接着调用WSARecv接收客户端传入数据
如果不是监听套接字则表明客户端发送数据过来,此时调用WSAGetOverlappedResult获取重叠IO执行的结果,如果成功则判断是否为exit,如果是exit关闭当前与客户端的链接,否则调用send函数原样返回数据接着调用WSARecv再次等待客户端传送数据。

完成过程模型

对于重叠I/O模型来说,前面的事件通知模型在资源的消耗上有时是惊人的。这主要是因为对于每个重叠I/O操作(WSASend/WSARecv等)来说,都必须额外创建一个Event对象。对于一个I/O密集型SOCKET应用来说,这种消耗会造成资源的严重浪费。由于Event对象是一个内核对象,它在应用层表现为一个4字节的句柄值,但是在内核中它对应的是一个具体的结构,而且所有的进程共享同一块内核的内存,因此某几个进程创建大量的内核对象的话,会影响整个系统的性能。

为此重叠I/O又提供了一种称之为完成过程方式的模型。该模型不需要像前面那样提供对应的事件句柄。它需要为每个I/O操作提供一个完成之后回调处理的函数。

完成历程的本质是一个历程它仍然是使用当前线程的环境。它主要向系统注册一些完成函数,当对应的IO操作完成时,系统会将函数放入到线程的APC队列,当线程陷入可警告状态时,它利用线程的环境来依次执行队列中的APC函数、
要使用重叠I/O完成过程模型,那么也需要为每个I/O操作提供WSAOVERLAPPED结构体,只是此时不需要Event对象了。取而代之的是提供一个完成过程的函数

完成历程的原型如下:

1
void CALLBACK CompletionROUTINE(DWORD dwError, DWORD cbTransferred,LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);

要使对应的完成函数能够执行需要在恰当的时机让对应线程进入可警告状态,一般的方式是调用SleepEx函数,还有就是调用Wait家族的相关Ex函数,但是如果使用Wait函数就需要使用一个内核对象进行等待,如果使用Event对象这样就与之前的事件通知模式有相同的资源消耗大的问问题了。此时我们可以考虑使用线程的句柄来进行等待,但是等待线程句柄时必须设置一个超时值而不能直接使用INFINIT了,因为等待线程就是要等到线程结束,而如果使用INFINIT,这样Wait函数永远不会返回,线程永远不会结束,此时就造成了死锁。

下面是一个使用完成过程的模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
typedef struct _tag_OVERLAPPED_COMPILE
{
WSAOVERLAPPED overlapped;
LONG lNetworks;
SOCKET sClient;
WSABUF pszBuf;
DWORD dwTransfer;
DWORD dwFlags;
DWORD dwNumberOfBytesRecv;
DWORD dwNumberOfBytesSend;
}OVERLAPPED_COMPILE, *LPOVERLAPPED_COMPILE;

void CALLBACK CompletionROUTINE(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);

int _tmain(int argc, TCHAR *argv[])
{
WSADATA wd = {0};
WSAStartup(MAKEWORD(2, 2), &wd);

SOCKET skServer = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN ServerClient = {0};
ServerClient.sin_family = AF_INET;
ServerClient.sin_port = htons(SERVER_PORT);
ServerClient.sin_addr.s_addr = htonl(INADDR_ANY);

bind(skServer, (SOCKADDR*)&ServerClient, sizeof(SOCKADDR));

listen(skServer, 0);

while (TRUE)
{
SOCKADDR_IN AddrClient = {0};
int AddrSize = sizeof(SOCKADDR);
SOCKET skClient = WSAAccept(skServer, (SOCKADDR*)&AddrClient, &AddrSize, NULL, NULL);
printf("有客户端[%s:%u]连接进来....\n", inet_ntoa(AddrClient.sin_addr), ntohs(AddrClient.sin_port));

LPOVERLAPPED_COMPILE lpOc = new OVERLAPPED_COMPILE;
ZeroMemory(lpOc, sizeof(OVERLAPPED_COMPILE));
lpOc->dwFlags = 0;
lpOc->dwTransfer = 0;
lpOc->lNetworks = FD_READ;
lpOc->pszBuf.buf = new char[1024];
ZeroMemory(lpOc->pszBuf.buf, 1024);
lpOc->pszBuf.len = 1024;
lpOc->sClient = skClient;
lpOc->dwNumberOfBytesRecv = 0;
WSARecv(skClient, &(lpOc->pszBuf), 1, &(lpOc->dwNumberOfBytesRecv), &(lpOc->dwFlags), &(lpOc->overlapped), CompletionROUTINE);

SleepEx(2000, TRUE);
}
WSACleanup();
return 0;
}

void CALLBACK CompletionROUTINE(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
{
LPOVERLAPPED_COMPILE lpOc = (LPOVERLAPPED_COMPILE)lpOverlapped;
if (0 != dwError || 0 == cbTransferred)
{
printf("与客户端通信发生错误,错误码为:%08x\n", WSAGetLastError());
closesocket(lpOc->sClient);
delete[] lpOc->pszBuf.buf;
delete lpOc;
return;
}

if (lpOc->lNetworks == FD_READ)
{
if (0 == strcmp(lpOc->pszBuf.buf, "exit"))
{
closesocket(lpOc->sClient);
delete[] lpOc->pszBuf.buf;
delete lpOc;
return;
}

send(lpOc->sClient, lpOc->pszBuf.buf, cbTransferred, 0);
lpOc->dwNumberOfBytesRecv = 0;
ZeroMemory(lpOc->pszBuf.buf, 1024);
lpOc->dwFlags = 0;
lpOc->dwTransfer = 0;
lpOc->lNetworks = FD_READ;
WSARecv(lpOc->sClient, &(lpOc->pszBuf), 1, &(lpOc->dwNumberOfBytesRecv), &(lpOc->dwFlags), &(lpOc->overlapped), CompletionROUTINE);
}
}

主函数的写法与之前的例子中的写法类似。也是先初始化环境,绑定,监听等等。在循环中接收连接,当有新客户端连接进来时创建对应的客户端结构,然后调用WSARecv函数接收数据,接下来就是使用SleepEx进入可警告状态,以便让完成历程有机会执行。
在完成历程中就不需要像之前那样调用WSAGetOverlappedResult了,因为调用完成历程就一定意味着重叠IO操作已经完成了。在完成历程中根据第一个参数来判断IO操作执行是否成功。如果失败则会直接断开与客户端的连接然后清理对应的结构。如果成功则直接获取获取IO操作得到的数据,如果是exit则需要关闭连接,否则原样返回并准备下一次接收数据