Scalable IPC Using Asynchronous Named Pipes and IO Completion Ports

.
XML parsing using SAX


Introduction

This is a client/server implementation of Inter Process Communication using Asynchronous Named Pipes with IO completion ports to achieve scalability in situations where several applications need to send and receive large chunks of data to each other or to a central (server) application.
Named pipes provide a flexible and rather easy to implement IPC model which has a message read communication mode where a "client" process can send a message to a "server" process and get an answer to that message.
IO Completion ports, though more known for their use with sockets in server applications, can also be used with files, directories and pipes and they provide an efficient way for handling concurrent asynchronous IO requests.

Using the Code

Creating the server:

//this function will be called when a message is received
//and can be given an answer in pAnswerBuf to pass to the sender
VOID WINAPI Callback(
  LPCVOID	pMessage,
  DWORD		dwMessageLen,
  LPVOID	pAns,
  DWORD*	pdwAnsLen)
{

}

CAsyncPipesServer2 myPipeServer;
myPipeServer.Create(_T("SAMPLEIPC1"), Callback);



Sending a message to server:

	CAsyncPipesClient2 myPipe;

	if (!myPipe.SendMsg(_T("SAMPLEIPC1"),
                  (PVOID)pBuff,
                  nLenBuff,
                  szAns,
                  nAnsLen,
                  INFINITE))
	{ .... }


How Does It Work

Server Side

After creating a completion port we create a separate thread for creating pipe instances that will wait for new connections from clients. This thread will create a predefined number of instances and then wait for a signal from the main thread. When new clients connect to the server, using up pending pipe instances, main thread signals the pipe instances thread to create more. This way we don't hold up the main queue polling thread too much and replace the used up pending pipe instances as soon as possible. After that we issue a ReadFile call to read data a newly connected client has sent.
		case OP_CONNECT:
			{
				////////////////////////////////////////////////////////////

				{
					CXCritSec::CLocker (p->m_csOlp);
					--p->m_nPendingConnects;
				}

				p->m_eventMoreConnects.Set();


				///////////////////////////////////////////////////////////

				pOverlapPlus->nOpCode = OP_READ;
				if (!(pOverlapPlus->pBuff = new CBuffer))
				{
					_ASSERT(NULL);
					p->RemoveOverlapped(pOverlapPlus);
					break;
				}

				DWORD dwBytesRead = 0;

				if (!ReadFile(pOverlapPlus->hPipe,
						pOverlapPlus->pBuff->GetPTR(),
						pOverlapPlus->pBuff->GetSize(),
						&dwBytesRead,
						&pOverlapPlus->ol))
				{
					DWORD dwErr;
					if ((dwErr = GetLastError()) != ERROR_IO_PENDING
						&& dwErr != ERROR_PIPE_LISTENING)
					{
						_ASSERT(NULL);
						p->RemoveOverlapped(pOverlapPlus);
						break;
					}

				}
				else //completed immediately
				{
					pOverlapPlus->pBuff->SetCurrLen(dwBytesRead);

					PostQueuedCompletionStatus(p->m_hIOCP,
						dwBytesRead,
						(ULONG_PTR)pOverlapPlus->hPipe,
						&pOverlapPlus->ol);
				}

			}
			break;




Once read is complete we create a new thread in which we will call the callback function supplied during creation of the server this is because we want to return to GetQueuedCompletionStatus as soon as possible otherwise we unnecessarily delay both new connections and pending read/write requests this also allows the callback function to safely do any lengthy operations if needed. After that we write any (answer) data the callback function wish to pass on to the client onto the pipe.
//called from a separate thread upon read completion
void CAsyncPipesServer2::OnRead(SOverlapped* pOlp)
{
	CBuffer myBuffAns;
	DWORD dwAns = myBuffAns.GetSize();

	m_pCallback(pOlp->pBuff->GetPTR(),
				pOlp->pBuff->GetCurrLen(),
				myBuffAns.GetPTR(),
				&dwAns);


	myBuffAns.SetCurrLen(dwAns);
	pOlp->pBuff->Assign(myBuffAns);


	DWORD dwWritten = 0;

	if (WriteFile(pOlp->hPipe,
			pOlp->pBuff->GetPTR(),
			pOlp->pBuff->GetCurrLen(),
			&dwWritten,
			&pOlp->ol))
	{
		PostQueuedCompletionStatus(m_hIOCP,
						dwWritten,
						(ULONG_PTR)pOlp->hPipe,
						&pOlp->ol);
	}
	else if (GetLastError() != ERROR_IO_PENDING)
	{
		//probably pipe is being closed, or pipe not connected
		RemoveOverlapped(pOlp, true);
	}


}


Client Side

There are a couple of ways to send a message to the pipes server, depending on the level of control you want. I wanted to control the timeout for connection establishment and message transfer individually so I am using the second easiest method CreateFile + WaitNamedPipe + TransactNamedPipe. Easiest method being CallNamedPipe which is equivalent to calling CreateFile, WaitNamedPipe, WriteFile, ReadFile and CloseHandle.
BOOL CAsyncPipesClient2::SendMsg(LPCTSTR  pIpc,
		PVOID   pMessage,
		DWORD   dwMessageLen,
		PVOID   pAns,
		DWORD   dwAnsLen,
		DWORD   dwAnsTimeOut)
{
	tstring strPipeName = _T("\\\\.\\pipe\\");
	strPipeName += pIpc;

	HANDLE hPipe;


_BEGIN:

	if ((hPipe = CreateFile(strPipeName.c_str(),
				GENERIC_READ |
				GENERIC_WRITE,
				FILE_SHARE_READ | FILE_SHARE_WRITE,
				NULL,
				OPEN_EXISTING,
				FILE_FLAG_OVERLAPPED,
				NULL)) == INVALID_HANDLE_VALUE)
	{
		if (GetLastError() == ERROR_PIPE_BUSY)
		{
			if (WaitNamedPipe(strPipeName.c_str(), MAX_WAIT_CONNECT))
				goto _BEGIN;
		}
		else
			return FALSE;

		
		//_ASSERT(NULL);

		return FALSE;
	}


	// Pipe connected; change to message-read mode. 
	DWORD dwMode = PIPE_READMODE_MESSAGE; 
	if (!SetNamedPipeHandleState( 
					hPipe,    // pipe handle 
					&dwMode,  // new pipe mode 
					NULL,
					NULL))
	{
		_ASSERT(NULL);
		CloseHandle(hPipe);
		return FALSE;
	}


	OVERLAPPED olp;
	CXEvent eventTransact;

	memset(&olp, 0, sizeof(olp));
	olp.hEvent = eventTransact.GetEvent();

	DWORD dwBytesRead = 0;
	if (!TransactNamedPipe( 
					hPipe,
					pMessage,
					dwMessageLen,
					pAns,
					dwAnsLen,
					&dwBytesRead,
					&olp))
	{
		DWORD dwLastErr = GetLastError();
		if (dwLastErr == ERROR_MORE_DATA) 
		{
			//ReadFile..
		}
		else if (dwLastErr == ERROR_IO_PENDING)
		{
			if (pAns)
				eventTransact.Wait(dwAnsTimeOut);
		}
		else
		{
			CloseHandle(hPipe);
			return FALSE;
		}
	}


	CloseHandle(hPipe); 


	return TRUE;
}

.
.

Comments are closed on this post.