Socket changes for portability

Billy Newport bnewport@eps.agfa.be
Tue, 2 Dec 1997 13:18:38 +0100


This is a multi-part message in MIME format.

------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: multipart/alternative;
	boundary="----=_NextPart_001_01F0_01BCFF24.CDE73B00"


------=_NextPart_001_01F0_01BCFF24.CDE73B00
Content-Type: text/plain;
	charset="iso-8859-1"
Content-Transfer-Encoding: quoted-printable

I was wondering if the following changes could be including in OmniORBs =
standard release.

I've abstract all socket operations to a base class (baseSocket). I then =
changed tcpSocket_NT to use this abstraction instead of bsd calls. I =
then implemented a subclass of baseSocket for the normal NT bsd/WinSock =
calls. If you implement Sun ATMOS versions of this abstract class they =
should all work with the new tcpSocket_NT code.

There is a factory method baseSocket::getSocket which returns a new =
socket object for the platform.

This should improve the portability and maintainability of the code. =
I've attached the changes. Also, if some-one could do some-thing similar =
to OmniThread to avoid the IMPLEMENTATION macros in it, that would also =
improve matters. I mean make OmniThread a pure abstract base class along =
with all omni_xxx mutex classes. Include a factory method on each one to =
construct the platform specific version of it.

Billy Newport



------=_NextPart_001_01F0_01BCFF24.CDE73B00
Content-Type: text/html;
	charset="iso-8859-1"
Content-Transfer-Encoding: quoted-printable

<!DOCTYPE HTML PUBLIC "-//W3C//DTD W3 HTML//EN">
<HTML>
<HEAD>

<META content=3Dtext/html;charset=3Diso-8859-1 =
http-equiv=3DContent-Type>
<META content=3D'"MSHTML 4.71.1712.3"' name=3DGENERATOR>
</HEAD>
<BODY bgColor=3D#ffffff>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>I was wondering if the =
following=20
changes could be including in OmniORBs standard release.</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT>&nbsp;</DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>I've abstract all =
socket operations=20
to a base class (baseSocket). I then changed tcpSocket_NT to use this=20
abstraction instead of bsd calls. I then implemented a subclass of =
baseSocket=20
for the normal NT bsd/WinSock calls. If you implement Sun ATMOS versions =
of this=20
abstract class they should all work with the new tcpSocket_NT =
code.</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT>&nbsp;</DIV>
<DIV><FONT face=3DArial size=3D2>There is a factory method =
baseSocket::getSocket=20
which returns a new socket object for the platform.</FONT></DIV>
<DIV><FONT face=3DArial size=3D2></FONT>&nbsp;</DIV>
<DIV><FONT face=3DArial size=3D2>This should improve the portability and =

maintainability of the code. I've attached the changes. Also, if =
some-one could=20
do some-thing similar to OmniThread to avoid the IMPLEMENTATION macros =
in it,=20
that would also improve matters. I mean make OmniThread a pure abstract =
base=20
class along with all omni_xxx mutex classes. Include a factory method on =
each=20
one to construct the platform specific version of it.</FONT></DIV>
<DIV><FONT face=3DArial size=3D2></FONT>&nbsp;</DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2>Billy =
Newport</FONT></DIV>
<DIV><FONT color=3D#000000 face=3DArial size=3D2></FONT>&nbsp;</DIV>
<DIV>&nbsp;</DIV></BODY></HTML>

------=_NextPart_001_01F0_01BCFF24.CDE73B00--

------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
	name="baseSocket.h"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
	filename="baseSocket.h"

#ifndef BASE_SOCKET_H
#define BASE_SOCKET_H

class baseSocket
{
public:
	enum SocketErrors { eOK, eINTERRUPTED, eERROR, eINVALID};
	static const int SocketError;
protected:
	SocketErrors m_errno;
public:
	SocketErrors getErrNo() const 
	{ 
		return m_errno; 
	};
	virtual bool connect(tcpSocketEndpoint *pPoint) = 0;
	virtual bool bind(tcpSocketEndpoint *pPoint) = 0;
	virtual ~baseSocket();
	virtual baseSocket *accept() = 0;
	virtual int recv(char *pBuffer, int NumBytes) = 0;
	virtual int send(const char *pBuffer, int NumBytes) = 0;
	virtual void shutdown() = 0;

	static baseSocket *getSocket();
	static bool Initialise();
	static bool Terminate();
};

#endif
------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
	name="bsdSocket.cc"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
	filename="bsdSocket.cc"

#include <omniORB2/CORBA.h>
#include "tcpSocket_NT.h"
#include "bsdSocket.h"
#include <winsock.h>

#include <sys/types.h>
#include <errno.h>
#include <limits.h>
#include <stdio.h>

#include "libcWrapper.h"

#ifdef QWERTY
baseSocket *baseSocket::getSocket()
{
	return new NTsocket();
}

bool baseSocket::Initialise()
{
	return true;
}

bool baseSocket::Terminate()
{
	return true;
}

#endif

NTsocket::NTsocket()
{
	m_socket =3D -1;
}

NTsocket::NTsocket(int socket)
{
	m_socket =3D socket;
}

