You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
289 lines
5.3 KiB
289 lines
5.3 KiB
/*
|
|
reads input data
|
|
Copyright (C) 1999 Martin Vogt
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU Library General Public License as published by
|
|
the Free Software Foundation.
|
|
|
|
For more information look at the file COPYRIGHT in this package
|
|
|
|
*/
|
|
|
|
|
|
#include "bufferInputStream.h"
|
|
#include "simpleRingBuffer.h"
|
|
|
|
|
|
BufferInputStream::BufferInputStream(int size,int minlinSize,
|
|
const char* name) {
|
|
ringBuffer=new SimpleRingBuffer(size,minlinSize);
|
|
leof=false;
|
|
bytePos=0;
|
|
fillgrade=0;
|
|
setUrl(name);
|
|
lLock=false;
|
|
abs_thread_mutex_init(&writeInMut);
|
|
}
|
|
|
|
|
|
BufferInputStream::~BufferInputStream() {
|
|
delete ringBuffer;
|
|
abs_thread_mutex_destroy(&writeInMut);
|
|
}
|
|
|
|
|
|
int BufferInputStream::open(const char*) {
|
|
leof=false;
|
|
setBlocking(true);
|
|
return true;
|
|
}
|
|
|
|
void BufferInputStream::close() {
|
|
leof=true;
|
|
setBlocking(false);
|
|
}
|
|
|
|
int BufferInputStream::eof() {
|
|
return (leof && (fillgrade==0));
|
|
}
|
|
|
|
int BufferInputStream::isOpen() {
|
|
return !leof;
|
|
}
|
|
|
|
void BufferInputStream::setBlocking(int lblock) {
|
|
ringBuffer->setCanWaitForSpace(lblock);
|
|
ringBuffer->setCanWaitForData(lblock);
|
|
}
|
|
|
|
int BufferInputStream::getHold() {
|
|
return lLock;
|
|
}
|
|
|
|
|
|
void BufferInputStream::setHold(int lLock) {
|
|
if (lLock) {
|
|
lockBuffer();
|
|
} else {
|
|
unlockBuffer();
|
|
}
|
|
this->lLock=lLock;
|
|
}
|
|
|
|
int BufferInputStream::read(char* ptr,int size) {
|
|
int i=0;
|
|
int n=size;
|
|
int canCopy=n;
|
|
char* readPtr;
|
|
// here we read even if leof is true
|
|
// we make sure that we empty the whole buffer!
|
|
while((eof()==false) && (n > 0)) {
|
|
canCopy=n;
|
|
ringBuffer->getReadArea(readPtr,canCopy);
|
|
if (canCopy <= 0){
|
|
ringBuffer->waitForData(1);
|
|
continue;
|
|
}
|
|
if (n < canCopy) {
|
|
canCopy=n;
|
|
}
|
|
memcpy((char*)ptr+i,readPtr,canCopy);
|
|
i=i+canCopy;
|
|
n=n-canCopy;
|
|
ringBuffer->forwardReadPtr(canCopy);
|
|
ringBuffer->forwardLockPtr(canCopy);
|
|
lockBuffer();
|
|
bytePos+=canCopy;
|
|
fillgrade-=canCopy;
|
|
unlockBuffer();
|
|
}
|
|
return i;
|
|
}
|
|
|
|
|
|
int BufferInputStream::write(char* ptr,int len,TimeStamp* stamp) {
|
|
int i=0;
|
|
int n=len;
|
|
int canWrite=n;
|
|
char* writePtr;
|
|
|
|
if (stamp) {
|
|
lockBuffer();
|
|
long key;
|
|
key=bytePos+fillgrade;
|
|
InputStream::insertTimeStamp(stamp,key,len);
|
|
unlockBuffer();
|
|
}
|
|
// if eof is set we do not insert any more data
|
|
// we do not call eof() !!!
|
|
while((leof==false) && (n > 0)) {
|
|
canWrite=n;
|
|
ringBuffer->getWriteArea(writePtr,canWrite);
|
|
if (canWrite <= 0){
|
|
ringBuffer->waitForSpace(1);
|
|
continue;
|
|
}
|
|
if (canWrite > n) {
|
|
canWrite=n;
|
|
}
|
|
memcpy(writePtr,(char*)ptr+i,canWrite);
|
|
i=i+canWrite;
|
|
n=n-canWrite;
|
|
ringBuffer->forwardWritePtr(canWrite);
|
|
lockBuffer();
|
|
fillgrade+=canWrite;
|
|
unlockBuffer();
|
|
}
|
|
|
|
return i;
|
|
}
|
|
|
|
|
|
|
|
int BufferInputStream::write(InputStream* input,int len,TimeStamp* stamp) {
|
|
int i=0;
|
|
int n=len;
|
|
int canWrite=n;
|
|
int didWrite;
|
|
char* writePtr;
|
|
|
|
if (stamp) {
|
|
lockBuffer();
|
|
long key;
|
|
key=bytePos+fillgrade;
|
|
InputStream::insertTimeStamp(stamp,key,len);
|
|
unlockBuffer();
|
|
}
|
|
// if eof is set we do not insert any more data
|
|
// we do not call eof() !!!
|
|
while((leof==false) && (n > 0)) {
|
|
canWrite=n;
|
|
ringBuffer->getWriteArea(writePtr,canWrite);
|
|
if (canWrite <= 0){
|
|
ringBuffer->waitForSpace(1);
|
|
continue;
|
|
}
|
|
if (canWrite > n) {
|
|
canWrite=n;
|
|
}
|
|
didWrite=input->read(writePtr,canWrite);
|
|
if (input->eof()) break;
|
|
i=i+didWrite;
|
|
n=n-didWrite;
|
|
ringBuffer->forwardWritePtr(didWrite);
|
|
lockBuffer();
|
|
fillgrade+=canWrite;
|
|
unlockBuffer();
|
|
}
|
|
|
|
return i;
|
|
}
|
|
|
|
|
|
|
|
long BufferInputStream::getByteLength() {
|
|
return ringBuffer->getFillgrade();
|
|
}
|
|
|
|
int BufferInputStream::getFillgrade() {
|
|
return ringBuffer->getFillgrade();
|
|
}
|
|
|
|
|
|
int BufferInputStream::getFreeRead() {
|
|
return ringBuffer->getFreeRead();
|
|
}
|
|
|
|
|
|
int BufferInputStream::getFreeSpace() {
|
|
return ringBuffer->getFreeWrite();
|
|
}
|
|
|
|
|
|
|
|
long BufferInputStream::getBytePosition() {
|
|
return bytePos;
|
|
}
|
|
|
|
void BufferInputStream::setBytePosition(long bytePos) {
|
|
this->bytePos=bytePos;
|
|
}
|
|
|
|
|
|
int BufferInputStream::seek(long) {
|
|
return false;
|
|
}
|
|
|
|
|
|
void BufferInputStream::clear() {
|
|
|
|
ringBuffer->emptyBuffer();
|
|
ringBuffer->exitWaitForData();
|
|
ringBuffer->exitWaitForSpace();
|
|
timeStampArray->clear();
|
|
|
|
lockBuffer();
|
|
bytePos=0;
|
|
fillgrade=0;
|
|
unlockBuffer();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// remote read extension
|
|
int BufferInputStream::readRemote(char** ptr,int size) {
|
|
int n=0;
|
|
char* readPtr;
|
|
while((eof()==false)) {
|
|
n=size;
|
|
ringBuffer->getReadArea(readPtr,n);
|
|
if (n < size){
|
|
ringBuffer->waitForData(size);
|
|
if (ringBuffer->getCanWaitForData()==false) {
|
|
break;
|
|
}
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
*ptr=readPtr;
|
|
return n;
|
|
}
|
|
|
|
|
|
void BufferInputStream::forwardReadPtr(int bytes) {
|
|
|
|
ringBuffer->forwardReadPtr(bytes);
|
|
ringBuffer->forwardLockPtr(bytes);
|
|
lockBuffer();
|
|
bytePos+=bytes;
|
|
fillgrade-=bytes;
|
|
unlockBuffer();
|
|
getTimeStamp(bytePos);
|
|
}
|
|
|
|
|
|
void BufferInputStream::setCanWaitForData(int lBlock) {
|
|
ringBuffer->setCanWaitForData(lBlock);
|
|
}
|
|
|
|
|
|
void BufferInputStream::lockBuffer() {
|
|
abs_thread_mutex_lock(&writeInMut);
|
|
}
|
|
|
|
|
|
void BufferInputStream::unlockBuffer() {
|
|
abs_thread_mutex_unlock(&writeInMut);
|
|
}
|
|
|
|
|
|
int BufferInputStream::getSize() {
|
|
return ringBuffer->getSize();
|
|
}
|
|
|
|
|