Навигация
Главная »  Delphi 

Windows Sockets, IOCP и Delphi


Источник: habrahabr
Mr_Developer

Пролог


Недавно я столкнулся с необходимостью эффективной работы с сокетами в Windows приложении. Задача типичная для нагруженного сервера. Нетипичным тут будет казаться только язык реализации - Delphi.
Я хочу описать способ массовой асинхронной работы с большим количеством сокетов с использованием I/O Completion Ports. Microsoft в своей литературе рекомендует использовать именно эту технологию. Я думаю, многие с ней знакомы, но на всякий случай укажу ссылку на MSDN. Суть технологии в том, что система организует высокоэффективную очередь событий, а программа обрабатывает её из тред-пула, размер которого подобран по количеству вычислительных ядер. Данный подход имеет преимущества при большом количестве одновременно производимых асинхронных операций ввода вывода для разных конечных точек. Готовый исходник можно (лучше) сразу гляуть здесь. Не всё идеально, но для эксперементов сойдёт.

Roadmap


Я, в некотором смысле, буду придерживаться идеологии Node.Js во всём, что касается организации объектов и операций ввода вывода. 
В случае с серверной частью понадобиться реализовать следующее:
  • Прослушивание сокета. Принятием или отклонением новых соединений.
  • Отслеживание сигнала закрытия клиентских сокетов.

Для клиента первый пункт этого списка не актуален, но необходимо реализовать асинхронное подключение к серверу. В обоих классах будет возможность одновременного и чтения и записи на одну конечную точку.
Все созданные экземпляры клиентских и серверных сокетов будут использовать одну общую очередь сообщений и один тред-пул. Это нужно для возможности использовать оба типа сокетов в одном приложении оптимальным образом.

Реализация


Приступим. Для начала отмечу, что в связи с абсолютно асинхронной событийной моделью построения я буду реализовывать не классы а интерфейсы. Это очень удобно в данном случае, так как с конечного программиста снимается ответственность за выделенную память. Да и вообще, отследить тут её использование другим способом либо очень затратно либо вовсе невозможно. Очень много работы должно происходить при инициализации модуля. 
  • Создание списков сокетов разных типов.
  • Инициализация подсистемы сокетов.
  • Создание очереди сообщений.
  • Создание пула для обработки очереди.
  • Создание событий для сокетов.
  • Создание потоков отслеживающих сокетные события( например подключение нового клиента).

И так, секция инициализации содержит следующую процедуру, которая реализует список пункт за пунктом.
procedure Init; var WSAData: TWsaData; i: Integer; begin gClients := TProtoStore.Create; gListeners := TProtoStore.Create; gServerClients := TProtoStore.Create; if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then raise IOCPClientException.Create(sErrorInit_WSAtartup); gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2); if gIOCP = INVALID_HANDLE_VALUE then raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort); for i := 1 to CPUCount * 2 do begin SetLength(gWorkers, Length(gWorkers) + 1); gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create(); end; gListenerAcceptEvent := WSACreateEvent; if gListenerAcceptEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gServerClientsCloseEvent := WSACreateEvent; if gServerClientsCloseEvent = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClisentsConnectAndCloseEvents := WSACreateEvent; if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then raise IOCPClientException.Create(sErrorInit_WSACreateEvent); gClientSocketEventThread := TSocketEventThread.Create (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED); gClientSocketEventThread.Start; gServerClientsSocketEventThread := TSocketEventThread.Create (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED); gServerClientsSocketEventThread.Start; gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent, gListeners, ET_EVENT_SIGNALED); gServerSocketEventThread.Start; end; 

Функция CreateIoCompletionPort в данном случае выполняет создание специальной очереди сообщений.
Видно, что для отслеживания событий на сокетах с разным назначением используется один и тот же класс потока TSocketEventThread. Потоки этого класса выполняют процедуру, которая ожидает сокетные события, и сразу же ставят в очередь сообщения (для каждого сокета относящегося к типу, который облуживает этот поток) о том что произошло какое-то событие. 
procedure TSocketEventThread.WaitForClientsEvents; var WaitResult: DWORD; const TimeOut: DWORD = 100; begin WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE); if WaitResult = WSA_WAIT_FAILED then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents); if WaitResult = WSA_WAIT_EVENT_0 then begin if not WSAResetEvent(fEvent) then raise IOCPClientException.Create (sErrorWaitForClientsEvents_WSAResetEvent); fStore.Post(fKey); end; end;

Тут метод fStore.Post(fKey); как раз и выполняет отправку сообщений в очередь.
procedure TProtoStore.Post(CompletionKey: DWORD); var i: Integer; begin fLock.Enter; try for i := 0 to Length(ProtoArray) - 1 do begin ProtoArray[i]._AddRef; if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey, POverlapped(ProtoArray[i])) then begin ProtoArray[i]._Release; raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus); end; end; finally fLock.Leave; end; end;