NTsocket::~NTsocket()
{
	if(m_socket !=3D -1)
		closesocket(m_socket);
	m_socket =3D -1;
}
bool NTsocket::connect(tcpSocketEndpoint *r)
{
	struct sockaddr_in raddr;
	LibcWrapper::hostent_var h;
	int  rc;

	if (! LibcWrapper::isipaddr( (char*) r->host()))
	{
		if (LibcWrapper::gethostbyname((char *)r->host(),h,rc) < 0)=20
		{
			// XXX look at rc to decide what to do or if to give up what errno
			// XXX to return
			// XXX For the moment, just return EINVAL
			m_errno =3D Convert_errno(EINVAL);
			return false;
		}
		// We just pick the first address in the list, may be we should go
		// through the list and if possible pick the one that is on the same
		// subnet.
		memcpy((void*)&raddr.sin_addr,
		 (void*)h.hostent()->h_addr_list[0],
		 sizeof(raddr.sin_addr));
	}
	else
	{
		// The machine name is already an IP address
		CORBA::ULong ip_p;
		if ( (ip_p =3D inet_addr( (char*) r->host() )) =3D=3D INADDR_NONE)
		{
			m_errno =3D Convert_errno(errno);
			return false;
		}

		memcpy((void*) &raddr.sin_addr, (void*) &ip_p, =
sizeof(raddr.sin_addr));
	}

	raddr.sin_family =3D PF_INET;
	raddr.sin_port   =3D htons(r->port());

	if ((m_socket =3D socket(PF_INET,SOCK_STREAM,0)) =3D=3D INVALID_SOCKET) =

	{
		m_errno =3D Convert_errno(errno);
		return false;
	}
	if (::connect(m_socket,(struct sockaddr *)&raddr,
		  sizeof(struct sockaddr_in)) =3D=3D baseSocket::SocketError)=20
	{
		m_errno =3D Convert_errno(errno);
		::closesocket(m_socket);
		m_socket =3D -1;
		return false;
	}
	return true;
}

baseSocket *NTsocket::accept()
{
	int new_sock;
	struct sockaddr_in raddr;

	int l;

	l =3D sizeof(struct sockaddr_in);

	if ((new_sock =3D ::accept(m_socket,(struct sockaddr *)&raddr,&l)) =
=3D=3D INVALID_SOCKET )=20
	{
		m_errno =3D Convert_errno(errno);
		return 0;
	}
	NTsocket *rc =3D new NTsocket(new_sock);
	return rc;
}

baseSocket::SocketErrors NTsocket::Convert_errno(int err) const
{
	SocketErrors rc =3D eERROR;
	switch(err)
	{
	case EINTR:
		rc =3D eINTERRUPTED;
		break;
	case EINVAL:
		rc =3D eINVALID;
		break;
	}
	return rc;
}

int NTsocket::recv(char *pBuffer, int NumBytes)
{
	int rx;
	printf("Reading %d from %p\n", NumBytes, pBuffer);
    if ((rx =3D ::recv(m_socket,pBuffer,NumBytes,0)) =3D=3D =
baseSocket::SocketError)
		m_errno =3D Convert_errno(errno);
	return rx;
}

int NTsocket::send(const char *pBuffer, int NumBytes)
{
	int tx;
	printf("Sending %d from %p\n", NumBytes, pBuffer);
    if ((tx =3D ::send(m_socket,pBuffer,NumBytes,0)) =3D=3D =
baseSocket::SocketError)
		m_errno =3D Convert_errno(errno);
	return tx;
}

void NTsocket::shutdown()
{
	::shutdown(m_socket,2);
}

bool NTsocket::bind(tcpSocketEndpoint *me)
{
	struct sockaddr_in myaddr;

	if ((m_socket =3D socket(PF_INET,SOCK_STREAM,0)) =3D=3D INVALID_SOCKET) =

	{
		m_errno =3D Convert_errno(errno);
		return false;
	}
	myaddr.sin_family =3D PF_INET;
	myaddr.sin_addr.s_addr =3D INADDR_ANY;
	printf("Port number wanted is %d\n", me->port());
	myaddr.sin_port =3D htons(me->port());

	if (me->port())=20
	{
		int valtrue =3D 1;
		if (setsockopt(m_socket,SOL_SOCKET,
		   SO_REUSEADDR,(char*)&valtrue,sizeof(int)) =3D=3D =
baseSocket::SocketError)
		{
			m_errno =3D Convert_errno(errno);
			closesocket(m_socket);
			m_socket =3D -1;
			return false;
		}
	}

	if (::bind(m_socket,(struct sockaddr *)&myaddr,
	   sizeof(struct sockaddr_in)) =3D=3D baseSocket::SocketError)=20
	{
		m_errno =3D Convert_errno(errno);
		closesocket(m_socket);
		m_socket =3D -1;
		return false;
	}

	// Make it a passive socket
	if (listen(m_socket,5) =3D=3D baseSocket::SocketError)=20
	{
		m_errno =3D Convert_errno(errno);
		closesocket(m_socket);
		m_socket =3D -1;
		return false;
	}

	{
		int l;
		l =3D sizeof(struct sockaddr_in);
		if (getsockname(m_socket,(struct sockaddr *)&myaddr,&l) =3D=3D =
baseSocket::SocketError)=20
		{
			m_errno =3D Convert_errno(errno);
			closesocket(m_socket);
			m_socket =3D -1;
			return false;
		}

		me->port(ntohs(myaddr.sin_port));
		printf("Port number given is %d\n", me->port());

		char self[64];
		if (gethostname(&self[0],64) =3D=3D baseSocket::SocketError)=20
		{
		  throw omniORB::fatalException(__FILE__,__LINE__,
						"Cannot get the name of this host");
		}

		LibcWrapper::hostent_var h;
		int rc;

		if (LibcWrapper::gethostbyname(self,h,rc) < 0)=20
		{
			throw omniORB::fatalException(__FILE__,__LINE__,
						"Cannot get the address of this host");
		}
		memcpy((void *)&myaddr.sin_addr,
		   (void *)h.hostent()->h_addr_list[0],
		   sizeof(myaddr.sin_addr));
		char ipaddr[16];
		sprintf(ipaddr,"%d.%d.%d.%d",
			(int)((ntohl(myaddr.sin_addr.s_addr) & 0xff000000) >> 24),
			(int)((ntohl(myaddr.sin_addr.s_addr) & 0x00ff0000) >> 16),
			(int)((ntohl(myaddr.sin_addr.s_addr) & 0x0000ff00) >> 8),
			(int)((ntohl(myaddr.sin_addr.s_addr) & 0x000000ff)));
		me->host((const CORBA::Char *) ipaddr);
	}
	return true;
}

