summaryrefslogtreecommitdiffstats
path: root/chrome/browser/chromeos/file_system_provider/queue.cc
blob: dca78ea4d9e7fd0b57e0d86847d261237e09aa28 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "chrome/browser/chromeos/file_system_provider/queue.h"

namespace chromeos {
namespace file_system_provider {

Queue::Task::Task() : token(0) {
}

Queue::Task::Task(size_t token, const AbortableCallback& callback)
    : token(token), callback(callback) {
}

Queue::Task::~Task() {
}

Queue::Queue(size_t max_in_parallel)
    : max_in_parallel_(max_in_parallel),
      next_token_(1),
      weak_ptr_factory_(this) {
  CHECK_LT(0u, max_in_parallel);
}

Queue::~Queue() {
}

size_t Queue::NewToken() {
  return next_token_++;
}

void Queue::Enqueue(size_t token, const AbortableCallback& callback) {
#if !NDEBUG
  CHECK(executed_.find(token) == executed_.end());
  for (auto& task : pending_) {
    CHECK(token != task.token);
  }
#endif
  pending_.push_back(Task(token, callback));
  base::ThreadTaskRunnerHandle::Get()->PostTask(
      FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
}

void Queue::Complete(size_t token) {
  const auto it = executed_.find(token);
  DCHECK(it != executed_.end());
  executed_.erase(it);
  base::ThreadTaskRunnerHandle::Get()->PostTask(
      FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
}

void Queue::MaybeRun() {
  if (executed_.size() == max_in_parallel_ || !pending_.size())
    return;

  CHECK_GT(max_in_parallel_, executed_.size());
  Task task = pending_.front();
  pending_.pop_front();

  executed_[task.token] = task;
  AbortCallback abort_callback = task.callback.Run();

  // It may happen that the task is completed and removed synchronously. Hence,
  // we need to check if the task is still in the executed collection.
  const auto executed_task_it = executed_.find(task.token);
  if (executed_task_it != executed_.end())
    executed_task_it->second.abort_callback = abort_callback;
}

void Queue::Abort(size_t token) {
  // Check if it's running. If so, then abort and expect a Complete() call soon.
  const auto it = executed_.find(token);
  if (it != executed_.end()) {
    Task& task = it->second;
    AbortCallback abort_callback = task.abort_callback;
    task.abort_callback = AbortCallback();
    DCHECK(!abort_callback.is_null());
    abort_callback.Run();
    return;
  }

  // Aborting not running tasks is linear. TODO(mtomasz): Optimize if feasible.
  for (auto it = pending_.begin(); it != pending_.end(); ++it) {
    if (token == it->token) {
      pending_.erase(it);
      base::ThreadTaskRunnerHandle::Get()->PostTask(
          FROM_HERE,
          base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
      return;
    }
  }

  // The task is already removed, marked as completed or aborted.
  NOTREACHED();
}

}  // namespace file_system_provider
}  // namespace chromeos