summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-22 00:50:15 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-22 00:50:15 +0000
commitafaa03ce00f0fa13ae78569c97ecc82bab66d157 (patch)
tree816e5542efd41e4397602830fabd19020ecf7c63 /google_apis
parentf870007e66d9d212124cd0bff85e84cc97551854 (diff)
downloadchromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.zip
chromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.tar.gz
chromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.tar.bz2
[GCM] Unrevert initial GCM patch
Includes build fix for static libraries and ensure that gcm_unit_tests is run on relevant builders. Original codereview: https://codereview.chromium.org/23684017/ BUG=284553 TBR=darin@chromium.org Review URL: https://codereview.chromium.org/32093003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@229982 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/DEPS12
-rw-r--r--google_apis/gcm/OWNERS3
-rw-r--r--google_apis/gcm/base/gcm_export.h29
-rw-r--r--google_apis/gcm/base/socket_stream.cc332
-rw-r--r--google_apis/gcm/base/socket_stream.h205
-rw-r--r--google_apis/gcm/base/socket_stream_unittest.cc406
-rw-r--r--google_apis/gcm/gcm.gyp56
7 files changed, 1043 insertions, 0 deletions
diff --git a/google_apis/gcm/DEPS b/google_apis/gcm/DEPS
new file mode 100644
index 0000000..757c0da
--- /dev/null
+++ b/google_apis/gcm/DEPS
@@ -0,0 +1,12 @@
+include_rules = [
+ # Repeat these from the top-level DEPS file so one can just run
+ #
+ # checkdeps.py google_apis/gcm
+ #
+ # to test.
+ "+base",
+ "+testing",
+
+ "+google", # For third_party/protobuf/src.
+ "+net",
+]
diff --git a/google_apis/gcm/OWNERS b/google_apis/gcm/OWNERS
new file mode 100644
index 0000000..493c918
--- /dev/null
+++ b/google_apis/gcm/OWNERS
@@ -0,0 +1,3 @@
+zea@chromium.org
+dimich@chromium.org
+tim@chromium.org
diff --git a/google_apis/gcm/base/gcm_export.h b/google_apis/gcm/base/gcm_export.h
new file mode 100644
index 0000000..b66eb8e
--- /dev/null
+++ b/google_apis/gcm/base/gcm_export.h
@@ -0,0 +1,29 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_GCM_EXPORT_H_
+#define GOOGLE_APIS_GCM_GCM_EXPORT_H_
+
+#if defined(COMPONENT_BUILD)
+#if defined(WIN32)
+
+#if defined(GCM_IMPLEMENTATION)
+#define GCM_EXPORT __declspec(dllexport)
+#else
+#define GCM_EXPORT __declspec(dllimport)
+#endif // defined(GCM_IMPLEMENTATION)
+
+#else // defined(WIN32)
+#if defined(GCM_IMPLEMENTATION)
+#define GCM_EXPORT __attribute__((visibility("default")))
+#else
+#define GCM_EXPORT
+#endif // defined(GCM_IMPLEMENTATION)
+#endif
+
+#else // defined(COMPONENT_BUILD)
+#define GCM_EXPORT
+#endif
+
+#endif // GOOGLE_APIS_GCM_GCM_EXPORT_H_
diff --git a/google_apis/gcm/base/socket_stream.cc b/google_apis/gcm/base/socket_stream.cc
new file mode 100644
index 0000000..1a0b29d
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream.cc
@@ -0,0 +1,332 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/base/socket_stream.h"
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "net/base/io_buffer.h"
+#include "net/socket/stream_socket.h"
+
+namespace gcm {
+
+namespace {
+
+// TODO(zea): consider having dynamically-sized buffers if this becomes too
+// expensive.
+const uint32 kDefaultBufferSize = 8*1024;
+
+} // namespace
+
+SocketInputStream::SocketInputStream(net::StreamSocket* socket)
+ : socket_(socket),
+ io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
+ read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
+ kDefaultBufferSize)),
+ next_pos_(0),
+ last_error_(net::OK),
+ weak_ptr_factory_(this) {
+ DCHECK(socket->IsConnected());
+}
+
+SocketInputStream::~SocketInputStream() {
+}
+
+bool SocketInputStream::Next(const void** data, int* size) {
+ if (GetState() != EMPTY && GetState() != READY) {
+ NOTREACHED() << "Invalid input stream read attempt.";
+ return false;
+ }
+
+ if (GetState() == EMPTY) {
+ DVLOG(1) << "No unread data remaining, ending read.";
+ return false;
+ }
+
+ DCHECK_EQ(GetState(), READY)
+ << " Input stream must have pending data before reading.";
+ DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
+ *data = io_buffer_->data() + next_pos_;
+ *size = UnreadByteCount();
+ next_pos_ = read_buffer_->BytesConsumed();
+ DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
+ return true;
+}
+
+void SocketInputStream::BackUp(int count) {
+ DCHECK(GetState() == READY || GetState() == EMPTY);
+ DCHECK_GT(count, 0);
+ DCHECK_LE(count, next_pos_);
+
+ next_pos_ -= count;
+ DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
+ << "Current position now at " << next_pos_
+ << " of " << read_buffer_->BytesConsumed();
+}
+
+bool SocketInputStream::Skip(int count) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+int64 SocketInputStream::ByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ return next_pos_;
+}
+
+size_t SocketInputStream::UnreadByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ return read_buffer_->BytesConsumed() - next_pos_;
+}
+
+net::Error SocketInputStream::Refresh(const base::Closure& callback,
+ int byte_limit) {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), READING);
+ DCHECK_GT(byte_limit, 0);
+
+ if (byte_limit > read_buffer_->BytesRemaining()) {
+ NOTREACHED() << "Out of buffer space, closing input stream.";
+ CloseStream(net::ERR_UNEXPECTED, base::Closure());
+ return net::OK;
+ }
+
+ if (!socket_->IsConnected()) {
+ LOG(ERROR) << "Socket was disconnected, closing input stream";
+ CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
+ return net::OK;
+ }
+
+ DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
+ int result = socket_->Read(
+ read_buffer_,
+ byte_limit,
+ base::Bind(&SocketInputStream::RefreshCompletionCallback,
+ weak_ptr_factory_.GetWeakPtr(),
+ callback));
+ DVLOG(1) << "Read returned " << result;
+ if (result == net::ERR_IO_PENDING) {
+ last_error_ = net::ERR_IO_PENDING;
+ return net::ERR_IO_PENDING;
+ }
+
+ RefreshCompletionCallback(base::Closure(), result);
+ return net::OK;
+}
+
+void SocketInputStream::RebuildBuffer() {
+ DVLOG(1) << "Rebuilding input stream, consumed "
+ << next_pos_ << " bytes.";
+ DCHECK_NE(GetState(), READING);
+ DCHECK_NE(GetState(), CLOSED);
+
+ int unread_data_size = 0;
+ const void* unread_data_ptr = NULL;
+ Next(&unread_data_ptr, &unread_data_size);
+ ResetInternal();
+
+ if (unread_data_ptr != io_buffer_->data()) {
+ DVLOG(1) << "Have " << unread_data_size
+ << " unread bytes remaining, shifting.";
+ // Move any remaining unread data to the start of the buffer;
+ std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
+ } else {
+ DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
+ }
+ read_buffer_->DidConsume(unread_data_size);
+}
+
+net::Error SocketInputStream::last_error() const {
+ return last_error_;
+}
+
+SocketInputStream::State SocketInputStream::GetState() const {
+ if (last_error_ < net::ERR_IO_PENDING)
+ return CLOSED;
+
+ if (last_error_ == net::ERR_IO_PENDING)
+ return READING;
+
+ DCHECK_EQ(last_error_, net::OK);
+ if (read_buffer_->BytesConsumed() == next_pos_)
+ return EMPTY;
+
+ return READY;
+}
+
+void SocketInputStream::RefreshCompletionCallback(
+ const base::Closure& callback, int result) {
+ // If an error occurred before the completion callback could complete, ignore
+ // the result.
+ if (GetState() == CLOSED)
+ return;
+
+ // Result == 0 implies EOF, which is treated as an error.
+ if (result == 0)
+ result = net::ERR_CONNECTION_CLOSED;
+
+ DCHECK_NE(result, net::ERR_IO_PENDING);
+
+ if (result < net::OK) {
+ DVLOG(1) << "Failed to refresh socket: " << result;
+ CloseStream(static_cast<net::Error>(result), callback);
+ return;
+ }
+
+ DCHECK_GT(result, 0);
+ last_error_ = net::OK;
+ read_buffer_->DidConsume(result);
+
+ DVLOG(1) << "Refresh complete with " << result << " new bytes. "
+ << "Current position " << next_pos_
+ << " of " << read_buffer_->BytesConsumed() << ".";
+
+ if (!callback.is_null())
+ callback.Run();
+}
+
+void SocketInputStream::ResetInternal() {
+ read_buffer_->SetOffset(0);
+ next_pos_ = 0;
+ last_error_ = net::OK;
+ weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
+}
+
+void SocketInputStream::CloseStream(net::Error error,
+ const base::Closure& callback) {
+ DCHECK_LT(error, net::ERR_IO_PENDING);
+ ResetInternal();
+ last_error_ = error;
+ LOG(ERROR) << "Closing stream with result " << error;
+ if (!callback.is_null())
+ callback.Run();
+}
+
+SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
+ : socket_(socket),
+ io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
+ write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
+ kDefaultBufferSize)),
+ next_pos_(0),
+ last_error_(net::OK),
+ weak_ptr_factory_(this) {
+ DCHECK(socket->IsConnected());
+}
+
+SocketOutputStream::~SocketOutputStream() {
+}
+
+bool SocketOutputStream::Next(void** data, int* size) {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), FLUSHING);
+ if (next_pos_ == write_buffer_->size())
+ return false;
+
+ *data = write_buffer_->data() + next_pos_;
+ *size = write_buffer_->size() - next_pos_;
+ next_pos_ = write_buffer_->size();
+ return true;
+}
+
+void SocketOutputStream::BackUp(int count) {
+ DCHECK_GE(count, 0);
+ if (count > next_pos_)
+ next_pos_ = 0;
+ next_pos_ -= count;
+ DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
+ << next_pos_ << " bytes used.";
+}
+
+int64 SocketOutputStream::ByteCount() const {
+ DCHECK_NE(GetState(), CLOSED);
+ DCHECK_NE(GetState(), FLUSHING);
+ return next_pos_;
+}
+
+net::Error SocketOutputStream::Flush(const base::Closure& callback) {
+ DCHECK_EQ(GetState(), READY);
+
+ if (!socket_->IsConnected()) {
+ LOG(ERROR) << "Socket was disconnected, closing output stream";
+ last_error_ = net::ERR_CONNECTION_CLOSED;
+ return net::OK;
+ }
+
+ DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
+ int result = socket_->Write(
+ write_buffer_,
+ next_pos_,
+ base::Bind(&SocketOutputStream::FlushCompletionCallback,
+ weak_ptr_factory_.GetWeakPtr(),
+ callback));
+ DVLOG(1) << "Write returned " << result;
+ if (result == net::ERR_IO_PENDING) {
+ last_error_ = net::ERR_IO_PENDING;
+ return net::ERR_IO_PENDING;
+ }
+
+ FlushCompletionCallback(base::Closure(), result);
+ return net::OK;
+}
+
+SocketOutputStream::State SocketOutputStream::GetState() const{
+ if (last_error_ < net::ERR_IO_PENDING)
+ return CLOSED;
+
+ if (last_error_ == net::ERR_IO_PENDING)
+ return FLUSHING;
+
+ DCHECK_EQ(last_error_, net::OK);
+ if (next_pos_ == 0)
+ return EMPTY;
+
+ return READY;
+}
+
+net::Error SocketOutputStream::last_error() const {
+ return last_error_;
+}
+
+void SocketOutputStream::FlushCompletionCallback(
+ const base::Closure& callback, int result) {
+ // If an error occurred before the completion callback could complete, ignore
+ // the result.
+ if (GetState() == CLOSED)
+ return;
+
+ // Result == 0 implies EOF, which is treated as an error.
+ if (result == 0)
+ result = net::ERR_CONNECTION_CLOSED;
+
+ DCHECK_NE(result, net::ERR_IO_PENDING);
+
+ if (result < net::OK) {
+ LOG(ERROR) << "Failed to flush socket.";
+ last_error_ = static_cast<net::Error>(result);
+ if (!callback.is_null())
+ callback.Run();
+ return;
+ }
+
+ DCHECK_GT(result, net::OK);
+ last_error_ = net::OK;
+
+ if (write_buffer_->BytesConsumed() + result < next_pos_) {
+ DVLOG(1) << "Partial flush complete. Retrying.";
+ // Only a partial write was completed. Flush again to finish the write.
+ write_buffer_->DidConsume(result);
+ Flush(callback);
+ return;
+ }
+
+ DVLOG(1) << "Socket flush complete.";
+ write_buffer_->SetOffset(0);
+ next_pos_ = 0;
+ if (!callback.is_null())
+ callback.Run();
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/base/socket_stream.h b/google_apis/gcm/base/socket_stream.h
new file mode 100644
index 0000000..a458420
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream.h
@@ -0,0 +1,205 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a
+// net::StreamSocket. Built to work with Protobuf CodedStreams.
+
+#ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
+#define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
+
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "google/protobuf/io/zero_copy_stream.h"
+#include "google_apis/gcm/base/gcm_export.h"
+#include "net/base/net_errors.h"
+
+namespace net {
+class DrainableIOBuffer;
+class IOBuffer;
+class StreamSocket;
+} // namespace net
+
+namespace gcm {
+
+// A helper class for interacting with a net::StreamSocket that is receiving
+// protobuf encoded messages. A SocketInputStream does not take ownership of
+// the socket itself, and it is expected that the life of the input stream
+// should match the life of the socket itself (while the socket remains
+// connected). If an error is encounters, the input stream will store the error
+// in |last_error_|, and GetState() will be set to CLOSED.
+// Typical usage:
+// 1. Check the GetState() of the input stream before using it. If CLOSED, the
+// input stream must be rebuilt (and the socket likely needs to be
+// reconnected as an error was encountered).
+// 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size
+// for a message, and wait until completion. It is invalid to attempt to
+// Refresh an input stream or read data from the stream while a Refresh is
+// pending.
+// 3. Check GetState() again to ensure the Refresh was successful.
+// 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of
+// the SocketInputStream. Next(..) will return true until there is no data
+// remaining.
+// 5. Call RebuildBuffer when done reading, to shift any unread data to the
+// start of the buffer.
+// 6. Repeat as necessary.
+class GCM_EXPORT SocketInputStream
+ : public google::protobuf::io::ZeroCopyInputStream {
+ public:
+ enum State {
+ // No valid data to read. This means the buffer is either empty or all data
+ // in the buffer has already been consumed.
+ EMPTY,
+ // Valid data to read.
+ READY,
+ // In the process of reading new data from the socket.
+ READING,
+ // An permanent error occurred and the stream is now closed.
+ CLOSED,
+ };
+
+ // |socket| should already be connected.
+ explicit SocketInputStream(net::StreamSocket* socket);
+ virtual ~SocketInputStream();
+
+ // ZeroCopyInputStream implementation.
+ virtual bool Next(const void** data, int* size) OVERRIDE;
+ virtual void BackUp(int count) OVERRIDE;
+ virtual bool Skip(int count) OVERRIDE; // Not implemented.
+ virtual int64 ByteCount() const OVERRIDE;
+
+ // The remaining amount of valid data available to be read.
+ size_t UnreadByteCount() const;
+
+ // Reads from the socket, appending a max of |byte_limit| bytes onto the read
+ // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete
+ // synchronously, in which case the callback is invoked upon completion. If
+ // the refresh can complete synchronously, even in case of an error, returns
+ // net::OK without invoking callback.
+ // Note: GetState() (and possibly last_error()) should be checked upon
+ // completion to determine whether the Refresh encountered an error.
+ net::Error Refresh(const base::Closure& callback, int byte_limit);
+
+ // Rebuilds the buffer state by copying over any unread data to the beginning
+ // of the buffer and resetting the buffer read/write positions.
+ // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream
+ // must be recreated from scratch in such a scenario.
+ void RebuildBuffer();
+
+ // Returns the last fatal error encountered. Only valid if GetState() ==
+ // CLOSED.
+ net::Error last_error() const;
+
+ // Returns the current state.
+ State GetState() const;
+
+ private:
+ // Clears the local state.
+ void ResetInternal();
+
+ // Callback for Socket::Read calls.
+ void RefreshCompletionCallback(const base::Closure& callback, int result);
+
+ // Permanently closes the stream.
+ void CloseStream(net::Error error, const base::Closure& callback);
+
+ // Internal net components.
+ net::StreamSocket* const socket_;
+ const scoped_refptr<net::IOBuffer> io_buffer_;
+ // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
+ // been written to yet by Socket::Read calls.
+ const scoped_refptr<net::DrainableIOBuffer> read_buffer_;
+
+ // Starting position of the data within |io_buffer_| to consume on subsequent
+ // Next(..) call. 0 <= next_pos_ <= read_buffer_.BytesConsumed()
+ // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY.
+ int next_pos_;
+
+ // If < net::ERR_IO_PENDING, the last net error received.
+ // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING.
+ net::Error last_error_;
+
+ base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketInputStream);
+};
+
+// A helper class for writing to a SocketStream with protobuf encoded data.
+// A SocketOutputStream does not take ownership of the socket itself, and it is
+// expected that the life of the output stream should match the life of the
+// socket itself (while the socket remains connected).
+// Typical usage:
+// 1. Check the GetState() of the output stream before using it. If CLOSED, the
+// output stream must be rebuilt (and the socket likely needs to be
+// reconnected, as an error was encountered).
+// 2. If EMPTY, the output stream can be written via a CodedOutputStream using
+// the ZeroCopyOutputStream interface.
+// 3. Once done writing, GetState() should be READY, so call Flush(..) to write
+// the buffer into the StreamSocket. Wait for the callback to be invoked
+// (it's invalid to write to an output stream while it's flushing).
+// 4. Check the GetState() again to ensure the Flush was successful. GetState()
+// should be EMPTY again.
+// 5. Repeat.
+class GCM_EXPORT SocketOutputStream
+ : public google::protobuf::io::ZeroCopyOutputStream {
+ public:
+ enum State {
+ // No valid data yet.
+ EMPTY,
+ // Ready for flushing (some data is present).
+ READY,
+ // In the process of flushing into the socket.
+ FLUSHING,
+ // A permanent error occurred, and the stream is now closed.
+ CLOSED,
+ };
+
+ // |socket| should already be connected.
+ explicit SocketOutputStream(net::StreamSocket* socket);
+ virtual ~SocketOutputStream();
+
+ // ZeroCopyOutputStream implementation.
+ virtual bool Next(void** data, int* size) OVERRIDE;
+ virtual void BackUp(int count) OVERRIDE;
+ virtual int64 ByteCount() const OVERRIDE;
+
+ // Writes the buffer into the Socket.
+ net::Error Flush(const base::Closure& callback);
+
+ // Returns the last fatal error encountered. Only valid if GetState() ==
+ // CLOSED.
+ net::Error last_error() const;
+
+ // Returns the current state.
+ State GetState() const;
+
+ private:
+ void FlushCompletionCallback(const base::Closure& callback, int result);
+
+ // Internal net components.
+ net::StreamSocket* const socket_;
+ const scoped_refptr<net::IOBuffer> io_buffer_;
+ // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
+ // been written to the socket yet.
+ const scoped_refptr<net::DrainableIOBuffer> write_buffer_;
+
+ // Starting position of the data within |io_buffer_| to consume on subsequent
+ // Next(..) call. 0 <= write_buffer_.BytesConsumed() <= next_pos_
+ // Note: next_pos == 0 implies GetState() == EMPTY.
+ int next_pos_;
+
+ // If < net::ERR_IO_PENDING, the last net error received.
+ // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING.
+ net::Error last_error_;
+
+ base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketOutputStream);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
diff --git a/google_apis/gcm/base/socket_stream_unittest.cc b/google_apis/gcm/base/socket_stream_unittest.cc
new file mode 100644
index 0000000..d7ba679
--- /dev/null
+++ b/google_apis/gcm/base/socket_stream_unittest.cc
@@ -0,0 +1,406 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/base/socket_stream.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/run_loop.h"
+#include "base/stl_util.h"
+#include "base/strings/string_piece.h"
+#include "net/socket/socket_test_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+namespace {
+
+typedef std::vector<net::MockRead> ReadList;
+typedef std::vector<net::MockWrite> WriteList;
+
+const char kReadData[] = "read_data";
+const uint64 kReadDataSize = arraysize(kReadData) - 1;
+const char kReadData2[] = "read_alternate_data";
+const uint64 kReadData2Size = arraysize(kReadData2) - 1;
+const char kWriteData[] = "write_data";
+const uint64 kWriteDataSize = arraysize(kWriteData) - 1;
+
+class GCMSocketStreamTest : public testing::Test {
+ public:
+ GCMSocketStreamTest();
+ virtual ~GCMSocketStreamTest();
+
+ // Build a socket with the expected reads and writes.
+ void BuildSocket(const ReadList& read_list, const WriteList& write_list);
+
+ // Pump the message loop until idle.
+ void PumpLoop();
+
+ // Simulates a google::protobuf::io::CodedInputStream read.
+ base::StringPiece DoInputStreamRead(uint64 bytes);
+ // Simulates a google::protobuf::io::CodedOutputStream write.
+ uint64 DoOutputStreamWrite(const base::StringPiece& write_src);
+
+ // Synchronous Refresh wrapper.
+ void WaitForData(size_t msg_size);
+
+ base::MessageLoop* message_loop() { return &message_loop_; };
+ net::DelayedSocketData* data_provider() { return data_provider_.get(); }
+ SocketInputStream* input_stream() { return socket_input_stream_.get(); }
+ SocketOutputStream* output_stream() { return socket_output_stream_.get(); }
+ net::StreamSocket* socket() { return socket_.get(); }
+
+ private:
+ void OpenConnection();
+ void ResetInputStream();
+ void ResetOutputStream();
+
+ void ConnectCallback(int result);
+
+ // SocketStreams and their data providers.
+ ReadList mock_reads_;
+ WriteList mock_writes_;
+ scoped_ptr<net::DelayedSocketData> data_provider_;
+ scoped_ptr<SocketInputStream> socket_input_stream_;
+ scoped_ptr<SocketOutputStream> socket_output_stream_;
+
+ // net:: components.
+ scoped_ptr<net::StreamSocket> socket_;
+ net::MockClientSocketFactory socket_factory_;
+ net::AddressList address_list_;
+
+ base::MessageLoopForIO message_loop_;
+};
+
+GCMSocketStreamTest::GCMSocketStreamTest() {
+ net::IPAddressNumber ip_number;
+ net::ParseIPLiteralToNumber("127.0.0.1", &ip_number);
+ address_list_ = net::AddressList::CreateFromIPAddress(ip_number, 5228);
+}
+
+GCMSocketStreamTest::~GCMSocketStreamTest() {}
+
+void GCMSocketStreamTest::BuildSocket(const ReadList& read_list,
+ const WriteList& write_list) {
+ mock_reads_ = read_list;
+ mock_writes_ = write_list;
+ data_provider_.reset(
+ new net::DelayedSocketData(
+ 0,
+ vector_as_array(&mock_reads_), mock_reads_.size(),
+ vector_as_array(&mock_writes_), mock_writes_.size()));
+ socket_factory_.AddSocketDataProvider(data_provider_.get());
+ OpenConnection();
+ ResetInputStream();
+ ResetOutputStream();
+}
+
+void GCMSocketStreamTest::PumpLoop() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+}
+
+base::StringPiece GCMSocketStreamTest::DoInputStreamRead(uint64 bytes) {
+ uint64 total_bytes_read = 0;
+ const void* initial_buffer = NULL;
+ const void* buffer = NULL;
+ int size = 0;
+
+ do {
+ DCHECK(socket_input_stream_->GetState() == SocketInputStream::EMPTY ||
+ socket_input_stream_->GetState() == SocketInputStream::READY);
+ if (!socket_input_stream_->Next(&buffer, &size))
+ break;
+ total_bytes_read += size;
+ if (initial_buffer) { // Verify the buffer doesn't skip data.
+ EXPECT_EQ(static_cast<const uint8*>(initial_buffer) + total_bytes_read,
+ static_cast<const uint8*>(buffer) + size);
+ } else {
+ initial_buffer = buffer;
+ }
+ } while (total_bytes_read < bytes);
+
+ if (total_bytes_read > bytes) {
+ socket_input_stream_->BackUp(total_bytes_read - bytes);
+ total_bytes_read = bytes;
+ }
+
+ return base::StringPiece(static_cast<const char*>(initial_buffer),
+ total_bytes_read);
+}
+
+uint64 GCMSocketStreamTest::DoOutputStreamWrite(
+ const base::StringPiece& write_src) {
+ DCHECK_EQ(socket_output_stream_->GetState(), SocketOutputStream::EMPTY);
+ uint64 total_bytes_written = 0;
+ void* buffer = NULL;
+ int size = 0;
+ size_t bytes = write_src.size();
+
+ do {
+ if (!socket_output_stream_->Next(&buffer, &size))
+ break;
+ uint64 bytes_to_write = (static_cast<uint64>(size) < bytes ? size : bytes);
+ memcpy(buffer,
+ write_src.data() + total_bytes_written,
+ bytes_to_write);
+ if (bytes_to_write < static_cast<uint64>(size))
+ socket_output_stream_->BackUp(size - bytes_to_write);
+ total_bytes_written += bytes_to_write;
+ } while (total_bytes_written < bytes);
+
+ base::RunLoop run_loop;
+ if (socket_output_stream_->Flush(run_loop.QuitClosure()) ==
+ net::ERR_IO_PENDING) {
+ run_loop.Run();
+ }
+
+ return total_bytes_written;
+}
+
+void GCMSocketStreamTest::WaitForData(size_t msg_size) {
+ while (input_stream()->UnreadByteCount() < msg_size) {
+ base::RunLoop run_loop;
+ if (input_stream()->Refresh(run_loop.QuitClosure(),
+ msg_size - input_stream()->UnreadByteCount()) ==
+ net::ERR_IO_PENDING) {
+ run_loop.Run();
+ }
+ if (input_stream()->GetState() == SocketInputStream::CLOSED)
+ return;
+ }
+}
+
+void GCMSocketStreamTest::OpenConnection() {
+ socket_ = socket_factory_.CreateTransportClientSocket(
+ address_list_, NULL, net::NetLog::Source());
+ socket_->Connect(
+ base::Bind(&GCMSocketStreamTest::ConnectCallback,
+ base::Unretained(this)));
+ PumpLoop();
+}
+
+void GCMSocketStreamTest::ConnectCallback(int result) {}
+
+void GCMSocketStreamTest::ResetInputStream() {
+ DCHECK(socket_.get());
+ socket_input_stream_.reset(new SocketInputStream(socket_.get()));
+}
+
+void GCMSocketStreamTest::ResetOutputStream() {
+ DCHECK(socket_.get());
+ socket_output_stream_.reset(new SocketOutputStream(socket_.get()));
+}
+
+// A read where all data is already available.
+TEST_F(GCMSocketStreamTest, ReadDataSync) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// A read that comes in two parts.
+TEST_F(GCMSocketStreamTest, ReadPartialDataSync) {
+ size_t first_read_len = kReadDataSize / 2;
+ size_t second_read_len = kReadDataSize - first_read_len;
+ ReadList read_list;
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ first_read_len));
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS,
+ &kReadData[first_read_len],
+ second_read_len));
+ BuildSocket(read_list, WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// A read where no data is available at first (IO_PENDING will be returned).
+TEST_F(GCMSocketStreamTest, ReadAsync) {
+ size_t first_read_len = kReadDataSize / 2;
+ size_t second_read_len = kReadDataSize - first_read_len;
+ ReadList read_list;
+ read_list.push_back(
+ net::MockRead(net::SYNCHRONOUS, net::ERR_IO_PENDING));
+ read_list.push_back(
+ net::MockRead(net::ASYNC, kReadData, first_read_len));
+ read_list.push_back(
+ net::MockRead(net::ASYNC, &kReadData[first_read_len], second_read_len));
+ BuildSocket(read_list, WriteList());
+
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&net::DelayedSocketData::ForceNextRead,
+ base::Unretained(data_provider())));
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// Simulate two packets arriving at once. Read them in two separate calls.
+TEST_F(GCMSocketStreamTest, TwoReadsAtOnce) {
+ std::string long_data = std::string(kReadData, kReadDataSize) +
+ std::string(kReadData2, kReadData2Size);
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ long_data.c_str(),
+ long_data.size())),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ WaitForData(kReadData2Size);
+ ASSERT_EQ(std::string(kReadData2, kReadData2Size),
+ DoInputStreamRead(kReadData2Size));
+}
+
+// Simulate two packets arriving at once. Read them in two calls separated
+// by a Rebuild.
+TEST_F(GCMSocketStreamTest, TwoReadsAtOnceWithRebuild) {
+ std::string long_data = std::string(kReadData, kReadDataSize) +
+ std::string(kReadData2, kReadData2Size);
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ long_data.c_str(),
+ long_data.size())),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ input_stream()->RebuildBuffer();
+ WaitForData(kReadData2Size);
+ ASSERT_EQ(std::string(kReadData2, kReadData2Size),
+ DoInputStreamRead(kReadData2Size));
+}
+
+// Simulate a read that is aborted.
+TEST_F(GCMSocketStreamTest, ReadError) {
+ int result = net::ERR_ABORTED;
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, result)),
+ WriteList());
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
+ ASSERT_EQ(result, input_stream()->last_error());
+}
+
+// Simulate a read after the connection is closed.
+TEST_F(GCMSocketStreamTest, ReadDisconnected) {
+ BuildSocket(ReadList(), WriteList());
+ socket()->Disconnect();
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState());
+ ASSERT_EQ(net::ERR_CONNECTION_CLOSED, input_stream()->last_error());
+}
+
+// Write a full message in one go.
+TEST_F(GCMSocketStreamTest, WriteFull) {
+ BuildSocket(ReadList(),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message in two go's.
+TEST_F(GCMSocketStreamTest, WritePartial) {
+ WriteList write_list;
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize / 2));
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData + kWriteDataSize / 2,
+ kWriteDataSize / 2));
+ BuildSocket(ReadList(), write_list);
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message completely asynchronously (returns IO_PENDING before
+// finishing the write in two go's).
+TEST_F(GCMSocketStreamTest, WriteNone) {
+ WriteList write_list;
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize / 2));
+ write_list.push_back(net::MockWrite(net::SYNCHRONOUS,
+ kWriteData + kWriteDataSize / 2,
+ kWriteDataSize / 2));
+ BuildSocket(ReadList(), write_list);
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Write a message then read a message.
+TEST_F(GCMSocketStreamTest, WriteThenRead) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+}
+
+// Read a message then write a message.
+TEST_F(GCMSocketStreamTest, ReadThenWrite) {
+ BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS,
+ kReadData,
+ kReadDataSize)),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS,
+ kWriteData,
+ kWriteDataSize)));
+
+ WaitForData(kReadDataSize);
+ ASSERT_EQ(std::string(kReadData, kReadDataSize),
+ DoInputStreamRead(kReadDataSize));
+
+ ASSERT_EQ(kWriteDataSize,
+ DoOutputStreamWrite(base::StringPiece(kWriteData,
+ kWriteDataSize)));
+}
+
+// Simulate a write that gets aborted.
+TEST_F(GCMSocketStreamTest, WriteError) {
+ int result = net::ERR_ABORTED;
+ BuildSocket(ReadList(),
+ WriteList(1, net::MockWrite(net::SYNCHRONOUS, result)));
+ DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
+ ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
+ ASSERT_EQ(result, output_stream()->last_error());
+}
+
+// Simulate a write after the connection is closed.
+TEST_F(GCMSocketStreamTest, WriteDisconnected) {
+ BuildSocket(ReadList(), WriteList());
+ socket()->Disconnect();
+ DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize));
+ ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState());
+ ASSERT_EQ(net::ERR_CONNECTION_CLOSED, output_stream()->last_error());
+}
+
+} // namespace
+} // namespace gcm
diff --git a/google_apis/gcm/gcm.gyp b/google_apis/gcm/gcm.gyp
new file mode 100644
index 0000000..c1ab33b
--- /dev/null
+++ b/google_apis/gcm/gcm.gyp
@@ -0,0 +1,56 @@
+# Copyright 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+{
+ 'variables': {
+ 'chromium_code': 1,
+ },
+
+ 'targets': [
+ # The public GCM target.
+ {
+ 'target_name': 'gcm',
+ 'type': '<(component)',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '../..',
+ ],
+ 'defines': [
+ 'GCM_IMPLEMENTATION',
+ ],
+ 'export_dependent_settings': [
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite'
+ ],
+ 'dependencies': [
+ '../../base/base.gyp:base',
+ '../../net/net.gyp:net',
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite'
+ ],
+ 'sources': [
+ 'base/socket_stream.h',
+ 'base/socket_stream.cc',
+ ],
+ },
+
+ # The main GCM unit tests.
+ {
+ 'target_name': 'gcm_unit_tests',
+ 'type': '<(gtest_target_type)',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '../..',
+ ],
+ 'dependencies': [
+ '../../base/base.gyp:run_all_unittests',
+ '../../base/base.gyp:base',
+ '../../net/net.gyp:net_test_support',
+ '../../testing/gtest.gyp:gtest',
+ 'gcm'
+ ],
+ 'sources': [
+ 'base/socket_stream_unittest.cc',
+ ]
+ },
+ ],
+}