You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tdelibs/arts/kde/kioinputstream_impl.cpp

237 lines
6.4 KiB

/*
Copyright (C) 2001 Nikolas Zimmermann <wildfox@kde.org>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
/*
* How does it work?
* -----------------
*
* First the buffer has to be filled. When it reaches a defined size the outdata
* stream has to start pulling packets. If the buffer reaches a size of zero the
* stream has to stop. If the buffer gets to big the job has to be suspended
* until the buffer is small enough again.
*/
#include <tdeapplication.h>
#include <kdebug.h>
#include <tdeio/job.h>
#include <tdeio/kmimetype.h>
#include <tdeio/jobclasses.h>
#include <tqtimer.h>
#include <tqdatastream.h>
#include "artsversion.h"
#include "kioinputstream_impl.moc"
using namespace Arts;
const unsigned int TDEIOInputStream_impl::PACKET_COUNT = 10;
TDEIOInputStream_impl::TDEIOInputStream_impl() : m_packetSize(2048)
{
m_job = 0;
m_finished = false;
m_firstBuffer = false;
m_packetBuffer = 16;
m_streamStarted = false;
m_streamSuspended = false;
m_streamPulled = false;
m_size = 0;
}
TDEIOInputStream_impl::~TDEIOInputStream_impl()
{
if(m_job != 0)
m_job->kill();
}
void TDEIOInputStream_impl::streamStart()
{
// prevent kill/reconnect
if (m_streamStarted) {
kdDebug( 400 ) << "not restarting stream!\n";
if (m_job->isSuspended())
m_job->resume();
return;
}
kdDebug( 400 ) << "(re)starting stream\n";
if(m_job != 0)
m_job->kill();
m_job = TDEIO::get(m_url, false, false);
m_job->addMetaData("accept", "audio/x-mp3, video/mpeg, application/ogg");
m_job->addMetaData("UserAgent", TQString::fromLatin1("aRts/") + TQString::fromLatin1(ARTS_VERSION));
TQObject::connect(m_job, TQT_SIGNAL(data(TDEIO::Job *, const TQByteArray &)),
this, TQT_SLOT(slotData(TDEIO::Job *, const TQByteArray &)));
TQObject::connect(m_job, TQT_SIGNAL(result(TDEIO::Job *)),
this, TQT_SLOT(slotResult(TDEIO::Job *)));
TQObject::connect(m_job, TQT_SIGNAL(mimetype(TDEIO::Job *, const TQString &)),
this, TQT_SLOT(slotScanMimeType(TDEIO::Job *, const TQString &)));
TQObject::connect(m_job, TQT_SIGNAL(totalSize( TDEIO::Job *, TDEIO::filesize_t)),
this, TQT_SLOT(slotTotalSize(TDEIO::Job *, TDEIO::filesize_t)));
m_streamStarted = true;
}
void TDEIOInputStream_impl::streamEnd()
{
kdDebug( 400 ) << "streamEnd()\n";
if(m_job != 0)
{
TQObject::disconnect(m_job, TQT_SIGNAL(data(TDEIO::Job *, const TQByteArray &)),
this, TQT_SLOT(slotData(TDEIO::Job *, const TQByteArray &)));
TQObject::disconnect(m_job, TQT_SIGNAL(result(TDEIO::Job *)),
this, TQT_SLOT(slotResult(TDEIO::Job *)));
TQObject::disconnect(m_job, TQT_SIGNAL(mimetype(TDEIO::Job *, const TQString &)),
this, TQT_SLOT(slotScanMimeType(TDEIO::Job *, const TQString &)));
TQObject::disconnect(m_job, TQT_SIGNAL(totalSize( TDEIO::Job *, TDEIO::filesize_t)),
this, TQT_SLOT(slotTotalSize(TDEIO::Job *, TDEIO::filesize_t)));
if ( m_streamPulled )
outdata.endPull();
m_job->kill();
m_job = 0;
}
m_streamStarted = false;
}
bool TDEIOInputStream_impl::openURL(const std::string& url)
{
m_url = KURL(url.c_str());
m_size = 0;
return true;
}
void TDEIOInputStream_impl::slotData(TDEIO::Job *, const TQByteArray &data)
{
if(m_finished)
m_finished = false;
TQDataStream dataStream(m_data, IO_WriteOnly | IO_Append);
dataStream.writeRawBytes(data.data(), data.size());
//kdDebug( 400 ) << "STREAMING: buffersize = " << m_data.size() << " bytes" << endl;
processQueue();
}
void TDEIOInputStream_impl::slotResult(TDEIO::Job *job)
{
// jobs delete themselves after emitting their result
m_finished = true;
m_streamStarted = false;
m_job = 0;
if(job->error()) {
// break out of the event loop in case of
// connection error
emit mimeTypeFound("application/x-zerosize");
job->showErrorDialog();
}
}
void TDEIOInputStream_impl::slotScanMimeType(TDEIO::Job *, const TQString &mimetype)
{
kdDebug( 400 ) << "got mimetype: " << mimetype << endl;
emit mimeTypeFound(mimetype);
}
void TDEIOInputStream_impl::slotTotalSize(TDEIO::Job *, TDEIO::filesize_t size)
{
m_size = size;
}
bool TDEIOInputStream_impl::eof()
{
return (m_finished && m_data.size() == 0);
}
bool TDEIOInputStream_impl::seekOk()
{
return false;
}
long TDEIOInputStream_impl::size()
{
return m_size ? m_size : m_data.size();
}
long TDEIOInputStream_impl::seek(long)
{
return -1;
}
void TDEIOInputStream_impl::processQueue()
{
if(m_job != 0)
{
if(m_data.size() > (m_packetBuffer * m_packetSize * 2) && !m_job->isSuspended())
{
kdDebug( 400 ) << "STREAMING: suspend job" << endl;
m_job->suspend();
}
else if(m_data.size() < (m_packetBuffer * m_packetSize) && m_job->isSuspended())
{
kdDebug( 400 ) << "STREAMING: resume job" << endl;
m_job->resume();
}
}
if (!m_firstBuffer) {
if(m_data.size() < (m_packetBuffer * m_packetSize * 2) ) {
kdDebug( 400 ) << "STREAMING: Buffering in progress... (Needed bytes before it starts to play: " << ((m_packetBuffer * m_packetSize * 2) - m_data.size()) << ")" << endl;
return;
} else {
m_firstBuffer = true;
m_streamPulled = true;
outdata.setPull(PACKET_COUNT, m_packetSize);
}
}
}
void TDEIOInputStream_impl::request_outdata(DataPacket<mcopbyte> *packet)
{
processQueue();
packet->size = std::min(m_packetSize, (unsigned int)m_data.size());
kdDebug( 400 ) << "STREAMING: Filling one DataPacket with " << packet->size << " bytes of the stream!" << endl;
if (!m_finished) {
if( (unsigned)packet->size < m_packetSize || ! m_firstBuffer) {
m_firstBuffer = false;
packet->size = 0;
outdata.endPull();
}
}
if (packet->size > 0)
{
memcpy(packet->contents, m_data.data(), packet->size);
memmove(m_data.data(), m_data.data() + packet->size, m_data.size() - packet->size);
m_data.resize(m_data.size() - packet->size);
}
packet->send();
}
REGISTER_IMPLEMENTATION(TDEIOInputStream_impl);