Порт завершения

Модель порта завершения

Последняя модель ввода/вывода, которую мы рассмотрим, это модель «порта завершения» – completion port. Порт завершения представляет собой специальный механизм в составе ОС, с помощью которого приложение использует объединение (пул) нескольких потоков, предназначенных единственно для цели обработки асинхронных операций ввода/вывода с перекрытием.

Модель порта завершения

Приложения, которые вынуждены обрабатывать многочисленные асинхронные запросы (речь идет о сотнях и тысячах одновременно поступающих запросах – например, на поисковых серверах или популярных серверах типа www.microsoft.com), с помощью этого механизма могут обрабатывать I/O- запросы существенно быстрее и эффективнее, чем просто запускать новый поток для обработки поступившего запроса. Поддержка этого механизма включена в Windows NT, Windows 2000, Windows XP и Windows Server 2003 и особенно эффективна для мультипроцессорных систем. Так, демонстрационный программный код, который опубликован в MSDN, рассчитан на 16-ти процессорную аппаратную платформу.

Для функционирования этой модели необходимо создание специального программного объекта ядра системы, который и был назван «порт завершения». Это осуществляется с помощью функции CreateIoCompletionPort(), которая асссоциирует этот объект с одним или несколькими файловыми (сокетными) дескрипторами (см. ниже пример в разделе 4.5.1.1) и который будет управлять перекрывающимися I/O операциями, используя определенное количество потоков для обслуживания завершенных запросов.

Для начала нам необходимо создать программный объект — порт завершения I/O, который будет использоваться, чтобы управлять множественными I/O-запросами для любого  количества сокетных дескрипторов. Это выполняется  вызовом функции CreateIoCompletionPort(), которая определена как:

HANDLE CreateIoCompletionPort(
    HANDLE FileHandle,
    HANDLE ExistingCompletionPort,
    DWORD CompletionKey,
    DWORD NumberOfConcurrentThreads
);

Прежде чем рассматривать параметры подробно, следует отметить, что эта функция фактически используется для двух различных целей:

  1. Чтобы создать объект порта завершения
  2. Связать дескриптор с портом завершения

Когда Вы первоначально создаете объект порта завершения, интерес представляет единственный параметр — NumberOfConcurrentThreads; первые три параметра не существенны. Параметр NumberOfConcurrentThreads специфичен, потому что он определяет число потоков, которым позволяется выполниться одновременно на порте завершения. По идее, нам нужен только один поток для каждого отдельного процессора, чтобы обслужить порт завершения и избежать переключения контекста потока. Значение для этого параметра равное 0 сообщает системе разрешить иметь столько потоков, сколько  процессоров имеется в системе. Следующий вызов создает порт завершения I/O:

 CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL, 0, 0);

В результате функция  возвратит дескриптор, который используется, чтобы идентифицировать порт завершения, когда на него будет назначен дескриптор сокета.

Рабочие потоки и порты завершения

После того, как порт завершения успешно создан, Вы можете начинать связывать дескрипторы сокета с объектом. Перед этим – и это важно — Вы должны создать один или большее количество рабочих потоков, чтобы обслужить порт завершения, когда  пакеты завершения операции для сокета отправлены в объект порта завершения. Можно задаться вопросом, сколько же потоков должно быть создано, чтобы обслужить порт завершения? Это один из более сложных аспектов модели порта завершения, потому что  количество потоков, необходимое для обслуживания запросов Ввода – вывода, зависит от общей концепции проекта вашего приложения. Важно обратить внимание на различие между числом заявленных параллельных потоков при работе с CreateIoCompletionPort() и числом рабочих потоков; они не представляют одно и то же.

Обычно рекомендуется, чтобы  функция CreateIoCompletionPort() задавала один поток для каждого отдельного процессора во избежание переключения контекста потока. Параметр NumberOfConcurrentThreads функции CreateIoCompletionPort() явно дает разрешение системе позволять только n потокам работать одновременно на порте завершения. И даже если Вы создаете больше чем n рабочих потоков для порта завершения, только n потокам будет разрешено работать одновременно. (Фактически, система могла бы превысить это значение на короткий промежуток времени, но она же быстро приведет количество потоков до значения, которое Вы определяете в CreateIoCompletionPort().)

Можно задаться вопросом, для каких целей надо создавать большее количество рабочих потоков, чем указано параметром вызова CreateIoCompletionPort()?

