// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/threading/sequenced_worker_pool.h"

#include <algorithm>

#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop.h"
#include "base/message_loop_proxy.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/test/sequenced_worker_pool_owner.h"
#include "base/test/sequenced_task_runner_test_template.h"
#include "base/test/task_runner_test_template.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/time.h"
#include "base/tracked_objects.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace base {

// IMPORTANT NOTE:
//
// Many of these tests have failure modes where they'll hang forever. These
// tests should not be flaky, and hangling indicates a type of failure. Do not
// mark as flaky if they're hanging, it's likely an actual bug.

namespace {

const size_t kNumWorkerThreads = 3;

// Allows a number of threads to all be blocked on the same event, and
// provides a way to unblock a certain number of them.
class ThreadBlocker {
 public:
  ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}

  void Block() {
    {
      base::AutoLock lock(lock_);
      while (unblock_counter_ == 0)
        cond_var_.Wait();
      unblock_counter_--;
    }
    cond_var_.Signal();
  }

  void Unblock(size_t count) {
    {
      base::AutoLock lock(lock_);
      DCHECK(unblock_counter_ == 0);
      unblock_counter_ = count;
    }
    cond_var_.Signal();
  }

 private:
  base::Lock lock_;
  base::ConditionVariable cond_var_;

  size_t unblock_counter_;
};

class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
 public:
  TestTracker()
      : lock_(),
        cond_var_(&lock_),
        started_events_(0) {
  }

  // Each of these tasks appends the argument to the complete sequence vector
  // so calling code can see what order they finished in.
  void FastTask(int id) {
    SignalWorkerDone(id);
  }

  void SlowTask(int id) {
    base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
    SignalWorkerDone(id);
  }

  void BlockTask(int id, ThreadBlocker* blocker) {
    // Note that this task has started and signal anybody waiting for that
    // to happen.
    {
      base::AutoLock lock(lock_);
      started_events_++;
    }
    cond_var_.Signal();

    blocker->Block();
    SignalWorkerDone(id);
  }

  void PostAdditionalTasks(int id, SequencedWorkerPool* pool) {
    Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
    EXPECT_FALSE(
        pool->PostWorkerTaskWithShutdownBehavior(
            FROM_HERE, fast_task,
            SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    EXPECT_FALSE(
        pool->PostWorkerTaskWithShutdownBehavior(
            FROM_HERE, fast_task,
            SequencedWorkerPool::SKIP_ON_SHUTDOWN));
    pool->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE, fast_task,
        SequencedWorkerPool::BLOCK_SHUTDOWN);
    SignalWorkerDone(id);
  }

  // Waits until the given number of tasks have started executing.
  void WaitUntilTasksBlocked(size_t count) {
    {
      base::AutoLock lock(lock_);
      while (started_events_ < count)
        cond_var_.Wait();
    }
    cond_var_.Signal();
  }

  // Blocks the current thread until at least the given number of tasks are in
  // the completed vector, and then returns a copy.
  std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
    std::vector<int> ret;
    {
      base::AutoLock lock(lock_);
      while (complete_sequence_.size() < num_tasks)
        cond_var_.Wait();
      ret = complete_sequence_;
    }
    cond_var_.Signal();
    return ret;
  }

  void ClearCompleteSequence() {
    base::AutoLock lock(lock_);
    complete_sequence_.clear();
    started_events_ = 0;
  }

 private:
  friend class base::RefCountedThreadSafe<TestTracker>;
  ~TestTracker() {}

  void SignalWorkerDone(int id) {
    {
      base::AutoLock lock(lock_);
      complete_sequence_.push_back(id);
    }
    cond_var_.Signal();
  }

  // Protects the complete_sequence.
  base::Lock lock_;

  base::ConditionVariable cond_var_;

  // Protected by lock_.
  std::vector<int> complete_sequence_;

  // Counter of the number of "block" workers that have started.
  size_t started_events_;
};

class SequencedWorkerPoolTest : public testing::Test {
 public:
  SequencedWorkerPoolTest()
      : tracker_(new TestTracker) {
    ResetPool();
  }