------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
	name="bsdSocket.h"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
	filename="bsdSocket.h"

#ifndef bsdSocket_H
#define bsdSocket_H

class NTsocket : public baseSocket
{
protected:
	int m_socket;
	SocketErrors Convert_errno(int ErrorNumber) const;
	NTsocket(int socket);
public:
	bool connect(tcpSocketEndpoint *pPoint);
	bool bind(tcpSocketEndpoint *pPoint);
	NTsocket();
	~NTsocket();
	baseSocket *accept();
	int recv(char *pBuffer, int NumBytes);
	int send(const char *pBuffer, int NumBytes);
	void shutdown();
};

#endif

------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
	name="tcpSocket_NT.cc"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
	filename="tcpSocket_NT.cc"

// -*- Mode: C++; -*-
//                            Package   : omniORB2
// tcpSocket_NT.cc            Created on: 4/2/97
//                            Author    : Eoin Carroll (ewc)
//
//    Copyright (C) 1996, 1997 Olivetti & Oracle Research Laboratory
//
//    This file is part of the omniORB library
//
//    The omniORB library is free software; you can redistribute it =
and/or
//    modify it under the terms of the GNU Library General Public
//    License as published by the Free Software Foundation; either
//    version 2 of the License, or (at your option) any later version.
//
//    This library is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
//    Library General Public License for more details.
//
//    You should have received a copy of the GNU Library General Public
//    License along with this library; if not, write to the Free
//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA =
=20
//    02111-1307, USA
//
//
// Description:
//	Implementation of the Strand using TCP/IP and WinSock interface
//=09

/*
  $Log: /Apogee/dev/OmniORB/lib/omniORB2/tcpSocket_NT.cc $
 *=20
 * 2     11/28/97 3:12p Bnewport
 * Now works with AGS for possible Mac port
 *=20
 * 1     11/26/97 2:51p Bnewport
// Revision 1.6  1997/05/06  15:30:32  sll
// Public release.
//
  */

#include <omniORB2/CORBA.h>
#include "tcpSocket_NT.h"

#include <limits.h>
#include <stdio.h>

#include "libcWrapper.h"

#define  DO_NOT_AVOID_MISALIGNMENT   =20

// Size of transmit and receive buffers
const=20
unsigned int=20
tcpSocketStrand::buffer_size =3D 8192 + (int)omni::max_alignment;

tcpSocketStrand::tcpSocketStrand(tcpSocketRope *rope,
				 tcpSocketEndpoint   *r,
				 CORBA::Boolean heapAllocated)
  : Strand(rope,heapAllocated)
{
  pd_socket =3D baseSocket::getSocket();

  if(pd_socket->connect(r) =3D=3D false)
  {
    throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
  }

  pd_tx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
  pd_tx_begin  =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
  pd_rx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
  pd_rx_begin =3D pd_rx_end =3D pd_rx_received_end =3D pd_rx_buffer;
  return;
}

tcpSocketStrand::tcpSocketStrand(tcpSocketRope *r,
				 tcpSocketHandle_t sock,
				 CORBA::Boolean heapAllocated)
  : Strand(r,heapAllocated)
{
  pd_socket =3D sock;
  pd_tx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
  pd_tx_begin  =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
  pd_rx_buffer =3D (void *) new char[tcpSocketStrand::buffer_size];
  pd_rx_begin =3D pd_rx_end =3D pd_rx_received_end =3D pd_rx_buffer;
  return;
}


tcpSocketStrand::~tcpSocketStrand()=20
{
  if (omniORB::traceLevel>=3D5) {
    cerr << "tcpSocketStrand::~Strand() close socket no. " << pd_socket =
<< endl;
  }
  if(pd_socket !=3D 0)
	  delete pd_socket;
  pd_socket =3D 0;

  if (pd_tx_buffer) {
    delete [] (char *)pd_tx_buffer;
    pd_tx_buffer =3D 0;
  }
  if (pd_rx_buffer) {
    delete [] (char *)pd_rx_buffer;
    pd_rx_buffer =3D 0;
  }
}

size_t=20
tcpSocketStrand::MaxMTU() const {
  // No limit
  return UINT_MAX;
}