Как было упомянуто выше, это зависит от идеологии построения проекта приложения в целом. Некоторые потоки могут приостанавливать своё выполнение (например proxy-сервер — ждёт ответа с помощью функции WaitForSingleObject()) или поток «засыпает» наSleep() — тогда в это время вместо него с портом завершения сможет работать другой поток. Т.е. если программа будет блокировать поток — тогда лучше создать рабочих потоков несколько больше чем NumberOfConcurrentThreads, с другой стороны — если в вашей программе не будет блокировки — тогда не стоит создавать лишних потоков. Однако по большому счёту можно считать NumberOfConcurrentThread константой.

Как только создано достаточно рабочих потоков, чтобы обслужить запросы ввода/вывода на порте завершения, можно начинать связывать дескрипторы сокета с портом завершения. Это требует вызова функции CreateIoCompletionPort() на уже существующем порте завершения и указания первых трех параметров — FileHandle, ExistingCompletionPort и CompletionKey с соответствующей сокетной информацией.

Параметр FileHandle представляет дескриптор сокета, ассоциированный с портом завершения. Параметр ExistingCompletionPort указывает существующий уже порт завершения, с которым будет связан дескриптор сокета. Параметр CompletionKey задает специфические данные «per-handle data»,  которые можно связать с конкретным сокетным дескриптором. Прикладные программы могут хранить любой тип информации, связанной с сокетом,  используя этот ключ. Мы называем эти данные «per-handle data» или «сокетная информация». Принято хранить дескриптор сокета, используя этот ключ как указатель на структуру данных, содержащую дескриптор сокета и другую «сокет-специфичную» информацию. Функции потока, которые обслуживают порт завершения, могут отыскивать эту специфичную для данного сокета информацию, используя этот ключ.

Поток использует функцию для ожидания пакета завершения, который будет поставлен в очередь в порту завершения, а не ожидает непосредственного исполнения асинхронного ввода/вывода. Потоки, которые блокируются на порту завершения,  освобождаются в порядке LIFO. Это означает, что, когда пакет завершения поставлен в очередь порта завершения, система освобождает последний поток, который был заблокирован на порту.

Опишем основной каркас прикладной программы-приложения, использующей для ввода/вывода модель порта завершения.  Это простой  ECHO-сервер, получающий информацию от клиента и ее же обратно отправляющего. Предлагается следующие шаги:

  1. Создать порт завершения. Четвертый параметр функции оставлен как 0 — только одному рабочему потоку на процессоре будут позволено выполняться в данное время на порте завершения.
    1. Определить, сколько процессоров существуют в системе.
    1. Создать рабочие потоки для обслуживания завершенных I/O-запросов на порте завершения, используя информацию процессора в шаге 2. В случае этого простого примера, мы создаем один рабочий поток для процессора, потому что мы не ожидаем, что наши потоки когда-либо войдут в временно приостановленное состояние, когда окажется, что рабочих потоков для исполнения не хватает. Когда вызывается функция создания  потока CreateThread(), вы должны указать ту функцию, которая будет исполняться в рабочем потоке.
    1. Сформировать слушающий сокет, чтобы принимать входные подключений на порту 5150.
    1. Принять поступившие подключения функцией accept().
    1. Создать структуру данных, чтобы представить «per-handle data» и сохранить дескриптор присоединенного сокета в структуре.
    1. Связать новый дескриптор сокета, возвращенный из accept(), с портом завершения,  вызывая CreateIoCompletionPort(). Передать структуру  с «per-handle data» в функцию CreateIoCompletionPort() через параметр ключа завершения.
    1. Начинаем выполнять операции I/O  на принятом подключении. По существу, мы будем выдавать один или более асинхронных вызовов WSARec() или WSASend() на новом сокете, используя  механизм ввода/вывода с перекрытием. Вызовы этих функций будут исполняться асинхронно и с перекрытием (т.е. одновременно). Когда эти функции завершатся (с тем или иным результатом), рабочий поток обслужит исполненные запросы и  будет ждать  следующие вызовы.

Далее повторяем шаги 5-8, пока сервер не завершит свою работу.

Итак., текст программы:

Листинг 4.2  Сервер на базе IOCompletionPort

