diff options
author | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-01 23:16:20 +0000 |
---|---|---|
committer | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-01 23:16:20 +0000 |
commit | bc581a6829fe49e43f4869075781d6dc94843f09 (patch) | |
tree | a94363488dadff28fe2c03f3a169b6ad2eeb02e8 /base/synchronization | |
parent | 10f33b1bd6c6adb6306759a45bf3a5c18221d878 (diff) | |
download | chromium_src-bc581a6829fe49e43f4869075781d6dc94843f09.zip chromium_src-bc581a6829fe49e43f4869075781d6dc94843f09.tar.gz chromium_src-bc581a6829fe49e43f4869075781d6dc94843f09.tar.bz2 |
Move base/lock and base/condition_variable to base/synchronization/
I kept a base/lock.h in place with a using statement to avoid updating
all callers in one CL.
TEST=it compiles
BUG=none
Review URL: http://codereview.chromium.org/6018013
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70363 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/synchronization')
-rw-r--r-- | base/synchronization/condition_variable.h | 189 | ||||
-rw-r--r-- | base/synchronization/condition_variable_posix.cc | 77 | ||||
-rw-r--r-- | base/synchronization/condition_variable_unittest.cc | 750 | ||||
-rw-r--r-- | base/synchronization/condition_variable_win.cc | 447 | ||||
-rw-r--r-- | base/synchronization/lock.cc | 41 | ||||
-rw-r--r-- | base/synchronization/lock.h | 131 | ||||
-rw-r--r-- | base/synchronization/lock_impl.h | 63 | ||||
-rw-r--r-- | base/synchronization/lock_impl_posix.cc | 54 | ||||
-rw-r--r-- | base/synchronization/lock_impl_win.cc | 36 | ||||
-rw-r--r-- | base/synchronization/lock_unittest.cc | 214 |
10 files changed, 2002 insertions, 0 deletions
diff --git a/base/synchronization/condition_variable.h b/base/synchronization/condition_variable.h new file mode 100644 index 0000000..3acd0ac --- /dev/null +++ b/base/synchronization/condition_variable.h @@ -0,0 +1,189 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// ConditionVariable wraps pthreads condition variable synchronization or, on +// Windows, simulates it. This functionality is very helpful for having +// several threads wait for an event, as is common with a thread pool managed +// by a master. The meaning of such an event in the (worker) thread pool +// scenario is that additional tasks are now available for processing. It is +// used in Chrome in the DNS prefetching system to notify worker threads that +// a queue now has items (tasks) which need to be tended to. A related use +// would have a pool manager waiting on a ConditionVariable, waiting for a +// thread in the pool to announce (signal) that there is now more room in a +// (bounded size) communications queue for the manager to deposit tasks, or, +// as a second example, that the queue of tasks is completely empty and all +// workers are waiting. +// +// USAGE NOTE 1: spurious signal events are possible with this and +// most implementations of condition variables. As a result, be +// *sure* to retest your condition before proceeding. The following +// is a good example of doing this correctly: +// +// while (!work_to_be_done()) Wait(...); +// +// In contrast do NOT do the following: +// +// if (!work_to_be_done()) Wait(...); // Don't do this. +// +// Especially avoid the above if you are relying on some other thread only +// issuing a signal up *if* there is work-to-do. There can/will +// be spurious signals. Recheck state on waiting thread before +// assuming the signal was intentional. Caveat caller ;-). +// +// USAGE NOTE 2: Broadcast() frees up all waiting threads at once, +// which leads to contention for the locks they all held when they +// called Wait(). This results in POOR performance. A much better +// approach to getting a lot of threads out of Wait() is to have each +// thread (upon exiting Wait()) call Signal() to free up another +// Wait'ing thread. Look at condition_variable_unittest.cc for +// both examples. +// +// Broadcast() can be used nicely during teardown, as it gets the job +// done, and leaves no sleeping threads... and performance is less +// critical at that point. +// +// The semantics of Broadcast() are carefully crafted so that *all* +// threads that were waiting when the request was made will indeed +// get signaled. Some implementations mess up, and don't signal them +// all, while others allow the wait to be effectively turned off (for +// a while while waiting threads come around). This implementation +// appears correct, as it will not "lose" any signals, and will guarantee +// that all threads get signaled by Broadcast(). +// +// This implementation offers support for "performance" in its selection of +// which thread to revive. Performance, in direct contrast with "fairness," +// assures that the thread that most recently began to Wait() is selected by +// Signal to revive. Fairness would (if publicly supported) assure that the +// thread that has Wait()ed the longest is selected. The default policy +// may improve performance, as the selected thread may have a greater chance of +// having some of its stack data in various CPU caches. +// +// For a discussion of the many very subtle implementation details, see the FAQ +// at the end of condition_variable_win.cc. + +#ifndef BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ +#define BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ +#pragma once + +#include "build/build_config.h" + +#if defined(OS_WIN) +#include <windows.h> +#elif defined(OS_POSIX) +#include <pthread.h> +#endif + +#include "base/basictypes.h" +#include "base/lock.h" + +namespace base { + +class TimeDelta; + +class ConditionVariable { + public: + // Construct a cv for use with ONLY one user lock. + explicit ConditionVariable(Lock* user_lock); + + ~ConditionVariable(); + + // Wait() releases the caller's critical section atomically as it starts to + // sleep, and the reacquires it when it is signaled. + void Wait(); + void TimedWait(const TimeDelta& max_time); + + // Broadcast() revives all waiting threads. + void Broadcast(); + // Signal() revives one waiting thread. + void Signal(); + + private: + +#if defined(OS_WIN) + + // Define Event class that is used to form circularly linked lists. + // The list container is an element with NULL as its handle_ value. + // The actual list elements have a non-zero handle_ value. + // All calls to methods MUST be done under protection of a lock so that links + // can be validated. Without the lock, some links might asynchronously + // change, and the assertions would fail (as would list change operations). + class Event { + public: + // Default constructor with no arguments creates a list container. + Event(); + ~Event(); + + // InitListElement transitions an instance from a container, to an element. + void InitListElement(); + + // Methods for use on lists. + bool IsEmpty() const; + void PushBack(Event* other); + Event* PopFront(); + Event* PopBack(); + + // Methods for use on list elements. + // Accessor method. + HANDLE handle() const; + // Pull an element from a list (if it's in one). + Event* Extract(); + + // Method for use on a list element or on a list. + bool IsSingleton() const; + + private: + // Provide pre/post conditions to validate correct manipulations. + bool ValidateAsDistinct(Event* other) const; + bool ValidateAsItem() const; + bool ValidateAsList() const; + bool ValidateLinks() const; + + HANDLE handle_; + Event* next_; + Event* prev_; + DISALLOW_COPY_AND_ASSIGN(Event); + }; + + // Note that RUNNING is an unlikely number to have in RAM by accident. + // This helps with defensive destructor coding in the face of user error. + enum RunState { SHUTDOWN = 0, RUNNING = 64213 }; + + // Internal implementation methods supporting Wait(). + Event* GetEventForWaiting(); + void RecycleEvent(Event* used_event); + + RunState run_state_; + + // Private critical section for access to member data. + Lock internal_lock_; + + // Lock that is acquired before calling Wait(). + Lock& user_lock_; + + // Events that threads are blocked on. + Event waiting_list_; + + // Free list for old events. + Event recycling_list_; + int recycling_list_size_; + + // The number of allocated, but not yet deleted events. + int allocation_counter_; + +#elif defined(OS_POSIX) + + pthread_cond_t condition_; + pthread_mutex_t* user_mutex_; +#if !defined(NDEBUG) + Lock* user_lock_; // Needed to adjust shadow lock state on wait. +#endif + +#endif + + DISALLOW_COPY_AND_ASSIGN(ConditionVariable); +}; + +} // namespace base + +#endif // BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ diff --git a/base/synchronization/condition_variable_posix.cc b/base/synchronization/condition_variable_posix.cc new file mode 100644 index 0000000..eff7053 --- /dev/null +++ b/base/synchronization/condition_variable_posix.cc @@ -0,0 +1,77 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/synchronization/condition_variable.h" + +#include <errno.h> +#include <sys/time.h> + +#include "base/logging.h" +#include "base/synchronization/lock.h" +#include "base/time.h" + +namespace base { + +ConditionVariable::ConditionVariable(Lock* user_lock) + : user_mutex_(user_lock->lock_.os_lock()) +#if !defined(NDEBUG) + , user_lock_(user_lock) +#endif +{ + int rv = pthread_cond_init(&condition_, NULL); + DCHECK(rv == 0); +} + +ConditionVariable::~ConditionVariable() { + int rv = pthread_cond_destroy(&condition_); + DCHECK(rv == 0); +} + +void ConditionVariable::Wait() { +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + int rv = pthread_cond_wait(&condition_, user_mutex_); + DCHECK(rv == 0); +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif +} + +void ConditionVariable::TimedWait(const TimeDelta& max_time) { + int64 usecs = max_time.InMicroseconds(); + + // The timeout argument to pthread_cond_timedwait is in absolute time. + struct timeval now; + gettimeofday(&now, NULL); + + struct timespec abstime; + abstime.tv_sec = now.tv_sec + (usecs / Time::kMicrosecondsPerSecond); + abstime.tv_nsec = (now.tv_usec + (usecs % Time::kMicrosecondsPerSecond)) * + Time::kNanosecondsPerMicrosecond; + abstime.tv_sec += abstime.tv_nsec / Time::kNanosecondsPerSecond; + abstime.tv_nsec %= Time::kNanosecondsPerSecond; + DCHECK(abstime.tv_sec >= now.tv_sec); // Overflow paranoia + +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + int rv = pthread_cond_timedwait(&condition_, user_mutex_, &abstime); + DCHECK(rv == 0 || rv == ETIMEDOUT); +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif +} + +void ConditionVariable::Broadcast() { + int rv = pthread_cond_broadcast(&condition_); + DCHECK(rv == 0); +} + +void ConditionVariable::Signal() { + int rv = pthread_cond_signal(&condition_); + DCHECK(rv == 0); +} + +} // namespace base diff --git a/base/synchronization/condition_variable_unittest.cc b/base/synchronization/condition_variable_unittest.cc new file mode 100644 index 0000000..8cfe4fe --- /dev/null +++ b/base/synchronization/condition_variable_unittest.cc @@ -0,0 +1,750 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Multi-threaded tests of ConditionVariable class. + +#include <time.h> +#include <algorithm> +#include <vector> + +#include "base/synchronization/condition_variable.h" +#include "base/lock.h" +#include "base/logging.h" +#include "base/scoped_ptr.h" +#include "base/spin_wait.h" +#include "base/threading/platform_thread.h" +#include "base/threading/thread_collision_warner.h" +#include "base/time.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +namespace base { + +namespace { +//------------------------------------------------------------------------------ +// Define our test class, with several common variables. +//------------------------------------------------------------------------------ + +class ConditionVariableTest : public PlatformTest { + public: + const TimeDelta kZeroMs; + const TimeDelta kTenMs; + const TimeDelta kThirtyMs; + const TimeDelta kFortyFiveMs; + const TimeDelta kSixtyMs; + const TimeDelta kOneHundredMs; + + explicit ConditionVariableTest() + : kZeroMs(TimeDelta::FromMilliseconds(0)), + kTenMs(TimeDelta::FromMilliseconds(10)), + kThirtyMs(TimeDelta::FromMilliseconds(30)), + kFortyFiveMs(TimeDelta::FromMilliseconds(45)), + kSixtyMs(TimeDelta::FromMilliseconds(60)), + kOneHundredMs(TimeDelta::FromMilliseconds(100)) { + } +}; + +//------------------------------------------------------------------------------ +// Define a class that will control activities an several multi-threaded tests. +// The general structure of multi-threaded tests is that a test case will +// construct an instance of a WorkQueue. The WorkQueue will spin up some +// threads and control them throughout their lifetime, as well as maintaining +// a central repository of the work thread's activity. Finally, the WorkQueue +// will command the the worker threads to terminate. At that point, the test +// cases will validate that the WorkQueue has records showing that the desired +// activities were performed. +//------------------------------------------------------------------------------ + +// Callers are responsible for synchronizing access to the following class. +// The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for +// all synchronized access. +class WorkQueue : public PlatformThread::Delegate { + public: + explicit WorkQueue(int thread_count); + ~WorkQueue(); + + // PlatformThread::Delegate interface. + void ThreadMain(); + + //---------------------------------------------------------------------------- + // Worker threads only call the following methods. + // They should use the lock to get exclusive access. + int GetThreadId(); // Get an ID assigned to a thread.. + bool EveryIdWasAllocated() const; // Indicates that all IDs were handed out. + TimeDelta GetAnAssignment(int thread_id); // Get a work task duration. + void WorkIsCompleted(int thread_id); + + int task_count() const; + bool allow_help_requests() const; // Workers can signal more workers. + bool shutdown() const; // Check if shutdown has been requested. + + void thread_shutting_down(); + + + //---------------------------------------------------------------------------- + // Worker threads can call them but not needed to acquire a lock. + Lock* lock(); + + ConditionVariable* work_is_available(); + ConditionVariable* all_threads_have_ids(); + ConditionVariable* no_more_tasks(); + + //---------------------------------------------------------------------------- + // The rest of the methods are for use by the controlling master thread (the + // test case code). + void ResetHistory(); + int GetMinCompletionsByWorkerThread() const; + int GetMaxCompletionsByWorkerThread() const; + int GetNumThreadsTakingAssignments() const; + int GetNumThreadsCompletingTasks() const; + int GetNumberOfCompletedTasks() const; + TimeDelta GetWorkTime() const; + + void SetWorkTime(TimeDelta delay); + void SetTaskCount(int count); + void SetAllowHelp(bool allow); + + // The following must be called without locking, and will spin wait until the + // threads are all in a wait state. + void SpinUntilAllThreadsAreWaiting(); + void SpinUntilTaskCountLessThan(int task_count); + + // Caller must acquire lock before calling. + void SetShutdown(); + + // Compares the |shutdown_task_count_| to the |thread_count| and returns true + // if they are equal. This check will acquire the |lock_| so the caller + // should not hold the lock when calling this method. + bool ThreadSafeCheckShutdown(int thread_count); + + private: + // Both worker threads and controller use the following to synchronize. + Lock lock_; + ConditionVariable work_is_available_; // To tell threads there is work. + + // Conditions to notify the controlling process (if it is interested). + ConditionVariable all_threads_have_ids_; // All threads are running. + ConditionVariable no_more_tasks_; // Task count is zero. + + const int thread_count_; + int waiting_thread_count_; + scoped_array<PlatformThreadHandle> thread_handles_; + std::vector<int> assignment_history_; // Number of assignment per worker. + std::vector<int> completion_history_; // Number of completions per worker. + int thread_started_counter_; // Used to issue unique id to workers. + int shutdown_task_count_; // Number of tasks told to shutdown + int task_count_; // Number of assignment tasks waiting to be processed. + TimeDelta worker_delay_; // Time each task takes to complete. + bool allow_help_requests_; // Workers can signal more workers. + bool shutdown_; // Set when threads need to terminate. + + DFAKE_MUTEX(locked_methods_); +}; + +//------------------------------------------------------------------------------ +// The next section contains the actual tests. +//------------------------------------------------------------------------------ + +TEST_F(ConditionVariableTest, StartupShutdownTest) { + Lock lock; + + // First try trivial startup/shutdown. + { + ConditionVariable cv1(&lock); + } // Call for cv1 destruction. + + // Exercise with at least a few waits. + ConditionVariable cv(&lock); + + lock.Acquire(); + cv.TimedWait(kTenMs); // Wait for 10 ms. + cv.TimedWait(kTenMs); // Wait for 10 ms. + lock.Release(); + + lock.Acquire(); + cv.TimedWait(kTenMs); // Wait for 10 ms. + cv.TimedWait(kTenMs); // Wait for 10 ms. + cv.TimedWait(kTenMs); // Wait for 10 ms. + lock.Release(); +} // Call for cv destruction. + +TEST_F(ConditionVariableTest, TimeoutTest) { + Lock lock; + ConditionVariable cv(&lock); + lock.Acquire(); + + TimeTicks start = TimeTicks::Now(); + const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300); + // Allow for clocking rate granularity. + const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50); + + cv.TimedWait(WAIT_TIME + FUDGE_TIME); + TimeDelta duration = TimeTicks::Now() - start; + // We can't use EXPECT_GE here as the TimeDelta class does not support the + // required stream conversion. + EXPECT_TRUE(duration >= WAIT_TIME); + + lock.Release(); +} + +// Test serial task servicing, as well as two parallel task servicing methods. +TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { + const int kThreadCount = 10; + WorkQueue queue(kThreadCount); // Start the threads. + + const int kTaskCount = 10; // Number of tasks in each mini-test here. + + Time start_time; // Used to time task processing. + + { + AutoLock auto_lock(*queue.lock()); + while (!queue.EveryIdWasAllocated()) + queue.all_threads_have_ids()->Wait(); + } + + // If threads aren't in a wait state, they may start to gobble up tasks in + // parallel, short-circuiting (breaking) this test. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // Since we have no tasks yet, all threads should be waiting by now. + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); + + // Set up to make one worker do 30ms tasks sequentially. + queue.ResetHistory(); + queue.SetTaskCount(kTaskCount); + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(false); + + start_time = Time::Now(); + } + + queue.work_is_available()->Signal(); // Start up one thread. + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(kTaskCount); + + { + // Wait until all 10 work tasks have at least been assigned. + AutoLock auto_lock(*queue.lock()); + while (queue.task_count()) + queue.no_more_tasks()->Wait(); + // The last of the tasks *might* still be running, but... all but one should + // be done by now, since tasks are being done serially. + EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), + (Time::Now() - start_time).InMilliseconds()); + + EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); + EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); + } + + // Wait to be sure all tasks are done. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // Check that all work was done by one thread id. + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); + + // Set up to make each task include getting help from another worker, so + // so that the work gets done in paralell. + queue.ResetHistory(); + queue.SetTaskCount(kTaskCount); + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(true); + + start_time = Time::Now(); + } + + queue.work_is_available()->Signal(); // But each worker can signal another. + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(kTaskCount); + // Wait to allow the all workers to get done. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // Wait until all work tasks have at least been assigned. + AutoLock auto_lock(*queue.lock()); + while (queue.task_count()) + queue.no_more_tasks()->Wait(); + + // To avoid racy assumptions, we'll just assert that at least 2 threads + // did work. We know that the first worker should have gone to sleep, and + // hence a second worker should have gotten an assignment. + EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); + + // Try to ask all workers to help, and only a few will do the work. + queue.ResetHistory(); + queue.SetTaskCount(3); + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(false); + } + queue.work_is_available()->Broadcast(); // Make them all try. + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(3); + // Wait to allow the 3 workers to get done. + queue.SpinUntilAllThreadsAreWaiting(); + + { + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); + + // Set up to make each task get help from another worker. + queue.ResetHistory(); + queue.SetTaskCount(3); + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(true); // Allow (unnecessary) help requests. + } + queue.work_is_available()->Broadcast(); // Signal all threads. + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(3); + // Wait to allow the 3 workers to get done. + queue.SpinUntilAllThreadsAreWaiting(); + + { + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); + + // Set up to make each task get help from another worker. + queue.ResetHistory(); + queue.SetTaskCount(20); // 2 tasks per thread. + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(true); + } + queue.work_is_available()->Signal(); // But each worker can signal another. + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(20); + // Wait to allow the 10 workers to get done. + queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. + + { + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); + + // Same as last test, but with Broadcast(). + queue.ResetHistory(); + queue.SetTaskCount(20); // 2 tasks per thread. + queue.SetWorkTime(kThirtyMs); + queue.SetAllowHelp(true); + } + queue.work_is_available()->Broadcast(); + // Wait till we at least start to handle tasks (and we're not all waiting). + queue.SpinUntilTaskCountLessThan(20); + // Wait to allow the 10 workers to get done. + queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. + + { + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); + + queue.SetShutdown(); + } + queue.work_is_available()->Broadcast(); // Force check for shutdown. + + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), + queue.ThreadSafeCheckShutdown(kThreadCount)); +} + +TEST_F(ConditionVariableTest, LargeFastTaskTest) { + const int kThreadCount = 200; + WorkQueue queue(kThreadCount); // Start the threads. + + Lock private_lock; // Used locally for master to wait. + AutoLock private_held_lock(private_lock); + ConditionVariable private_cv(&private_lock); + + { + AutoLock auto_lock(*queue.lock()); + while (!queue.EveryIdWasAllocated()) + queue.all_threads_have_ids()->Wait(); + } + + // Wait a bit more to allow threads to reach their wait state. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // Since we have no tasks, all threads should be waiting by now. + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); + EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); + + // Set up to make all workers do (an average of) 20 tasks. + queue.ResetHistory(); + queue.SetTaskCount(20 * kThreadCount); + queue.SetWorkTime(kFortyFiveMs); + queue.SetAllowHelp(false); + } + queue.work_is_available()->Broadcast(); // Start up all threads. + // Wait until we've handed out all tasks. + { + AutoLock auto_lock(*queue.lock()); + while (queue.task_count() != 0) + queue.no_more_tasks()->Wait(); + } + + // Wait till the last of the tasks complete. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // With Broadcast(), every thread should have participated. + // but with racing.. they may not all have done equal numbers of tasks. + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); + + // Set up to make all workers do (an average of) 4 tasks. + queue.ResetHistory(); + queue.SetTaskCount(kThreadCount * 4); + queue.SetWorkTime(kFortyFiveMs); + queue.SetAllowHelp(true); // Might outperform Broadcast(). + } + queue.work_is_available()->Signal(); // Start up one thread. + + // Wait until we've handed out all tasks + { + AutoLock auto_lock(*queue.lock()); + while (queue.task_count() != 0) + queue.no_more_tasks()->Wait(); + } + + // Wait till the last of the tasks complete. + queue.SpinUntilAllThreadsAreWaiting(); + + { + // With Signal(), every thread should have participated. + // but with racing.. they may not all have done four tasks. + AutoLock auto_lock(*queue.lock()); + EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); + EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); + EXPECT_EQ(0, queue.task_count()); + EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); + EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); + + queue.SetShutdown(); + } + queue.work_is_available()->Broadcast(); // Force check for shutdown. + + // Wait for shutdowns to complete. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), + queue.ThreadSafeCheckShutdown(kThreadCount)); +} + +//------------------------------------------------------------------------------ +// Finally we provide the implementation for the methods in the WorkQueue class. +//------------------------------------------------------------------------------ + +WorkQueue::WorkQueue(int thread_count) + : lock_(), + work_is_available_(&lock_), + all_threads_have_ids_(&lock_), + no_more_tasks_(&lock_), + thread_count_(thread_count), + waiting_thread_count_(0), + thread_handles_(new PlatformThreadHandle[thread_count]), + assignment_history_(thread_count), + completion_history_(thread_count), + thread_started_counter_(0), + shutdown_task_count_(0), + task_count_(0), + allow_help_requests_(false), + shutdown_(false) { + EXPECT_GE(thread_count_, 1); + ResetHistory(); + SetTaskCount(0); + SetWorkTime(TimeDelta::FromMilliseconds(30)); + + for (int i = 0; i < thread_count_; ++i) { + PlatformThreadHandle pth; + EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); + thread_handles_[i] = pth; + } +} + +WorkQueue::~WorkQueue() { + { + AutoLock auto_lock(lock_); + SetShutdown(); + } + work_is_available_.Broadcast(); // Tell them all to terminate. + + for (int i = 0; i < thread_count_; ++i) { + PlatformThread::Join(thread_handles_[i]); + } + EXPECT_EQ(0, waiting_thread_count_); +} + +int WorkQueue::GetThreadId() { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + DCHECK(!EveryIdWasAllocated()); + return thread_started_counter_++; // Give out Unique IDs. +} + +bool WorkQueue::EveryIdWasAllocated() const { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + return thread_count_ == thread_started_counter_; +} + +TimeDelta WorkQueue::GetAnAssignment(int thread_id) { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + DCHECK_LT(0, task_count_); + assignment_history_[thread_id]++; + if (0 == --task_count_) { + no_more_tasks_.Signal(); + } + return worker_delay_; +} + +void WorkQueue::WorkIsCompleted(int thread_id) { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + completion_history_[thread_id]++; +} + +int WorkQueue::task_count() const { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + return task_count_; +} + +bool WorkQueue::allow_help_requests() const { + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + return allow_help_requests_; +} + +bool WorkQueue::shutdown() const { + lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + return shutdown_; +} + +// Because this method is called from the test's main thread we need to actually +// take the lock. Threads will call the thread_shutting_down() method with the +// lock already acquired. +bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { + bool all_shutdown; + AutoLock auto_lock(lock_); + { + // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + all_shutdown = (shutdown_task_count_ == thread_count); + } + return all_shutdown; +} + +void WorkQueue::thread_shutting_down() { + lock_.AssertAcquired(); + DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); + shutdown_task_count_++; +} + +Lock* WorkQueue::lock() { + return &lock_; +} + +ConditionVariable* WorkQueue::work_is_available() { + return &work_is_available_; +} + +ConditionVariable* WorkQueue::all_threads_have_ids() { + return &all_threads_have_ids_; +} + +ConditionVariable* WorkQueue::no_more_tasks() { + return &no_more_tasks_; +} + +void WorkQueue::ResetHistory() { + for (int i = 0; i < thread_count_; ++i) { + assignment_history_[i] = 0; + completion_history_[i] = 0; + } +} + +int WorkQueue::GetMinCompletionsByWorkerThread() const { + int minumum = completion_history_[0]; + for (int i = 0; i < thread_count_; ++i) + minumum = std::min(minumum, completion_history_[i]); + return minumum; +} + +int WorkQueue::GetMaxCompletionsByWorkerThread() const { + int maximum = completion_history_[0]; + for (int i = 0; i < thread_count_; ++i) + maximum = std::max(maximum, completion_history_[i]); + return maximum; +} + +int WorkQueue::GetNumThreadsTakingAssignments() const { + int count = 0; + for (int i = 0; i < thread_count_; ++i) + if (assignment_history_[i]) + count++; + return count; +} + +int WorkQueue::GetNumThreadsCompletingTasks() const { + int count = 0; + for (int i = 0; i < thread_count_; ++i) + if (completion_history_[i]) + count++; + return count; +} + +int WorkQueue::GetNumberOfCompletedTasks() const { + int total = 0; + for (int i = 0; i < thread_count_; ++i) + total += completion_history_[i]; + return total; +} + +TimeDelta WorkQueue::GetWorkTime() const { + return worker_delay_; +} + +void WorkQueue::SetWorkTime(TimeDelta delay) { + worker_delay_ = delay; +} + +void WorkQueue::SetTaskCount(int count) { + task_count_ = count; +} + +void WorkQueue::SetAllowHelp(bool allow) { + allow_help_requests_ = allow; +} + +void WorkQueue::SetShutdown() { + lock_.AssertAcquired(); + shutdown_ = true; +} + +void WorkQueue::SpinUntilAllThreadsAreWaiting() { + while (true) { + { + AutoLock auto_lock(lock_); + if (waiting_thread_count_ == thread_count_) + break; + } + PlatformThread::Sleep(30); + } +} + +void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { + while (true) { + { + AutoLock auto_lock(lock_); + if (task_count_ < task_count) + break; + } + PlatformThread::Sleep(30); + } +} + + +//------------------------------------------------------------------------------ +// Define the standard worker task. Several tests will spin out many of these +// threads. +//------------------------------------------------------------------------------ + +// The multithread tests involve several threads with a task to perform as +// directed by an instance of the class WorkQueue. +// The task is to: +// a) Check to see if there are more tasks (there is a task counter). +// a1) Wait on condition variable if there are no tasks currently. +// b) Call a function to see what should be done. +// c) Do some computation based on the number of milliseconds returned in (b). +// d) go back to (a). + +// WorkQueue::ThreadMain() implements the above task for all threads. +// It calls the controlling object to tell the creator about progress, and to +// ask about tasks. + +void WorkQueue::ThreadMain() { + int thread_id; + { + AutoLock auto_lock(lock_); + thread_id = GetThreadId(); + if (EveryIdWasAllocated()) + all_threads_have_ids()->Signal(); // Tell creator we're ready. + } + + Lock private_lock; // Used to waste time on "our work". + while (1) { // This is the main consumer loop. + TimeDelta work_time; + bool could_use_help; + { + AutoLock auto_lock(lock_); + while (0 == task_count() && !shutdown()) { + ++waiting_thread_count_; + work_is_available()->Wait(); + --waiting_thread_count_; + } + if (shutdown()) { + // Ack the notification of a shutdown message back to the controller. + thread_shutting_down(); + return; // Terminate. + } + // Get our task duration from the queue. + work_time = GetAnAssignment(thread_id); + could_use_help = (task_count() > 0) && allow_help_requests(); + } // Release lock + + // Do work (outside of locked region. + if (could_use_help) + work_is_available()->Signal(); // Get help from other threads. + + if (work_time > TimeDelta::FromMilliseconds(0)) { + // We could just sleep(), but we'll instead further exercise the + // condition variable class, and do a timed wait. + AutoLock auto_lock(private_lock); + ConditionVariable private_cv(&private_lock); + private_cv.TimedWait(work_time); // Unsynchronized waiting. + } + + { + AutoLock auto_lock(lock_); + // Send notification that we completed our "work." + WorkIsCompleted(thread_id); + } + } +} + +} // namespace + +} // namespace base diff --git a/base/synchronization/condition_variable_win.cc b/base/synchronization/condition_variable_win.cc new file mode 100644 index 0000000..3030178 --- /dev/null +++ b/base/synchronization/condition_variable_win.cc @@ -0,0 +1,447 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/synchronization/condition_variable.h" + +#include <stack> + +#include "base/logging.h" +#include "base/synchronization/lock.h" +#include "base/time.h" + +namespace base { + +ConditionVariable::ConditionVariable(Lock* user_lock) + : user_lock_(*user_lock), + run_state_(RUNNING), + allocation_counter_(0), + recycling_list_size_(0) { + DCHECK(user_lock); +} + +ConditionVariable::~ConditionVariable() { + AutoLock auto_lock(internal_lock_); + run_state_ = SHUTDOWN; // Prevent any more waiting. + + DCHECK_EQ(recycling_list_size_, allocation_counter_); + if (recycling_list_size_ != allocation_counter_) { // Rare shutdown problem. + // There are threads of execution still in this->TimedWait() and yet the + // caller has instigated the destruction of this instance :-/. + // A common reason for such "overly hasty" destruction is that the caller + // was not willing to wait for all the threads to terminate. Such hasty + // actions are a violation of our usage contract, but we'll give the + // waiting thread(s) one last chance to exit gracefully (prior to our + // destruction). + // Note: waiting_list_ *might* be empty, but recycling is still pending. + AutoUnlock auto_unlock(internal_lock_); + Broadcast(); // Make sure all waiting threads have been signaled. + Sleep(10); // Give threads a chance to grab internal_lock_. + // All contained threads should be blocked on user_lock_ by now :-). + } // Reacquire internal_lock_. + + DCHECK_EQ(recycling_list_size_, allocation_counter_); +} + +void ConditionVariable::Wait() { + // Default to "wait forever" timing, which means have to get a Signal() + // or Broadcast() to come out of this wait state. + TimedWait(TimeDelta::FromMilliseconds(INFINITE)); +} + +void ConditionVariable::TimedWait(const TimeDelta& max_time) { + Event* waiting_event; + HANDLE handle; + { + AutoLock auto_lock(internal_lock_); + if (RUNNING != run_state_) return; // Destruction in progress. + waiting_event = GetEventForWaiting(); + handle = waiting_event->handle(); + DCHECK(handle); + } // Release internal_lock. + + { + AutoUnlock unlock(user_lock_); // Release caller's lock + WaitForSingleObject(handle, static_cast<DWORD>(max_time.InMilliseconds())); + // Minimize spurious signal creation window by recycling asap. + AutoLock auto_lock(internal_lock_); + RecycleEvent(waiting_event); + // Release internal_lock_ + } // Reacquire callers lock to depth at entry. +} + +// Broadcast() is guaranteed to signal all threads that were waiting (i.e., had +// a cv_event internally allocated for them) before Broadcast() was called. +void ConditionVariable::Broadcast() { + std::stack<HANDLE> handles; // See FAQ-question-10. + { + AutoLock auto_lock(internal_lock_); + if (waiting_list_.IsEmpty()) + return; + while (!waiting_list_.IsEmpty()) + // This is not a leak from waiting_list_. See FAQ-question 12. + handles.push(waiting_list_.PopBack()->handle()); + } // Release internal_lock_. + while (!handles.empty()) { + SetEvent(handles.top()); + handles.pop(); + } +} + +// Signal() will select one of the waiting threads, and signal it (signal its +// cv_event). For better performance we signal the thread that went to sleep +// most recently (LIFO). If we want fairness, then we wake the thread that has +// been sleeping the longest (FIFO). +void ConditionVariable::Signal() { + HANDLE handle; + { + AutoLock auto_lock(internal_lock_); + if (waiting_list_.IsEmpty()) + return; // No one to signal. + // Only performance option should be used. + // This is not a leak from waiting_list. See FAQ-question 12. + handle = waiting_list_.PopBack()->handle(); // LIFO. + } // Release internal_lock_. + SetEvent(handle); +} + +// GetEventForWaiting() provides a unique cv_event for any caller that needs to +// wait. This means that (worst case) we may over time create as many cv_event +// objects as there are threads simultaneously using this instance's Wait() +// functionality. +ConditionVariable::Event* ConditionVariable::GetEventForWaiting() { + // We hold internal_lock, courtesy of Wait(). + Event* cv_event; + if (0 == recycling_list_size_) { + DCHECK(recycling_list_.IsEmpty()); + cv_event = new Event(); + cv_event->InitListElement(); + allocation_counter_++; + CHECK(cv_event->handle()); + } else { + cv_event = recycling_list_.PopFront(); + recycling_list_size_--; + } + waiting_list_.PushBack(cv_event); + return cv_event; +} + +// RecycleEvent() takes a cv_event that was previously used for Wait()ing, and +// recycles it for use in future Wait() calls for this or other threads. +// Note that there is a tiny chance that the cv_event is still signaled when we +// obtain it, and that can cause spurious signals (if/when we re-use the +// cv_event), but such is quite rare (see FAQ-question-5). +void ConditionVariable::RecycleEvent(Event* used_event) { + // We hold internal_lock, courtesy of Wait(). + // If the cv_event timed out, then it is necessary to remove it from + // waiting_list_. If it was selected by Broadcast() or Signal(), then it is + // already gone. + used_event->Extract(); // Possibly redundant + recycling_list_.PushBack(used_event); + recycling_list_size_++; +} +//------------------------------------------------------------------------------ +// The next section provides the implementation for the private Event class. +//------------------------------------------------------------------------------ + +// Event provides a doubly-linked-list of events for use exclusively by the +// ConditionVariable class. + +// This custom container was crafted because no simple combination of STL +// classes appeared to support the functionality required. The specific +// unusual requirement for a linked-list-class is support for the Extract() +// method, which can remove an element from a list, potentially for insertion +// into a second list. Most critically, the Extract() method is idempotent, +// turning the indicated element into an extracted singleton whether it was +// contained in a list or not. This functionality allows one (or more) of +// threads to do the extraction. The iterator that identifies this extractable +// element (in this case, a pointer to the list element) can be used after +// arbitrary manipulation of the (possibly) enclosing list container. In +// general, STL containers do not provide iterators that can be used across +// modifications (insertions/extractions) of the enclosing containers, and +// certainly don't provide iterators that can be used if the identified +// element is *deleted* (removed) from the container. + +// It is possible to use multiple redundant containers, such as an STL list, +// and an STL map, to achieve similar container semantics. This container has +// only O(1) methods, while the corresponding (multiple) STL container approach +// would have more complex O(log(N)) methods (yeah... N isn't that large). +// Multiple containers also makes correctness more difficult to assert, as +// data is redundantly stored and maintained, which is generally evil. + +ConditionVariable::Event::Event() : handle_(0) { + next_ = prev_ = this; // Self referencing circular. +} + +ConditionVariable::Event::~Event() { + if (0 == handle_) { + // This is the list holder + while (!IsEmpty()) { + Event* cv_event = PopFront(); + DCHECK(cv_event->ValidateAsItem()); + delete cv_event; + } + } + DCHECK(IsSingleton()); + if (0 != handle_) { + int ret_val = CloseHandle(handle_); + DCHECK(ret_val); + } +} + +// Change a container instance permanently into an element of a list. +void ConditionVariable::Event::InitListElement() { + DCHECK(!handle_); + handle_ = CreateEvent(NULL, false, false, NULL); + CHECK(handle_); +} + +// Methods for use on lists. +bool ConditionVariable::Event::IsEmpty() const { + DCHECK(ValidateAsList()); + return IsSingleton(); +} + +void ConditionVariable::Event::PushBack(Event* other) { + DCHECK(ValidateAsList()); + DCHECK(other->ValidateAsItem()); + DCHECK(other->IsSingleton()); + // Prepare other for insertion. + other->prev_ = prev_; + other->next_ = this; + // Cut into list. + prev_->next_ = other; + prev_ = other; + DCHECK(ValidateAsDistinct(other)); +} + +ConditionVariable::Event* ConditionVariable::Event::PopFront() { + DCHECK(ValidateAsList()); + DCHECK(!IsSingleton()); + return next_->Extract(); +} + +ConditionVariable::Event* ConditionVariable::Event::PopBack() { + DCHECK(ValidateAsList()); + DCHECK(!IsSingleton()); + return prev_->Extract(); +} + +// Methods for use on list elements. +// Accessor method. +HANDLE ConditionVariable::Event::handle() const { + DCHECK(ValidateAsItem()); + return handle_; +} + +// Pull an element from a list (if it's in one). +ConditionVariable::Event* ConditionVariable::Event::Extract() { + DCHECK(ValidateAsItem()); + if (!IsSingleton()) { + // Stitch neighbors together. + next_->prev_ = prev_; + prev_->next_ = next_; + // Make extractee into a singleton. + prev_ = next_ = this; + } + DCHECK(IsSingleton()); + return this; +} + +// Method for use on a list element or on a list. +bool ConditionVariable::Event::IsSingleton() const { + DCHECK(ValidateLinks()); + return next_ == this; +} + +// Provide pre/post conditions to validate correct manipulations. +bool ConditionVariable::Event::ValidateAsDistinct(Event* other) const { + return ValidateLinks() && other->ValidateLinks() && (this != other); +} + +bool ConditionVariable::Event::ValidateAsItem() const { + return (0 != handle_) && ValidateLinks(); +} + +bool ConditionVariable::Event::ValidateAsList() const { + return (0 == handle_) && ValidateLinks(); +} + +bool ConditionVariable::Event::ValidateLinks() const { + // Make sure both of our neighbors have links that point back to us. + // We don't do the O(n) check and traverse the whole loop, and instead only + // do a local check to (and returning from) our immediate neighbors. + return (next_->prev_ == this) && (prev_->next_ == this); +} + + +/* +FAQ On subtle implementation details: + +1) What makes this problem subtle? Please take a look at "Strategies +for Implementing POSIX Condition Variables on Win32" by Douglas +C. Schmidt and Irfan Pyarali. +http://www.cs.wustl.edu/~schmidt/win32-cv-1.html It includes +discussions of numerous flawed strategies for implementing this +functionality. I'm not convinced that even the final proposed +implementation has semantics that are as nice as this implementation +(especially with regard to Broadcast() and the impact on threads that +try to Wait() after a Broadcast() has been called, but before all the +original waiting threads have been signaled). + +2) Why can't you use a single wait_event for all threads that call +Wait()? See FAQ-question-1, or consider the following: If a single +event were used, then numerous threads calling Wait() could release +their cs locks, and be preempted just before calling +WaitForSingleObject(). If a call to Broadcast() was then presented on +a second thread, it would be impossible to actually signal all +waiting(?) threads. Some number of SetEvent() calls *could* be made, +but there could be no guarantee that those led to to more than one +signaled thread (SetEvent()'s may be discarded after the first!), and +there could be no guarantee that the SetEvent() calls didn't just +awaken "other" threads that hadn't even started waiting yet (oops). +Without any limit on the number of requisite SetEvent() calls, the +system would be forced to do many such calls, allowing many new waits +to receive spurious signals. + +3) How does this implementation cause spurious signal events? The +cause in this implementation involves a race between a signal via +time-out and a signal via Signal() or Broadcast(). The series of +actions leading to this are: + +a) Timer fires, and a waiting thread exits the line of code: + + WaitForSingleObject(waiting_event, max_time.InMilliseconds()); + +b) That thread (in (a)) is randomly pre-empted after the above line, +leaving the waiting_event reset (unsignaled) and still in the +waiting_list_. + +c) A call to Signal() (or Broadcast()) on a second thread proceeds, and +selects the waiting cv_event (identified in step (b)) as the event to revive +via a call to SetEvent(). + +d) The Signal() method (step c) calls SetEvent() on waiting_event (step b). + +e) The waiting cv_event (step b) is now signaled, but no thread is +waiting on it. + +f) When that waiting_event (step b) is reused, it will immediately +be signaled (spuriously). + + +4) Why do you recycle events, and cause spurious signals? First off, +the spurious events are very rare. They can only (I think) appear +when the race described in FAQ-question-3 takes place. This should be +very rare. Most(?) uses will involve only timer expiration, or only +Signal/Broadcast() actions. When both are used, it will be rare that +the race will appear, and it would require MANY Wait() and signaling +activities. If this implementation did not recycle events, then it +would have to create and destroy events for every call to Wait(). +That allocation/deallocation and associated construction/destruction +would be costly (per wait), and would only be a rare benefit (when the +race was "lost" and a spurious signal took place). That would be bad +(IMO) optimization trade-off. Finally, such spurious events are +allowed by the specification of condition variables (such as +implemented in Vista), and hence it is better if any user accommodates +such spurious events (see usage note in condition_variable.h). + +5) Why don't you reset events when you are about to recycle them, or +about to reuse them, so that the spurious signals don't take place? +The thread described in FAQ-question-3 step c may be pre-empted for an +arbitrary length of time before proceeding to step d. As a result, +the wait_event may actually be re-used *before* step (e) is reached. +As a result, calling reset would not help significantly. + +6) How is it that the callers lock is released atomically with the +entry into a wait state? We commit to the wait activity when we +allocate the wait_event for use in a given call to Wait(). This +allocation takes place before the caller's lock is released (and +actually before our internal_lock_ is released). That allocation is +the defining moment when "the wait state has been entered," as that +thread *can* now be signaled by a call to Broadcast() or Signal(). +Hence we actually "commit to wait" before releasing the lock, making +the pair effectively atomic. + +8) Why do you need to lock your data structures during waiting, as the +caller is already in possession of a lock? We need to Acquire() and +Release() our internal lock during Signal() and Broadcast(). If we tried +to use a callers lock for this purpose, we might conflict with their +external use of the lock. For example, the caller may use to consistently +hold a lock on one thread while calling Signal() on another, and that would +block Signal(). + +9) Couldn't a more efficient implementation be provided if you +preclude using more than one external lock in conjunction with a +single ConditionVariable instance? Yes, at least it could be viewed +as a simpler API (since you don't have to reiterate the lock argument +in each Wait() call). One of the constructors now takes a specific +lock as an argument, and a there are corresponding Wait() calls that +don't specify a lock now. It turns that the resulting implmentation +can't be made more efficient, as the internal lock needs to be used by +Signal() and Broadcast(), to access internal data structures. As a +result, I was not able to utilize the user supplied lock (which is +being used by the user elsewhere presumably) to protect the private +member access. + +9) Since you have a second lock, how can be be sure that there is no +possible deadlock scenario? Our internal_lock_ is always the last +lock acquired, and the first one released, and hence a deadlock (due +to critical section problems) is impossible as a consequence of our +lock. + +10) When doing a Broadcast(), why did you copy all the events into +an STL queue, rather than making a linked-loop, and iterating over it? +The iterating during Broadcast() is done so outside the protection +of the internal lock. As a result, other threads, such as the thread +wherein a related event is waiting, could asynchronously manipulate +the links around a cv_event. As a result, the link structure cannot +be used outside a lock. Broadcast() could iterate over waiting +events by cycling in-and-out of the protection of the internal_lock, +but that appears more expensive than copying the list into an STL +stack. + +11) Why did the lock.h file need to be modified so much for this +change? Central to a Condition Variable is the atomic release of a +lock during a Wait(). This places Wait() functionality exactly +mid-way between the two classes, Lock and Condition Variable. Given +that there can be nested Acquire()'s of locks, and Wait() had to +Release() completely a held lock, it was necessary to augment the Lock +class with a recursion counter. Even more subtle is the fact that the +recursion counter (in a Lock) must be protected, as many threads can +access it asynchronously. As a positive fallout of this, there are +now some DCHECKS to be sure no one Release()s a Lock more than they +Acquire()ed it, and there is ifdef'ed functionality that can detect +nested locks (legal under windows, but not under Posix). + +12) Why is it that the cv_events removed from list in Broadcast() and Signal() +are not leaked? How are they recovered?? The cv_events that appear to leak are +taken from the waiting_list_. For each element in that list, there is currently +a thread in or around the WaitForSingleObject() call of Wait(), and those +threads have references to these otherwise leaked events. They are passed as +arguments to be recycled just aftre returning from WaitForSingleObject(). + +13) Why did you use a custom container class (the linked list), when STL has +perfectly good containers, such as an STL list? The STL list, as with any +container, does not guarantee the utility of an iterator across manipulation +(such as insertions and deletions) of the underlying container. The custom +double-linked-list container provided that assurance. I don't believe any +combination of STL containers provided the services that were needed at the same +O(1) efficiency as the custom linked list. The unusual requirement +for the container class is that a reference to an item within a container (an +iterator) needed to be maintained across an arbitrary manipulation of the +container. This requirement exposes itself in the Wait() method, where a +waiting_event must be selected prior to the WaitForSingleObject(), and then it +must be used as part of recycling to remove the related instance from the +waiting_list. A hash table (STL map) could be used, but I was embarrased to +use a complex and relatively low efficiency container when a doubly linked list +provided O(1) performance in all required operations. Since other operations +to provide performance-and/or-fairness required queue (FIFO) and list (LIFO) +containers, I would also have needed to use an STL list/queue as well as an STL +map. In the end I decided it would be "fun" to just do it right, and I +put so many assertions (DCHECKs) into the container class that it is trivial to +code review and validate its correctness. + +*/ + +} // namespace base diff --git a/base/synchronization/lock.cc b/base/synchronization/lock.cc new file mode 100644 index 0000000..6445ce8 --- /dev/null +++ b/base/synchronization/lock.cc @@ -0,0 +1,41 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// This file is used for debugging assertion support. The Lock class +// is functionally a wrapper around the LockImpl class, so the only +// real intelligence in the class is in the debugging logic. + +#if !defined(NDEBUG) + +#include "base/synchronization/lock.h" +#include "base/logging.h" + +namespace base { + +Lock::Lock() : lock_() { + owned_by_thread_ = false; + owning_thread_id_ = static_cast<PlatformThreadId>(0); +} + +void Lock::AssertAcquired() const { + DCHECK(owned_by_thread_); + DCHECK_EQ(owning_thread_id_, PlatformThread::CurrentId()); +} + +void Lock::CheckHeldAndUnmark() { + DCHECK(owned_by_thread_); + DCHECK_EQ(owning_thread_id_, PlatformThread::CurrentId()); + owned_by_thread_ = false; + owning_thread_id_ = static_cast<PlatformThreadId>(0); +} + +void Lock::CheckUnheldAndMark() { + DCHECK(!owned_by_thread_); + owned_by_thread_ = true; + owning_thread_id_ = PlatformThread::CurrentId(); +} + +} // namespace base + +#endif // NDEBUG diff --git a/base/synchronization/lock.h b/base/synchronization/lock.h new file mode 100644 index 0000000..f7c9c49 --- /dev/null +++ b/base/synchronization/lock.h @@ -0,0 +1,131 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_SYNCHRONIZATION_LOCK_H_ +#define BASE_SYNCHRONIZATION_LOCK_H_ +#pragma once + +#include "base/synchronization/lock_impl.h" +#include "base/threading/platform_thread.h" + +namespace base { + +// A convenient wrapper for an OS specific critical section. The only real +// intelligence in this class is in debug mode for the support for the +// AssertAcquired() method. +class Lock { + public: +#if defined(NDEBUG) // Optimized wrapper implementation + Lock() : lock_() {} + ~Lock() {} + void Acquire() { lock_.Lock(); } + void Release() { lock_.Unlock(); } + + // If the lock is not held, take it and return true. If the lock is already + // held by another thread, immediately return false. This must not be called + // by a thread already holding the lock (what happens is undefined and an + // assertion may fail). + bool Try() { return lock_.Try(); } + + // Null implementation if not debug. + void AssertAcquired() const {} +#else + Lock(); + ~Lock() {} + + // NOTE: Although windows critical sections support recursive locks, we do not + // allow this, and we will commonly fire a DCHECK() if a thread attempts to + // acquire the lock a second time (while already holding it). + void Acquire() { + lock_.Lock(); + CheckUnheldAndMark(); + } + void Release() { + CheckHeldAndUnmark(); + lock_.Unlock(); + } + + bool Try() { + bool rv = lock_.Try(); + if (rv) { + CheckUnheldAndMark(); + } + return rv; + } + + void AssertAcquired() const; +#endif // NDEBUG + +#if defined(OS_POSIX) + // The posix implementation of ConditionVariable needs to be able + // to see our lock and tweak our debugging counters, as it releases + // and acquires locks inside of pthread_cond_{timed,}wait. + // Windows doesn't need to do this as it calls the Lock::* methods. + friend class ConditionVariable; +#endif + + private: +#if !defined(NDEBUG) + // Members and routines taking care of locks assertions. + // Note that this checks for recursive locks and allows them + // if the variable is set. This is allowed by the underlying implementation + // on windows but not on Posix, so we're doing unneeded checks on Posix. + // It's worth it to share the code. + void CheckHeldAndUnmark(); + void CheckUnheldAndMark(); + + // All private data is implicitly protected by lock_. + // Be VERY careful to only access members under that lock. + + // Determines validity of owning_thread_id_. Needed as we don't have + // a null owning_thread_id_ value. + bool owned_by_thread_; + base::PlatformThreadId owning_thread_id_; +#endif // NDEBUG + + // Platform specific underlying lock implementation. + internal::LockImpl lock_; + + DISALLOW_COPY_AND_ASSIGN(Lock); +}; + +// A helper class that acquires the given Lock while the AutoLock is in scope. +class AutoLock { + public: + explicit AutoLock(Lock& lock) : lock_(lock) { + lock_.Acquire(); + } + + ~AutoLock() { + lock_.AssertAcquired(); + lock_.Release(); + } + + private: + Lock& lock_; + DISALLOW_COPY_AND_ASSIGN(AutoLock); +}; + +// AutoUnlock is a helper that will Release() the |lock| argument in the +// constructor, and re-Acquire() it in the destructor. +class AutoUnlock { + public: + explicit AutoUnlock(Lock& lock) : lock_(lock) { + // We require our caller to have the lock. + lock_.AssertAcquired(); + lock_.Release(); + } + + ~AutoUnlock() { + lock_.Acquire(); + } + + private: + Lock& lock_; + DISALLOW_COPY_AND_ASSIGN(AutoUnlock); +}; + +} // namespace base + +#endif // BASE_SYNCHRONIZATION_LOCK_H_ diff --git a/base/synchronization/lock_impl.h b/base/synchronization/lock_impl.h new file mode 100644 index 0000000..2994610 --- /dev/null +++ b/base/synchronization/lock_impl.h @@ -0,0 +1,63 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_SYNCHRONIZATION_LOCK_IMPL_H_ +#define BASE_SYNCHRONIZATION_LOCK_IMPL_H_ +#pragma once + +#include "build/build_config.h" + +#if defined(OS_WIN) +#include <windows.h> +#elif defined(OS_POSIX) +#include <pthread.h> +#endif + +#include "base/basictypes.h" + +namespace base { +namespace internal { + +// This class implements the underlying platform-specific spin-lock mechanism +// used for the Lock class. Most users should not use LockImpl directly, but +// should instead use Lock. +class LockImpl { + public: +#if defined(OS_WIN) + typedef CRITICAL_SECTION OSLockType; +#elif defined(OS_POSIX) + typedef pthread_mutex_t OSLockType; +#endif + + LockImpl(); + ~LockImpl(); + + // If the lock is not held, take it and return true. If the lock is already + // held by something else, immediately return false. + bool Try(); + + // Take the lock, blocking until it is available if necessary. + void Lock(); + + // Release the lock. This must only be called by the lock's holder: after + // a successful call to Try, or a call to Lock. + void Unlock(); + + // Return the native underlying lock. Not supported for Windows builds. + // TODO(awalker): refactor lock and condition variables so that this is + // unnecessary. +#if !defined(OS_WIN) + OSLockType* os_lock() { return &os_lock_; } +#endif + + private: + OSLockType os_lock_; + + DISALLOW_COPY_AND_ASSIGN(LockImpl); +}; + +} // namespace internal +} // namespace base + +#endif // BASE_SYNCHRONIZATION_LOCK_IMPL_H_ diff --git a/base/synchronization/lock_impl_posix.cc b/base/synchronization/lock_impl_posix.cc new file mode 100644 index 0000000..f638fcd --- /dev/null +++ b/base/synchronization/lock_impl_posix.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/synchronization/lock_impl.h" + +#include <errno.h> + +#include "base/logging.h" + +namespace base { +namespace internal { + +LockImpl::LockImpl() { +#ifndef NDEBUG + // In debug, setup attributes for lock error checking. + pthread_mutexattr_t mta; + int rv = pthread_mutexattr_init(&mta); + DCHECK_EQ(rv, 0); + rv = pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_ERRORCHECK); + DCHECK_EQ(rv, 0); + rv = pthread_mutex_init(&os_lock_, &mta); + DCHECK_EQ(rv, 0); + rv = pthread_mutexattr_destroy(&mta); + DCHECK_EQ(rv, 0); +#else + // In release, go with the default lock attributes. + pthread_mutex_init(&os_lock_, NULL); +#endif +} + +LockImpl::~LockImpl() { + int rv = pthread_mutex_destroy(&os_lock_); + DCHECK_EQ(rv, 0); +} + +bool LockImpl::Try() { + int rv = pthread_mutex_trylock(&os_lock_); + DCHECK(rv == 0 || rv == EBUSY); + return rv == 0; +} + +void LockImpl::Lock() { + int rv = pthread_mutex_lock(&os_lock_); + DCHECK_EQ(rv, 0); +} + +void LockImpl::Unlock() { + int rv = pthread_mutex_unlock(&os_lock_); + DCHECK_EQ(rv, 0); +} + +} // namespace internal +} // namespace base diff --git a/base/synchronization/lock_impl_win.cc b/base/synchronization/lock_impl_win.cc new file mode 100644 index 0000000..bb8a23d --- /dev/null +++ b/base/synchronization/lock_impl_win.cc @@ -0,0 +1,36 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/synchronization/lock_impl.h" + +namespace base { +namespace internal { + +LockImpl::LockImpl() { + // The second parameter is the spin count, for short-held locks it avoid the + // contending thread from going to sleep which helps performance greatly. + ::InitializeCriticalSectionAndSpinCount(&os_lock_, 2000); +} + +LockImpl::~LockImpl() { + ::DeleteCriticalSection(&os_lock_); +} + +bool LockImpl::Try() { + if (::TryEnterCriticalSection(&os_lock_) != FALSE) { + return true; + } + return false; +} + +void LockImpl::Lock() { + ::EnterCriticalSection(&os_lock_); +} + +void LockImpl::Unlock() { + ::LeaveCriticalSection(&os_lock_); +} + +} // namespace internal +} // namespace base diff --git a/base/synchronization/lock_unittest.cc b/base/synchronization/lock_unittest.cc new file mode 100644 index 0000000..5ac3e6b --- /dev/null +++ b/base/synchronization/lock_unittest.cc @@ -0,0 +1,214 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <stdlib.h> + +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Basic test to make sure that Acquire()/Release()/Try() don't crash ---------- + +class BasicLockTestThread : public PlatformThread::Delegate { + public: + BasicLockTestThread(Lock* lock) : lock_(lock), acquired_(0) {} + + virtual void ThreadMain() { + for (int i = 0; i < 10; i++) { + lock_->Acquire(); + acquired_++; + lock_->Release(); + } + for (int i = 0; i < 10; i++) { + lock_->Acquire(); + acquired_++; + PlatformThread::Sleep(rand() % 20); + lock_->Release(); + } + for (int i = 0; i < 10; i++) { + if (lock_->Try()) { + acquired_++; + PlatformThread::Sleep(rand() % 20); + lock_->Release(); + } + } + } + + int acquired() const { return acquired_; } + + private: + Lock* lock_; + int acquired_; + + DISALLOW_COPY_AND_ASSIGN(BasicLockTestThread); +}; + +TEST(LockTest, Basic) { + Lock lock; + BasicLockTestThread thread(&lock); + PlatformThreadHandle handle = kNullThreadHandle; + + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + + int acquired = 0; + for (int i = 0; i < 5; i++) { + lock.Acquire(); + acquired++; + lock.Release(); + } + for (int i = 0; i < 10; i++) { + lock.Acquire(); + acquired++; + PlatformThread::Sleep(rand() % 20); + lock.Release(); + } + for (int i = 0; i < 10; i++) { + if (lock.Try()) { + acquired++; + PlatformThread::Sleep(rand() % 20); + lock.Release(); + } + } + for (int i = 0; i < 5; i++) { + lock.Acquire(); + acquired++; + PlatformThread::Sleep(rand() % 20); + lock.Release(); + } + + PlatformThread::Join(handle); + + EXPECT_GE(acquired, 20); + EXPECT_GE(thread.acquired(), 20); +} + +// Test that Try() works as expected ------------------------------------------- + +class TryLockTestThread : public PlatformThread::Delegate { + public: + TryLockTestThread(Lock* lock) : lock_(lock), got_lock_(false) {} + + virtual void ThreadMain() { + got_lock_ = lock_->Try(); + if (got_lock_) + lock_->Release(); + } + + bool got_lock() const { return got_lock_; } + + private: + Lock* lock_; + bool got_lock_; + + DISALLOW_COPY_AND_ASSIGN(TryLockTestThread); +}; + +TEST(LockTest, TryLock) { + Lock lock; + + ASSERT_TRUE(lock.Try()); + // We now have the lock.... + + // This thread will not be able to get the lock. + { + TryLockTestThread thread(&lock); + PlatformThreadHandle handle = kNullThreadHandle; + + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + + PlatformThread::Join(handle); + + ASSERT_FALSE(thread.got_lock()); + } + + lock.Release(); + + // This thread will.... + { + TryLockTestThread thread(&lock); + PlatformThreadHandle handle = kNullThreadHandle; + + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + + PlatformThread::Join(handle); + + ASSERT_TRUE(thread.got_lock()); + // But it released it.... + ASSERT_TRUE(lock.Try()); + } + + lock.Release(); +} + +// Tests that locks actually exclude ------------------------------------------- + +class MutexLockTestThread : public PlatformThread::Delegate { + public: + MutexLockTestThread(Lock* lock, int* value) : lock_(lock), value_(value) {} + + // Static helper which can also be called from the main thread. + static void DoStuff(Lock* lock, int* value) { + for (int i = 0; i < 40; i++) { + lock->Acquire(); + int v = *value; + PlatformThread::Sleep(rand() % 10); + *value = v + 1; + lock->Release(); + } + } + + virtual void ThreadMain() { + DoStuff(lock_, value_); + } + + private: + Lock* lock_; + int* value_; + + DISALLOW_COPY_AND_ASSIGN(MutexLockTestThread); +}; + +TEST(LockTest, MutexTwoThreads) { + Lock lock; + int value = 0; + + MutexLockTestThread thread(&lock, &value); + PlatformThreadHandle handle = kNullThreadHandle; + + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + + MutexLockTestThread::DoStuff(&lock, &value); + + PlatformThread::Join(handle); + + EXPECT_EQ(2 * 40, value); +} + +TEST(LockTest, MutexFourThreads) { + Lock lock; + int value = 0; + + MutexLockTestThread thread1(&lock, &value); + MutexLockTestThread thread2(&lock, &value); + MutexLockTestThread thread3(&lock, &value); + PlatformThreadHandle handle1 = kNullThreadHandle; + PlatformThreadHandle handle2 = kNullThreadHandle; + PlatformThreadHandle handle3 = kNullThreadHandle; + + ASSERT_TRUE(PlatformThread::Create(0, &thread1, &handle1)); + ASSERT_TRUE(PlatformThread::Create(0, &thread2, &handle2)); + ASSERT_TRUE(PlatformThread::Create(0, &thread3, &handle3)); + + MutexLockTestThread::DoStuff(&lock, &value); + + PlatformThread::Join(handle1); + PlatformThread::Join(handle2); + PlatformThread::Join(handle3); + + EXPECT_EQ(4 * 40, value); +} + +} // namespace base |