summaryrefslogtreecommitdiffstats
path: root/sync/internal_api/public/attachments/task_queue.h
blob: e8ff1a0487cab57c478442712eda703dc8e9cc5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_

#include <stddef.h>
#include <deque>
#include <set>
#include <utility>

#include "base/bind.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/threading/non_thread_safe.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "net/base/backoff_entry.h"

namespace syncer {

// A queue that dispatches tasks, ignores duplicates, and provides backoff
// semantics.
//
// |T| is the task type.
//
// For each task added to the queue, the HandleTaskCallback will eventually be
// invoked.  For each invocation, the user of TaskQueue must call exactly one of
// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
//
// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
//
// Example usage:
//
// void Handle(const Foo& foo);
// ...
// TaskQueue<Foo> queue(base::Bind(&Handle),
//                      base::TimeDelta::FromSeconds(1),
//                      base::TimeDelta::FromMinutes(1));
// ...
// {
//   Foo foo;
//   // Add foo to the queue.  At some point, Handle will be invoked in this
//   // message loop.
//   queue.AddToQueue(foo);
// }
// ...
// void Handle(const Foo& foo) {
//   DoSomethingWith(foo);
//   // We must call one of the three methods to tell the queue how we're
//   // dealing with foo.  Of course, we are free to call in the the context of
//   // this HandleTaskCallback or outside the context if we so choose.
//   if (SuccessfullyHandled(foo)) {
//     queue.MarkAsSucceeded(foo);
//   } else if (Failed(foo)) {
//     queue.MarkAsFailed(foo);
//     if (ShouldRetry(foo)) {
//       queue.AddToQueue(foo);
//     }
//   } else {
//     Cancel(foo);
//   }
// }
//
template <typename T>
class TaskQueue : base::NonThreadSafe {
 public:
  // A callback provided by users of the TaskQueue to handle tasks.
  //
  // This callback is invoked by the queue with a task to be handled.  The
  // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
  // or |Cancel| to signify completion of the task.
  typedef base::Callback<void(const T&)> HandleTaskCallback;

  // Construct a TaskQueue.
  //
  // |callback| the callback to be invoked for handling tasks.
  //
  // |initial_backoff_delay| the initial amount of time the queue will wait
  // before dispatching tasks after a failed task (see |MarkAsFailed|).  May be
  // zero.  Subsequent failures will increase the delay up to
  // |max_backoff_delay|.
  //
  // |max_backoff_delay| the maximum amount of time the queue will wait before
  // dispatching tasks.  May be zero.  Must be greater than or equal to
  // |initial_backoff_delay|.
  TaskQueue(const HandleTaskCallback& callback,
            const base::TimeDelta& initial_backoff_delay,
            const base::TimeDelta& max_backoff_delay);

  // Add |task| to the end of the queue.
  //
  // If |task| is already present (as determined by operator==) it is not added.
  void AddToQueue(const T& task);

  // Mark |task| as completing successfully.
  //
  // Marking a task as completing successfully will reduce or eliminate any
  // backoff delay in effect.
  //
  // May only be called after the HandleTaskCallback has been invoked with
  // |task|.
  void MarkAsSucceeded(const T& task);

  // Mark |task| as failed.
  //
  // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
  // of subsequent tasks.  Repeated failures will increase the delay.
  //
  // May only be called after the HandleTaskCallback has been invoked with
  // |task|.
  void MarkAsFailed(const T& task);

  // Cancel |task|.
  //
  // |task| is removed from the queue and will not be retried.  Does not affect
  // the backoff delay.
  //
  // May only be called after the HandleTaskCallback has been invoked with
  // |task|.
  void Cancel(const T& task);

  // Reset any backoff delay and resume dispatching of tasks.
  //
  // Useful for when you know the cause of previous failures has been resolved
  // and you want don't want to wait for the accumulated backoff delay to
  // elapse.
  void ResetBackoff();

  // Use |timer| for scheduled events.
  //
  // Used in tests.  See also MockTimer.
  void SetTimerForTest(scoped_ptr<base::Timer> timer);