DWORD WINAPI ServerThread (LPVOID CompletionPortID);//Прототип рабочего потока
HANDLE CompletionPort;         // Дескриптор порта завершения
WSADATA wsd;                   // Структура типа WSADATA
SYSTEM_INFO SystemInfo;        // Системная информация
SOCKADDR_IN InternetAddr;      // Структура адреса сокета
SOCKET Listen_socket;
int i;
typedef struct _PER_HANDLE_DATA 
{
SOCKET       Accept_socket;    // Дескриптор сокета клиента
SOCKADDR_STORAGE  ClientAddr;  // Адрес клиента
//Другая полезная информация,  связанная с дескриптором
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
// Стартуем WinSock
StartWinsock(MAKEWORD(2,2), &wsd);
// Шаг 1:
// Создать порт завершения Ввода - вывода
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// Шаг 2:

// Определяем, сколько процессоров находятся в системе

GetSystemInfo(&SystemInfo);
// Шаг 3:

// Создаем рабочие потоки, основанные на числе процессоров, доступных в

// системе. Для нашего случая мы создаем один рабочий поток для

// каждого процессора.

for(i = 0; i < SystemInfo.dwNumberOfProcessors; i++)
{
HANDLE ThreadHandle;

// Создаем рабочий поток сервера, и передаем порт завершения в поток.

// Замечание: вариант содержания функции ServerWorkerThread() определен ниже.

ThreadHandle = CreateThread(NULL, 0,ServerWorkerThread, CompletionPort,0, &ThreadId);

// Закрываем дескриптор потока

CloseHandle(ThreadHandle);
}
// Шаг 4:
// Создаем слушающий сокет с перекрытием (флаг WSA_FLAG_OVERLAPPED)
Listen_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(5150);
bind(Listen, (PSOCKADDR) &InternetAddr,sizeof(InternetAddr));
// Формируем очередь входящих запросов
listen(Listen_socket, 5);
while(TRUE)
{
    PER_HANDLE_DATA *PerHandleData=NULL;
    SOCKADDR_IN saRemote;
    SOCKET Accept_socket;
    int RemoteLen;
    // Шаг  5:
    // Принимаем запрос на соединение
    RemoteLen = sizeof(saRemote);
    Accept_socket = WSAAccept(Listen_socket, (SOCKADDR *)&saRemote,&RemoteLen);
    // Шаг 6:
    // Создаем структуру для информации " per-handle data", связанной с сокетом
    PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
    printf("Socket number %d connected\n", Accept_socket);
    PerHandleData->Socket = Accept_socket;
    memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);
    // Шаг 7:
    // Ассоциируем  сокет клиента с портом завершения
    CreateIoCompletionPort((HANDLE)Accept_socket,CompletionPort,(DWORD) PerHandleData,0);
    // Шаг 8:

    // Начинаем обрабатывать операции ввода/вывода на присоединенном сокете.

    // Выдаем один или несколько вызовов WSASend() или WSARecv()
    // на сокете с использованием перекрытого I/O.
    // WSARecv() / WSASend()
    // Учтите, что функции будут возвращать ошибку SOCKET_ERROR со статусом
    // WSA_IO_PENDING, которую необходимо игнорировать (если вызов их не завершится
    // мгновенно). 
}
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
.....
return 0;
}

GetQueuedCompletionStatus

Каждая завершившаяся тем или иным образом операция ввода/вывода, назначенная на данный порт завершения, сообщает о своем возврате с помощью специального системного пакета, который поступает в очередь завершенных запросов порта. Для обработки этой очереди и предназначены наши рабочие потоки. Достигается это с использованием функции GetQueuedCompletionStatus(), которая пытается извлечь из очереди пакет завершения соответствующей операции. Если в очереди отсутствует пакет завершения, функция ожидает завершения операции I/O.

BOOL GetQueuedCompletionStatus(
  HANDLE CompletionPort,       // the I/O completion port of interest
  LPDWORD lpNumberOfBytesTransferred,// to receive number of bytes 
                                     // transferred during I/O
  LPDWORD lpCompletionKey,     // to receive file's completion key
  LPOVERLAPPED *lpOverlapped,  // to receive pointer to OVERLAPPED structure
  DWORD dwMilliseconds         // optional timeout value
);

Параметры этой функции следующие:

  • CompletionPort – дескриптор порта завершения
  • lpNumberOfBytesTransferred — число байт переданных во время ввода-вывода функциями WSASend/WSARecv
  • lpCompletionKey — здесь нам вернутся данные дескриптора сокета, которые мы задали при привязке сокета к порту заврешния
  • lpOverlapped – указатель на OVERLAPPED-структуру
  • dwMilliseconds — таймаут операции в милисекундах (INFINITE — бесконечное ожидание)

