summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-19 04:11:57 +0000
committerbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-19 04:11:57 +0000
commit3189013eebc55ea5493186b29f2cee2bc7c0dafc (patch)
treef9c867f92c5fd9325a7d7e6ea9fed638103c4f4b /base
parent1946e0c4351fbdfd5b012d3c2aeac2c9218ad027 (diff)
downloadchromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.zip
chromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.tar.gz
chromium_src-3189013eebc55ea5493186b29f2cee2bc7c0dafc.tar.bz2
Hook up the SequencedWorkerPool to the browser thread.
[re-land of 116816 http://codereview.chromium.org/9065009] This does some refactoring of the static data in the browser thread so we only have one global object instead of a bunch fo separate arrays. It also hooks up the visited link master's I/O to use this new system as a proof of concept. Review URL: https://chromiumcodereview.appspot.com/9124033 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@118236 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/threading/sequenced_worker_pool.cc79
-rw-r--r--base/threading/sequenced_worker_pool.h16
2 files changed, 80 insertions, 15 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 05ccff1..6d47068 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -57,20 +57,33 @@ class SequencedWorkerPool::Inner
Inner(size_t max_threads, const std::string& thread_name_prefix);
virtual ~Inner();
- // Backends for SequenceWorkerPool.
SequenceToken GetSequenceToken();
+
SequenceToken GetNamedSequenceToken(const std::string& name);
- bool PostTask(int sequence_token_id,
+
+ // This function accepts a name and an ID. If the name is null, the
+ // token ID is used. This allows us to implement the optional name lookup
+ // from a single function without having to enter the lock a separate time.
+ bool PostTask(const std::string* optional_token_name,
+ int sequence_token_id,
SequencedWorkerPool::WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
const base::Closure& task);
+
+ void Flush();
+
void Shutdown();
+
void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
// Runs the worker loop on the background thread.
void ThreadLoop(Worker* this_worker);
private:
+ // Called from within the lock, this converts the given token name into a
+ // token ID, creating a new one if necessary.
+ int LockedGetNamedTokenID(const std::string& name);
+
// The calling code should clear the given delete_these_oustide_lock
// vector the next time the lock is released. See the implementation for
// a more detailed description.
@@ -235,18 +248,11 @@ SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetNamedSequenceToken(
const std::string& name) {
base::AutoLock lock(lock_);
- std::map<std::string, int>::const_iterator found =
- named_sequence_tokens_.find(name);
- if (found != named_sequence_tokens_.end())
- return SequenceToken(found->second); // Got an existing one.
-
- // Create a new one for this name.
- SequenceToken result = GetSequenceToken();
- named_sequence_tokens_.insert(std::make_pair(name, result.id_));
- return result;
+ return SequenceToken(LockedGetNamedTokenID(name));
}
bool SequencedWorkerPool::Inner::PostTask(
+ const std::string* optional_token_name,
int sequence_token_id,
SequencedWorkerPool::WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
@@ -263,6 +269,10 @@ bool SequencedWorkerPool::Inner::PostTask(
if (terminating_)
return false;
+ // Now that we have the lock, apply the named token rules.
+ if (optional_token_name)
+ sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
+
pending_tasks_.push_back(sequenced);
pending_task_count_++;
if (shutdown_behavior == BLOCK_SHUTDOWN)
@@ -281,6 +291,15 @@ bool SequencedWorkerPool::Inner::PostTask(
return true;
}
+void SequencedWorkerPool::Inner::Flush() {
+ {
+ base::AutoLock lock(lock_);
+ while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
+ cond_var_.Wait();
+ }
+ cond_var_.Signal();
+}
+
void SequencedWorkerPool::Inner::Shutdown() {
if (shutdown_called_)
return;
@@ -366,6 +385,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
if (terminating_)
break;
waiting_thread_count_++;
+ cond_var_.Signal(); // For Flush() that may be waiting on the
+ // waiting thread count to go up.
cond_var_.Wait();
waiting_thread_count_--;
}
@@ -377,6 +398,22 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
cond_var_.Signal();
}
+int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
+ const std::string& name) {
+ lock_.AssertAcquired();
+ DCHECK(!name.empty());
+
+ std::map<std::string, int>::const_iterator found =
+ named_sequence_tokens_.find(name);
+ if (found != named_sequence_tokens_.end())
+ return found->second; // Got an existing one.
+
+ // Create a new one for this name.
+ SequenceToken result = GetSequenceToken();
+ named_sequence_tokens_.insert(std::make_pair(name, result.id_));
+ return result.id_;
+}
+
bool SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
std::vector<base::Closure>* delete_these_outside_lock) {
@@ -593,33 +630,45 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
bool SequencedWorkerPool::PostWorkerTask(
const tracked_objects::Location& from_here,
const base::Closure& task) {
- return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task);
+ return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task);
}
bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
const tracked_objects::Location& from_here,
const base::Closure& task,
WorkerShutdown shutdown_behavior) {
- return inner_->PostTask(0, shutdown_behavior, from_here, task);
+ return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task);
}
bool SequencedWorkerPool::PostSequencedWorkerTask(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const base::Closure& task) {
- return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN,
+ return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN,
from_here, task);
}
+bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
+ const std::string& token_name,
+ const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ DCHECK(!token_name.empty());
+ return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task);
+}
+
bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const base::Closure& task,
WorkerShutdown shutdown_behavior) {
- return inner_->PostTask(sequence_token.id_, shutdown_behavior,
+ return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior,
from_here, task);
}
+void SequencedWorkerPool::FlushForTesting() {
+ inner_->Flush();
+}
+
void SequencedWorkerPool::Shutdown() {
inner_->Shutdown();
}
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index c6e0560..f5770a7 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -172,6 +172,12 @@ class BASE_EXPORT SequencedWorkerPool {
const tracked_objects::Location& from_here,
const base::Closure& task);
+ // Like PostSequencedWorkerTask above, but allows you to specify a named
+ // token, which saves an extra call to GetNamedSequenceToken.
+ bool PostNamedSequencedWorkerTask(const std::string& token_name,
+ const tracked_objects::Location& from_here,
+ const base::Closure& task);
+
// Same as PostSequencedWorkerTask but allows specification of the shutdown
// behavior.
bool PostSequencedWorkerTaskWithShutdownBehavior(
@@ -180,6 +186,16 @@ class BASE_EXPORT SequencedWorkerPool {
const base::Closure& task,
WorkerShutdown shutdown_behavior);
+ // Blocks until all pending tasks are complete. This should only be called in
+ // unit tests when you want to validate something that should have happened.
+ //
+ // Note that calling this will not prevent other threads from posting work to
+ // the queue while the calling thread is waiting on Flush(). In this case,
+ // Flush will return only when there's no more work in the queue. Normally,
+ // this doesn't come up sine in a test, all the work is being posted from
+ // the main thread.
+ void FlushForTesting();
+
// Implements the worker pool shutdown. This should be called during app
// shutdown, and will discard/join with appropriate tasks before returning.
// After this call, subsequent calls to post tasks will fail.