 private:
  void FinishTask(const T& task);
  void ScheduleDispatch();
  void Dispatch();
  // Return true if we should dispatch tasks.
  bool ShouldDispatch();

  const HandleTaskCallback process_callback_;
  net::BackoffEntry::Policy backoff_policy_;
  scoped_ptr<net::BackoffEntry> backoff_entry_;
  // The number of tasks currently being handled.
  int num_in_progress_;
  std::deque<T> queue_;
  // The set of tasks in queue_ or currently being handled.
  std::set<T> tasks_;
  base::Closure dispatch_closure_;
  scoped_ptr<base::Timer> backoff_timer_;
  base::TimeDelta delay_;

  // Must be last data member.
  base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;

  DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};

// The maximum number of tasks that may be concurrently executed.  Think
// carefully before changing this value.  The desired behavior of backoff may
// not be obvious when there is more than one concurrent task
const int kMaxConcurrentTasks = 1;

template <typename T>
TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
                        const base::TimeDelta& initial_backoff_delay,
                        const base::TimeDelta& max_backoff_delay)
    : process_callback_(callback),
      backoff_policy_({}),
      num_in_progress_(0),
      weak_ptr_factory_(this) {
  DCHECK_LE(initial_backoff_delay.InMicroseconds(),
            max_backoff_delay.InMicroseconds());
  backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
  backoff_policy_.multiply_factor = 2.0;
  backoff_policy_.jitter_factor = 0.1;
  backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
  backoff_policy_.entry_lifetime_ms = -1;
  backoff_policy_.always_use_initial_delay = false;
  backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
  dispatch_closure_ =
      base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
  backoff_timer_.reset(new base::Timer(false, false));
}

template <typename T>
void TaskQueue<T>::AddToQueue(const T& task) {
  DCHECK(CalledOnValidThread());
  // Ignore duplicates.
  if (tasks_.find(task) == tasks_.end()) {
    queue_.push_back(task);
    tasks_.insert(task);
  }
  ScheduleDispatch();
}

template <typename T>
void TaskQueue<T>::MarkAsSucceeded(const T& task) {
  DCHECK(CalledOnValidThread());
  FinishTask(task);
  // The task succeeded.  Stop any pending timer, reset (clear) the backoff, and
  // reschedule a dispatch.
  backoff_timer_->Stop();
  backoff_entry_->Reset();
  ScheduleDispatch();
}

template <typename T>
void TaskQueue<T>::MarkAsFailed(const T& task) {
  DCHECK(CalledOnValidThread());
  FinishTask(task);
  backoff_entry_->InformOfRequest(false);
  ScheduleDispatch();
}

template <typename T>
void TaskQueue<T>::Cancel(const T& task) {
  DCHECK(CalledOnValidThread());
  FinishTask(task);
  ScheduleDispatch();
}

template <typename T>
void TaskQueue<T>::ResetBackoff() {
  backoff_timer_->Stop();
  backoff_entry_->Reset();
  ScheduleDispatch();
}

template <typename T>
void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
  DCHECK(CalledOnValidThread());
  DCHECK(timer.get());
  backoff_timer_ = std::move(timer);
}

template <typename T>
void TaskQueue<T>::FinishTask(const T& task) {
  DCHECK(CalledOnValidThread());
  DCHECK_GE(num_in_progress_, 1);
  --num_in_progress_;
  const size_t num_erased = tasks_.erase(task);
  DCHECK_EQ(1U, num_erased);
}

template <typename T>
void TaskQueue<T>::ScheduleDispatch() {
  DCHECK(CalledOnValidThread());
  if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
    return;
  }

  backoff_timer_->Start(
      FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
}

template <typename T>
void TaskQueue<T>::Dispatch() {
  DCHECK(CalledOnValidThread());
  if (!ShouldDispatch()) {
    return;
  }

  DCHECK(!queue_.empty());
  const T& task = queue_.front();
  ++num_in_progress_;
  DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
  base::MessageLoop::current()->PostTask(FROM_HERE,
                                         base::Bind(process_callback_, task));
  queue_.pop_front();
}

template <typename T>
bool TaskQueue<T>::ShouldDispatch() {
  return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
}

}  // namespace syncer

#endif  // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_