Если пакет завершения не появляется в пределах указанного времени, функция возвращает FALSE и устанавливает *lpOverlapped в NULL. Если dwMilliseconds =0  и в очереди отсутствует пакет завершения, функция завершается немедленно.

При успешном завершении функция возвращает положительное число. Если  *lpOverlapped  указан как NULL и функция не извлекла пакет завершения из порта, она возвращает 0. Если функция не извлекла пакет завершения из порта по таймауту, GetLastError() вернет WAIT_TIMEOUT. Если сокет, ассоциированный с портом, закрыт, GetQueuedCompletionStatus() возвращает ERROR_SUCCESS, с  *lpOverlapped не-NULL и lpNumberOfBytes равным 0.

lpOverlapped указывает на структуру OVERLAPPED, снабженную тем, что мы называем данные операции Per-I/O, знание которых может быть важно для рабочего потока при обработке пакета завершения (возвращенные ECHO-данные, как в нашем примере, прием подключения, тип операции —  чтение/запись и так далее). Per-I/O данные операции – это любое количество байтов, содержащихся в отдельной структуре, также содержащей ту самую структуру OVERLAPPED, которая передается в функцию, ожидающую структуру OVERLAPPED. Самый простой способ выполнить это — определить новую структуру и разместить структуру OVERLAPPED как поле новой структуры. Например, мы объявляем, что  следующая структура данных управляет данными операции Per-I/O:

typedef struct {
        OVERLAPPED Overlapped;
        //любые полезные нам данные:
        WSABUF DataBuf;
        CHAR Buffer[DATA_BUFSIZE];
        DWORD BytesSend;
        DWORD BytesRecv;
        DWORD OperationType;
        DWORD TotalBytes;
        .....
} PER_IO_OPERATION_DATA;

После поля обязательной структуры OVERLAPPED мы можем разместить буффер для хранения данных, количество переданных/принятых байт, тип операции — отправка/приём, адрес и порт клиента -практически любую информацию, которая нам может быть полезна, исходя из требований к серверу.

Таким образом, при вызове WinSock-функций мы должны передавать OVERLAPPED-структуру как поле новой структуры. Например таким образом:

PER_IO_OPERATION_DATA PerIoData;
...
WSARecv(socket, &wbuf, 1, &Bytes, &Flags, &(PerIoData.Overlapped),NULL);

Позже в рабочем потоке функция GetQueuedCompletionStatus() возвращается со  структурой перекрытия и ключом завершения. Чтобы обеспечения доступа к per-I/O данным,  должна использоваться макрокоманда CONTAINING_RECORD. Например:

PER_IO_DATA  *PerIoData=NULL;
OVERLAPPED   *lpOverlapped=NULL;
ret = GetQueuedCompletionStatus(CompPortHandle,&Transferred,
         (PULONG_PTR)&CompletionKey,&lpOverlapped,INFINITE);
PerIoData = CONTAINING_RECORD(lpOverlapped, PER_IO_DATA, Overlapped);

Использование этого макроса позволяет записывать OVERLAPPED-член структуры PER_IO_DATA в любом месте, а не обязательно первым полем, что может быть важно для команды разработчиков.

Вы можете определить, какая операция была выполнена на конкретном сокетном дескрипторе,  используя поле структуры PerIoData для фиксации  типа операции — чтение, запись и т.д. Одна из самых больших выгод от этого — возможность управлять многократными операциями I/O на том же самом  дескрипторе сокета. Например, если Вы имеете многопроцессорную машину, то можно потенциально иметь несколько процессоров, посылающих и получающих данные по одному сокету одновременно.

Для завершения нашего сервера ECHO мы должны написать код функции ServerWorkerThread():

ServerWorkerThread