Особое внимание заслуживает тут использование объектов с интерфейсами.
Метод _AddRef используется для того, чтобы обозначить тот факт, что объект "находится в очереди" и его не следует уничтожать. (Позже после обработки будет вызван _Release). Процедура PostQueuedCompletionStatus непосредственно выполняет постановку сообщения в очередь.
Пул обработает каждое сообщение в асинхронном режиме.
Для этого он выполняет следующую процедуру.
procedure TWorkerThread.ProcessIOCP; var NumberOfBytes: DWORD; CompletionKey: NativeUInt; Overlapped: POverlapped; Proto: TIOCPSocketProto; begin if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey, Overlapped, INFINITE)) and (Overlapped = nil)) then begin if CompletionKey = ET_EVENT_SIGNALED then begin Proto := TIOCPSocketProto(Overlapped); with Proto do begin IOCPProcessEventsProc(); _Release; end end else if CompletionKey <> 0 then begin Proto := TIOCPSocketProto(CompletionKey); if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then Proto._Release; end; end end;

Процедура GetQueuedCompletionStatus служит для получения сообщения из очереди. Далее определяется является ли это сообщение сообщением о завершенном вводе/выводе или это сообщение о произошедшем событии. Тут продемонстрированы два способа передать через очередь какую-то информацию, в данном случае это ссылка на конкретный экземпляр класса сокетов. 
Обработка ведётся унифицировано для всех типов сокетов, это достигнуто с помощью наследования от общего предка который содержит общие обработчики, допускается их переопределение. 
Рассмотрим механизм обработки сокетных событий.
procedure TIOCPSocketProto.IOCPProcessEventsProc(); var WSAEvents: TWsaNetworkEvents; AcceptedSocket: TSocket; RemoteAddress: string; begin if fStateLock <> CLI_SOCKET_LOCK_CLOSED then begin fClosingLock.BeginRead; try if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then begin if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then begin if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED); CallOnConnect; end; if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then CallOnClose; if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then begin AcceptedSocket := DoAccept(RemoteAddress); if AcceptedSocket <> INVALID_SOCKET then begin fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose, RemoteAddress).Prepare; end; end; end finally fClosingLock.EndRead; end; end; end;

Здесь интересно применён класс TMultiReadExclusiveWriteSynchronizer. Он используется для предотвращения попытки закрыть сокет и уничтожить объект из другой нити пула (fClosingLock.BeginRead). Все операции с сокетом проходят как операции чтения для этого объекта синхронизации, кроме операции создания и операции закрытия сокета - они являются операциями записи и потому могут выполняться только при монопольном владении ресурсом.
Во всём же остальном работа с сокетами в данной процедуре совершенно обыкновенная.
Единственное что в этой процедуре стоит рассмотреть дополнительно - это подключение нового клиента к серверу, метод DoAccept. 
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket; var addr: TSockAddr; addrlen: Integer; dwCallbackData: NativeUInt; RemoteAddrLen: DWORD; begin dwCallbackData := NativeUInt(self); addrlen := SizeOf(addr); Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack, dwCallbackData); if Result <> INVALID_SOCKET then begin SetLength(RemoteAddress, 255); RemoteAddrLen := Length(RemoteAddress); if WSAAddressToString(addr, addrlen, nil, PChar([1]), RemoteAddrLen) = SOCKET_ERROR then raise IOCPClientException.Create(sErrorAccept_WSAAddressToString); SetLength(RemoteAddress, RemoteAddrLen - 1) end end;

Здесь ключевым моментом является использование WSAAccept. Эта функция позволяет отклонять подключение клиентов таким образом, что клиент на самом деле получает событие FD_CONNECT.
Это предочтительный путь для организации так называемых чёрных списков.
Идём далее. Расмотрим организацию ввода вывода. Сделаем это на примере операции чтения.
procedure TIOCPSocketProto.Read(Length: DWORD; OnRead, OnReadProcess: TOnReadEvent); var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin fClosingLock.BeginRead; try if fStateLock = CLI_SOCKET_LOCK_CONNECTED then begin if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE then begin fOnRead := OnRead; fOnReadProcess := OnReadProcess; fReaded := 0; fReadBufLength := Length; fReadBuffer := nil; GetMem(fReadBuffer, Length); if fReadBuffer <> nil then begin Bytes := 0; FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0); WsaBuf.buf := fReadBuffer; WsaBuf.len := fReadBufLength; Flags := 0; Bytes := 0; _AddRef; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin FreeMem(fReadBuffer, Length); InterlockedExchange(fReadLock, IO_IDLE); _Release; raise IOCPClientException.Create(sErrorRead_WSARecv); end; end else raise IOCPClientException.Create(sErrorRead_GetMem); end else raise IOCPClientException.Create(sErrorRead_InProcess); end else raise IOCPClientException.Create(sErrorRead_NotConnected); finally fClosingLock.EndRead; end; end;

