summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorzea <zea@chromium.org>2015-07-09 14:29:29 -0700
committerCommit bot <commit-bot@chromium.org>2015-07-09 21:30:06 +0000
commit4ec2e30709613f4062aa22e8fe91cf5eb7eb3bd2 (patch)
tree0fb9b6d7a53ab04e7e3eef61090d83bae0b6d708 /google_apis
parenta3913650bb06e7df15354703007729bdec7792cd (diff)
downloadchromium_src-4ec2e30709613f4062aa22e8fe91cf5eb7eb3bd2.zip
chromium_src-4ec2e30709613f4062aa22e8fe91cf5eb7eb3bd2.tar.gz
chromium_src-4ec2e30709613f4062aa22e8fe91cf5eb7eb3bd2.tar.bz2
[GCM] Add support for extra large data packets
Some special messages are actually allowed to have larger than normal payloads that can result in the data packet being longer than 4KiB (the normal limit). We now support this by having a dynamic in-memory buffer capable of handling arbitrary lengths (although in practice the limit is 32 bits, due to using a varint32 for the size packet). BUG=479974 Review URL: https://codereview.chromium.org/1223183002 Cr-Commit-Position: refs/heads/master@{#338153}
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/engine/connection_handler_impl.cc114
-rw-r--r--google_apis/gcm/engine/connection_handler_impl.h17
-rw-r--r--google_apis/gcm/engine/connection_handler_impl_unittest.cc45
3 files changed, 121 insertions, 55 deletions
diff --git a/google_apis/gcm/engine/connection_handler_impl.cc b/google_apis/gcm/engine/connection_handler_impl.cc
index ccb6362..31a6748 100644
--- a/google_apis/gcm/engine/connection_handler_impl.cc
+++ b/google_apis/gcm/engine/connection_handler_impl.cc
@@ -6,6 +6,7 @@
#include "base/message_loop/message_loop.h"
#include "google/protobuf/io/coded_stream.h"
+#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
#include "google_apis/gcm/base/mcs_util.h"
#include "google_apis/gcm/base/socket_stream.h"
#include "google_apis/gcm/protocol/mcs.pb.h"
@@ -23,14 +24,17 @@ const int kVersionPacketLen = 1;
// # of bytes a tag packet consumes.
const int kTagPacketLen = 1;
// Max # of bytes a length packet consumes. A Varint32 can consume up to 5 bytes
-// (the MSB in each byte is reserved for denoting whether more bytes follow).
-// But, the protocol only allows for 4KiB payloads, and the socket stream buffer
-// is only of size 8KiB. As such we should never need more than 2 bytes (max
-// value of 16KiB). Anything higher than that will result in an error, either
-// because the socket stream buffer overflowed or too many bytes were required
-// in the size packet.
+// (the msb in each byte is reserved for denoting whether more bytes follow).
+// Although the protocol only allows for 4KiB payloads currently, and the socket
+// stream buffer is only of size 8KiB, it's possible for certain applications to
+// have larger message sizes. When payload is larger than 4KiB, an temporary
+// in-memory buffer is used instead of the normal in-place socket stream buffer.
const int kSizePacketLenMin = 1;
-const int kSizePacketLenMax = 2;
+const int kSizePacketLenMax = 5;
+
+// The normal limit for a data packet is 4KiB. Any data packet with a size
+// larger than this uses the temporary in-memory buffer,
+const int kDefaultDataPacketLimit = 1024 * 4;
// The current MCS protocol version.
const int kMCSVersion = 41;
@@ -50,6 +54,7 @@ ConnectionHandlerImpl::ConnectionHandlerImpl(
read_callback_(read_callback),
write_callback_(write_callback),
connection_callback_(connection_callback),
+ size_packet_so_far_(0),
weak_ptr_factory_(this) {
}
@@ -203,17 +208,23 @@ void ConnectionHandlerImpl::WaitForData(ProcessingState state) {
min_bytes_needed = kTagPacketLen + kSizePacketLenMin;
max_bytes_needed = kTagPacketLen + kSizePacketLenMax;
break;
- case MCS_FULL_SIZE:
- // If in this state, the minimum size packet length must already have been
- // insufficient, so set both to the max length.
- min_bytes_needed = kSizePacketLenMax;
+ case MCS_SIZE:
+ min_bytes_needed = size_packet_so_far_ + 1;
max_bytes_needed = kSizePacketLenMax;
break;
case MCS_PROTO_BYTES:
read_timeout_timer_.Reset();
- // No variability in the message size, set both to the same.
- min_bytes_needed = message_size_;
- max_bytes_needed = message_size_;
+ if (message_size_ < kDefaultDataPacketLimit) {
+ // No variability in the message size, set both to the same.
+ min_bytes_needed = message_size_;
+ max_bytes_needed = message_size_;
+ } else {
+ int bytes_left = message_size_ - payload_input_buffer_.size();
+ if (bytes_left > kDefaultDataPacketLimit)
+ bytes_left = kDefaultDataPacketLimit;
+ min_bytes_needed = bytes_left;
+ max_bytes_needed = bytes_left;
+ }
break;
default:
NOTREACHED();
@@ -265,7 +276,7 @@ void ConnectionHandlerImpl::WaitForData(ProcessingState state) {
case MCS_TAG_AND_SIZE:
OnGotMessageTag();
break;
- case MCS_FULL_SIZE:
+ case MCS_SIZE:
OnGotMessageSize();
break;
case MCS_PROTO_BYTES:
@@ -326,31 +337,29 @@ void ConnectionHandlerImpl::OnGotMessageSize() {
return;
}
- bool need_another_byte = false;
int prev_byte_count = input_stream_->UnreadByteCount();
{
CodedInputStream coded_input_stream(input_stream_.get());
- if (!coded_input_stream.ReadVarint32(&message_size_))
- need_another_byte = true;
- }
-
- if (need_another_byte) {
- DVLOG(1) << "Expecting another message size byte.";
- if (prev_byte_count >= kSizePacketLenMax) {
- // Already had enough bytes, something else went wrong.
- LOG(ERROR) << "Failed to process message size, too many bytes needed.";
- connection_callback_.Run(net::ERR_FILE_TOO_BIG);
+ if (!coded_input_stream.ReadVarint32(&message_size_)) {
+ DVLOG(1) << "Expecting another message size byte.";
+ if (prev_byte_count >= kSizePacketLenMax) {
+ // Already had enough bytes, something else went wrong.
+ LOG(ERROR) << "Failed to process message size";
+ connection_callback_.Run(net::ERR_FILE_TOO_BIG);
+ return;
+ }
+ // Back up by the amount read.
+ int bytes_read = prev_byte_count - input_stream_->UnreadByteCount();
+ input_stream_->BackUp(bytes_read);
+ size_packet_so_far_ = bytes_read;
+ WaitForData(MCS_SIZE);
return;
}
- // Back up by the amount read (should always be 1 byte).
- int bytes_read = prev_byte_count - input_stream_->UnreadByteCount();
- DCHECK_EQ(bytes_read, 1);
- input_stream_->BackUp(bytes_read);
- WaitForData(MCS_FULL_SIZE);
- return;
}
DVLOG(1) << "Proto size: " << message_size_;
+ size_packet_so_far_ = 0;
+ payload_input_buffer_.clear();
if (message_size_ > 0)
WaitForData(MCS_PROTO_BYTES);
@@ -386,9 +395,9 @@ void ConnectionHandlerImpl::OnGotMessageBytes() {
<< static_cast<unsigned int>(message_tag_);
connection_callback_.Run(net::ERR_INVALID_ARGUMENT);
return;
- }
+ }
- {
+ if (message_size_ < kDefaultDataPacketLimit) {
CodedInputStream coded_input_stream(input_stream_.get());
if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) {
LOG(ERROR) << "Unable to parse GCM message of type "
@@ -397,6 +406,41 @@ void ConnectionHandlerImpl::OnGotMessageBytes() {
connection_callback_.Run(net::ERR_FAILED);
return;
}
+ } else {
+ // Copy any data in the input stream onto the end of the buffer.
+ const void* data_ptr = NULL;
+ int size = 0;
+ input_stream_->Next(&data_ptr, &size);
+ payload_input_buffer_.insert(payload_input_buffer_.end(),
+ static_cast<const uint8*>(data_ptr),
+ static_cast<const uint8*>(data_ptr) + size);
+ DCHECK_LE(payload_input_buffer_.size(), message_size_);
+
+ if (payload_input_buffer_.size() == message_size_) {
+ ArrayInputStream buffer_input_stream(payload_input_buffer_.data(),
+ payload_input_buffer_.size());
+ CodedInputStream coded_input_stream(&buffer_input_stream);
+ if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) {
+ LOG(ERROR) << "Unable to parse GCM message of type "
+ << static_cast<unsigned int>(message_tag_);
+ // Reset the connection.
+ connection_callback_.Run(net::ERR_FAILED);
+ return;
+ }
+ } else {
+ // Continue reading data.
+ DVLOG(1) << "Continuing data read. Buffer size is "
+ << payload_input_buffer_.size()
+ << ", expecting " << message_size_;
+ input_stream_->RebuildBuffer();
+
+ read_timeout_timer_.Start(FROM_HERE,
+ read_timeout_,
+ base::Bind(&ConnectionHandlerImpl::OnTimeout,
+ weak_ptr_factory_.GetWeakPtr()));
+ WaitForData(MCS_PROTO_BYTES);
+ return;
+ }
}
input_stream_->RebuildBuffer();
@@ -431,6 +475,8 @@ void ConnectionHandlerImpl::CloseConnection() {
handshake_complete_ = false;
message_tag_ = 0;
message_size_ = 0;
+ size_packet_so_far_ = 0;
+ payload_input_buffer_.clear();
input_stream_.reset();
output_stream_.reset();
weak_ptr_factory_.InvalidateWeakPtrs();
diff --git a/google_apis/gcm/engine/connection_handler_impl.h b/google_apis/gcm/engine/connection_handler_impl.h
index fc965fb..399b397 100644
--- a/google_apis/gcm/engine/connection_handler_impl.h
+++ b/google_apis/gcm/engine/connection_handler_impl.h
@@ -5,6 +5,8 @@
#ifndef GOOGLE_APIS_GCM_ENGINE_CONNECTION_HANDLER_IMPL_H_
#define GOOGLE_APIS_GCM_ENGINE_CONNECTION_HANDLER_IMPL_H_
+#include <vector>
+
#include "base/memory/weak_ptr.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
@@ -48,10 +50,8 @@ class GCM_EXPORT ConnectionHandlerImpl : public ConnectionHandler {
// Processing the tag and size packets (assuming minimum length size
// packet). Used for normal messages.
MCS_TAG_AND_SIZE,
- // Processing a maximum length size packet (for messages with length > 128).
- // Used when a normal size packet was not sufficient to read the message
- // size.
- MCS_FULL_SIZE,
+ // Processing the size packet alone.
+ MCS_SIZE,
// Processing the protocol buffer bytes (for those messages with non-zero
// sizes).
MCS_PROTO_BYTES
@@ -112,6 +112,15 @@ class GCM_EXPORT ConnectionHandlerImpl : public ConnectionHandler {
ProtoSentCallback write_callback_;
ConnectionChangedCallback connection_callback_;
+ // The number of bytes of the size packet read so far without finishing the
+ // read. This can be up to but no larger than 5 (the max number of bytes for
+ // a varint32).
+ uint8 size_packet_so_far_;
+ // A temporary input buffer for holding large messages. Will hold
+ // message_size_ bytes for messages larger than the normal size limit (and
+ // will be empty otherwise).
+ std::vector<uint8> payload_input_buffer_;
+
base::WeakPtrFactory<ConnectionHandlerImpl> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(ConnectionHandlerImpl);
diff --git a/google_apis/gcm/engine/connection_handler_impl_unittest.cc b/google_apis/gcm/engine/connection_handler_impl_unittest.cc
index 9bf03a8..f15c841 100644
--- a/google_apis/gcm/engine/connection_handler_impl_unittest.cc
+++ b/google_apis/gcm/engine/connection_handler_impl_unittest.cc
@@ -671,17 +671,19 @@ TEST_F(GCMConnectionHandlerImplTest, SendMsgSocketDisconnected) {
EXPECT_EQ(net::ERR_CONNECTION_CLOSED, last_error());
}
-// Receive a message whose size field was corrupted and is larger than the
-// socket's buffer. Should fail gracefully with a size error.
-TEST_F(GCMConnectionHandlerImplTest, OutOfBuffer) {
+// Receive a message with a custom data packet that is larger than the
+// default data limit (and the socket buffer limit). Should successfully
+// read the packet by using the in-memory buffer.
+TEST_F(GCMConnectionHandlerImplTest, ExtraLargeDataPacket) {
std::string handshake_request = EncodeHandshakeRequest();
WriteList write_list(1, net::MockWrite(net::ASYNC,
handshake_request.c_str(),
handshake_request.size()));
std::string handshake_response = EncodeHandshakeResponse();
- // Fill a string with 9000 character zero.
- std::string data_message_proto(9000, '0');
+ const std::string kVeryLongFrom(20000, '0');
+ std::string data_message_proto = BuildDataMessage(kVeryLongFrom,
+ kDataMsgCategory);
std::string data_message_pkt =
EncodePacket(kDataMessageStanzaTag, data_message_proto);
ReadList read_list;
@@ -697,24 +699,25 @@ TEST_F(GCMConnectionHandlerImplTest, OutOfBuffer) {
Connect(&received_message);
WaitForMessage(); // The login send.
WaitForMessage(); // The login response.
- received_message.reset();
WaitForMessage(); // The data message.
- EXPECT_FALSE(received_message.get());
- EXPECT_EQ(net::ERR_FILE_TOO_BIG, last_error());
+ ASSERT_TRUE(received_message.get());
+ EXPECT_EQ(data_message_proto, received_message->SerializeAsString());
+ EXPECT_EQ(net::OK, last_error());
}
-// Receive a message whose size field was corrupted and takes more than two
-// bytes to encode. Should fail gracefully with a size error.
-TEST_F(GCMConnectionHandlerImplTest, InvalidSizePacket) {
+// Receive two messages with a custom data packet that is larger than the
+// default data limit (and the socket buffer limit). Should successfully
+// read the packet by using the in-memory buffer.
+TEST_F(GCMConnectionHandlerImplTest, 2ExtraLargeDataPacketMsgs) {
std::string handshake_request = EncodeHandshakeRequest();
WriteList write_list(1, net::MockWrite(net::ASYNC,
handshake_request.c_str(),
handshake_request.size()));
std::string handshake_response = EncodeHandshakeResponse();
- // Fill a string with 20000 character zero (which uses more than 2 bytes to
- // encode the size packet).
- std::string data_message_proto(20000, '0');
+ const std::string kVeryLongFrom(20000, '0');
+ std::string data_message_proto = BuildDataMessage(kVeryLongFrom,
+ kDataMsgCategory);
std::string data_message_pkt =
EncodePacket(kDataMessageStanzaTag, data_message_proto);
ReadList read_list;
@@ -724,16 +727,24 @@ TEST_F(GCMConnectionHandlerImplTest, InvalidSizePacket) {
read_list.push_back(net::MockRead(net::ASYNC,
data_message_pkt.c_str(),
data_message_pkt.size()));
+ read_list.push_back(net::MockRead(net::SYNCHRONOUS,
+ data_message_pkt.c_str(),
+ data_message_pkt.size()));
BuildSocket(read_list, write_list);
ScopedMessage received_message;
Connect(&received_message);
WaitForMessage(); // The login send.
WaitForMessage(); // The login response.
- received_message.reset();
WaitForMessage(); // The data message.
- EXPECT_FALSE(received_message.get());
- EXPECT_EQ(net::ERR_FILE_TOO_BIG, last_error());
+ ASSERT_TRUE(received_message.get());
+ EXPECT_EQ(data_message_proto, received_message->SerializeAsString());
+ EXPECT_EQ(net::OK, last_error());
+ received_message.reset();
+ WaitForMessage(); // The second data message.
+ ASSERT_TRUE(received_message.get());
+ EXPECT_EQ(data_message_proto, received_message->SerializeAsString());
+ EXPECT_EQ(net::OK, last_error());
}
// Make sure a message with an invalid tag is handled gracefully and resets