WinSock 完成端口模型

之前写了关于Winsock的重叠IO模型,按理来说重叠IO模型与之前的模型相比,它的socket即是非阻塞的,也是异步的,它基本上性能非常高,但是它主要的缺点在于,即使我们使用历程来处理完成通知,但是我们知道历程它本身是在对应线程暂停,它借用当前线程的线程环境来执行完成通知,也就是说要执行完成通知就必须暂停当前线程的工作。这对工作线程来说也是一个不必要的性能浪费,这样我们自然就会想到,另外开辟一个线程来执行完成通知,而本来的线程就不需要暂停,而是一直执行它自身的任务。处于这个思想,WinSock提供了一个新的模型——完成端口模型。

完成端口简介

完成端口本质上是一个线程池的模型,它需要我们创建对应的线程放在那,当完成通知到来时,他会直接执行线程。在这5中模型中它的性能是最高的。
在文件中我们也提到过完成端口,其实我们利用Linux上一切皆文件的思想来考虑这个问题就可以很方便的理解,既然我们需要异步的方式来读写网卡的信息,这与读写文件的方式类似,既然文件中存在完成端口模型,网络上存在也就不足为奇了。
对于完成端口Windows没有引入新的API函数,而是仍然采用文件中一堆相关的函数。可以使用CreateIoCompletionPort来创建完成端口的句柄,该函数原型如下:

1
2
3
4
5
6
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in_opt HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);

第一个参数是与完成端口绑定的文件句柄,如果我们要创建完成端口句柄,这个值必须传入INVALID_HANDLE_VALUE。如果是要将文件句柄与完成端口绑定,这个参数必须穿入一个支持完成端口的文件句柄。在Winsock中如果要绑定SOCKET到完成端口只需要将SOCKET强转为HANDLE。
第二个参数是一个已知的完成端口句柄,如果是创建完成端口,这个参数填入NULL。
第三个参数是一个LONG型的指针,它作为一个标志,由完成通知传入完成线程中,用来标识不同的完成通知。一般我们会定义一个扩展来OVERLAPPED结构来标识不同的完成通知,所以这个参数一般不用传入NULL。
第四个参数是同时执行的线程数,如果是绑定文件句柄到完成端口,则这个参数填入0

我们可以在对应的完成线程中调用GetQueuedCompletionStatus函数来获取完成通知,这个函数只有当有IO操作完成时才会返回,函数原型如下:

1
2
3
4
5
6
7
BOOL WINAPI GetQueuedCompletionStatus(
__in HANDLE CompletionPort,
__out LPDWORD lpNumberOfBytes,
__out PULONG_PTR lpCompletionKey,
__out LPOVERLAPPED* lpOverlapped,
__in DWORD dwMilliseconds
);

它的第一个参数是一个完成端口的句柄。
第二个参数表示当前有多少字节的数据完成IO操作。
第三个参数是一个标记值,用来标识不同文件句柄对应的完成通知,它是通过 CreateIoCompletionPort 函数设置的那个标识。
第四个参数是OVERLAPPED结构。
第五个参数表示等待的时间,如果填入INFINITE则会一直等到有IO操作完成。

完成端口的示例:

下面是一个完成端口的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
typedef struct _tag_MY_OVERLAPPED
{
OVERLAPPED m_overlapped;
SOCKET m_sClient;
long m_lEvent;
DWORD m_dwNumberOfBytesRecv;
DWORD m_dwFlags;
char *m_pszBuf;
LONG m_dwBufSize;
}MY_OVERLAPPED, *LPMY_OVERLAPPED;

unsigned int __stdcall IOCPThread(LPVOID lpParameter);

#define BUFFER_SIZE 1024
#define SERVER_PORT 6000

int _tmain(int argc, TCHAR *argv)
{
WSADATA wd = {0};
WSAStartup(MAKEWORD(2, 2), &wd);

SYSTEM_INFO si = {0};
GetSystemInfo(&si);

//创建完成端口对象
HANDLE hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, si.dwNumberOfProcessors);