Здесь пришлось использовать интерлокед блокировку, т.к. она очень быстрая и удовлетворяет потребность в отсечении попытки повторного вызова опрации ввода/вывода. Память выделяется под буфер единажды в каждой операции. Далее вызывается чтение из сокета в асинхронном режиме. Объект также "помечается" с помощью AddRef, для невозможности его удаления во время нахождения в очереди. По завершении вычитывания пакета сообщения об этом автоматически выставляется в очередь.
Рассмотрим, что происходит при выборке из очереди сообщения о завершенном вводе/выводе.
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD; Overlapped: POverlapped): Boolean; var Bytes, Flags: DWORD; WsaBuf: TWsaBuf; begin Result := FALSE; fClosingLock.BeginRead; try if Overlapped = @fOverlappedRead then begin if NumberOfBytes <> 0 then begin if fReadLock = IO_PROCESS then begin inc(fReaded, NumberOfBytes); if fReaded < fReadBufLength then begin CallOnReadProcess; WsaBuf.buf := fReadBuffer; inc(WsaBuf.buf, fReaded); WsaBuf.len := fReadBufLength; dec(WsaBuf.len, fReaded); Flags := 0; Bytes := 0; if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnRead; Result := True; end end else begin CallOnReadProcess; CallOnRead; Result := True; end; end end else begin CallOnRead; Result := True; end; end else if Overlapped = @fOverlappedWrite then begin if NumberOfBytes <> 0 then begin if fWriteLock = IO_PROCESS then begin inc(fWrited, NumberOfBytes); if fWrited < fWriteBufLength then begin CallOnWriteProcess; WsaBuf.buf := fWriteBuffer; inc(WsaBuf.buf, fWrited); WsaBuf.len := fWriteBufLength; dec(WsaBuf.len, fWrited); Flags := 0; Bytes := 0; if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then begin CallOnWrite; Result := True; end end else begin CallOnWriteProcess; CallOnWrite; Result := True; end; end end else begin CallOnWrite; Result := True; end; end finally fClosingLock.EndRead; end; end; 

Суть этой процедуры в том, что она вызывает чтение или запись в сокет до того момента, когда выделенный буфер не окажется заполненным. Интересный момент в данном случае, это определение типа операции по ссылке на оверлапед структуру. Эту ссылку предоставляет очередь и необходимо лишь сравнить её с соответсвующими полями класса, в которых храняться структуры для чтения и записи.
Так же примечательно, то что если операция чтения/записи выполнилась мгновенно, то она всё равно попадает в очередь, однако это можно настроить через апи.
Стоит так же рассмотреть создание класса сокета и внедрение в очередь.
constructor TIOCPClientSocket.Create(RemoteAddress: string; OnConnect, OnClose: TOnSimpleSocketEvenet); var lRemoteAddress: TSockAddr; lRemoteAddressLength: Integer; begin inherited Create(); fStore := gClients; fOnConnect := OnConnect; fOnClose := OnClose; fStateLock := 0; fRemoteAddressStr := RemoteAddress; fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); if fSocket = INVALID_SOCKET then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket); if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents, FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect); if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then raise IOCPClientException.Create (sErrorTIOCPClientSocket_CreateIoCompletionPort); fStateLock := CLI_SOCKET_LOCK_CREATED; fStore.Add(self); lRemoteAddressLength := SizeOf(lRemoteAddress); lRemoteAddress.sa_family := AF_INET; if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil, lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then raise IOCPClientException.Create (sErrorTIOCPClientSocket_WSAStringToAddress); if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil, nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect); end; 

В конструкторе клиентского сокета создаётся непосрественно сокет (WSASocket), регестрируется в очереди (CreateIoCompletionPort), асоциируется с событием и вызывает асинхронную функцию подключения(WSAConnect). Сам факт подключения ожидается в потоке который был рассмотрен первым(поток ожидания событий в сокетах). Тот в свою очередь поставит это событие в очередь.

Эпилог


В данной статье кратко рассмотрены, на мой взгляд, удачные приёмы создания классов для событийного программирования. 
Удалось создать класс для выскопроизводительной работы с сокетами для Delphi. Тема эта освещена в целом крайне слабо и я планирую продолжить эту публикацию ещё 2 - 3 постами по темам контекстов сокетов при использовании интерфейсов и создание защищённых соединений при использовании IOCP (криптопровайдеры и Winsock Secure Socket Extensions). Полный код примера здесь.

 

 Разработка DLL для CTD с использованием Delphi.
 Использование инструментов криптографии в Delphi-приложениях (исходники).
 Windows Sockets, IOCP и Delphi.
 BORLAND DELPHI: Базы данных "на DELPHI" - BDE.
 Borland InterBase 7.5.


Главная »  Delphi 

© 2018 Team.Furia.Ru.
Частичное копирование материалов разрешено.