summaryrefslogtreecommitdiffstats
path: root/google_apis/gcm
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-19 10:23:20 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-19 10:23:20 +0000
commit98fb81d13e14ebd9919803fa5d41f0718606c341 (patch)
tree009f99469c3fa662b68bcbc336f454ab6497f23c /google_apis/gcm
parent8a48e732c0dc67b22f0b2cdce8bf750e273a4628 (diff)
downloadchromium_src-98fb81d13e14ebd9919803fa5d41f0718606c341.zip
chromium_src-98fb81d13e14ebd9919803fa5d41f0718606c341.tar.gz
chromium_src-98fb81d13e14ebd9919803fa5d41f0718606c341.tar.bz2
[GCM] Initial work to set up directory structure and introduce socket integration
Create new google_api/gcm directory, and add some of the initial base logic for integrating protobufs with chrome sockets. A SocketStream is an implementation of the ZeroCopyStream interfaces that allows for asynchronously retrieving a message (with timeout support) which can then be parsed by a CodedInputStream, or alternatively allows a CodedOutputStream to write into a Chrome socket. Once a SocketStream closes itself (which will happen on error or timeout), the stream cannot be reused, and a new one should be created (typically with a socket that has been reconnected). In general, the state of the socket stream must be checked before interacting with it. Also fixes issue in SocketTestUtil where mock Sockets aren't taking ownership of IOBuffers that they're using (exposed by tests). TBR=darin@chromium.org BUG=284553 Review URL: https://codereview.chromium.org/23684017 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@229533 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis/gcm')
-rw-r--r--google_apis/gcm/DEPS12
-rw-r--r--google_apis/gcm/OWNERS3
-rw-r--r--google_apis/gcm/base/gcm_export.h29
-rw-r--r--google_apis/gcm/base/socket_stream.cc332
-rw-r--r--google_apis/gcm/base/socket_stream.h205
-rw-r--r--google_apis/gcm/base/socket_stream_unittest.cc406
-rw-r--r--google_apis/gcm/gcm.gyp56
7 files changed, 1043 insertions, 0 deletions
diff --git a/google_apis/gcm/DEPS b/google_apis/gcm/DEPS
new file mode 100644
index 0000000..757c0da
--- /dev/null
+++ b/google_apis/gcm/DEPS
@@ -0,0 +1,12 @@
+include_rules = [
+ # Repeat these from the top-level DEPS file so one can just run
+ #
+ # checkdeps.py google_apis/gcm
+ #
+ # to test.
+ "+base",
+ "+testing",
+
+ "+google", # For third_party/protobuf/src.
+ "+net",
+]
diff --git a/google_apis/gcm/OWNERS b/google_apis/gcm/OWNERS
new file mode 100644
index 0000000..493c918
--- /dev/null
+++ b/google_apis/gcm/OWNERS
@@ -0,0 +1,3 @@
+zea@chromium.org
+dimich@chromium.org
+tim@chromium.org
diff --git a/google_apis/gcm/base/gcm_export.h b/google_apis/gcm/base/gcm_export.h
new file mode 100644
index 0000000..b66eb8e
--- /dev/null
+++ b/google_apis/gcm/base/gcm_export.h
@@ -0,0 +1,29 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_GCM_EXPORT_H_
+#define GOOGLE_APIS_GCM_GCM_EXPORT_H_
+
+#if defined(COMPONENT_BUILD)
+#if defined(WIN32)
+
+#if defined(GCM_IMPLEMENTATION)
+#define GCM_EXPORT __declspec(dllexport)
+#else
+#define GCM_EXPORT __declspec(dllimport)
+#endif // defined(GCM_IMPLEMENTATION)
+
+#else // defined(WIN32)
+#if defined(GCM_IMPLEMENTATION)
+#define GCM_EXPORT __attribute__((visibility("default")))
+#else
+#define GCM_EXPORT
+#endif // defined(GCM_IMPLEMENTATION)
+#endif
+
+#else // defined(COMPONENT_BUILD)
+#define GCM_EXPORT
+#endif
+
+#endif // GOOGLE_APIS_GCM_GCM_EXPORT_H_
diff --git a/google_apis/gcm/base/socket_stream.cc b/google_apis/gcm/base/socket_stream.cc
new file mode 100644
index 0000000..1a0b29d
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream.cc
@@ -0,0 +1,332 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/base/socket_stream.h"
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "net/base/io_buffer.h"
+#include "net/socket/stream_socket.h"
+
+namespace gcm {
+
+namespace {
+
+// TODO(zea): consider having dynamically-sized buffers if this becomes too
+// expensive.
+const uint32 kDefaultBufferSize = 8*1024;
+
+} // namespace
+
+SocketInputStream::SocketInputStream(net::StreamSocket* socket)
+ : socket_(socket),
+ io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
+ read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
+ kDefaultBufferSize)),
+ next_pos_(0),
+ last_error_(net::OK),
+ weak_ptr_factory_(this) {
+ DCHECK(socket->IsConnected());
+}
+
+SocketInputStream::~SocketInputStream() {
+}
+
+bool SocketInputStream::Next(const void** data, int* size) {
+ if (GetState() != EMPTY && GetState() != READY) {
+ NOTREACHED() << "Invalid input stream read attempt.";
+ return false;
+ }
+
+ if (GetState() == EMPTY) {
+ DVLOG(1) << "No unread data remaining, ending read.";
+ return false;
+ }
+
+ DCHECK_EQ(GetState(), READY)
+ << " Input stream must have pending data before reading.";
+ DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
+ *data = io_buffer_->data() + next_pos_;
+ *size = UnreadByteCount();
+ next_pos_ = read_buffer_->BytesConsumed();
+ DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
+ return true;
+}
+
+void SocketInputStream::BackUp(int count) {
+ DCHECK(GetState() == READY || GetState() == EMPTY);
+ DCHECK_GT(count, 0);
+ DCHECK_LE(count, next_pos_);
+
+ next_pos_ -= count;
+ DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
+ << "Current position now at " << next_pos_
+ << " of " << read_buffer_->BytesConsumed();
+}
+
+bool SocketInputStream::Skip(int count) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+int64 SocketInputStream::ByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ return next_pos_;
+}
+
+size_t SocketInputStream::UnreadByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ return read_buffer_->BytesConsumed() - next_pos_;
+}
+
+net::Error SocketInputStream::Refresh(const base::Closure& callback,
+ int byte_limit) {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ DCHECK_GT(byte_limit, 0);
+
+ if (byte_limit > read_buffer_->BytesRemaining()) {
+ NOTREACHED() << "Out of buffer space, closing input stream.";
+ CloseStream(net::ERR_UNEXPECTED, base::Closure());
+ return net::OK;
+ }
+
+ if (!socket_->IsConnected()) {
+ LOG(ERROR) << "Socket was disconnected, closing input stream";
+ CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
+ return net::OK;
+ }
+
+ DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
+ int result = socket_->Read(
+ read_buffer_,
+ byte_limit,
+ base::Bind(&SocketInputStream::RefreshCompletionCallback,
+ weak_ptr_factory_.GetWeakPtr(),
+ callback));
+ DVLOG(1) << "Read returned " << result;
+ if (result == net::ERR_IO_PENDING) {
+ last_error_ = net::ERR_IO_PENDING;
+ return net::ERR_IO_PENDING;
+ }
+
+ RefreshCompletionCallback(base::Closure(), result);
+ return net::OK;
+}
+
+void SocketInputStream::RebuildBuffer() {
+ DVLOG(1) << "Rebuilding input stream, consumed "
+ << next_pos_ << " bytes.";
+ DCHECK_NE(GetState(), READING);
+ DCHECK_NE(GetState(), CLOSED);
+
+ int unread_data_size = 0;
+ const void* unread_data_ptr = NULL;
+ Next(&unread_data_ptr, &unread_data_size);
+ ResetInternal();
+
+ if (unread_data_ptr != io_buffer_->data()) {
+ DVLOG(1) << "Have " << unread_data_size
+ << " unread bytes remaining, shifting.";
+ // Move any remaining unread data to the start of the buffer;
+ std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
+ } else {
+ DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
+ }
+ read_buffer_->DidConsume(unread_data_size);
+}
+
+net::Error SocketInputStream::last_error() const {
+ return last_error_;
+}
+
+SocketInputStream::State SocketInputStream::GetState() const {
+ if (last_error_ < net::ERR_IO_PENDING)
+ return CLOSED;
+
+ if (last_error_ == net::ERR_IO_PENDING)
+ return READING;
+
+ DCHECK_EQ(last_error_, net::OK);
+ if (read_buffer_->BytesConsumed() == next_pos_)
+ return EMPTY;
+
+ return READY;
+}
+
+void SocketInputStream::RefreshCompletionCallback(
+ const base::Closure& callback, int result) {
+ // If an error occurred before the completion callback could complete, ignore
+ // the result.
+ if (GetState() == CLOSED)
+ return;
+
+ // Result == 0 implies EOF, which is treated as an error.
+ if (result == 0)
+ result = net::ERR_CONNECTION_CLOSED;
+
+ DCHECK_NE(result, net::ERR_IO_PENDING);
+
+ if (result < net::OK) {
+ DVLOG(1) << "Failed to refresh socket: " << result;
+ CloseStream(static_cast<net::Error>(result), callback);
+ return;
+ }
+
+ DCHECK_GT(result, 0);
+ last_error_ = net::OK;
+ read_buffer_->DidConsume(result);
+
+ DVLOG(1) << "Refresh complete with " << result << " new bytes. "
+ << "Current position " << next_pos_
+ << " of " << read_buffer_->BytesConsumed() << ".";
+
+ if (!callback.is_null())
+ callback.Run();
+}
+
+void SocketInputStream::ResetInternal() {
+ read_buffer_->SetOffset(0);
+ next_pos_ = 0;
+ last_error_ = net::OK;
+ weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
+}
+
+void SocketInputStream::CloseStream(net::Error error,
+ const base::Closure& callback) {
+ DCHECK_LT(error, net::ERR_IO_PENDING);
+ ResetInternal();
+ last_error_ = error;
+ LOG(ERROR) << "Closing stream with result " << error;
+ if (!callback.is_null())
+ callback.Run();
+}
+
+SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
+ : socket_(socket),
+ io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
+ write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
+ kDefaultBufferSize)),
+ next_pos_(0),
+ last_error_(net::OK),
+ weak_ptr_factory_(this) {
+ DCHECK(socket->IsConnected());
+}
+
+SocketOutputStream::~SocketOutputStream() {
+}
+
+bool SocketOutputStream::Next(void** data, int* size) {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), FLUSHING);
+ if (next_pos_ == write_buffer_->size())
+ return false;
+
+ *data = write_buffer_->data() + next_pos_;
+ *size = write_buffer_->size() - next_pos_;
+ next_pos_ = write_buffer_->size();
+ return true;
+}
+
+void SocketOutputStream::BackUp(int count) {
+ DCHECK_GE(count, 0);
+ if (count > next_pos_)
+ next_pos_ = 0;
+ next_pos_ -= count;
+ DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
+ << next_pos_ << " bytes used.";
+}
+
+int64 SocketOutputStream::ByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), FLUSHING);
+ return next_pos_;
+}
+
+net::Error SocketOutputStream::Flush(const base::Closure& callback) {
+ DCHECK_EQ(GetState(), READY);
+
+ if (!socket_->IsConnected()) {
+ LOG(ERROR) << "Socket was disconnected, closing output stream";
+ last_error_ = net::ERR_CONNECTION_CLOSED;
+ return net::OK;
+ }
+
+ DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
+ int result = socket_->Write(
+ write_buffer_,
+ next_pos_,
+ base::Bind(&SocketOutputStream::FlushCompletionCallback,
+ weak_ptr_factory_.GetWeakPtr(),
+ callback));
+ DVLOG(1) << "Write returned " << result;
+ if (result == net::ERR_IO_PENDING) {
+ last_error_ = net::ERR_IO_PENDING;
+ return net::ERR_IO_PENDING;
+ }
+
+ FlushCompletionCallback(base::Closure(), result);
+ return net::OK;
+}
+
+SocketOutputStream::State SocketOutputStream::GetState() const{
+ if (last_error_ < net::ERR_IO_PENDING)
+ return CLOSED;
+
+ if (last_error_ == net::ERR_IO_PENDING)
+ return FLUSHING;
+
+ DCHECK_EQ(last_error_, net::OK);
+ if (next_pos_ == 0)
+ return EMPTY;
+
+ return READY;
+}
+
+net::Error SocketOutputStream::last_error() const {
+ return last_error_;
+}
+
+void SocketOutputStream::FlushCompletionCallback(
+ const base::Closure& callback, int result) {
+ // If an error occurred before the completion callback could complete, ignore
+ // the result.
+ if (GetState() == CLOSED)
+ return;
+
+ // Result == 0 implies EOF, which is treated as an error.
+ if (result == 0)
+ result = net::ERR_CONNECTION_CLOSED;
+
+ DCHECK_NE(result, net::ERR_IO_PENDING);
+
+ if (result < net::OK) {
+ LOG(ERROR) << "Failed to flush socket.";
+ last_error_ = static_cast<net::Error>(result);
+ if (!callback.is_null())
+ callback.Run();
+ return;
+ }
+
+ DCHECK_GT(result, net::OK);
+ last_error_ = net::OK;
+
+ if (write_buffer_->BytesConsumed() + result < next_pos_) {
+ DVLOG(1) << "Partial flush complete. Retrying.";
+ // Only a partial write was completed. Flush again to finish the write.
+ write_buffer_->DidConsume(result);
+ Flush(callback);
+ return;
+ }
+
+ DVLOG(1) << "Socket flush complete.";
+ write_buffer_->SetOffset(0);
+ next_pos_ = 0;
+ if (!callback.is_null())
+ callback.Run();
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/base/socket_stream.h b/google_apis/gcm/base/socket_stream.h
new file mode 100644
index 0000000..a458420
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream.h
@@ -0,0 +1,205 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a
+// net::StreamSocket. Built to work with Protobuf CodedStreams.
+
+#ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
+#define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
+
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "google/protobuf/io/zero_copy_stream.h"
+#include "google_apis/gcm/base/gcm_export.h"
+#include "net/base/net_errors.h"
+
+namespace net {
+class DrainableIOBuffer;
+class IOBuffer;
+class StreamSocket;
+} // namespace net
+
+namespace gcm {
+
+// A helper class for interacting with a net::StreamSocket that is receiving
+// protobuf encoded messages. A SocketInputStream does not take ownership of
+// the socket itself, and it is expected that the life of the input stream
+// should match the life of the socket itself (while the socket remains
+// connected). If an error is encounters, the input stream will store the error
+// in |last_error_|, and GetState() will be set to CLOSED.
+// Typical usage:
+// 1. Check the GetState() of the input stream before using it. If CLOSED, the
+// input stream must be rebuilt (and the socket likely needs to be
+// reconnected as an error was encountered).
+// 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size
+// for a message, and wait until completion. It is invalid to attempt to
+// Refresh an input stream or read data from the stream while a Refresh is
+// pending.
+// 3. Check GetState() again to ensure the Refresh was successful.
+// 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of
+// the SocketInputStream. Next(..) will return true until there is no data
+// remaining.
+// 5. Call RebuildBuffer when done reading, to shift any unread data to the
+// start of the buffer.
+// 6. Repeat as necessary.
+class GCM_EXPORT SocketInputStream
+ : public google::protobuf::io::ZeroCopyInputStream {
+ public:
+ enum State {
+ // No valid data to read. This means the buffer is either empty or all data
+ // in the buffer has already been consumed.
+ EMPTY,
+ // Valid data to read.
+ READY,
+ // In the process of reading new data from the socket.
+ READING,
+ // An permanent error occurred and the stream is now closed.
+ CLOSED,
+ };
+
+ // |socket| should already be connected.
+ explicit SocketInputStream(net::StreamSocket* socket);
+ virtual ~SocketInputStream();
+
+ // ZeroCopyInputStream implementation.
+ virtual bool Next(const void** data, int* size) OVERRIDE;
+ virtual void BackUp(int count) OVERRIDE;
+ virtual bool Skip(int count) OVERRIDE; // Not implemented.
+ virtual int64 ByteCount() const OVERRIDE;
+
+ // The remaining amount of valid data available to be read.
+ size_t UnreadByteCount() const;
+
+ // Reads from the socket, appending a max of |byte_limit| bytes onto the read
+ // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete
+ // synchronously, in which case the callback is invoked upon completion. If
+ // the refresh can complete synchronously, even in case of an error, returns
+ // net::OK without invoking callback.
+ // Note: GetState() (and possibly last_error()) should be checked upon
+ // completion to determine whether the Refresh encountered an error.
+ net::Error Refresh(const base::Closure& callback, int byte_limit);
+
+ // Rebuilds the buffer state by copying over any unread data to the beginning
+ // of the buffer and resetting the buffer read/write positions.
+ // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream
+ // must be recreated from scratch in such a scenario.
+ void RebuildBuffer();
+
+ // Returns the last fatal error encountered. Only valid if GetState() ==
+ // CLOSED.
+ net::Error last_error() const;
+
+ // Returns the current state.
+ State GetState() const;
+
+ private:
+ // Clears the local state.
+ void ResetInternal();
+
+ // Callback for Socket::Read calls.
+ void RefreshCompletionCallback(const base::Closure& callback, int result);
+
+ // Permanently closes the stream.
+ void CloseStream(net::Error error, const base::Closure& callback);
+
+ // Internal net components.
+ net::StreamSocket* const socket_;
+ const scoped_refptr<net::IOBuffer> io_buffer_;
+ // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
+ // been written to yet by Socket::Read calls.
+ const scoped_refptr<net::DrainableIOBuffer> read_buffer_;
+
+ // Starting position of the data within |io_buffer_| to consume on subsequent
+ // Next(..) call. 0 <= next_pos_ <= read_buffer_.BytesConsumed()
+ // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY.
+ int next_pos_;
+
+ // If < net::ERR_IO_PENDING, the last net error received.
+ // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING.
+ net::Error last_error_;
+
+ base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketInputStream);
+};
+
+// A helper class for writing to a SocketStream with protobuf encoded data.
+// A SocketOutputStream does not take ownership of the socket itself, and it is
+// expected that the life of the output stream should match the life of the
+// socket itself (while the socket remains connected).
+// Typical usage:
+// 1. Check the GetState() of the output stream before using it. If CLOSED, the
+// output stream must be rebuilt (and the socket likely needs to be
+// reconnected, as an error was encountered).
+// 2. If EMPTY, the output stream can be written via a CodedOutputStream using
+// the ZeroCopyOutputStream interface.
+// 3. Once done writing, GetState() should be READY, so call Flush(..) to write
+// the buffer into the StreamSocket. Wait for the callback to be invoked
+// (it's invalid to write to an output stream while it's flushing).
+// 4. Check the GetState() again to ensure the Flush was successful. GetState()
+// should be EMPTY again.
+// 5. Repeat.
+class GCM_EXPORT SocketOutputStream
+ : public google::protobuf::io::ZeroCopyOutputStream {
+ public:
+ enum State {
+ // No valid data yet.
+ EMPTY,
+ // Ready for flushing (some data is present).
+ READY,
+ // In the process of flushing into the socket.
+ FLUSHING,
+ // A permanent error occurred, and the stream is now closed.
+ CLOSED,
+ };
+
+ // |socket| should already be connected.
+ explicit SocketOutputStream(net::StreamSocket* socket);
+ virtual ~SocketOutputStream();
+
+ // ZeroCopyOutputStream implementation.
+ virtual bool Next(void** data, int* size) OVERRIDE;
+ virtual void BackUp(int count) OVERRIDE;
+ virtual int64 ByteCount() const OVERRIDE;
+
+ // Writes the buffer into the Socket.
+ net::Error Flush(const base::Closure& callback);
+
+ // Returns the last fatal error encountered. Only valid if GetState() ==
+ // CLOSED.
+ net::Error last_error() const;
+
+ // Returns the current state.
+ State GetState() const;
+
+ private:
+ void FlushCompletionCallback(const base::Closure& callback, int result);
+
+ // Internal net components.
+ net::StreamSocket* const socket_;
+ const scoped_refptr<net::IOBuffer> io_buffer_;
+ // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
+ // been written to the socket yet.
+ const scoped_refptr<net::DrainableIOBuffer> write_buffer_;
+
+ // Starting position of the data within |io_buffer_| to consume on subsequent
+ // Next(..) call. 0 <= write_buffer_.BytesConsumed() <= next_pos_
+ // Note: next_pos == 0 implies GetState() == EMPTY.
+ int next_pos_;
+
+ // If < net::ERR_IO_PENDING, the last net error received.
+ // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING.
+ net::Error last_error_;
+
+ base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketOutputStream);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
diff --git a/google_apis/gcm/base/socket_stream_unittest.cc b/google_apis/gcm/base/socket_stream_unittest.cc
new file mode 100644
index 0000000..d7ba679
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream_unittest.cc
@@ -0,0 +1,406 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/base/socket_stream.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/run_loop.h"
+#include "base/stl_util.h"
+#include "base/strings/string_piece.h"
+#include "net/socket/socket_test_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+namespace {
+
+typedef std::vector<net::MockRead> ReadList;
+typedef std::vector<net::MockWrite> WriteList;
+
+const char kReadData[] = "read_data";
+const uint64 kReadDataSize = arraysize(kReadData) - 1;
+const char kReadData2[] = "read_alternate_data";
+const uint64 kReadData2Size = arraysize(kReadData2) - 1;
+const char kWriteData[] = "write_data";
+const uint64 kWriteDataSize = arraysize(kWriteData) - 1;
+
+class GCMSocketStreamTest : public testing::Test {
+ public:
+ GCMSocketStreamTest();
+ virtual ~GCMSocketStreamTest();
+
+ // Build a socket with the expected reads and writes.
+ void BuildSocket(const ReadList& read_list, const WriteList& write_list);
+
+ // Pump the message loop until idle.
+ void PumpLoop();
+
+ // Simulates a google::protobuf::io::CodedInputStream read.
+ base::StringPiece DoInputStreamRead(uint64 bytes);
+ // Simulates a google::protobuf::io::CodedOutputStream write.
+ uint64 DoOutputStreamWrite(const base::StringPiece& write_src);
+
+ // Synchronous Refresh wrapper.
+ void WaitForData(size_t msg_size);
+
+ base::MessageLoop* message_loop() { return &message_loop_; };
+ net::DelayedSocketData* data_provider() { return data_provider_.get(); }
+ SocketInputStream* input_stream() { return socket_input_stream_.get(); }
+ SocketOutputStream* output_stream() { return socket_output_stream_.get(); }
+ net::StreamSocket* socket() { return socket_.get(); }
+
+ private:
+ void OpenConnection();
+ void ResetInputStream();
+ void ResetOutputStream();
+
+ void ConnectCallback(int result);
+
+ // SocketStreams and their data providers.
+ ReadList mock_reads_;
+ WriteList mock_writes_;
+ scoped_ptr<net::DelayedSocketData> data_provider_;
+ scoped_ptr<SocketInputStream> socket_input_stream_;
+ scoped_ptr<SocketOutputStream> socket_output_stream_;
+
+ // net:: components.
+ scoped_ptr<net::StreamSocket> socket_;
+ net::MockClientSocketFactory socket_factory_;
+ net::AddressList address_list_;
+
+ base::MessageLoopForIO message_loop_;
+};
+
+GCMSocketStreamTest::GCMSocketStreamTest() {
+ net::IPAddressNumber ip_number;
+ net::ParseIPLiteralToNumber("127.0.0.1", &ip_number);
+ address_list_ = net::AddressList::CreateFromIPAddress(ip_number, 5228);
+}
+
+GCMSocketStreamTest::~GCMSocketStreamTest() {}
+
+void GCMSocketStreamTest::BuildSocket(const ReadList& read_list,
+ const WriteList& write_list) {
+ mock_reads_ = read_list;
+ mock_writes_ = write_list;
+ data_provider_.reset(
+ new net::DelayedSocketData(
+ 0,
+ vector_as_array(&mock_reads_), mock_reads_.size(),
+ vector_as_array(&mock_writes_), mock_writes_.size()));
+ socket_factory_.AddSocketDataProvider(data_provider_.get());
+ OpenConnection();
+ ResetInputStream();
+ ResetOutputStream();
+}
+
+void GCMSocketStreamTest::PumpLoop() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+}
+
+base::StringPiece GCMSocketStreamTest::DoInputStreamRead(uint64 bytes) {
+ uint64 total_bytes_read = 0;
+ const void* initial_buffer = NULL;
+ const void* buffer = NULL;
+ int size = 0;
+
+ do {
+ DCHECK(socket_input_stream_->GetState() == SocketInputStream::EMPTY ||
+ socket_input_stream_->GetState() == SocketInputStream::READY);
+ if (!socket_input_stream_->Next(&buffer, &size))
+ break;
+ total_bytes_read += size;
+ if (initial_buffer) { // Verify the buffer doesn't skip data.
+ EXPECT_EQ(static_cast<const uint8*>(initial_buffer) + total_bytes_read,
+ static_cast<const uint8*>(buffer) + size);
+ } else {
+ initial_buffer = buffer;
+ }
+ } while (total_bytes_read < bytes);
+
+ if (total_bytes_read > bytes) {
+ socket_input_stream_->BackUp(total_bytes_read - bytes);
+ total_bytes_read = bytes;
+ }
+
+ return base::StringPiece(static_cast<const char*>(initial_buffer),
+ total_bytes_read);
+}
+
+uint64 GCMSocketStreamTest::DoOutputStreamWrite(
+ const base::StringPiece& write_src) {
+ DCHECK_EQ(socket_output_stream_->GetState(), SocketOutputStream::EMPTY);
+ uint64 total_bytes_written = 0;
+ void* buffer = NULL;
+ int size = 0;
+ size_t bytes = write_src.size();
+
+ do {
+ if (!socket_output_stream_->Next(&buffer, &size))
+ break;
+ uint64 bytes_to_write = (static_cast<uint64>(size) < bytes ? size : bytes);
+ memcpy(buffer,
+ write_src.data() + total_bytes_written,
+ bytes_to_write);
+ if (bytes_to_write < static_cast<uint64>(size))
+ socket_output_stream_->BackUp(size - bytes_to_write);
+ total_bytes_written += bytes_to_write;
+ } while (total_bytes_written < bytes);
+
+ base::RunLoop run_loop;
+ if (socket_output_stream_->Flush(run_loop.QuitClosure()) ==
+ net::ERR_IO_PENDING) {
+ run_loop.Run();
+ }
+
+ return total_bytes_written;
+}
+
+void GCMSocketStreamTest::WaitForData(size_t msg_size) {
+ while (input_stream()->UnreadByteCount() < msg_size) {
+ base::RunLoop run_loop;
+ if (input_stream()->Refresh(run_loop.QuitClosure(),
+ msg_size - input_stream()->UnreadByteCount()) ==
+ net::ERR_IO_PENDING) {
+ run_loop.Run();
+ }
+ if (input_stream()->GetState() == SocketInputStream::CLOSED)
+ return;
+ }
+}
+
+void GCMSocketStreamTest::OpenConnection() {
+ socket_ = socket_factory_.CreateTransportClientSocket(
+ address_list_, NULL, net::NetLog::Source());
+ socket_->Connect(
+ base::Bind(&GCMSocketStreamTest::ConnectCallback,
+ base::Unretained(this)));
+ PumpLoop();
+}
+
+void GCMSocketStreamTest::ConnectCallback(int result) {}
+
+void GCMSocketStreamTest::ResetInputStream() {
+ DCHECK(socket_.get());
+ socket_input_stream_.reset(new SocketInputStream(socket_.get()));
+}
+
+void GCMSocketStreamTest::ResetOutputStream() {
+ DCHECK(socket_.get());
+ socket_output_stream_.reset(new SocketOutputStream(socket_.get()));
+}
+
+// A read where all data is already available.
+TEST_F(GCMSocketStreamTest, ReadDataSync) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// A read that comes in two parts.
+TEST_F(GCMSocketStreamTest, ReadPartialDataSync) {
+ size_t first_read_len = kReadDataSize / 2;
+ size_t second_read_len = kReadDataSize - first_read_len;
+ ReadList read_list;
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ first_read_len));
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS,
+ &kReadData[first_read_len],
+ second_read_len));
+ BuildSocket(read_list, WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// A read where no data is available at first (IO_PENDING will be returned).
+TEST_F(GCMSocketStreamTest, ReadAsync) {
+ size_t first_read_len = kReadDataSize / 2;
+ size_t second_read_len = kReadDataSize - first_read_len;
+ ReadList read_list;
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS, net::ERR_IO_PENDING));
+ read_list.push_back(
+ net::MockRead(net::ASYNC, kReadData, first_read_len));
+ read_list.push_back(
+ net::MockRead(net::ASYNC, &kReadData[first_read_len], second_read_len));
+ BuildSocket(read_list, WriteList());
+
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&net::DelayedSocketData::ForceNextRead,
+ base::Unretained(data_provider())));
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// Simulate two packets arriving at once. Read them in two separate calls.
+TEST_F(GCMSocketStreamTest, TwoReadsAtOnce) {
+ std::string long_data = std::string(kReadData, kReadDataSize) +
+ std::string(kReadData2, kReadData2Size);
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ long_data.c_str(),
+ long_data.size())),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ WaitForData(kReadData2Size);
+ ASSERT_EQ(std::string(kReadData2, kReadData2Size),
+ DoInputStreamRead(kReadData2Size));
+}
+
+// Simulate two packets arriving at once. Read them in two calls separated
+// by a Rebuild.
+TEST_F(GCMSocketStreamTest, TwoReadsAtOnceWithRebuild) {
+ std::string long_data = std::string(kReadData, kReadDataSize) +
+ std::string(kReadData2, kReadData2Size);
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ long_data.c_str(),
+ long_data.size())),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ input_stream()->RebuildBuffer();
+ WaitForData(kReadData2Size);
+ ASSERT_EQ(std::string(kReadData2, kReadData2Size),
+ DoInputStreamRead(kReadData2Size));
+}
+
+// Simulate a read that is aborted.
+TEST_F(GCMSocketStreamTest, ReadError) {
+ int result = net::ERR_ABORTED;
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, result)),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
+ ASSERT_EQ(result, input_stream()->last_error());
+}
+
+// Simulate a read after the connection is closed.
+TEST_F(GCMSocketStreamTest, ReadDisconnected) {
+ BuildSocket(ReadList(), WriteList());
+ socket()->Disconnect();
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
+ ASSERT_EQ(net::ERR_CONNECTION_CLOSED, input_stream()->last_error());
+}
+
+// Write a full message in one go.
+TEST_F(GCMSocketStreamTest, WriteFull) {
+ BuildSocket(ReadList(),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message in two go's.
+TEST_F(GCMSocketStreamTest, WritePartial) {
+ WriteList write_list;
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize / 2));
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData + kWriteDataSize / 2,
+ kWriteDataSize / 2));
+ BuildSocket(ReadList(), write_list);
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message completely asynchronously (returns IO_PENDING before
+// finishing the write in two go's).
+TEST_F(GCMSocketStreamTest, WriteNone) {
+ WriteList write_list;
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize / 2));
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData + kWriteDataSize / 2,
+ kWriteDataSize / 2));
+ BuildSocket(ReadList(), write_list);
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message then read a message.
+TEST_F(GCMSocketStreamTest, WriteThenRead) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// Read a message then write a message.
+TEST_F(GCMSocketStreamTest, ReadThenWrite) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Simulate a write that gets aborted.
+TEST_F(GCMSocketStreamTest, WriteError) {
+ int result = net::ERR_ABORTED;
+ BuildSocket(ReadList(),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS, result)));
+ DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
+ ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
+ ASSERT_EQ(result, output_stream()->last_error());
+}
+
+// Simulate a write after the connection is closed.
+TEST_F(GCMSocketStreamTest, WriteDisconnected) {
+ BuildSocket(ReadList(), WriteList());
+ socket()->Disconnect();
+ DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
+ ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
+ ASSERT_EQ(net::ERR_CONNECTION_CLOSED, output_stream()->last_error());
+}
+
+} // namespace
+} // namespace gcm
diff --git a/google_apis/gcm/gcm.gyp b/google_apis/gcm/gcm.gyp
new file mode 100644
index 0000000..d6d9a31
--- /dev/null
+++ b/google_apis/gcm/gcm.gyp
@@ -0,0 +1,56 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+{
+ 'variables': {
+ 'chromium_code': 1,
+ },
+
+ 'targets': [
+ # The public GCM shared library target.
+ {
+ 'target_name': 'gcm',
+ 'type': 'shared_library',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '../..',
+ ],
+ 'defines': [
+ 'GCM_IMPLEMENTATION',
+ ],
+ 'export_dependent_settings': [
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite'
+ ],
+ 'dependencies': [
+ '../../base/base.gyp:base',
+ '../../net/net.gyp:net',
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite'
+ ],
+ 'sources': [
+ 'base/socket_stream.h',
+ 'base/socket_stream.cc',
+ ],
+ },
+
+ # The main GCM unit tests.
+ {
+ 'target_name': 'gcm_unit_tests',
+ 'type': '<(gtest_target_type)',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '../..',
+ ],
+ 'dependencies': [
+ '../../base/base.gyp:run_all_unittests',
+ '../../base/base.gyp:base',
+ '../../net/net.gyp:net_test_support',
+ '../../testing/gtest.gyp:gtest',
+ 'gcm'
+ ],
+ 'sources': [
+ 'base/socket_stream_unittest.cc',
+ ]
+ },
+ ],
+}