Листинг 4.3 Код функции ServerWorkerThread():

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;
DWORD BytesTransferred;
LPOVERLAPPED Overlapped;
LPPER_HANDLE_DATA PerHandleData;
LPPER_IO_DATA PerIoData;
DWORD SendBytes, RecvBytes;
DWORD Flags;
   while(TRUE)
   {
// Ожидаем завершения операции ввода/вывода  на любом сокете, 
// связанным с данным портом завершения
  if (GetQueuedCompletionStatus (CompletionPort, &BytesTransferred,
   (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
   {
   // Сначала проверяем возврат на возможную ошибку. 
   printf ("GetQueuedCompletionStatus failed with error %d\n", GetLastError ());
   return 0;
 }
// Если произошла ошибка типа BytesTransferred=0, что свидетельствует о
// закрытии сокета на удаленном хосте, закрываем свой сокет и очищаем данные, 
// связанные с сокетом
if (BytesTransferred == 0 &&(PerIoData->OperationType == RECV_POSTED ││
       PerIoData->OperationType == SEND_POSTED))
 {
 closesocket(PerHandleData->Socket);
 GlobalFree(PerHandleData);
 GlobalFree(PerIoData);
 continue;//Продолжаем цикл 
 }
// Обслуживаем завершенный запрос. Какая операция была закончена, определяем по 
// содержимому поля OperationTypefield в структуре PerIoData
if (PerIoData->OperationType == RECV_POSTED)
{
// Если тип операции был помечен как WSARecv(), выполняем необходимые действия с
// информацией, имеющейся в поле PerIoData->Buffer
}
// Выдаем следующий запрос на выполнение другой необходимой операции – WSASend()
// или WSARecv(). В нашем случае это WSARecv() – мы продолжаем получать данные
Flags = 0;
//Формируем данные для следующего  вызова операции с перекрытием
ZeroMemory(&(PerIoData->Overlapped),sizeof(OVERLAPPED));
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->OperationType = RECV_POSTED;
//Выполняем вызов WSARecv() и переходим опять к ожиданию завершения 
WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
 &(PerIoData->Overlapped), NULL);
  }//End While 
}//End ServerWorkerThread()

Ошибка времени исполнения

Следует отметить следующее обстоятельство. Если происходит ошибка во время исполнения данной операции с перекрытием, функция GetQueuedCompletionStatus() вернет FALSE. Так как порт завершения – это объект Windows, то в этом случае надо вызывать  GetLastError(). Для получения эквивалентного WinSock-кода ошибки, надо обращаться к WSAGetOverlappedResult() с заданием дескриптора сокета и структуры WSAOVERLAPPED, после чего вызов WSAGetLastError() вернет правильный код WinSock-ошибки.

Тип операции

В нашем примере мы не очень заботимся о различении типа операции —  т.к.  здесь надо просто отправлять и принимать символы.  В более «серьёзных» программах использование параметра OperationType  в структуре PER_IO_OPERATION_DATA может привести к такому коду для точного понимания, что же  происходит с сервером:

switch (PerIoData->OperationType)
{
        case SRV_NEW_CONNECTION:
        // это первое подключение:
        ...
        case SRV_DATA_SEND:
        // посылаем данные
        ...
        case SRV_DATA_RECV:
        // принимаем данные
        ...
        case SRV_DISCONNECT:
        // отсоединение
        .......
  }

При коректном завершении работы порта завершения главное — не освобождать память со структурой OVERLAPPED, пока выполняется какая-нибудь операция на сокете. Так же надо знать, что закрытие сокета с помощью closesocket() прерывает любые продолжающиеся операции на данном порту завершения. После того как вы закроете все сокеты, необходимо завершить все рабочие потоки порта завершения. Для этого воспользуемся функцией PostQueuedCompletionStatus(), которая отправит потоку пакет, заставляющий прекратить работу:

BOOL PostQueuedCompletionStatus(
  HANDLE CompletionPort,            // дескриптор порта завершения
  DWORD dwNumberOfBytesTransferred, // возврат из GetQueuedCompletionStatus()
  DWORD dwCompletionKey,            // возврат из GetQueuedCompletionStatus()
  LPOVERLAPPED lpOverlapped         // возврат из GetQueuedCompletionStatus()
);

Параметр CompletionPort — задаёт порт завершения, а остальные параметры задают значения, которые поток получит из функции GetQueuedCompletionStatus() — те мы можем задать непосредственно тип операции для завершения работы — когда поток получит это значение и интерпретирует его соответвующим образом мы можем освободить какие-то ресурсы, сделать какую-то работу, Etc…

Рабочий поток получает эти три параметра GetQueuedCompletionStatus() и он может определить, когда ему необходимо завершить свою работу с помощью специального значения, переданного в одном из этих параметров. Например, можно передавать значение 0 в dwCompletionKey-параметре, который рабочий поток будет интерпретировать как команда завершения работы.

После закрытия всех рабочих потоков надо закрыть порт завершения через CloseHandle() и завершить программу.

Прoграммы  для работы с портом завершения IOCP

Программа-клиент с использованием потока

Листинг

#include <stdio.h>

#include <conio.h>

#include <malloc.h>

#include <winsock2.h>

#pragma comment (lib,»ws2_32.lib»)

#define PORT 28912                      // Номер порта сервера

DWORD WINAPI ClientPool(SOCKET client); // Функция потока клиента

//—————————————————————————

void main(void){

  int j;                   // Счетчик

  WORD wVersionRequested;  // Запрашиваемая версия

  WSADATA wsaData;         // Структура инф-ции о сокетах

  int err;                 // Возвращяемое значение

  char address[16]={0};    // Адрес удаленного компьютера(сервера)

  char buffer[128];        // Буфер для сообщений

  SOCKET sd;               // Сокет клиента

  HANDLE hThread;          // Хендл потока

  DWORD ThreadId;          // Идентификатор потока

  //Инициализируем процесс библиотеки ws2_32.dll

  wVersionRequested=MAKEWORD(2,2);

  err=WSAStartup(wVersionRequested,&wsaData);

  if(err==SOCKET_ERROR){

    strcpy(buffer,»Ошибка ф-ции WSAStartup № «);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, WSAGetLastError());

    WSACleanup(); // Завершение работы при неудаче

    getch();

    return;

  }

  strcpy(buffer,»Введите адрес удаленного компьютера»);

  CharToOem(buffer,buffer);

  printf(«%s\n»,buffer);

  scanf(«%s»,address);

  // Cоздаем сокет

  sd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);

 if(sd==INVALID_SOCKET){

    strcpy(buffer,»Ошибка ф-ции socket «);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, WSAGetLastError());

    WSACleanup(); // Завершение работы

    getch();

    return;

       }

  SOCKADDR_IN sin;//Структура для размещения адреса сервера

  ZeroMemory(&sin, sizeof(sin));

  sin.sin_family = AF_INET;

  sin.sin_port = htons(PORT);

  sin.sin_addr.s_addr =inet_addr(address);

  if(connect(sd, (PSOCKADDR)&sin, sizeof(SOCKADDR))==-1)

  {

   strcpy(buffer,»Не могу подключиться к серверу №»);

   CharToOem(buffer,buffer);

   printf(«%s %d\n»,buffer, GetLastError());

   WSACleanup(); // Завершение работы

   getch();

   return;

  }

  strcpy(buffer,»Подключение к серверу успешно»);

  CharToOem(buffer,buffer);

  printf(«%s\n»,buffer);

  // Создаем поток для исполнения функции ClientPool()

  hThread = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ClientPool,(void*)sd, 0, &ThreadId);

  while(true)

  {

  scanf(«%s»,buffer);

  if(strcmp(buffer,»EXIT»)==0)

  {

  TerminateThread(hThread,0);

  if (sd!=INVALID_SOCKET)closesocket(sd);       // Закрываем сокет

  break;

  }

  send(sd,buffer,strlen(buffer),0);

  }

  WSACleanup();               // Завершение работы

