1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/engine/worker_entity_tracker.h"
#include <stdint.h>
#include "base/logging.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/syncable/syncable_util.h"
#include "sync/util/time.h"
namespace syncer_v2 {
scoped_ptr<WorkerEntityTracker> WorkerEntityTracker::FromUpdateResponse(
const UpdateResponseData& data) {
return make_scoped_ptr(new WorkerEntityTracker(
data.entity->id, data.entity->client_tag_hash, 0, data.response_version));
}
scoped_ptr<WorkerEntityTracker> WorkerEntityTracker::FromCommitRequest(
const CommitRequestData& data) {
return make_scoped_ptr(new WorkerEntityTracker(
data.entity->id, data.entity->client_tag_hash, 0, 0));
}
WorkerEntityTracker::WorkerEntityTracker(
const std::string& id,
const std::string& client_tag_hash,
int64_t highest_commit_response_version,
int64_t highest_gu_response_version)
: id_(id),
client_tag_hash_(client_tag_hash),
highest_commit_response_version_(highest_commit_response_version),
highest_gu_response_version_(highest_gu_response_version),
sequence_number_(0),
base_version_(kUncommittedVersion) {}
WorkerEntityTracker::~WorkerEntityTracker() {}
bool WorkerEntityTracker::HasPendingCommit() const {
return !!pending_commit_;
}
void WorkerEntityTracker::PrepareCommitProto(sync_pb::SyncEntity* commit_entity,
int64_t* sequence_number) const {
DCHECK(HasPendingCommit());
DCHECK(!client_tag_hash_.empty());
if (!id_.empty()) {
commit_entity->set_id_string(id_);
}
const EntityData& entity = pending_commit_->entity.value();
DCHECK_EQ(client_tag_hash_, entity.client_tag_hash);
commit_entity->set_client_defined_unique_tag(client_tag_hash_);
commit_entity->set_version(base_version_);
commit_entity->set_deleted(entity.is_deleted());
// TODO(stanisc): This doesn't support bookmarks yet.
DCHECK(entity.parent_id.empty());
commit_entity->set_folder(false);
commit_entity->set_name(entity.non_unique_name);
if (!entity.is_deleted()) {
commit_entity->set_ctime(syncer::TimeToProtoTime(entity.creation_time));
commit_entity->set_mtime(syncer::TimeToProtoTime(entity.modification_time));
commit_entity->mutable_specifics()->CopyFrom(entity.specifics);
}
*sequence_number = sequence_number_;
}
void WorkerEntityTracker::RequestCommit(const CommitRequestData& data) {
DCHECK_GE(data.base_version, base_version_)
<< "Base version should never decrease";
DCHECK_GE(data.sequence_number, sequence_number_)
<< "Sequence number should never decrease";
// Update our book-keeping counters.
base_version_ = data.base_version;
sequence_number_ = data.sequence_number;
// Don't commit deletions of server-unknown items.
if (data.entity->is_deleted() && !IsServerKnown()) {
ClearPendingCommit();
return;
}
// We intentionally don't update the id_ here. Good ID values come from the
// server and always pass through the sync thread first. There's no way the
// model thread could have a better ID value than we do.
// This entity is identified by its client tag. That value can never change.
DCHECK_EQ(client_tag_hash_, data.entity->client_tag_hash);
// TODO(stanisc): consider simply copying CommitRequestData instead of
// allocating one dynamically.
pending_commit_.reset(new CommitRequestData(data));
// Do our counter values indicate a conflict? If so, don't commit.
//
// There's no need to inform the model thread of the conflict. The
// conflicting update has already been posted to its task runner; it will
// figure it out as soon as it runs that task.
//
// Note that this check must be after pending_commit_ is set.
if (IsInConflict()) {
ClearPendingCommit();
return;
}
// Otherwise, keep the data associated with this pending commit
// so it can be committed at the next possible opportunity.
}
void WorkerEntityTracker::ReceiveCommitResponse(const std::string& response_id,
int64_t response_version,
int64_t sequence_number) {
// Commit responses, especially after the first commit, can update our ID.
id_ = response_id;
DCHECK_GT(response_version, highest_commit_response_version_)
<< "Had expected higher response version."
<< " id: " << id_;
// Commits are synchronous, so there's no reason why the sequence numbers
// wouldn't match.
DCHECK_EQ(sequence_number_, sequence_number)
<< "Unexpected sequence number mismatch."
<< " id: " << id_;
highest_commit_response_version_ = response_version;
// Because an in-progress commit blocks the sync thread, we can assume that
// the item we just committed successfully is exactly the one we have now.
// Nothing changed it while the commit was happening. Since we're now in
// sync with the server, we can clear the pending commit.
ClearPendingCommit();
}
void WorkerEntityTracker::ReceiveUpdate(int64_t version) {
if (version <= highest_gu_response_version_)
return;
highest_gu_response_version_ = version;
// Got an applicable update newer than any pending updates. It must be safe
// to discard the old pending update, if there was one.
ClearPendingUpdate();
if (IsInConflict()) {
// Incoming update clobbers the pending commit on the sync thread.
// The model thread can re-request this commit later if it wants to.
ClearPendingCommit();
}
}
bool WorkerEntityTracker::ReceivePendingUpdate(const UpdateResponseData& data) {
if (data.response_version < highest_gu_response_version_)
return false;
highest_gu_response_version_ = data.response_version;
pending_update_.reset(new UpdateResponseData(data));
ClearPendingCommit();
return true;
}
bool WorkerEntityTracker::HasPendingUpdate() const {
return !!pending_update_;
}
UpdateResponseData WorkerEntityTracker::GetPendingUpdate() const {
return *pending_update_;
}
void WorkerEntityTracker::ClearPendingUpdate() {
pending_update_.reset();
}
bool WorkerEntityTracker::IsInConflict() const {
if (!HasPendingCommit())
return false;
if (HasPendingUpdate())
return true;
if (highest_gu_response_version_ <= highest_commit_response_version_) {
// The most recent server state was created in a commit made by this
// client. We're fully up to date, and therefore not in conflict.
return false;
} else {
// The most recent server state was written by someone else.
// Did the model thread have the most up to date version when it issued the
// commit request?
if (base_version_ >= highest_gu_response_version_) {
return false; // Yes.
} else {
return true; // No.
}
}
}
bool WorkerEntityTracker::IsServerKnown() const {
return base_version_ != kUncommittedVersion;
}
void WorkerEntityTracker::ClearPendingCommit() {
pending_commit_.reset();
}
} // namespace syncer_v2
|