//创建完成端口对应的线程对象
HANDLE *pThreadArray = (HANDLE *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, 2 * si.dwNumberOfProcessors);
for (int i = 0; i < 2 * si.dwNumberOfProcessors; i++)
{
pThreadArray[i] = (HANDLE)_beginthreadex(NULL, 0, IOCPThread, &hIocp, 0, NULL);
}

SOCKET SrvSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
SOCKADDR_IN SockAddr = {0};
SockAddr.sin_family = AF_INET;
SockAddr.sin_port = htons(SERVER_PORT);
SockAddr.sin_addr.s_addr = htonl(INADDR_ANY);

bind(SrvSocket, (SOCKADDR*)&SockAddr, sizeof(SOCKADDR));
listen(SrvSocket, 5);

SOCKET sClient = accept(SrvSocket, NULL, NULL);
CreateIoCompletionPort((HANDLE)sClient, hIocp, NULL, 0);

WSABUF buf = {0};
buf.buf = (char*)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFFER_SIZE);
buf.len = BUFFER_SIZE;

MY_OVERLAPPED AcceptOverlapped = {0};
AcceptOverlapped.m_dwBufSize = BUFFER_SIZE;
AcceptOverlapped.m_lEvent = FD_READ;
AcceptOverlapped.m_pszBuf = buf.buf;
AcceptOverlapped.m_sClient = sClient;

WSARecv(sClient, &buf, 1, &AcceptOverlapped.m_dwNumberOfBytesRecv, &AcceptOverlapped.m_dwFlags, &AcceptOverlapped.m_overlapped, NULL);

while (TRUE)
{
int nVirtKey = GetAsyncKeyState(VK_ESCAPE); //用户按下退出键(ESC)
{
break;
}
}

for (int i = 0; i < si.dwNumberOfProcessors * 2; i++)
{
//向IOCP发送FD_CLOSE消息,以便对应线程退出
AcceptOverlapped.m_lEvent = FD_CLOSE;
PostQueuedCompletionStatus(hIocp, si.dwNumberOfProcessors * 2, 0, &AcceptOverlapped.m_overlapped);
}

WaitForMultipleObjects(2 * si.dwNumberOfProcessors, pThreadArray, TRUE, INFINITE);

for (int i = 0; i < si.dwNumberOfProcessors * 2; i++)
{
CloseHandle(pThreadArray[i]);
}

HeapFree(GetProcessHeap(), 0, buf.buf);

shutdown(sClient, SD_BOTH);
closesocket(sClient);

CloseHandle(hIocp);
WSACleanup();
return 0;
}

unsigned int __stdcall IOCPThread(LPVOID lpParameter)
{
HANDLE hIocp = *(HANDLE*)lpParameter;
DWORD dwNumberOfBytes = 0;
MY_OVERLAPPED *lpOverlapped = NULL;
ULONG key = 0;
BOOL bLoop = TRUE;
while (bLoop)
{
BOOL bRet = GetQueuedCompletionStatus(hIocp, &dwNumberOfBytes, &key, (LPOVERLAPPED*)&lpOverlapped, INFINITE);
if (!bRet)
{
continue;
}

switch (lpOverlapped->m_lEvent)
{
case FD_CLOSE: //退出
{
bLoop = FALSE;
printf("线程[%08x]准备退出......\n", GetCurrentThreadId());
}
break;
case FD_WRITE:
{
printf("数据发送完成......\n");
shutdown(lpOverlapped->m_sClient, SD_BOTH);
closesocket(lpOverlapped->m_sClient);
}
break;

case FD_READ:
{
printf("client>%s", lpOverlapped->m_pszBuf);
lpOverlapped->m_lEvent = FD_WRITE;
WSABUF buf = {0};
buf.buf = lpOverlapped->m_pszBuf;
buf.len = dwNumberOfBytes;
lpOverlapped->m_dwFlags = 0;
WSASend(lpOverlapped->m_sClient, &buf, 1, &lpOverlapped->m_dwNumberOfBytesRecv, lpOverlapped->m_dwFlags, &lpOverlapped->m_overlapped, NULL);
}
}
}
return 0;
}