return;

}

//—————————————————————————

//       Функция потока клиента

//—————————————————————————

DWORD WINAPI ClientPool(SOCKET client)

{

int bytes;

char buffer[128];

while(true)

{

bytes=recv(client,buffer,sizeof(buffer),0);

buffer[strlen(buffer)]=’\0′;

if(strlen(buffer)!=0)printf(«%s\n»,buffer);

}

return 0;

}

Программа-сервер с использованием модели IOCP

#include <stdio.h>

#include <conio.h>

#include <malloc.h>

#include <Winsock2.h>

#include <list>                         // Используем STL

#pragma comment (lib,»ws2_32.lib»)     // Линковка библиотеки ws2_32.lib

using namespace std;                   // Использовать пространство имен std

#define BUFF_SIZE     1024              // Размер буфера

#define PORT 28912                      // Номер порта

DWORD WINAPI ServerPool(HANDLE hp);

void SendToAll(char *buffer,unsigned long bytes);    //Функция отсылки сообщения всем 

// клиентам

SOCKET server_sock;                     // Прослушивающий сокет сервера

int ClientCount;                        // Счетчик клиентов

list <SOCKET> ClientList;               // Список клиентов

//—————————————————————————

struct ovpConnection: public OVERLAPPED

{

  int client_number;       // Номер клиента

  SOCKET c;                   // Сокет клиента

  char * buffer;           // Буфер сообщений

  enum

  {

    op_type_send,          // Посылка

    op_type_recv           // Прием

  }op_type;                // Тип операции

};

//—————————————————————————

void main(int argc, char *argv[])