Strand::sbuf
tcpSocketStrand::receive(size_t size,
			 CORBA::Boolean exactly,
			 int align)=20
{
  giveback_received(0);

  size_t bsz =3D ((omni::ptr_arith_t) pd_rx_end -=20
		(omni::ptr_arith_t) pd_rx_begin);

  int current_alignment;
  omni::ptr_arith_t new_align_ptr;

  if (!bsz) {
    // No data left in receive buffer, fetch() and try again
    // rewind the buffer pointers to the beginning of the buffer and
    // at the same alignment as it is requested in <align>
    new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
				   omni::max_alignment) + align;
    if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
			  (int)omni::max_alignment)) {
      new_align_ptr -=3D (int) omni::max_alignment;
    }
    pd_rx_begin =3D pd_rx_received_end =3D pd_rx_end =3D (void =
*)new_align_ptr;

#ifndef DO_NOT_AVOID_MISALIGNMENT   =20
    fetch(size);
#else
    fetch();
#endif
    return receive(size,exactly,align);
  }

  if (align > (int)omni::max_alignment) {
    throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
  }

  current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
    ((int)omni::max_alignment - 1);
  if (current_alignment =3D=3D 0) {
    current_alignment =3D (int) omni::max_alignment;
  }

  if (current_alignment !=3D align) {
    // alignment is not right, move the data to the correct alignment
    new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
				   omni::max_alignment) + align;
    if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
			  (int)omni::max_alignment)) {
      new_align_ptr -=3D (int) omni::max_alignment;
    }
    memmove((void *)new_align_ptr,(void *)pd_rx_begin,bsz);
    pd_rx_begin =3D pd_rx_received_end =3D (void *)new_align_ptr;
    pd_rx_end =3D (void *)(new_align_ptr + bsz);
  }

  if (bsz < size) {
    if (exactly) {
      if (size > max_receive_buffer_size()) {
	throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
      }
      // Not enough data to satisfy the request, fetch() and try again
      // Check if there is enough empty space for fetch() to satisfy =
this
      // request. If necessary, make some space by moving existing data =
to
      // the beginning of the buffer. Always keep the buffer pointers at
      // the same alignment as they were previously

     =20
      size_t avail =3D tcpSocketStrand::buffer_size -=20
	((omni::ptr_arith_t) pd_rx_end -=20
	 (omni::ptr_arith_t) pd_rx_buffer) + bsz;
      if (avail < size) {
	// Not enough empty space, got to move existing data
	current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
	  ((int)omni::max_alignment - 1);
	if (current_alignment =3D=3D 0) {
	  current_alignment =3D (int) omni::max_alignment;
	}
	new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_rx_buffer,
				       omni::max_alignment) +=20
	  current_alignment;
	if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_rx_buffer +=20
			      (int)omni::max_alignment)) {
	  new_align_ptr -=3D (int) omni::max_alignment;
	}
	memmove((void *)new_align_ptr,pd_rx_begin,bsz);
	pd_rx_begin =3D pd_rx_received_end =3D (void *)new_align_ptr;
	pd_rx_end =3D (void *)(new_align_ptr + bsz);
      }
#ifndef DO_NOT_AVOID_MISALIGNMENT
      fetch(size-bsz);
#else
      fetch();
#endif
      return receive(size,exactly,align);
    }
    else {
      size =3D bsz;
    }
  }
  pd_rx_received_end =3D (void *)((omni::ptr_arith_t)pd_rx_begin + =
size);
  Strand::sbuf result;
  result.buffer =3D pd_rx_begin;
  result.size   =3D size;
  return result;
}

void=20
tcpSocketStrand::giveback_received(size_t leftover)=20
{
  size_t total =3D (omni::ptr_arith_t)pd_rx_received_end -
    (omni::ptr_arith_t)pd_rx_begin;
  if (total < leftover) {
    throw CORBA::MARSHAL(0,CORBA::COMPLETED_MAYBE);
  }
  total -=3D leftover;
  pd_rx_begin =3D (void *)((omni::ptr_arith_t)pd_rx_begin + total);
  pd_rx_received_end =3D pd_rx_begin;
  return;
}

size_t=20
tcpSocketStrand::max_receive_buffer_size()=20
{
  return tcpSocketStrand::buffer_size - (int)omni::max_alignment;
}