在上述代码中,首先定义了一个结构体用来保存额外的数据。在main函数中首先查询CPU的核数,然后创建这个数目2倍的线程。接着创建一个完成端口对象。然后进行SOCKET的创建、绑定、监听、接收连接的操作。当有连接进来的时候。创建对应的扩展结构并调用WSARecv投递一个接收操作。由于后面的收发操作都在对应的线程中操作,因此在主线程中只需要等待即可。当用户确定退出时。先调用PostQueuedCompletionStatus函数向完成线程中发送完成通知,并将网络事件设置为FD_CLOSE,表示让线程退出。在这里没有使用TerminateThread这种暴力的方式,而选择了一种让线程自动退出的温和的方式。接着进行资源的回收,最后退出。
在线程中,我们首先在循环中调用 GetQueuedCompletionStatus函数来获取完成通知,当发生完成事件时,我们在switch中根据不同的额网络事件来处理,针对FD_CLOSE事件,直接退出线程。针对FD_READ事件,先打印客户端发送的信息,然后调用WSASend将信息原样返回,接着设置网络事件为FD_WRITE,以便断开与客户端的链接。

几种模型的比较

最后针对5种模型和两种socket工作模式来做一个归纳说明。

  1. 最先学习的是SOCKET的阻塞模式,它的效率最低,它会一直等待有客户端连接或者有数据发送过来才会返回。这就好像我们在等某个人的信,但是不知道这封信什么时候能送到,于是我们在自家门口的收信箱前一直等待,直到有信到来。
  2. 为了解决这个问题,提出了SOCKET的非阻塞模式,它不会等待连接或者收发数据的操作完成,当我们调用对应的accept或者send、recv时会立即返回,但是我们不知道它什么时候有数据要处理,如果针对每个socket都等待直到有数据到来,那么跟之前的阻塞模式相比没有任何改进,于是就有了socket模式,它会等待多个socket,只要其中有一个有数据就返回,并处理。用收信的模型类比的话,现在我们不用在邮箱前等待了。但是我们会每隔一段时间就去邮箱那看看,有没有信,有信就将它收回否则空手而归。
  3. 我们说select模型的最大问题在于不知道什么时候有待决的SOCKET,因此我们需要在循环中不停的等待。为了解决这个时机问题,又提出了WSAAsyncSelect模型和WSAEvent模型,它们主要用来解决调用对应函数的时机。用收信的例子类比就是现在我在邮箱上装了一个报警的按钮,只有有信,警报就会响,这个时候我们就去收信。而不用向之前那样每隔一段时间就去邮箱看看
  4. 我们说解决了时机的问题,但是调用send和recv对网卡进行读写操作仍然是同步的操作,CPU需要傻傻的等着数据从网卡读到内存或者从内存写到网卡上。因此又有了重叠IO的模型和一些列的新的API,向WSARecv和WSASend等等函数。这样就相当于当有信来的警报响起时,我们不需要自己去取信了,另外派了一个人帮我们拿信,这样我们的工作效率又提高了一些。节约了我们的时间
  5. 重叠IO也有它的问题,如果使用重叠IO的事件模型时,也需要在合适的时候等待,就好像我们虽然派了一个人来帮忙拿信,但是我们自己却需要停下手头上的工作,询问拿信的人回来了。而使用完成历程也存在自己的问题,因为它需要使用主线程的资源来执行历程,它需要主线程暂停下来,这样就可能出现两种情况:1)有通知事件到来,但是并没有进入可警告状态;2)进入可警告状态却没有客户端发送请求。这就相当于可能我们不停的等待但是拿信的那个人却没有回来,或者拿信的人回来了,我们却没有时间处理信件。
  6. 针对重叠IO的上述问题,提出了完成端口的解决方案,完成事件由对应的线程处理,而主线程只需要专注于它自己的工作就好了,这就相当于警报响了,我们知道信来了,直接派一个人去拿信,后面的我就不管了,而拿信的人把信拿回来的时候将信放好。当我们忙完之后去处理这封信。没忙完的话信就一直放在那,甚至让拿信的人处理这封信,这样就能更高效的集中注意力来处理眼前的工作。