You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tdegames/libtdegames/kgame/kmessageio.cpp

483 lines
13 KiB

/*
This file is part of the TDE games library
Copyright (C) 2001 Burkhard Lehner (Burkhard.Lehner@gmx.de)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License version 2 as published by the Free Software Foundation.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
/*
KMessageIO class and subclasses KMessageSocket and KMessageDirect
*/
#include "kmessageio.h"
#include <tqsocket.h>
#include <kdebug.h>
#include <kprocess.h>
#include <tqfile.h>
// ----------------------- KMessageIO -------------------------
KMessageIO::KMessageIO (TQObject *parent, const char *name)
: TQObject (parent, name), m_id (0)
{}
KMessageIO::~KMessageIO ()
{}
void KMessageIO::setId (TQ_UINT32 id)
{
m_id = id;
}
TQ_UINT32 KMessageIO::id ()
{
return m_id;
}
// ----------------------KMessageSocket -----------------------
KMessageSocket::KMessageSocket (TQString host, TQ_UINT16 port, TQObject *parent,
const char *name)
: KMessageIO (parent, name)
{
mSocket = new TQSocket ();
mSocket->connectToHost (host, port);
initSocket ();
}
KMessageSocket::KMessageSocket (TQHostAddress host, TQ_UINT16 port, TQObject
*parent, const char *name)
: KMessageIO (parent, name)
{
mSocket = new TQSocket ();
mSocket->connectToHost (host.toString(), port);
initSocket ();
}
KMessageSocket::KMessageSocket (TQSocket *socket, TQObject *parent, const char
*name)
: KMessageIO (parent, name)
{
mSocket = socket;
initSocket ();
}
KMessageSocket::KMessageSocket (int socketFD, TQObject *parent, const char
*name)
: KMessageIO (parent, name)
{
mSocket = new TQSocket ();
mSocket->setSocket (socketFD);
initSocket ();
}
KMessageSocket::~KMessageSocket ()
{
delete mSocket;
}
bool KMessageSocket::isConnected () const
{
return mSocket->state() == TQSocket::Connection;
}
void KMessageSocket::send (const TQByteArray &msg)
{
TQDataStream str (mSocket);
str << TQ_UINT8 ('M'); // magic number for begin of message
str.writeBytes (msg.data(), msg.size()); // writes the length (as TQ_UINT32) and the data
}
void KMessageSocket::processNewData ()
{
if (isRecursive)
return;
isRecursive = true;
TQDataStream str (mSocket);
while (mSocket->bytesAvailable() > 0)
{
if (mAwaitingHeader)
{
// Header = magic number + packet length = 5 bytes
if (mSocket->bytesAvailable() < 5)
{
isRecursive = false;
return;
}
// Read the magic number first. If something unexpected is found,
// start over again, ignoring the data that was read up to then.
TQ_UINT8 v;
str >> v;
if (v != 'M')
{
kdWarning(11001) << k_funcinfo << ": Received unexpected data, magic number wrong!" << endl;
continue;
}
str >> mNextBlockLength;
mAwaitingHeader = false;
}
else
{
// Data not completely read => wait for more
if (mSocket->bytesAvailable() < (TQ_ULONG) mNextBlockLength)
{
isRecursive = false;
return;
}
TQByteArray msg (mNextBlockLength);
str.readRawBytes (msg.data(), mNextBlockLength);
// send the received message
emit received (msg);
// Waiting for the header of the next message
mAwaitingHeader = true;
}
}
isRecursive = false;
}
void KMessageSocket::initSocket ()
{
connect (mSocket, TQT_SIGNAL (error(int)), TQT_SIGNAL (connectionBroken()));
connect (mSocket, TQT_SIGNAL (connectionClosed()), TQT_SIGNAL (connectionBroken()));
connect (mSocket, TQT_SIGNAL (readyRead()), TQT_SLOT (processNewData()));
mAwaitingHeader = true;
mNextBlockLength = 0;
isRecursive = false;
}
TQ_UINT16 KMessageSocket::peerPort () const
{
return mSocket->peerPort();
}
TQString KMessageSocket::peerName () const
{
return mSocket->peerName();
}
// ----------------------KMessageDirect -----------------------
KMessageDirect::KMessageDirect (KMessageDirect *partner, TQObject *parent,
const char *name)
: KMessageIO (parent, name), mPartner (0)
{
// 0 as first parameter leaves the object unconnected
if (!partner)
return;
// Check if the other object is already connected
if (partner && partner->mPartner)
{
kdWarning(11001) << k_funcinfo << ": Object is already connected!" << endl;
return;
}
// Connect from us to that object
mPartner = partner;
// Connect the other object to us
partner->mPartner = this;
}
KMessageDirect::~KMessageDirect ()
{
if (mPartner)
{
mPartner->mPartner = 0;
emit mPartner->connectionBroken();
}
}
bool KMessageDirect::isConnected () const
{
return mPartner != 0;
}
void KMessageDirect::send (const TQByteArray &msg)
{
if (mPartner)
emit mPartner->received (msg);
else
kdError(11001) << k_funcinfo << ": Not yet connected!" << endl;
}
// ----------------------- KMessageProcess ---------------------------
KMessageProcess::~KMessageProcess()
{
kdDebug(11001) << "@@@KMessageProcess::Delete process" << endl;
if (mProcess)
{
mProcess->kill();
delete mProcess;
mProcess=0;
// Remove not send buffers
mQueue.setAutoDelete(true);
mQueue.clear();
// Maybe todo: delete mSendBuffer
}
}
KMessageProcess::KMessageProcess(TQObject *parent, TQString file) : KMessageIO(parent,0)
{
// Start process
kdDebug(11001) << "@@@KMessageProcess::Start process" << endl;
mProcessName=file;
mProcess=new TDEProcess;
int id=0;
*mProcess << mProcessName << TQString("%1").arg(id);
kdDebug(11001) << "@@@KMessageProcess::Init:Id= " << id << endl;
kdDebug(11001) << "@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
connect(mProcess, TQT_SIGNAL(receivedStdout(TDEProcess *, char *, int )),
this, TQT_SLOT(slotReceivedStdout(TDEProcess *, char * , int )));
connect(mProcess, TQT_SIGNAL(receivedStderr(TDEProcess *, char *, int )),
this, TQT_SLOT(slotReceivedStderr(TDEProcess *, char * , int )));
connect(mProcess, TQT_SIGNAL(processExited(TDEProcess *)),
this, TQT_SLOT(slotProcessExited(TDEProcess *)));
connect(mProcess, TQT_SIGNAL(wroteStdin(TDEProcess *)),
this, TQT_SLOT(slotWroteStdin(TDEProcess *)));
mProcess->start(TDEProcess::NotifyOnExit,TDEProcess::All);
mSendBuffer=0;
mReceiveCount=0;
mReceiveBuffer.resize(1024);
}
bool KMessageProcess::isConnected() const
{
kdDebug(11001) << "@@@KMessageProcess::Is conencted" << endl;
if (!mProcess) return false;
return mProcess->isRunning();
}
void KMessageProcess::send(const TQByteArray &msg)
{
kdDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process" << endl;
unsigned int size=msg.size()+2*sizeof(long);
char *tmpbuffer=new char[size];
long *p1=(long *)tmpbuffer;
long *p2=p1+1;
kdDebug(11001) << "p1="<<p1 << "p2="<< p2 << endl;
memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
*p1=0x4242aeae;
*p2=size;
TQByteArray *buffer=new TQByteArray();
buffer->assign(tmpbuffer,size);
// buffer->duplicate(msg);
mQueue.enqueue(buffer);
writeToProcess();
}
void KMessageProcess::writeToProcess()
{
// Previous send ok and item in queue
if (mSendBuffer || mQueue.isEmpty()) return ;
mSendBuffer=mQueue.dequeue();
if (!mSendBuffer) return ;
// write it out to the process
// kdDebug(11001) << " @@@@@@ writeToProcess::SEND to process " << mSendBuffer->size() << " BYTE " << endl;
// char *p=mSendBuffer->data();
// for (int i=0;i<16;i++) printf("%02x ",(unsigned char)(*(p+i)));printf("\n");
mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
}
void KMessageProcess::slotWroteStdin(TDEProcess * )
{
kdDebug(11001) << k_funcinfo << endl;
if (mSendBuffer)
{
delete mSendBuffer;
mSendBuffer=0;
}
writeToProcess();
}
void KMessageProcess::slotReceivedStderr(TDEProcess * proc, char *buffer, int buflen)
{
int pid=0;
int len;
char *p;
char *pos;
// kdDebug(11001)<<"############# Got stderr " << buflen << " bytes" << endl;
if (!buffer || buflen==0) return ;
if (proc) pid=proc->pid();
pos=buffer;
do
{
p=(char *)memchr(pos,'\n',buflen);
if (!p) len=buflen;
else len=p-pos;
TQByteArray a;
a.setRawData(pos,len);
TQString s(a);
kdDebug(11001) << "PID" <<pid<< ":" << s << endl;
a.resetRawData(pos,len);
if (p) pos=p+1;
buflen-=len+1;
}while(buflen>0);
}
void KMessageProcess::slotReceivedStdout(TDEProcess * , char *buffer, int buflen)
{
kdDebug(11001) << "$$$$$$ " << k_funcinfo << ": Received " << buflen << " bytes over inter process communication" << endl;
// TODO Make a plausibility check on buflen to avoid memory overflow
while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
mReceiveCount+=buflen;
// Possbile message
while (mReceiveCount>2*sizeof(long))
{
long *p1=(long *)mReceiveBuffer.data();
long *p2=p1+1;
unsigned int len;
if (*p1!=0x4242aeae)
{
kdDebug(11001) << k_funcinfo << ": Cookie error...transmission failure...serious problem..." << endl;
// for (int i=0;i<mReceiveCount;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
}
len=(int)(*p2);
if (len<2*sizeof(long))
{
kdDebug(11001) << k_funcinfo << ": Message size error" << endl;
break;
}
if (len<=mReceiveCount)
{
kdDebug(11001) << k_funcinfo << ": Got message with len " << len << endl;
TQByteArray msg;
// msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
emit received(msg);
// msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
// Shift buffer
if (len<mReceiveCount)
{
memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
}
mReceiveCount-=len;
}
else break;
}
}
void KMessageProcess::slotProcessExited(TDEProcess * /*p*/)
{
kdDebug(11001) << "Process exited (slot)" << endl;
emit connectionBroken();
delete mProcess;
mProcess=0;
}
// ----------------------- KMessageFilePipe ---------------------------
KMessageFilePipe::KMessageFilePipe(TQObject *parent,TQFile *readfile,TQFile *writefile) : KMessageIO(parent,0)
{
mReadFile=readfile;
mWriteFile=writefile;
mReceiveCount=0;
mReceiveBuffer.resize(1024);
}
KMessageFilePipe::~KMessageFilePipe()
{
}
bool KMessageFilePipe::isConnected () const
{
return (mReadFile!=0)&&(mWriteFile!=0);
}
void KMessageFilePipe::send(const TQByteArray &msg)
{
unsigned int size=msg.size()+2*sizeof(long);
char *tmpbuffer=new char[size];
long *p1=(long *)tmpbuffer;
long *p2=p1+1;
memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
*p1=0x4242aeae;
*p2=size;
TQByteArray buffer;
buffer.assign(tmpbuffer,size);
mWriteFile->writeBlock(buffer);
mWriteFile->flush();
/*
fprintf(stderr,"+++ KMessageFilePipe:: SEND(%d to parent) realsize=%d\n",msg.size(),buffer.size());
for (int i=0;i<buffer.size();i++) fprintf(stderr,"%02x ",buffer[i]);fprintf(stderr,"\n");
fflush(stderr);
*/
}
void KMessageFilePipe::exec()
{
// According to BL: Blocking read is ok
// while(mReadFile->atEnd()) { usleep(100); }
int ch=mReadFile->getch();
while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
mReceiveBuffer[mReceiveCount]=(char)ch;
mReceiveCount++;
// Change for message
if (mReceiveCount>=2*sizeof(long))
{
long *p1=(long *)mReceiveBuffer.data();
long *p2=p1+1;
unsigned int len;
if (*p1!=0x4242aeae)
{
fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
// for (int i=0;i<16;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
}
len=(int)(*p2);
if (len==mReceiveCount)
{
//fprintf(stderr,"KMessageFilePipe::exec:: Got Message with len %d\n",len);
TQByteArray msg;
//msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
emit received(msg);
//msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
mReceiveCount=0;
}
}
return ;
}
#include "kmessageio.moc"