You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
550 lines
12 KiB
550 lines
12 KiB
/*
|
|
This file implements the Weaver, Job and Thread classes.
|
|
|
|
$ Author: Mirko Boehm $
|
|
$ Copyright: (C) 2004, Mirko Boehm $
|
|
$ Contact: mirko@kde.org
|
|
http://www.kde.org
|
|
http://www.hackerbuero.org $
|
|
$ License: LGPL with the following explicit clarification:
|
|
This code may be linked against any version of the TQt toolkit
|
|
from Troll Tech, Norway. $
|
|
|
|
*/
|
|
|
|
extern "C" {
|
|
#include <signal.h>
|
|
}
|
|
|
|
#include <tqevent.h>
|
|
#include <tqapplication.h>
|
|
|
|
#include "weaver.h"
|
|
|
|
namespace KPIM {
|
|
namespace ThreadWeaver {
|
|
|
|
bool Debug = true;
|
|
int DebugLevel = 2;
|
|
|
|
Job::Job (TQObject* parent, const char* name)
|
|
: TQObject (parent, name),
|
|
m_finished (false),
|
|
m_mutex (new TQMutex (true) ),
|
|
m_thread (0)
|
|
{
|
|
}
|
|
|
|
Job::~Job()
|
|
{
|
|
}
|
|
|
|
void Job::lock()
|
|
{
|
|
m_mutex->lock();
|
|
}
|
|
|
|
void Job::unlock()
|
|
{
|
|
m_mutex->unlock();
|
|
}
|
|
|
|
void Job::execute(Thread *th)
|
|
{
|
|
m_mutex->lock();
|
|
m_thread = th;
|
|
m_mutex->unlock();
|
|
|
|
run ();
|
|
|
|
m_mutex->lock();
|
|
setFinished (true);
|
|
m_thread = 0;
|
|
m_mutex->unlock();
|
|
}
|
|
|
|
Thread *Job::thread ()
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_thread;
|
|
}
|
|
|
|
bool Job::isFinished() const
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_finished;
|
|
}
|
|
|
|
void Job::setFinished(bool status)
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
m_finished = status;
|
|
}
|
|
|
|
void Job::processEvent (Event *e)
|
|
{
|
|
switch ( e->action() )
|
|
{
|
|
case Event::JobStarted:
|
|
emit ( started() );
|
|
break;
|
|
case Event::JobFinished:
|
|
emit ( done() );
|
|
break;
|
|
case Event::JobSPR:
|
|
emit ( SPR () );
|
|
m_wc->wakeOne ();
|
|
break;
|
|
case Event::JobAPR:
|
|
emit ( APR () );
|
|
// no wake here !
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
void Job::triggerSPR ()
|
|
{
|
|
m_mutex->lock ();
|
|
m_wc = new TQWaitCondition;
|
|
m_mutex->unlock ();
|
|
|
|
thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
|
|
m_wc->wait ();
|
|
|
|
m_mutex->lock ();
|
|
delete m_wc;
|
|
m_wc = 0;
|
|
m_mutex->unlock ();
|
|
}
|
|
|
|
void Job::triggerAPR ()
|
|
{
|
|
m_mutex->lock ();
|
|
m_wc = new TQWaitCondition;
|
|
m_mutex->unlock ();
|
|
|
|
thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this);
|
|
m_wc->wait ();
|
|
}
|
|
|
|
void Job::wakeAPR ()
|
|
{
|
|
TQMutexLocker l(m_mutex);
|
|
if ( m_wc!=0 )
|
|
{
|
|
m_wc->wakeOne ();
|
|
delete m_wc;
|
|
m_wc = 0;
|
|
}
|
|
}
|
|
|
|
const int Event::Type = TQEvent::User + 1000;
|
|
|
|
Event::Event ( Action action, Thread *thread, Job *job)
|
|
: TQCustomEvent ( type () ),
|
|
m_action (action),
|
|
m_thread (thread),
|
|
m_job (job)
|
|
{
|
|
}
|
|
|
|
int Event::type ()
|
|
{
|
|
return Type;
|
|
}
|
|
|
|
Thread* Event::thread () const
|
|
{
|
|
if ( m_thread != 0)
|
|
{
|
|
return m_thread;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
Job* Event::job () const
|
|
{
|
|
return m_job;
|
|
}
|
|
|
|
Event::Action Event::action () const
|
|
{
|
|
return m_action;
|
|
}
|
|
|
|
unsigned int Thread::sm_Id;
|
|
|
|
Thread::Thread (Weaver *parent)
|
|
: TQThread (),
|
|
m_parent ( parent ),
|
|
m_id ( makeId() )
|
|
{
|
|
}
|
|
|
|
Thread::~Thread()
|
|
{
|
|
}
|
|
|
|
unsigned int Thread::makeId()
|
|
{
|
|
static TQMutex mutex;
|
|
TQMutexLocker l (&mutex);
|
|
|
|
return ++sm_Id;
|
|
}
|
|
|
|
unsigned int Thread::id() const
|
|
{
|
|
return m_id;
|
|
}
|
|
|
|
void Thread::run()
|
|
{
|
|
Job *job = 0;
|
|
|
|
post ( Event::ThreadStarted );
|
|
|
|
while (true)
|
|
{
|
|
debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
|
|
|
|
job = m_parent->applyForWork ( this, job );
|
|
|
|
if (job == 0)
|
|
{
|
|
break;
|
|
} else {
|
|
post ( Event::JobStarted, job );
|
|
job->execute (this);
|
|
post ( Event::JobFinished, job );
|
|
}
|
|
}
|
|
|
|
post ( Event::ThreadExiting );
|
|
}
|
|
|
|
void Thread::post (Event::Action a, Job *j)
|
|
{
|
|
m_parent->post ( a, this, j);
|
|
}
|
|
|
|
void Thread::msleep(unsigned long msec)
|
|
{
|
|
TQThread::msleep(msec);
|
|
}
|
|
|
|
Weaver::Weaver(TQObject* parent, const char* name,
|
|
int inventoryMin, int inventoryMax)
|
|
: TQObject(parent, name),
|
|
m_active(0),
|
|
m_inventoryMin(inventoryMin),
|
|
m_inventoryMax(inventoryMax),
|
|
m_shuttingDown(false),
|
|
m_running (false),
|
|
m_suspend (false),
|
|
m_mutex ( new TQMutex(true) )
|
|
{
|
|
lock();
|
|
|
|
for ( int count = 0; count < m_inventoryMin; ++count)
|
|
{
|
|
Thread *th = new Thread(this);
|
|
m_inventory.append(th);
|
|
// this will idle the thread, waiting for a job
|
|
th->start();
|
|
|
|
emit (threadCreated (th) );
|
|
}
|
|
|
|
unlock();
|
|
}
|
|
|
|
Weaver::~Weaver()
|
|
{
|
|
lock();
|
|
|
|
debug ( 1, "Weaver dtor: destroying inventory.\n" );
|
|
|
|
m_shuttingDown = true;
|
|
|
|
unlock();
|
|
|
|
m_jobAvailable.wakeAll();
|
|
|
|
// problem: Some threads might not be asleep yet, just finding
|
|
// out if a job is available. Those threads will suspend
|
|
// waiting for their next job (a rare case, but not impossible).
|
|
// Therefore, if we encounter a thread that has not exited, we
|
|
// have to wake it again (which we do in the following for
|
|
// loop).
|
|
|
|
for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
|
|
{
|
|
if ( !th->finished() )
|
|
{
|
|
m_jobAvailable.wakeAll();
|
|
th->wait();
|
|
}
|
|
|
|
emit (threadDestroyed (th) );
|
|
delete th;
|
|
|
|
}
|
|
|
|
m_inventory.clear();
|
|
|
|
delete m_mutex;
|
|
|
|
debug ( 1, "Weaver dtor: done\n" );
|
|
|
|
}
|
|
|
|
void Weaver::lock()
|
|
{
|
|
debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
|
|
( m_mutex->locked() ? "locked" : "not locked" ) );
|
|
m_mutex->lock();
|
|
}
|
|
|
|
void Weaver::unlock()
|
|
{
|
|
m_mutex->unlock();
|
|
|
|
debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
|
|
( m_mutex->locked() ? "locked" : "not locked" ) );
|
|
}
|
|
|
|
int Weaver::threads () const
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_inventory.count ();
|
|
}
|
|
|
|
void Weaver::enqueue(Job* job)
|
|
{
|
|
lock();
|
|
|
|
m_assignments.append(job);
|
|
m_running = true;
|
|
|
|
unlock();
|
|
|
|
assignJobs();
|
|
}
|
|
|
|
void Weaver::enqueue (TQPtrList <Job> jobs)
|
|
{
|
|
lock();
|
|
|
|
for ( Job * job = jobs.first(); job; job = jobs.next() )
|
|
{
|
|
m_assignments.append (job);
|
|
}
|
|
|
|
unlock();
|
|
|
|
assignJobs();
|
|
}
|
|
|
|
bool Weaver::dequeue ( Job* job )
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_assignments.remove (job);
|
|
}
|
|
|
|
void Weaver::dequeue ()
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
m_assignments.clear();
|
|
}
|
|
|
|
void Weaver::suspend (bool state)
|
|
{
|
|
lock();
|
|
|
|
if (state)
|
|
{
|
|
// no need to wake any threads here
|
|
m_suspend = true;
|
|
if ( m_active == 0 && isEmpty() )
|
|
{ // instead of waking up threads:
|
|
post (Event::Suspended);
|
|
}
|
|
} else {
|
|
m_suspend = false;
|
|
// make sure we emit suspended () even if all threads are sleeping:
|
|
assignJobs ();
|
|
debug (2, "Weaver::suspend: queueing resumed.\n" );
|
|
}
|
|
|
|
unlock();
|
|
}
|
|
|
|
void Weaver::assignJobs()
|
|
{
|
|
m_jobAvailable.wakeAll();
|
|
}
|
|
|
|
bool Weaver::event (TQEvent *e )
|
|
{
|
|
if ( e->type() >= TQEvent::User )
|
|
{
|
|
|
|
if ( e->type() == Event::type() )
|
|
{
|
|
Event *event = (Event*) e;
|
|
|
|
switch (event->action() )
|
|
{
|
|
case Event::JobFinished:
|
|
if ( event->job() !=0 )
|
|
{
|
|
emit (jobDone (event->job() ) );
|
|
}
|
|
break;
|
|
case Event::Finished:
|
|
emit ( finished() );
|
|
break;
|
|
case Event::Suspended:
|
|
emit ( suspended() );
|
|
break;
|
|
case Event::ThreadSuspended:
|
|
if (!m_shuttingDown )
|
|
{
|
|
emit (threadSuspended ( event->thread() ) );
|
|
}
|
|
break;
|
|
case Event::ThreadBusy:
|
|
if (!m_shuttingDown )
|
|
{
|
|
emit (threadBusy (event->thread() ) );
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if ( event->job() !=0 )
|
|
{
|
|
event->job()->processEvent (event);
|
|
}
|
|
} else {
|
|
debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
|
|
}
|
|
return true;
|
|
} else {
|
|
// others - please make sure we are a TQObject!
|
|
return TQObject::event ( e );
|
|
}
|
|
}
|
|
|
|
void Weaver::post (Event::Action a, Thread* t, Job* j)
|
|
{
|
|
Event *e = new Event ( a, t, j);
|
|
TQApplication::postEvent (this, e);
|
|
}
|
|
|
|
bool Weaver::isEmpty() const
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_assignments.count()==0;
|
|
}
|
|
|
|
Job* Weaver::applyForWork(Thread *th, Job* previous)
|
|
{
|
|
Job *rc = 0;
|
|
bool lastjob = false;
|
|
bool suspended = false;
|
|
|
|
while (true)
|
|
{
|
|
lock();
|
|
|
|
if (previous != 0)
|
|
{ // cleanup and send events:
|
|
--m_active;
|
|
|
|
debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
|
|
"%i active jobs left.\n",
|
|
queueLength(), m_active );
|
|
|
|
if ( m_active == 0 && isEmpty() )
|
|
{
|
|
lastjob = true;
|
|
m_running = false;
|
|
post (Event::Finished);
|
|
debug ( 3, "Weaver::applyForWork: last job.\n" );
|
|
}
|
|
|
|
if (m_active == 0 && m_suspend == true)
|
|
{
|
|
suspended = true;
|
|
post (Event::Suspended);
|
|
debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
|
|
}
|
|
|
|
m_jobFinished.wakeOne();
|
|
}
|
|
|
|
previous = 0;
|
|
|
|
if (m_shuttingDown == true)
|
|
{
|
|
unlock();
|
|
|
|
return 0;
|
|
} else {
|
|
if ( !isEmpty() && m_suspend == false )
|
|
{
|
|
rc = m_assignments.getFirst();
|
|
m_assignments.removeFirst ();
|
|
++m_active;
|
|
|
|
debug ( 3, "Weaver::applyForWork: job assigned, "
|
|
"%i jobs in queue (%i active).\n",
|
|
m_assignments.count(), m_active );
|
|
unlock();
|
|
|
|
post (Event::ThreadBusy, th);
|
|
|
|
return rc;
|
|
} else {
|
|
unlock();
|
|
|
|
post (Event::ThreadSuspended, th);
|
|
m_jobAvailable.wait();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
int Weaver::queueLength()
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return m_assignments.count();
|
|
}
|
|
|
|
bool Weaver::isIdle () const
|
|
{
|
|
TQMutexLocker l (m_mutex);
|
|
return isEmpty() && m_active == 0;
|
|
}
|
|
|
|
void Weaver::finish()
|
|
{
|
|
while ( !isIdle() )
|
|
{
|
|
debug (2, "Weaver::finish: not done, waiting.\n" );
|
|
m_jobFinished.wait();
|
|
}
|
|
debug (1, "Weaver::finish: done.\n\n\n" );
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
#include "weaver.moc"
|