void
tcpSocketStrand::receive_and_copy(Strand::sbuf b)
{
  giveback_received(0);

  size_t sz =3D b.size;
  char  *p =3D (char *)b.buffer;

  size_t bsz =3D ((omni::ptr_arith_t)pd_rx_end -=20
		(omni::ptr_arith_t)pd_rx_begin);
  if (bsz) {
    if (bsz > sz) {
      bsz =3D sz;
    }
    memcpy((void *)p,pd_rx_begin,bsz);
    pd_rx_begin =3D (void *)((omni::ptr_arith_t) pd_rx_begin + bsz);
    pd_rx_received_end =3D pd_rx_begin;
    sz -=3D bsz;
    p +=3D bsz;
  }
  while (sz) {
    int rx;
#ifdef TRACE_RECV
    if (omniORB::traceLevel>=3D10) {
      cerr << "tcpSocketStrand::receive_and_copy--- recv " << pd_socket =
<< endl;
    }
#endif

	rx =3D pd_socket->recv(p, sz);
    if (rx =3D=3D baseSocket::SocketError) {
		if(pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
			continue;
		else
		{
			setStrandIsDying();
			throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
		}
    }
    else
      if (rx =3D=3D 0) {
	setStrandIsDying();
	throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
      }
#ifdef TRACE_RECV
    if (omniORB::traceLevel >=3D 10) {
      cerr << "tcpSocketStrand::receive_and_copy-- recv " << pd_socket =
<< " "
	   << rx << " bytes" << endl;
    }
#endif
    sz -=3D rx;
    p +=3D rx;
  }
}

void
tcpSocketStrand::skip(size_t size)
{
  giveback_received(0);

  while (size) {
    size_t sz;
    sz =3D max_receive_buffer_size();
    if (sz > size) {
      sz =3D size;
    }
    int current_alignment =3D (omni::ptr_arith_t) pd_rx_begin &
      ((int)omni::max_alignment - 1);
    if (current_alignment =3D=3D 0) {
      current_alignment =3D (int) omni::max_alignment;
    }
    Strand::sbuf sb =3D receive(sz,0,current_alignment);
    size -=3D sb.size;
  }
  return;
}

void
tcpSocketStrand::fetch(CORBA::ULong max)
{
  size_t bsz =3D tcpSocketStrand::buffer_size -
    ((omni::ptr_arith_t) pd_rx_end - (omni::ptr_arith_t) pd_rx_buffer);

  bsz =3D (max !=3D 0 && bsz > max) ? max : bsz;

  if (!bsz) return;

  int rx;
again:
#ifdef TRACE_RECV
  if (omniORB::traceLevel >=3D 10) {
    cerr << "tcpSocketStrand::fetch--- recv " << pd_socket << endl;
  }
#endif

	rx =3D pd_socket->recv((char *)pd_rx_end, bsz);
	if (rx =3D=3D baseSocket::SocketError) {
		if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
			goto again;
		else=20
		{
			setStrandIsDying();
			throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
		}
	}
	else=20
		if (rx =3D=3D 0) {
		  setStrandIsDying();
		  throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
		}
#ifdef TRACE_RECV
  if (omniORB::traceLevel >=3D 10) {
    cerr << "tcpSocketStrand::fetch-- recv " << pd_socket << " "
	 << rx << " bytes" << endl;
  }
#endif

  pd_rx_end =3D (void *)((omni::ptr_arith_t) pd_rx_end + rx);
  return;
}


Strand::sbuf=20
tcpSocketStrand::reserve(size_t size,
			 CORBA::Boolean exactly,
			 int align,
			 CORBA::Boolean tx)=20
{

  giveback_reserved(0,tx);
 =20
  size_t bsz =3D tcpSocketStrand::buffer_size -
    ((omni::ptr_arith_t) pd_tx_end - (omni::ptr_arith_t) pd_tx_buffer);
 =20
  if (!bsz) {
    // No space left, transmit and try again
    transmit();
    return reserve(size,exactly,align,tx);
  }

  if (align > (int)omni::max_alignment) {
    throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
  }

  int current_alignment =3D (omni::ptr_arith_t) pd_tx_end &=20
    ((int)omni::max_alignment - 1);
  if (current_alignment =3D=3D 0) {
    current_alignment =3D (int)omni::max_alignment;
  }

  if (current_alignment !=3D align) {
    // alignment is not right
    if (pd_tx_end =3D=3D pd_tx_begin) {
      // There is nothing in the buffer, we could adjust
      // pd_tx_begin and pd_tx_end to the required alignment
      omni::ptr_arith_t new_align_ptr;

      new_align_ptr =3D omni::align_to((omni::ptr_arith_t) pd_tx_buffer,
				     omni::max_alignment) + align;
      if (new_align_ptr >=3D ((omni::ptr_arith_t)pd_tx_buffer +=20
			    (int)omni::max_alignment)) {
	new_align_ptr -=3D (int) omni::max_alignment;
      }
      pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D (void =
*)new_align_ptr;
      bsz =3D tcpSocketStrand::buffer_size - ((omni::ptr_arith_t) =
pd_tx_end=20
					    - (omni::ptr_arith_t) pd_tx_buffer);
    }
    else {
      // transmit what is left and try again
      transmit();
      return reserve(size,exactly,align,tx);
    }
  }

  if (bsz < size) {
    if (exactly) {
      if (size > max_reserve_buffer_size()) {
	throw CORBA::INTERNAL(0,CORBA::COMPLETED_MAYBE);
      }
      // Not enough space to satisfy the request, transmit what is
      // left and try again
      transmit();
      return reserve(size,exactly,align,tx);
    }
    else {
      size =3D bsz;
    }
  }
  pd_tx_reserved_end =3D (void *)((omni::ptr_arith_t)pd_tx_end + size);
  Strand::sbuf result;
  result.buffer =3D pd_tx_end;
  result.size   =3D size;
  return result;
}

void
tcpSocketStrand::giveback_reserved(size_t leftover,
				   CORBA::Boolean tx)=20
{
  size_t total =3D (omni::ptr_arith_t)pd_tx_reserved_end -
    (omni::ptr_arith_t)pd_tx_end;
  if (total < leftover) {
    throw CORBA::MARSHAL(0,CORBA::COMPLETED_MAYBE);
  }
  total -=3D leftover;
  pd_tx_end =3D (void *)((omni::ptr_arith_t)pd_tx_end + total);
  pd_tx_reserved_end =3D pd_tx_end;
  if (tx) {
    transmit();
  }
  return;
}

void=20
tcpSocketStrand::reserve_and_copy(Strand::sbuf b,
				  CORBA::Boolean transmit)
{
  // transmit anything that is left in the transmit buffer
  giveback_reserved(0,1);
 =20
  // Transmit directly from the supplied buffer
  int tx;
  size_t sz =3D b.size;
  char *p =3D (char *)b.buffer;
  while (sz) {
#ifdef TRACE_SEND
    if (omniORB::traceLevel >=3D 10) {
      cerr << "tcpSocketStrand::reserve_and_copy-- send " <<
	pd_socket << " " << sz << " bytes" << endl;
    }
#endif
	tx =3D pd_socket->send(p, sz);
    if (tx =3D=3D baseSocket::SocketError) {
		if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
			continue;
		else=20
		{
			setStrandIsDying();
			throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
		}
    }
    else
      if (tx =3D=3D 0) {
	setStrandIsDying();
	throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
      }
    sz -=3D tx;
    p +=3D tx;
  }
}

size_t
tcpSocketStrand::max_reserve_buffer_size()=20
{
  return tcpSocketStrand::buffer_size - (int)omni::max_alignment;
}

void
tcpSocketStrand::transmit()=20
{
  size_t sz =3D (omni::ptr_arith_t)pd_tx_end -=20
    (omni::ptr_arith_t)pd_tx_begin;
  int tx;
  char *p =3D (char *)pd_tx_begin;
  while (sz) {
#ifdef TRACE_SEND
    if (omniORB::traceLevel >=3D 10) {
      cerr << "tcpSocketStrand::transmit-- send " <<
	pd_socket << " " << sz << " bytes" << endl;
    }
#endif
	tx =3D pd_socket->send(p, sz);
    if (tx =3D=3D baseSocket::SocketError) {
		if (pd_socket->getErrNo() =3D=3D baseSocket::eINTERRUPTED)
			continue;
		else=20
		{
			setStrandIsDying();
			throw =
CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_MAYBE);
		}
    }
    else
		if (tx =3D=3D 0)=20
		{
			setStrandIsDying();
			throw CORBA::COMM_FAILURE(0,CORBA::COMPLETED_MAYBE);
		}
    sz -=3D tx;
    p +=3D tx;
  }
  pd_tx_begin =3D pd_tx_end =3D pd_tx_reserved_end =3D pd_tx_buffer;
  return;
}