  virtual ~SequencedWorkerPoolTest() {}

  virtual void SetUp() OVERRIDE {}

  virtual void TearDown() OVERRIDE {
    pool()->Shutdown();
  }

  const scoped_refptr<SequencedWorkerPool>& pool() {
    return pool_owner_->pool();
  }
  TestTracker* tracker() { return tracker_.get(); }

  // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
  // down, and creates a new instance.
  void ResetPool() {
    pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
  }

  void SetWillWaitForShutdownCallback(const Closure& callback) {
    pool_owner_->SetWillWaitForShutdownCallback(callback);
  }

  // Ensures that the given number of worker threads is created by adding
  // tasks and waiting until they complete. Worker thread creation is
  // serialized, can happen on background threads asynchronously, and doesn't
  // happen any more at shutdown. This means that if a test posts a bunch of
  // tasks and calls shutdown, fewer workers will be created than the test may
  // expect.
  //
  // This function ensures that this condition can't happen so tests can make
  // assumptions about the number of workers active. See the comment in
  // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
  // details.
  //
  // It will post tasks to the queue with id -1. It also assumes this is the
  // first thing called in a test since it will clear the complete_sequence_.
  void EnsureAllWorkersCreated() {
    // Create a bunch of threads, all waiting. This will cause that may
    // workers to be created.
    ThreadBlocker blocker;
    for (size_t i = 0; i < kNumWorkerThreads; i++) {
      pool()->PostWorkerTask(FROM_HERE,
                             base::Bind(&TestTracker::BlockTask,
                                        tracker(), -1, &blocker));
    }
    tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

    // Now wake them up and wait until they're done.
    blocker.Unblock(kNumWorkerThreads);
    tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

    // Clean up the task IDs we added.
    tracker()->ClearCompleteSequence();
  }

  int has_work_call_count() const {
    return pool_owner_->has_work_call_count();
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  const scoped_refptr<TestTracker> tracker_;
};

// Checks that the given number of entries are in the tasks to complete of
// the given tracker, and then signals the given event the given number of
// times. This is used to wakt up blocked background threads before blocking
// on shutdown.
void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
                                          size_t expected_tasks_to_complete,
                                          ThreadBlocker* blocker,
                                          size_t threads_to_awake) {
  EXPECT_EQ(
      expected_tasks_to_complete,
      tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());

  blocker->Unblock(threads_to_awake);
}

class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
 public:
  explicit DeletionHelper(
      const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
      : deleted_flag_(deleted_flag) {
  }

 private:
  friend class base::RefCountedThreadSafe<DeletionHelper>;
  virtual ~DeletionHelper() { deleted_flag_->data = true; }

  const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
  DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
};

void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
                       const scoped_refptr<DeletionHelper>& helper) {
  ADD_FAILURE() << "Should never run";
}

// Tests that delayed tasks are deleted upon shutdown of the pool.
TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
  // Post something to verify the pool is started up.
  EXPECT_TRUE(pool()->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));

  scoped_refptr<base::RefCountedData<bool> > deleted_flag(
      new base::RefCountedData<bool>(false));

  base::Time posted_at(base::Time::Now());
  // Post something that shouldn't run.
  EXPECT_TRUE(pool()->PostDelayedTask(
      FROM_HERE,
      base::Bind(&HoldPoolReference,
                 pool(),
                 make_scoped_refptr(new DeletionHelper(deleted_flag))),
      TestTimeouts::action_timeout()));

  std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
  ASSERT_EQ(1u, completion_sequence.size());
  ASSERT_EQ(1, completion_sequence[0]);

  pool()->Shutdown();
  // Shutdown is asynchronous, so use ResetPool() to block until the pool is
  // fully destroyed (and thus shut down).
  ResetPool();

  // Verify that we didn't block until the task was due.
  ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());

  // Verify that the deferred task has not only not run, but has also been
  // destroyed.
  ASSERT_TRUE(deleted_flag->data);
}