{

int err;                 // Возвращаемое значение

char buffer[128];        // Буфер для сообщений

WORD wVersionRequested;  // Запрашиваемая версия

WSADATA wsaData;         // Структура инф-ции о сокетах

HANDLE hCp;              // Описатель порта завершения

LPOVERLAPPED overlapped; // Структура асинхронного I/O

HANDLE hThread;          // Хендл потока

DWORD ThreadId;          // Идентификатор потока

DWORD flags;             // Флаги ф-ции WSARecv

//Инициализация библиотеки ws2_32.dll

wVersionRequested=MAKEWORD(2,2);

err=WSAStartup(wVersionRequested,&wsaData);

if(err==SOCKET_ERROR)

{

    strcpy(buffer,»Ошибка ф-ции WSAStartup»);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, WSAGetLastError());

    WSACleanup(); // Завершение работы

    getch();

    return;

}

//Создаем порт завершения

hCp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);

if(hCp==NULL)

{

    strcpy(buffer,»Ошибка ф-ции CreateIoCompletionPort»);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, GetLastError());

    WSACleanup(); // Завершение работы

    getch();

    return;

}

// Задаем параметры для прослушивающего сокета сервера

server_sock=WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,NULL, WSA_FLAG_OVERLAPPED);

if(server_sock==INVALID_SOCKET)

{

    strcpy(buffer,»Ошибка ф-ции WSASocket»);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, WSAGetLastError());

    WSACleanup(); //Завершение работы с сокетами

    getch();

    return;

}

else

{

// Используем ранее созданный порт завершения

    if(CreateIoCompletionPort((HANDLE)server_sock,hCp,0,0)==NULL)

{

      strcpy(buffer,»Ошибка ф-ции CreateIoCompletionPort»);

      CharToOem(buffer,buffer);

      printf(«%s %d\n»,buffer, GetLastError());

      WSACleanup(); //Завершение работы

      getch();

      return;

       }

}

//Заполняем структуру адреса и подключаем сокет к коммуникационной среде

SOCKADDR_IN sinServer;

sinServer.sin_family = AF_INET;

sinServer.sin_port = htons(PORT);

sinServer.sin_addr.s_addr = INADDR_ANY;

err = bind( server_sock,(LPSOCKADDR)&sinServer,sizeof(sinServer) );

if(err==-1)

{

    strcpy(buffer,»Ошибка ф-ции bind»);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, GetLastError());

    WSACleanup(); //Завершение работы

    getch();

    return;

}

//Создаем очередь для ожидания запросов от клиентов на соединение

err = listen(server_sock, SOMAXCONN);

if(err==-1)

{

    strcpy(buffer,»Ошибка ф-ции listen №»);

    CharToOem(buffer,buffer);

    printf(«%s %d\n»,buffer, GetLastError());

    WSACleanup(); // Завершение работы

    getch();

    return;

  }

//Cоздаем рабочий поток для обслуживания сообщений от порта завершения

hThread = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ServerPool,hCp,0, &ThreadId);

ClientCount=0;

strcpy(buffer,»Сервер запущен\n»);

CharToOem(buffer,buffer);

printf(«%s»,buffer);

//Бесконечный цикл для многократного обслушивания запросов от клиентов

while(true)

{

//Принимаем запрос от программы-клиента на установление связи

SOCKADDR_IN sinClient;

int lenClient=sizeof(sinClient);

SOCKET client = accept(server_sock,(struct sockaddr*)&sinClient, &lenClient);

CreateIoCompletionPort((HANDLE)client,hCp,0,0);

//Добавляем клиента в список

ClientList.insert(ClientList.end(),client);

// Создаем overlapped-структуру

ovpConnection * op = new ovpConnection;

//Заполняем overlapped-структуру

op->sock_handle = client;

op->op_type = ovpConnection::op_type_recv;

op->buffer = new char[BUFF_SIZE];

op->hEvent = 0;

op->client_number=++ClientCount;

strcpy(buffer,»Клиент № %d подключился, активных клиентов %d\n»);

CharToOem(buffer,buffer);

printf(buffer,ClientCount,ClientList.size());

unsigned long b;

WSABUF buf;

buf.buf = op->buffer;

buf.len = BUFF_SIZE;

flags=0;

err=WSARecv(op->sock_handle, &buf, 1, &b, &flags, op, 0);

if(!err)

{

   strcpy(buffer,»Ошибка ф-ции WSARecv»);

   CharToOem(buffer,buffer);

   printf(«%s %d\n»,buffer, WSAGetLastError());

}

}

return;

}

//—————————————————————————