void
tcpSocketStrand::shutdown()
{
  setStrandIsDying();

  pd_socket->shutdown();
  delete pd_socket;
  pd_socket =3D 0;

  return;
}

tcpSocketRendezvous::tcpSocketRendezvous(tcpSocketRope =
*r,tcpSocketEndpoint *me)=20
{
  pd_socket =3D baseSocket::getSocket();
  if(pd_socket->bind(me) =3D=3D false)
		throw CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
   =20
  pd_rope =3D r;
  return;
}

tcpSocketRendezvous::tcpSocketRendezvous(tcpSocketRope =
*r,tcpSocketHandle_t sock)=20
{
  pd_socket =3D sock;
  pd_rope =3D r;
  return;
}

tcpSocketRendezvous::~tcpSocketRendezvous()=20
{
	if(pd_socket)
	{
		delete pd_socket;
		pd_socket =3D 0;
	}
}


tcpSocketStrand *
tcpSocketRendezvous::accept()=20
{
	tcpSocketHandle_t new_sock;
	new_sock =3D pd_socket->accept();
	if(new_sock =3D=3D 0)
	{
		throw CORBA::COMM_FAILURE(pd_socket->getErrNo(),CORBA::COMPLETED_NO);
	}

	tcpSocketStrand *ns;

	pd_rope->pd_lock.lock();
	try=20
	{
		ns =3D new tcpSocketStrand(pd_rope,new_sock,1);
		if (!ns)=20
		{
		  pd_rope->pd_lock.unlock();
		  throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
		}
		ns->incrRefCount(1);
	}
	catch (...)=20
	{
		pd_rope->pd_lock.unlock();
		throw;
	}
	pd_rope->pd_lock.unlock();
	return ns;
}

tcpSocketRope::tcpSocketRope(Anchor *a,
			     unsigned int maxStrands,
			     Endpoint *e,
			     CORBA::Boolean passive,
			     CORBA::Boolean heapAllocated)
  : Rope(a,maxStrands,heapAllocated)
{
  tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
  if (!te) {
    throw CORBA::INTERNAL(0,CORBA::COMPLETED_NO);
  }
  pd_is_passive =3D passive;
  if (!passive) {
    pd_endpoint.remote =3D new tcpSocketEndpoint(te);
  }
  else {
    pd_endpoint.me =3D new tcpSocketEndpoint(te);
    pd_rendezvous =3D new tcpSocketRendezvous(this,pd_endpoint.me);
    *te =3D *pd_endpoint.me;
  }
  return;
}

tcpSocketRope::~tcpSocketRope()
{
  if (is_passive()) {
    if (pd_rendezvous) {
      delete pd_rendezvous;
      pd_rendezvous =3D 0;
    }
    if (pd_endpoint.me) {
      delete pd_endpoint.me;
      pd_endpoint.me =3D 0;
    }
  }
  else {
    if (pd_endpoint.remote) {
      delete pd_endpoint.remote;
      pd_endpoint.remote =3D 0;
    }
  }
  return;
}

CORBA::Boolean
tcpSocketRope::remote_is(Endpoint *&e)
{
  if (is_passive())
    return 0;

  if (e) {
    tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
    if (!te)
      return 0;
    if (*te =3D=3D pd_endpoint.remote)
      return 1;
    else
      return 0;
  }
  else {
    e =3D new tcpSocketEndpoint(pd_endpoint.remote);
    if (!e)
      throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
    return 1;
  }
}