// Tests that same-named tokens have the same ID.
TEST_F(SequencedWorkerPoolTest, NamedTokens) {
  const std::string name1("hello");
  SequencedWorkerPool::SequenceToken token1 =
      pool()->GetNamedSequenceToken(name1);

  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();

  const std::string name3("goodbye");
  SequencedWorkerPool::SequenceToken token3 =
      pool()->GetNamedSequenceToken(name3);

  // All 3 tokens should be different.
  EXPECT_FALSE(token1.Equals(token2));
  EXPECT_FALSE(token1.Equals(token3));
  EXPECT_FALSE(token2.Equals(token3));

  // Requesting the same name again should give the same value.
  SequencedWorkerPool::SequenceToken token1again =
      pool()->GetNamedSequenceToken(name1);
  EXPECT_TRUE(token1.Equals(token1again));

  SequencedWorkerPool::SequenceToken token3again =
      pool()->GetNamedSequenceToken(name3);
  EXPECT_TRUE(token3.Equals(token3again));
}

// Tests that posting a bunch of tasks (many more than the number of worker
// threads) runs them all.
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
  pool()->PostWorkerTask(FROM_HERE,
                         base::Bind(&TestTracker::SlowTask, tracker(), 0));

  const size_t kNumTasks = 20;
  for (size_t i = 1; i < kNumTasks; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::FastTask, tracker(), i));
  }

  std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
  EXPECT_EQ(kNumTasks, result.size());
}

// Tests that posting a bunch of tasks (many more than the number of
// worker threads) to two pools simultaneously runs them all twice.
// This test is meant to shake out any concurrency issues between
// pools (like histograms).
TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
  SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
  SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");

  base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
  pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
  pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);

  const size_t kNumTasks = 20;
  for (size_t i = 1; i < kNumTasks; i++) {
    base::Closure fast_task =
        base::Bind(&TestTracker::FastTask, tracker(), i);
    pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
    pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
  }

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(2*kNumTasks);
  EXPECT_EQ(2 * kNumTasks, result.size());

  pool2.pool()->Shutdown();
  pool1.pool()->Shutdown();
}

// Test that tasks with the same sequence token are executed in order but don't
// affect other tasks.
TEST_F(SequencedWorkerPoolTest, Sequence) {
  // Fill all the worker threads except one.
  const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
  ThreadBlocker background_blocker;
  for (size_t i = 0; i < kNumBackgroundTasks; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &background_blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);

  // Create two tasks with the same sequence token, one that will block on the
  // event, and one which will just complete quickly when it's run. Since there
  // is one worker thread free, the first task will start and then block, and
  // the second task should be waiting.
  ThreadBlocker blocker;
  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101));
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Create another two tasks as above with a different token. These will be
  // blocked since there are no slots to run.
  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 200));
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 201));
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Let one background task complete. This should then let both tasks of
  // token2 run to completion in order. The second task of token1 should still
  // be blocked.
  background_blocker.Unblock(1);
  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
  ASSERT_EQ(3u, result.size());
  EXPECT_EQ(200, result[1]);
  EXPECT_EQ(201, result[2]);

  // Finish the rest of the background tasks. This should leave some workers
  // free with the second token1 task still blocked on the first.
  background_blocker.Unblock(kNumBackgroundTasks - 1);
  EXPECT_EQ(kNumBackgroundTasks + 2,
            tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());

  // Allow the first task of token1 to complete. This should run the second.
  blocker.Unblock(1);
  result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
  ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
  EXPECT_EQ(100, result[result.size() - 2]);
  EXPECT_EQ(101, result[result.size() - 1]);
}

// Tests that any tasks posted after Shutdown are ignored.
// Disabled for flakiness.  See http://crbug.com/166451.
TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));

  // Shutdown the worker pool. This should discard all non-blocking tasks.
  const int kMaxNewBlockingTasksAfterShutdown = 100;
  pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);

  int old_has_work_call_count = has_work_call_count();

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

  // The kNumWorkerThread items should have completed, in no particular order.
  ASSERT_EQ(kNumWorkerThreads, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }

  // No further tasks, regardless of shutdown mode, should be allowed.
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 100),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN));
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 102),
      SequencedWorkerPool::BLOCK_SHUTDOWN));

  ASSERT_EQ(old_has_work_call_count, has_work_call_count());
}

TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
  // Test that <n> new blocking tasks are allowed provided they're posted
  // by a running tasks.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;

  // Start tasks to take all the threads and block them.
  const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
  for (int i = 0; i < kNumBlockTasks; ++i) {
    EXPECT_TRUE(pool()->PostWorkerTask(
        FROM_HERE,
        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Queue up shutdown blocking tasks behind those which will attempt to post
  // additional tasks when run, PostAdditionalTasks attemtps to post 3
  // new FastTasks, one for each shutdown_behavior.
  const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
  for (int i = 0; i < kNumQueuedTasks; ++i) {
    EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE,
        base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool()),
        SequencedWorkerPool::BLOCK_SHUTDOWN));
  }

  // Setup to open the floodgates from within Shutdown().
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()),
                 0, &blocker, kNumBlockTasks));

  // Allow half of the additional blocking tasks thru.
  const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
  pool()->Shutdown(kNumNewBlockingTasksToAllow);

  // Ensure that the correct number of tasks actually got run.
  tracker()->WaitUntilTasksComplete(static_cast<size_t>(
      kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));

  // Clean up the task IDs we added and go home.
  tracker()->ClearCompleteSequence();
}

// Tests that unrun tasks are discarded properly according to their shutdown
// mode.
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTask(FROM_HERE,
                           base::Bind(&TestTracker::BlockTask,
                                      tracker(), i, &blocker));
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Create some tasks with different shutdown modes.
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 100),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 101),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN);
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::FastTask, tracker(), 102),
      SequencedWorkerPool::BLOCK_SHUTDOWN);

  // Shutdown the worker pool. This should discard all non-blocking tasks.
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));
  pool()->Shutdown();

  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);

  // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
  // one, in no particular order.
  ASSERT_EQ(kNumWorkerThreads + 1, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }
  EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
}

// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
  scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  scoped_refptr<SequencedTaskRunner> sequenced_runner(
      pool()->GetSequencedTaskRunnerWithShutdownBehavior(
          pool()->GetSequenceToken(),
          SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 0, &blocker),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
  runner->PostTask(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 1, &blocker));
  sequenced_runner->PostTask(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 2, &blocker));

  tracker()->WaitUntilTasksBlocked(3);

  // This should not block. If this test hangs, it means it failed.
  pool()->Shutdown();

  // The task should not have completed yet.
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // Posting more tasks should fail.
  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
  EXPECT_FALSE(runner->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
  EXPECT_FALSE(sequenced_runner->PostTask(
      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));

  // Continue the background thread and make sure the tasks can complete.
  blocker.Unblock(3);
  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
  EXPECT_EQ(3u, result.size());
}

// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
// until they stop, but tasks not yet started do not.
TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
  // Start tasks to take all the threads and block them.
  EnsureAllWorkersCreated();
  ThreadBlocker blocker;

  // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
  // return until these tasks have completed.
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    pool()->PostWorkerTaskWithShutdownBehavior(
        FROM_HERE,
        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
        SequencedWorkerPool::SKIP_ON_SHUTDOWN);
  }
  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);

  // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
  // executed once Shutdown() has been called.
  pool()->PostWorkerTaskWithShutdownBehavior(
      FROM_HERE,
      base::Bind(&TestTracker::BlockTask,
                 tracker(), 0, &blocker),
      SequencedWorkerPool::SKIP_ON_SHUTDOWN);

  // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
  // been started block shutdown.
  SetWillWaitForShutdownCallback(
      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
                 scoped_refptr<TestTracker>(tracker()), 0,
                 &blocker, kNumWorkerThreads));

  // No tasks should have completed yet.
  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());

  // This should not block. If this test hangs, it means it failed.
  pool()->Shutdown();

  // Shutdown should not return until all of the tasks have completed.
  std::vector<int> result =
      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);

  // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
  // allowed to complete. No additional non-blocking tasks should have been
  // started.
  ASSERT_EQ(kNumWorkerThreads, result.size());
  for (size_t i = 0; i < kNumWorkerThreads; i++) {
    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
                result.end());
  }
}