//Функция потока сервера для обслуживания порта завершения

//—————————————————————————

DWORD WINAPI ServerPool(HANDLE hp)

{

  int err;                 // Возвращяемое значение

  unsigned long bytes;     // Кол-во байтов

  unsigned long key;       // Значение, асоциированное с хендлом порта

  char buffer[128];        // Буфер для сообщений

  LPOVERLAPPED overlapped; // Структура асинхронного I/O

  HANDLE hport=hp;         // Дескриптор порта

  DWORD flags;             // Флаги ф-ции WSARecv()

  while(true)

  {

  // Получаем информацию о завершении операции

  if(GetQueuedCompletionStatus(hport, &bytes, &key, &overlapped, INFINITE))

   {

       // Операция завершена успешно

   ovpConnection * op = (ovpConnection*)overlapped;

// Определяем тип завершенной операции и выполняем соответствующие действия

   switch(op->op_type)

{

      //Завершена отправка данных

      case ovpConnection::op_type_send:

      delete [] op->buffer;

      delete op;

      break;

      //Завершен приём данных

      case ovpConnection::op_type_recv:

      if(!bytes)

{

//Соединение с данным клиентом закрыто

            ClientList.remove(op->sock_handle);

            closesocket(op->sock_handle);

            strcpy(buffer,»Клиент № %d отключился, активных клиентов %d\n»);

            CharToOem(buffer,buffer);

            printf(buffer,op->client_number,ClientList.size());

            break;

       }

   op->buffer[bytes]=’\0′;

   if(op->buffer[0]==’*’) //Звездочка * — признак приема сообщения, которое

//должно быть переслано всем подключенным клиентам

   {

   strcpy(buffer,»От клиента № %d получено сообщение для всех  %s\n»);

   CharToOem(buffer,buffer);

   printf(buffer,op->client_number,(op->buffer+1));

   SendToAll(op->buffer, bytes);  //Отправляем данные всем

   }

   else

   {

   strcpy(buffer,»От клиента № %d получено сообщение %s\n»);

   CharToOem(buffer,buffer);

   printf(buffer,op->client_number,op->buffer);

   }

   unsigned long b;

   WSABUF buf;

   buf.buf = op->buffer;

   buf.len = BUFF_SIZE;   // buffer_len – постоянная величина

   err=WSARecv(op->sock_handle, &buf, 1, &b, &flags, op, 0);

   if(!err)

   {

   strcpy(buffer,»Ошибка ф-ции WSARecv»);

   CharToOem(buffer,buffer);

   printf(«%s %d\n»,buffer, WSAGetLastError());

   }

   }

}

else

{

  if(!overlapped)

{

       // Ошибка с портом

      // Закрываем все сокеты, закрываем порт, очищаем список

      for(list<SOCKET>::iterator i=ClientList.begin();i!=ClientList.end();i++)

{

          closesocket(*i);

      }

      ClientList.clear();

      closesocket(server_sock);

      CloseHandle(hport);

      strcpy(buffer,»Ошибка порта № %d, сервер завершает работу\n»);

      CharToOem(buffer,buffer);

      printf(buffer,GetLastError());

      getch();

      exit(0);

      }

      else

{

      //Закрываем соединение с клиентом

      closesocket(((ovpConnection*)overlapped)->sock_handle);

      ClientList.remove(((ovpConnection*)overlapped)->sock_handle);

      strcpy(buffer,»Клиент № %d отключился, активных клиентов %d\n»);

      CharToOem(buffer,buffer);

printf(buffer,((ovpConnection*)overlapped)->client_number,           ClientList.size());

      }

    }

  }

  return 0;

}

//—————————————————————————

//Функция отсылки данных всем клиентам

//—————————————————————————

void SendToAll(char *buffer,unsigned long bytes)

{

//Перебираем все соединения

for(list<SOCKET>::iterator i=ClientList.begin();i!=ClientList.end();i++)

{

ovpConnection * op = new ovpConnection;

op->sock_handle = *i;

op->op_type = ovpConnection::op_type_send;

op->buffer = new char[bytes-1];

memcpy(op->buffer, (buffer+1), bytes-1);

op->buffer[bytes-1]=’\0′;

unsigned long b;

WSABUF buf;

buf.buf = op->buffer;

buf.len = BUFF_SIZE;

WSASend(op->sock_handle,&buf,1,&b, 0, op, 0);

}

  return;

}

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт защищен reCAPTCHA и применяются Политика конфиденциальности и Условия обслуживания применять.