diff options
author | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-19 04:11:57 +0000 |
---|---|---|
committer | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-19 04:11:57 +0000 |
commit | 3189013eebc55ea5493186b29f2cee2bc7c0dafc (patch) | |
tree | f9c867f92c5fd9325a7d7e6ea9fed638103c4f4b /base | |
parent | 1946e0c4351fbdfd5b012d3c2aeac2c9218ad027 (diff) | |
download | chromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.zip chromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.tar.gz chromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.tar.bz2 |
Hook up the SequencedWorkerPool to the browser thread.
[re-land of 116816 http://codereview.chromium.org/9065009]
This does some refactoring of the static data in the browser thread so we only have one global object instead of a bunch fo separate arrays.
It also hooks up the visited link master's I/O to use this new system as a proof of concept.
Review URL: https://chromiumcodereview.appspot.com/9124033
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@118236 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 79 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 16 |
2 files changed, 80 insertions, 15 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 05ccff1..6d47068 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -57,20 +57,33 @@ class SequencedWorkerPool::Inner Inner(size_t max_threads, const std::string& thread_name_prefix); virtual ~Inner(); - // Backends for SequenceWorkerPool. SequenceToken GetSequenceToken(); + SequenceToken GetNamedSequenceToken(const std::string& name); - bool PostTask(int sequence_token_id, + + // This function accepts a name and an ID. If the name is null, the + // token ID is used. This allows us to implement the optional name lookup + // from a single function without having to enter the lock a separate time. + bool PostTask(const std::string* optional_token_name, + int sequence_token_id, SequencedWorkerPool::WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, const base::Closure& task); + + void Flush(); + void Shutdown(); + void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); // Runs the worker loop on the background thread. void ThreadLoop(Worker* this_worker); private: + // Called from within the lock, this converts the given token name into a + // token ID, creating a new one if necessary. + int LockedGetNamedTokenID(const std::string& name); + // The calling code should clear the given delete_these_oustide_lock // vector the next time the lock is released. See the implementation for // a more detailed description. @@ -235,18 +248,11 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::Inner::GetNamedSequenceToken( const std::string& name) { base::AutoLock lock(lock_); - std::map<std::string, int>::const_iterator found = - named_sequence_tokens_.find(name); - if (found != named_sequence_tokens_.end()) - return SequenceToken(found->second); // Got an existing one. - - // Create a new one for this name. - SequenceToken result = GetSequenceToken(); - named_sequence_tokens_.insert(std::make_pair(name, result.id_)); - return result; + return SequenceToken(LockedGetNamedTokenID(name)); } bool SequencedWorkerPool::Inner::PostTask( + const std::string* optional_token_name, int sequence_token_id, SequencedWorkerPool::WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, @@ -263,6 +269,10 @@ bool SequencedWorkerPool::Inner::PostTask( if (terminating_) return false; + // Now that we have the lock, apply the named token rules. + if (optional_token_name) + sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); + pending_tasks_.push_back(sequenced); pending_task_count_++; if (shutdown_behavior == BLOCK_SHUTDOWN) @@ -281,6 +291,15 @@ bool SequencedWorkerPool::Inner::PostTask( return true; } +void SequencedWorkerPool::Inner::Flush() { + { + base::AutoLock lock(lock_); + while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) + cond_var_.Wait(); + } + cond_var_.Signal(); +} + void SequencedWorkerPool::Inner::Shutdown() { if (shutdown_called_) return; @@ -366,6 +385,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { if (terminating_) break; waiting_thread_count_++; + cond_var_.Signal(); // For Flush() that may be waiting on the + // waiting thread count to go up. cond_var_.Wait(); waiting_thread_count_--; } @@ -377,6 +398,22 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { cond_var_.Signal(); } +int SequencedWorkerPool::Inner::LockedGetNamedTokenID( + const std::string& name) { + lock_.AssertAcquired(); + DCHECK(!name.empty()); + + std::map<std::string, int>::const_iterator found = + named_sequence_tokens_.find(name); + if (found != named_sequence_tokens_.end()) + return found->second; // Got an existing one. + + // Create a new one for this name. + SequenceToken result = GetSequenceToken(); + named_sequence_tokens_.insert(std::make_pair(name, result.id_)); + return result.id_; +} + bool SequencedWorkerPool::Inner::GetWork( SequencedTask* task, std::vector<base::Closure>* delete_these_outside_lock) { @@ -593,33 +630,45 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( bool SequencedWorkerPool::PostWorkerTask( const tracked_objects::Location& from_here, const base::Closure& task) { - return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); + return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); } bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( const tracked_objects::Location& from_here, const base::Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(0, shutdown_behavior, from_here, task); + return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); } bool SequencedWorkerPool::PostSequencedWorkerTask( SequenceToken sequence_token, const tracked_objects::Location& from_here, const base::Closure& task) { - return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, + return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, from_here, task); } +bool SequencedWorkerPool::PostNamedSequencedWorkerTask( + const std::string& token_name, + const tracked_objects::Location& from_here, + const base::Closure& task) { + DCHECK(!token_name.empty()); + return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); +} + bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( SequenceToken sequence_token, const tracked_objects::Location& from_here, const base::Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(sequence_token.id_, shutdown_behavior, + return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, from_here, task); } +void SequencedWorkerPool::FlushForTesting() { + inner_->Flush(); +} + void SequencedWorkerPool::Shutdown() { inner_->Shutdown(); } diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index c6e0560..f5770a7 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -172,6 +172,12 @@ class BASE_EXPORT SequencedWorkerPool { const tracked_objects::Location& from_here, const base::Closure& task); + // Like PostSequencedWorkerTask above, but allows you to specify a named + // token, which saves an extra call to GetNamedSequenceToken. + bool PostNamedSequencedWorkerTask(const std::string& token_name, + const tracked_objects::Location& from_here, + const base::Closure& task); + // Same as PostSequencedWorkerTask but allows specification of the shutdown // behavior. bool PostSequencedWorkerTaskWithShutdownBehavior( @@ -180,6 +186,16 @@ class BASE_EXPORT SequencedWorkerPool { const base::Closure& task, WorkerShutdown shutdown_behavior); + // Blocks until all pending tasks are complete. This should only be called in + // unit tests when you want to validate something that should have happened. + // + // Note that calling this will not prevent other threads from posting work to + // the queue while the calling thread is waiting on Flush(). In this case, + // Flush will return only when there's no more work in the queue. Normally, + // this doesn't come up sine in a test, all the work is being posted from + // the main thread. + void FlushForTesting(); + // Implements the worker pool shutdown. This should be called during app // shutdown, and will discard/join with appropriate tasks before returning. // After this call, subsequent calls to post tasks will fail. |