1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/protocol_decoder.h"
#include "base/logging.h"
#include "remoting/base/multiple_array_input_stream.h"
#include "talk/base/byteorder.h"
namespace remoting {
ProtocolDecoder::ProtocolDecoder()
: last_read_position_(0),
available_bytes_(0),
next_payload_(0),
next_payload_known_(false) {
}
void ProtocolDecoder::ParseClientMessages(scoped_refptr<media::DataBuffer> data,
ClientMessageList* messages) {
ParseMessages<ChromotingClientMessage>(data, messages);
}
void ProtocolDecoder::ParseHostMessages(scoped_refptr<media::DataBuffer> data,
HostMessageList* messages) {
ParseMessages<ChromotingHostMessage>(data, messages);
}
template <typename T>
void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data,
std::vector<T*>* messages) {
// If this is the first data in the processing queue, then set the
// last read position to 0.
if (data_list_.empty())
last_read_position_ = 0;
// First enqueue the data received.
data_list_.push_back(data);
available_bytes_ += data->GetDataSize();
// Then try to parse one message until we can't parse anymore.
T* message;
while (ParseOneMessage<T>(&message)) {
messages->push_back(message);
}
}
template <typename T>
bool ProtocolDecoder::ParseOneMessage(T** message) {
// Determine the payload size. If we already know it, then skip this
// part.
// We have the value set to -1 for checking later.
int next_payload = -1;
if (!next_payload_known_ && GetPayloadSize(&next_payload)) {
DCHECK_NE(-1, next_payload);
next_payload_ = next_payload;
next_payload_known_ = true;
}
// If the next payload size is still not known or we don't have enough
// data for parsing then exit.
if (!next_payload_known_ || available_bytes_ < next_payload_)
return false;
next_payload_known_ = false;
// Extract data from |data_list_| used to form a full protocol buffer.
DataList buffers;
std::deque<const uint8*> buffer_pointers;
std::deque<int> buffer_sizes;
while (next_payload_ > 0 && !data_list_.empty()) {
scoped_refptr<media::DataBuffer> buffer = data_list_.front();
size_t read_bytes = std::min(buffer->GetDataSize() - last_read_position_,
next_payload_);
buffers.push_back(buffer);
buffer_pointers.push_back(buffer->GetData() + last_read_position_);
buffer_sizes.push_back(read_bytes);
// Adjust counters.
last_read_position_ += read_bytes;
next_payload_ -= read_bytes;
available_bytes_ -= read_bytes;
// If the front buffer is fully read, remove it from the queue.
if (buffer->GetDataSize() == last_read_position_) {
data_list_.pop_front();
last_read_position_ = 0;
}
}
DCHECK_EQ(0UL, next_payload_);
DCHECK_EQ(buffers.size(), buffer_pointers.size());
DCHECK_EQ(buffers.size(), buffer_sizes.size());
// Create a MultipleArrayInputStream for parsing.
MultipleArrayInputStream stream(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
stream.SetBuffer(i, buffer_pointers[i], buffer_sizes[i]);
}
// And finally it is parsing.
*message = new T();
bool ret = (*message)->ParseFromZeroCopyStream(&stream);
if (!ret)
delete *message;
return ret;
}
bool ProtocolDecoder::GetPayloadSize(int* size) {
// The header has a size of 4 bytes.
const size_t kHeaderSize = sizeof(int32);
if (available_bytes_ < kHeaderSize)
return false;
std::string header;
while (header.length() < kHeaderSize && !data_list_.empty()) {
scoped_refptr<media::DataBuffer> buffer = data_list_.front();
// Find out how many bytes we need and how many bytes are available in this
// buffer.
int needed_bytes = kHeaderSize - header.length();
int available_bytes = buffer->GetDataSize() - last_read_position_;
// Then append the required bytes into the header and advance the last
// read position.
int read_bytes = std::min(needed_bytes, available_bytes);
header.append(
reinterpret_cast<const char*>(buffer->GetData()) + last_read_position_,
read_bytes);
last_read_position_ += read_bytes;
available_bytes_ -= read_bytes;
// If the buffer is depleted then remove it from the queue.
if (last_read_position_ == buffer->GetDataSize()) {
last_read_position_ = 0;
data_list_.pop_front();
}
}
if (header.length() == kHeaderSize) {
*size = talk_base::GetBE32(header.c_str());
return true;
}
NOTREACHED() << "Unable to extract payload size";
return false;
}
} // namespace remoting
|