// Author: Max Howell (C) Copyright 2004 // (c) 2005 Jeff Mitchell // See COPYING file that comes with this distribution // // the asserts we use in this module prevent crashes, so best to abort the application if they fail #define TQT_FATAL_ASSERT #define DEBUG_PREFIX "ThreadManager" #include #include #include //we're emulating features of TQt 4, so this can be removed for Amarok 2.0 #include "debug.h" #include "statusbar.h" #include "threadmanager.h" #include "collectiondb.h" #include "amarokconfig.h" using Amarok::StatusBar; volatile uint ThreadManager::threadIdCounter = 1; //main thread grabs zero TQMutex* ThreadManager::threadIdMutex = new TQMutex(); ThreadManager::ThreadManager() { startTimer( 5 * 60 * 1000 ); // prunes the thread pool every 5 minutes } ThreadManager::~ThreadManager() { DEBUG_BLOCK for( ThreadList::Iterator it = m_threads.begin(), end = m_threads.end(); it != end; ++it ) { #ifdef HAVE_INOTIFY // we don't delete the thread's job as amarok is gone // and the Job dtor may expect amarok to be there etc. if ( (*it)->job() && (*it)->job()->name() == TQCString( "INotify" ) ) { debug() << "Forcibly terminating INotify thread...\n"; (*it)->terminate(); continue; } #endif if( (*it)->job() && (*it)->job()->name() ) debug() << "Waiting on thread " << (*it)->job()->name() << "...\n"; else debug() << "Waiting on thread...\n"; (*it)->wait(); } } uint ThreadManager::jobCount( const TQCString &name ) { uint count = 0; for( JobList::Iterator it = m_jobs.begin(), end = m_jobs.end(); it != end; ++it ) if ( name == (*it)->name() ) count++; return count; } int ThreadManager::queueJob( Job *job ) { SHOULD_BE_GUI if (!job) return -1; // this list contains all pending and running jobs m_jobs += job; const uint count = jobCount( job->name() ); if ( count == 1 ) gimmeThread()->runJob( job ); return count; } int ThreadManager::queueJobs( const JobList &jobs ) { SHOULD_BE_GUI if ( jobs.isEmpty() ) return -1; m_jobs += jobs; const TQCString name = jobs.front()->name(); const uint count = jobCount( name ); if ( count == jobs.count() ) gimmeThread()->runJob( jobs.front() ); return count; } void ThreadManager::onlyOneJob( Job *job ) { SHOULD_BE_GUI const TQCString name = job->name(); // first cause all current jobs with this name to be aborted abortAllJobsNamed( name ); // now queue this job. // if there is a running Job of its type this one will be // started when that one returns to the GUI thread. m_jobs += job; // if there weren't any jobs of this type running, we must // start this job. if ( jobCount( name ) == 1 ) gimmeThread()->runJob( job ); } int ThreadManager::abortAllJobsNamed( const TQCString &name ) { SHOULD_BE_GUI int count = 0; for( JobList::Iterator it = m_jobs.begin(), end = m_jobs.end(); it != end; ++it ) if ( name == (*it)->name() ) { count++; (*it)->abort(); } return count; } ThreadManager::Thread* ThreadManager::gimmeThread() { for( ThreadList::ConstIterator it = m_threads.begin(), end = m_threads.end(); it != end; ++it ) if ( !(*it)->running() && (*it)->job() == 0 ) return *it; Thread *thread = new Thread; m_threads += thread; return thread; } bool ThreadManager::event( TQEvent *e ) { switch( e->type() ) { case JobEvent: { Job *job = static_cast( e ); DebugStream d = debug() << "Job "; const TQCString name = job->name(); Thread *thread = job->m_thread; TQApplication::postEvent( ThreadManager::instance(), new TQCustomEvent( ThreadManager::RestoreOverrideCursorEvent ) ); if ( !job->isAborted() ) { d << "completed"; job->completeJob(); } else d << "aborted"; m_jobs.remove( job ); d << ": " << name; d << ". Jobs pending: " << jobCount( name ); d << endl; for( JobList::ConstIterator it = m_jobs.begin(), end = m_jobs.end(); it != end; ++it ) if ( name == (*it)->name() ) { thread->runJob( (*it) ); return true; } // this thread is done thread->m_job = 0; break; } case TQEvent::Timer: debug() << "Threads in pool: " << m_threads.count() << endl; // for( ThreadList::Iterator it = m_threads.begin(), end = m_threads.end(); it != end; ++it ) // if ( (*it)->readyForTrash() ) { // m_threads.remove( it ); // delete *it; // break; // only delete 1 thread every 5 minutes // } break; case OverrideCursorEvent: // we have to do this for the PlaylistLoader case, as TQt uses the same // function for drag and drop operations. if (tqApp->type() != TQApplication::Tty) TQApplication::setOverrideCursor( KCursor::workingCursor() ); break; case RestoreOverrideCursorEvent: // we have to do this for the PlaylistLoader case, as TQt uses the same // function for drag and drop operations. if (tqApp->type() != TQApplication::Tty) TQApplication::restoreOverrideCursor(); break; default: return false; } return true; } //Taken from TQt 4 src/corelib/thread/qthread_unix.cpp static pthread_once_t current_thread_key_once = PTHREAD_ONCE_INIT; static pthread_key_t current_thread_key; static void create_current_thread_key() { debug() << "Creating pthread key, exit value is " << pthread_key_create(¤t_thread_key, NULL) << endl; } /// @class ThreadManager::Thread ThreadManager::Thread::Thread() : TQThread() {} ThreadManager::Thread::~Thread() { Q_ASSERT( finished() ); } TQThread* ThreadManager::Thread::getRunning() { pthread_once( ¤t_thread_key_once, create_current_thread_key ); return reinterpret_cast( pthread_getspecific( current_thread_key ) ); } TQString ThreadManager::Thread::threadId() { if (!getRunning()) return "None"; else { TQString s; return s.sprintf( "%p", getRunning() ); } } void ThreadManager::Thread::runJob( Job *job ) { job->m_thread = this; job->m_parentThreadId = m_threadId; if ( job->isAborted() ) TQApplication::postEvent( ThreadManager::instance(), job ); else { m_job = job; start( Thread::IdlePriority ); //will wait() first if necessary TQApplication::postEvent( ThreadManager::instance(), new TQCustomEvent( ThreadManager::OverrideCursorEvent ) ); } } void ThreadManager::Thread::run() { // BE THREAD-SAFE! DEBUG_BLOCK //keep this first, before anything that uses the database, or STQLite may error out if ( AmarokConfig::databaseEngine().toInt() == DbConnection::sqlite ) CollectionDB::instance()->releasePreviousConnection( this ); //register this thread so that it can be returned in a static getRunning() function m_threadId = ThreadManager::getNewThreadId(); pthread_once( ¤t_thread_key_once, create_current_thread_key ); pthread_setspecific( current_thread_key, this ); if( m_job ) { m_job->m_aborted |= !m_job->doJob(); TQApplication::postEvent( ThreadManager::instance(), m_job ); } // almost always the thread doesn't finish until after the // above event is already finished processing } /// @class ProgressEvent /// @short Used by ThreadManager::Job internally class ProgressEvent : public TQCustomEvent { public: ProgressEvent( int progress ) : TQCustomEvent( 30303 ) , progress( progress ) {} const int progress; }; /// @class ThreadManager::Job ThreadManager::Job::Job( const char *name ) : TQCustomEvent( ThreadManager::JobEvent ) , m_name( name ) , m_thread( 0 ) , m_percentDone( 0 ) , m_progressDone( 0 ) , m_totalSteps( 1 ) // no divide by zero {} ThreadManager::Job::~Job() { if( m_thread->running() && m_thread->job() == this ) warning() << "Deleting a job before its thread has finished with it!\n"; } void ThreadManager::Job::setProgressTotalSteps( uint steps ) { if ( steps == 0 ) { warning() << k_funcinfo << "You can't set steps to 0!\n"; steps = 1; } m_totalSteps = steps; TQApplication::postEvent( this, new ProgressEvent( -1 ) ); } void ThreadManager::Job::setProgress( uint steps ) { m_progressDone = steps; uint newPercent = uint( (100 * steps) / m_totalSteps); if ( newPercent != m_percentDone ) { m_percentDone = newPercent; TQApplication::postEvent( this, new ProgressEvent( newPercent ) ); } } void ThreadManager::Job::settqStatus( const TQString &status ) { m_status = status; TQApplication::postEvent( this, new ProgressEvent( -2 ) ); } void ThreadManager::Job::incrementProgress() { setProgress( m_progressDone + 1 ); } void ThreadManager::Job::customEvent( TQCustomEvent *e ) { int progress = static_cast(e)->progress; switch( progress ) { case -2: StatusBar::instance()->setProgresstqStatus( this, m_status ); break; case -1: StatusBar::instance()->newProgressOperation( this ) .setDescription( m_description ) .setAbortSlot( this, TQT_SLOT(abort()) ) .setTotalSteps( 100 ); break; default: StatusBar::instance()->setProgress( this, progress ); } } ThreadManager::DependentJob::DependentJob( TQObject *dependent, const char *name ) : Job( name ) , m_dependent( dependent ) { connect( dependent, TQT_SIGNAL(destroyed()), TQT_SLOT(abort()) ); TQApplication::postEvent( dependent, new TQCustomEvent( JobStartedEvent ) ); } void ThreadManager::DependentJob::completeJob() { //synchronous, so we don't get deleted twice TQApplication::sendEvent( m_dependent, this ); } #include "threadmanager.moc" #undef TQT_FATAL_ASSERT //enable-final