CORBA::Boolean
tcpSocketRope::this_is(Endpoint *&e)
{
  if (!is_passive())
    return 0;

  if (e) {
    tcpSocketEndpoint *te =3D tcpSocketEndpoint::castup(e);
    if (!te)
      return 0;
    if (*te =3D=3D pd_endpoint.me)
      return 1;
    else
      return 0;
  }
  else {
    e =3D new tcpSocketEndpoint(pd_endpoint.me);
    if (!e)
      throw CORBA::NO_MEMORY(0,CORBA::COMPLETED_NO);
    return 1;
  }
}

void
tcpSocketRope::iopProfile(const _CORBA_Octet *objkey,const size_t =
objkeysize,
			  IOP::TaggedProfile &p)
{
  p.tag =3D IOP::TAG_INTERNET_IOP;

  IIOP::ProfileBody b;
  b.iiop_version.major =3D IIOP::current_major;
  b.iiop_version.minor =3D IIOP::current_minor;
  if (is_passive()) {
    b.host =3D pd_endpoint.me->host();
    b.port =3D pd_endpoint.me->port();
  }
  else {
    b.host =3D pd_endpoint.remote->host();
    b.port =3D pd_endpoint.remote->port();
  }
  b.object_key.length((CORBA::ULong)objkeysize);
  memcpy((void *)&b.object_key[0],(void *)objkey,objkeysize);
  IIOP::profileToEncapStream(b,p.profile_data);
  return;
}


Strand *
tcpSocketRope::newStrand()
{
  if (is_passive()) {
    throw CORBA::INTERNAL(0,CORBA::COMPLETED_NO);
  }
  return new tcpSocketStrand(this,pd_endpoint.remote,1);
}

const int baseSocket::SocketError =3D -1;

baseSocket::~baseSocket()
{
}

------=_NextPart_000_01EF_01BCFF24.CDE73B00
Content-Type: application/octet-stream;
	name="tcpSocket_NT.h"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment;
	filename="tcpSocket_NT.h"

// -*- Mode: C++; -*-
//                            Package   : omniORB2
// TCPSocket_NT.h             Created on: 4/2/97
//                            Author    : Eoin Carroll (ewc)
//
//    Copyright (C) 1996, 1997 Olivetti & Oracle Research Laboratory
//
//    This file is part of the omniORB library
//
//    The omniORB library is free software; you can redistribute it =
and/or
//    modify it under the terms of the GNU Library General Public
//    License as published by the Free Software Foundation; either
//    version 2 of the License, or (at your option) any later version.
//
//    This library is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
//    Library General Public License for more details.
//
//    You should have received a copy of the GNU Library General Public
//    License along with this library; if not, write to the Free
//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA =
=20
//    02111-1307, USA
//
//
// Description:
//	Implementation of the Strand using TCP/IP and WinSock interface
//=09

/*
  $Log: /Apogee/dev/OmniORB/lib/omniORB2/tcpSocket_NT.h $
 *=20
 * 2     11/28/97 3:12p Bnewport
 * Now works with AGS for possible Mac port
 *=20
 * 1     11/26/97 2:51p Bnewport
 * Revision 1.3  1997/05/06  15:29:54  sll
 * Public release.
 *
  */

#ifndef __TCPSOCKET_NT_H__
#define __TCPSOCKET_NT_H__

class tcpSocketRope;
class tcpSocketStrand;
class tcpSocketEndpoint;

#include "baseSocket.h"
typedef baseSocket *tcpSocketHandle_t;


class tcpSocketEndpoint : public Endpoint {
public:
  tcpSocketEndpoint(CORBA::Char *h,CORBA::UShort p)=20
    : Endpoint((CORBA::Char *)"TCPIP")=20
  {
    pd_host =3D 0;
    pd_port =3D 0;
    host(h);
    port(p);
    return;
  }

  tcpSocketEndpoint(const tcpSocketEndpoint *e)=20
    : Endpoint((CORBA::Char *)"TCPIP")=20
  {
    pd_host =3D 0;
    pd_port =3D 0;
    host(e->host());
    port(e->port());
    return;
  }

  tcpSocketEndpoint &operator=3D(const tcpSocketEndpoint &e)=20
  {
    host(e.host());
    port(e.port());
    return *this;
  }
 =20
  CORBA::Boolean operator=3D=3D(const tcpSocketEndpoint *e)
  {
    if ((strcmp((const char *)pd_host,
		(const char *)e->host())=3D=3D0) && (pd_port =3D=3D e->port()))
      return 1;
    else
      return 0;
  }=09

  virtual ~tcpSocketEndpoint() {
    if (pd_host) delete [] pd_host;
    return;
  }

  CORBA::Char * host() const { return pd_host; }

  void host(const CORBA::Char *p) {
    if (pd_host) delete [] pd_host;
    if (p) {
      pd_host =3D new CORBA::Char [strlen((char *)p) + 1];
      strcpy((char *)pd_host,(char *)p);
    }
    else {
      pd_host =3D new CORBA::Char [1];
      pd_host[0] =3D '\0';
    }
    return;
  }
  CORBA::UShort port() const { return pd_port; }
  void port(const CORBA::UShort p) { pd_port =3D p; }
 =20
  static tcpSocketEndpoint *castup(Endpoint *e) {
    if (e->is_protocol((CORBA::Char *)"TCPIP")) {
      return (tcpSocketEndpoint *)e;
    }
    else {
      return 0;
    }
  }
 =20
private:
  CORBA::Char  *pd_host;
  CORBA::UShort pd_port;
 =20
  tcpSocketEndpoint();
};


