diff options
Diffstat (limited to 'base/threading/sequenced_worker_pool.cc')
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 79 |
1 files changed, 64 insertions, 15 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 05ccff1..6d47068 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -57,20 +57,33 @@ class SequencedWorkerPool::Inner Inner(size_t max_threads, const std::string& thread_name_prefix); virtual ~Inner(); - // Backends for SequenceWorkerPool. SequenceToken GetSequenceToken(); + SequenceToken GetNamedSequenceToken(const std::string& name); - bool PostTask(int sequence_token_id, + + // This function accepts a name and an ID. If the name is null, the + // token ID is used. This allows us to implement the optional name lookup + // from a single function without having to enter the lock a separate time. + bool PostTask(const std::string* optional_token_name, + int sequence_token_id, SequencedWorkerPool::WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, const base::Closure& task); + + void Flush(); + void Shutdown(); + void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); // Runs the worker loop on the background thread. void ThreadLoop(Worker* this_worker); private: + // Called from within the lock, this converts the given token name into a + // token ID, creating a new one if necessary. + int LockedGetNamedTokenID(const std::string& name); + // The calling code should clear the given delete_these_oustide_lock // vector the next time the lock is released. See the implementation for // a more detailed description. @@ -235,18 +248,11 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::Inner::GetNamedSequenceToken( const std::string& name) { base::AutoLock lock(lock_); - std::map<std::string, int>::const_iterator found = - named_sequence_tokens_.find(name); - if (found != named_sequence_tokens_.end()) - return SequenceToken(found->second); // Got an existing one. - - // Create a new one for this name. - SequenceToken result = GetSequenceToken(); - named_sequence_tokens_.insert(std::make_pair(name, result.id_)); - return result; + return SequenceToken(LockedGetNamedTokenID(name)); } bool SequencedWorkerPool::Inner::PostTask( + const std::string* optional_token_name, int sequence_token_id, SequencedWorkerPool::WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, @@ -263,6 +269,10 @@ bool SequencedWorkerPool::Inner::PostTask( if (terminating_) return false; + // Now that we have the lock, apply the named token rules. + if (optional_token_name) + sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); + pending_tasks_.push_back(sequenced); pending_task_count_++; if (shutdown_behavior == BLOCK_SHUTDOWN) @@ -281,6 +291,15 @@ bool SequencedWorkerPool::Inner::PostTask( return true; } +void SequencedWorkerPool::Inner::Flush() { + { + base::AutoLock lock(lock_); + while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) + cond_var_.Wait(); + } + cond_var_.Signal(); +} + void SequencedWorkerPool::Inner::Shutdown() { if (shutdown_called_) return; @@ -366,6 +385,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { if (terminating_) break; waiting_thread_count_++; + cond_var_.Signal(); // For Flush() that may be waiting on the + // waiting thread count to go up. cond_var_.Wait(); waiting_thread_count_--; } @@ -377,6 +398,22 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { cond_var_.Signal(); } +int SequencedWorkerPool::Inner::LockedGetNamedTokenID( + const std::string& name) { + lock_.AssertAcquired(); + DCHECK(!name.empty()); + + std::map<std::string, int>::const_iterator found = + named_sequence_tokens_.find(name); + if (found != named_sequence_tokens_.end()) + return found->second; // Got an existing one. + + // Create a new one for this name. + SequenceToken result = GetSequenceToken(); + named_sequence_tokens_.insert(std::make_pair(name, result.id_)); + return result.id_; +} + bool SequencedWorkerPool::Inner::GetWork( SequencedTask* task, std::vector<base::Closure>* delete_these_outside_lock) { @@ -593,33 +630,45 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( bool SequencedWorkerPool::PostWorkerTask( const tracked_objects::Location& from_here, const base::Closure& task) { - return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); + return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); } bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( const tracked_objects::Location& from_here, const base::Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(0, shutdown_behavior, from_here, task); + return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); } bool SequencedWorkerPool::PostSequencedWorkerTask( SequenceToken sequence_token, const tracked_objects::Location& from_here, const base::Closure& task) { - return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, + return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, from_here, task); } +bool SequencedWorkerPool::PostNamedSequencedWorkerTask( + const std::string& token_name, + const tracked_objects::Location& from_here, + const base::Closure& task) { + DCHECK(!token_name.empty()); + return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); +} + bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( SequenceToken sequence_token, const tracked_objects::Location& from_here, const base::Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(sequence_token.id_, shutdown_behavior, + return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, from_here, task); } +void SequencedWorkerPool::FlushForTesting() { + inner_->Flush(); +} + void SequencedWorkerPool::Shutdown() { inner_->Shutdown(); } |