// Ensure all worker threads are created, and then trigger a spurious
// work signal. This shouldn't cause any other work signals to be
// triggered. This is a regression test for http://crbug.com/117469.
TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
  EnsureAllWorkersCreated();
  int old_has_work_call_count = has_work_call_count();
  pool()->SignalHasWorkForTesting();
  // This is inherently racy, but can only produce false positives.
  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
  EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
}

void IsRunningOnCurrentThreadTask(
    SequencedWorkerPool::SequenceToken test_positive_token,
    SequencedWorkerPool::SequenceToken test_negative_token,
    SequencedWorkerPool* pool,
    SequencedWorkerPool* unused_pool) {
  EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
  EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
  EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
}

// Verify correctness of the IsRunningSequenceOnCurrentThread method.
TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
  SequencedWorkerPool::SequenceToken unsequenced_token;

  scoped_refptr<SequencedWorkerPool> unused_pool =
      new SequencedWorkerPool(2, "unused_pool");
  EXPECT_TRUE(token1.Equals(unused_pool->GetSequenceToken()));
  EXPECT_TRUE(token2.Equals(unused_pool->GetSequenceToken()));

  EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
  EXPECT_FALSE(
      unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));

  pool()->PostSequencedWorkerTask(
      token1, FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 token1, token2, pool(), unused_pool));
  pool()->PostSequencedWorkerTask(
      token2, FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 token2, unsequenced_token, pool(), unused_pool));
  pool()->PostWorkerTask(
      FROM_HERE,
      base::Bind(&IsRunningOnCurrentThreadTask,
                 unsequenced_token, token1, pool(), unused_pool));
  pool()->Shutdown();
  unused_pool->Shutdown();
}

class SequencedWorkerPoolTaskRunnerTestDelegate {
 public:
  SequencedWorkerPoolTaskRunnerTestDelegate() {}

  ~SequencedWorkerPoolTaskRunnerTestDelegate() {}

  void StartTaskRunner() {
    pool_owner_.reset(
        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
  }

  scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
    return pool_owner_->pool();
  }

  void StopTaskRunner() {
    // Make sure all tasks (including delayed ones) are run before shutting
    // down.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

  bool TaskRunnerHandlesNonZeroDelays() const {
    return true;
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPool, TaskRunnerTest,
    SequencedWorkerPoolTaskRunnerTestDelegate);

class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
 public:
  SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}

  ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
  }

  void StartTaskRunner() {
    pool_owner_.reset(
        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
    task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
        SequencedWorkerPool::BLOCK_SHUTDOWN);
  }

  scoped_refptr<TaskRunner> GetTaskRunner() {
    return task_runner_;
  }

  void StopTaskRunner() {
    // Make sure all tasks (including delayed ones) are run before shutting
    // down.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

  bool TaskRunnerHandlesNonZeroDelays() const {
    return true;
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  scoped_refptr<TaskRunner> task_runner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolTaskRunner, TaskRunnerTest,
    SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);

class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
 public:
  SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}

  ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
  }

  void StartTaskRunner() {
    pool_owner_.reset(new SequencedWorkerPoolOwner(
        10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
    task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
        pool_owner_->pool()->GetSequenceToken());
  }

  scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
    return task_runner_;
  }

  void StopTaskRunner() {
    // Make sure all tasks (including delayed ones) are run before shutting
    // down.
    pool_owner_->pool()->FlushForTesting();
    pool_owner_->pool()->Shutdown();
    // Don't reset |pool_owner_| here, as the test may still hold a
    // reference to the pool.
  }

  bool TaskRunnerHandlesNonZeroDelays() const {
    return true;
  }

 private:
  MessageLoop message_loop_;
  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
  scoped_refptr<SequencedTaskRunner> task_runner_;
};

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);

INSTANTIATE_TYPED_TEST_CASE_P(
    SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);

}  // namespace

}  // namespace base