class tcpSocketRendezvous {
public:
  tcpSocketRendezvous(tcpSocketRope *r,tcpSocketEndpoint *me);
  tcpSocketRendezvous(tcpSocketRope *r,tcpSocketHandle_t sock);
  virtual ~tcpSocketRendezvous();

  tcpSocketStrand * accept();

  static CORBA::Boolean has_spawned_rendevous_threads;

private:
  tcpSocketRope    *pd_rope;
  tcpSocketHandle_t pd_socket;
};

class tcpSocketStrand : public Strand {
public:

  static const unsigned int buffer_size;

  tcpSocketStrand(tcpSocketRope *r,
		  tcpSocketEndpoint *remote,
		  _CORBA_Boolean heapAllocated =3D 0);
  // Concurrency Control:
  //    MUTEX =3D r->pd_lock
  // Pre-condition:
  //	  Must hold <MUTEX> on entry
  // Post-condition:
  //    Still hold <MUTEX> on exit, even if an exception is raised

  tcpSocketStrand(tcpSocketRope *r,
		  tcpSocketHandle_t sock,
		  _CORBA_Boolean heapAllocated =3D 0);
  // Concurrency Control:
  //    MUTEX =3D r->pd_lock
  // Pre-condition:
  //	  Must hold <MUTEX> on entry
  // Post-condition:
  //    Still hold <MUTEX> on exit, even if an exception is raised

  virtual ~tcpSocketStrand();
  // MUTEX:
  //    pd_rope->pd_lock
  // Pre-condition:
  //    Must hold <MUTEX> on entry
  // Post-condition:
  //    Still hold <MUTEX> on exit

  virtual size_t MaxMTU() const;

  virtual Strand::sbuf receive(size_t size,
			       CORBA::Boolean exactly,
			       int align);

  virtual void giveback_received(size_t leftover);
  virtual size_t max_receive_buffer_size();
  virtual void receive_and_copy(Strand::sbuf b);
  virtual void skip(size_t size);
  virtual Strand::sbuf reserve(size_t size,
			       CORBA::Boolean exactly,
			       int align,
			       CORBA::Boolean transmit=3D0);
  virtual void giveback_reserved(size_t leftover,
				 CORBA::Boolean transmit=3D0);
  virtual size_t max_reserve_buffer_size();
  virtual void reserve_and_copy(Strand::sbuf b,
				CORBA::Boolean transmit=3D0);
  virtual void shutdown();

private:

  void transmit();
  void fetch(CORBA::ULong max=3D0);
  // fetch data from the network to the internal buffer.
  // If <max>=3D0, fetch as much as possible, otherwise fetch at most =
<max>
  // bytes.

  tcpSocketHandle_t pd_socket;
  void    *pd_tx_buffer;
  void    *pd_tx_begin;
  void    *pd_tx_end;
  void    *pd_tx_reserved_end;

  void    *pd_rx_buffer;
  void    *pd_rx_begin;
  void    *pd_rx_end;
  void    *pd_rx_received_end;
   =20
};

class tcpSocketRope : public Rope {
public:
  tcpSocketRope(Anchor *a,
		unsigned int maxStrands,
		Endpoint *e,=20
		_CORBA_Boolean passive =3D 0,
		_CORBA_Boolean heapAllocated =3D 0);
  // Create a tcpSocket Rope.
  // If passive =3D=3D 1,
  //      create a passive socket. e->port() specifies the port number =
to
  //      bind to or 0 if an arbitary port number can be assigned).=20
  //      e->host() can either be the host's fully qualified domain name =

  //      (FQDN) or a 0 length string. If it is the latter, the =
constructor
  //      will initialise the host field with the host's IP address.
  //      This is a dot separated numeric string of the form =
"xxx.xxx.xxx.xxx".
  // If passive =3D=3D 0,
  //      The endpoint <e> is the remote endpoint and should contain the
  //      host name in FQDN form or as a dot separeted numeric string.
  // Concurrency Control:
  //    MUTEX =3D a->pd_lock
  // Pre-condition:
  //    Must hold <MUTEX> on entry
  // Post-condition:
  //    Still hold <MUTEX> on exit, even if an exception is raised

  virtual ~tcpSocketRope();
  // Concurrency Control:
  //    MUTEX =3D pd_anchor->pd_lock
  // Pre-condition:
  //    Must hold <MUTEX> on entry
  // Post-condition:
  //    Still hold <MUTEX> on exit

  virtual CORBA::Boolean remote_is(Endpoint *&e);
   =20
  virtual CORBA::Boolean this_is(Endpoint *&e);
 =20
  virtual void iopProfile(const _CORBA_Octet *objkey,const size_t =
objkeysize,
			  IOP::TaggedProfile &p);
  virtual Strand *newStrand();

  CORBA::Boolean is_passive() { return pd_is_passive; }
  tcpSocketRendezvous * getRendezvous() { return pd_rendezvous; }

  friend class tcpSocketRendezvous;
private:
  CORBA::Boolean pd_is_passive;
  union {
    tcpSocketEndpoint *remote;
    tcpSocketEndpoint *me;
  } pd_endpoint;
  tcpSocketRendezvous * pd_rendezvous;
};

#endif // __TCPSOCKET_NT_H__

------=_NextPart_000_01EF_01BCFF24.CDE73B00--