diff options
author | zea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-22 00:50:15 +0000 |
---|---|---|
committer | zea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-22 00:50:15 +0000 |
commit | afaa03ce00f0fa13ae78569c97ecc82bab66d157 (patch) | |
tree | 816e5542efd41e4397602830fabd19020ecf7c63 /google_apis/gcm/base | |
parent | f870007e66d9d212124cd0bff85e84cc97551854 (diff) | |
download | chromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.zip chromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.tar.gz chromium_src-afaa03ce00f0fa13ae78569c97ecc82bab66d157.tar.bz2 |
[GCM] Unrevert initial GCM patch
Includes build fix for static libraries and ensure that gcm_unit_tests
is run on relevant builders.
Original codereview: https://codereview.chromium.org/23684017/
BUG=284553
TBR=darin@chromium.org
Review URL: https://codereview.chromium.org/32093003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@229982 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis/gcm/base')
-rw-r--r-- | google_apis/gcm/base/gcm_export.h | 29 | ||||
-rw-r--r-- | google_apis/gcm/base/socket_stream.cc | 332 | ||||
-rw-r--r-- | google_apis/gcm/base/socket_stream.h | 205 | ||||
-rw-r--r-- | google_apis/gcm/base/socket_stream_unittest.cc | 406 |
4 files changed, 972 insertions, 0 deletions
diff --git a/google_apis/gcm/base/gcm_export.h b/google_apis/gcm/base/gcm_export.h new file mode 100644 index 0000000..b66eb8e --- /dev/null +++ b/google_apis/gcm/base/gcm_export.h @@ -0,0 +1,29 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef GOOGLE_APIS_GCM_GCM_EXPORT_H_ +#define GOOGLE_APIS_GCM_GCM_EXPORT_H_ + +#if defined(COMPONENT_BUILD) +#if defined(WIN32) + +#if defined(GCM_IMPLEMENTATION) +#define GCM_EXPORT __declspec(dllexport) +#else +#define GCM_EXPORT __declspec(dllimport) +#endif // defined(GCM_IMPLEMENTATION) + +#else // defined(WIN32) +#if defined(GCM_IMPLEMENTATION) +#define GCM_EXPORT __attribute__((visibility("default"))) +#else +#define GCM_EXPORT +#endif // defined(GCM_IMPLEMENTATION) +#endif + +#else // defined(COMPONENT_BUILD) +#define GCM_EXPORT +#endif + +#endif // GOOGLE_APIS_GCM_GCM_EXPORT_H_ diff --git a/google_apis/gcm/base/socket_stream.cc b/google_apis/gcm/base/socket_stream.cc new file mode 100644 index 0000000..1a0b29d --- /dev/null +++ b/google_apis/gcm/base/socket_stream.cc @@ -0,0 +1,332 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "google_apis/gcm/base/socket_stream.h" + +#include "base/bind.h" +#include "base/callback.h" +#include "net/base/io_buffer.h" +#include "net/socket/stream_socket.h" + +namespace gcm { + +namespace { + +// TODO(zea): consider having dynamically-sized buffers if this becomes too +// expensive. +const uint32 kDefaultBufferSize = 8*1024; + +} // namespace + +SocketInputStream::SocketInputStream(net::StreamSocket* socket) + : socket_(socket), + io_buffer_(new net::IOBuffer(kDefaultBufferSize)), + read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), + kDefaultBufferSize)), + next_pos_(0), + last_error_(net::OK), + weak_ptr_factory_(this) { + DCHECK(socket->IsConnected()); +} + +SocketInputStream::~SocketInputStream() { +} + +bool SocketInputStream::Next(const void** data, int* size) { + if (GetState() != EMPTY && GetState() != READY) { + NOTREACHED() << "Invalid input stream read attempt."; + return false; + } + + if (GetState() == EMPTY) { + DVLOG(1) << "No unread data remaining, ending read."; + return false; + } + + DCHECK_EQ(GetState(), READY) + << " Input stream must have pending data before reading."; + DCHECK_LT(next_pos_, read_buffer_->BytesConsumed()); + *data = io_buffer_->data() + next_pos_; + *size = UnreadByteCount(); + next_pos_ = read_buffer_->BytesConsumed(); + DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; + return true; +} + +void SocketInputStream::BackUp(int count) { + DCHECK(GetState() == READY || GetState() == EMPTY); + DCHECK_GT(count, 0); + DCHECK_LE(count, next_pos_); + + next_pos_ -= count; + DVLOG(1) << "Backing up " << count << " bytes in input buffer. " + << "Current position now at " << next_pos_ + << " of " << read_buffer_->BytesConsumed(); +} + +bool SocketInputStream::Skip(int count) { + NOTIMPLEMENTED(); + return false; +} + +int64 SocketInputStream::ByteCount() const { + DCHECK_NE(GetState(), CLOSED); + DCHECK_NE(GetState(), READING); + return next_pos_; +} + +size_t SocketInputStream::UnreadByteCount() const { + DCHECK_NE(GetState(), CLOSED); + DCHECK_NE(GetState(), READING); + return read_buffer_->BytesConsumed() - next_pos_; +} + +net::Error SocketInputStream::Refresh(const base::Closure& callback, + int byte_limit) { + DCHECK_NE(GetState(), CLOSED); + DCHECK_NE(GetState(), READING); + DCHECK_GT(byte_limit, 0); + + if (byte_limit > read_buffer_->BytesRemaining()) { + NOTREACHED() << "Out of buffer space, closing input stream."; + CloseStream(net::ERR_UNEXPECTED, base::Closure()); + return net::OK; + } + + if (!socket_->IsConnected()) { + LOG(ERROR) << "Socket was disconnected, closing input stream"; + CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure()); + return net::OK; + } + + DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes."; + int result = socket_->Read( + read_buffer_, + byte_limit, + base::Bind(&SocketInputStream::RefreshCompletionCallback, + weak_ptr_factory_.GetWeakPtr(), + callback)); + DVLOG(1) << "Read returned " << result; + if (result == net::ERR_IO_PENDING) { + last_error_ = net::ERR_IO_PENDING; + return net::ERR_IO_PENDING; + } + + RefreshCompletionCallback(base::Closure(), result); + return net::OK; +} + +void SocketInputStream::RebuildBuffer() { + DVLOG(1) << "Rebuilding input stream, consumed " + << next_pos_ << " bytes."; + DCHECK_NE(GetState(), READING); + DCHECK_NE(GetState(), CLOSED); + + int unread_data_size = 0; + const void* unread_data_ptr = NULL; + Next(&unread_data_ptr, &unread_data_size); + ResetInternal(); + + if (unread_data_ptr != io_buffer_->data()) { + DVLOG(1) << "Have " << unread_data_size + << " unread bytes remaining, shifting."; + // Move any remaining unread data to the start of the buffer; + std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size); + } else { + DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining."; + } + read_buffer_->DidConsume(unread_data_size); +} + +net::Error SocketInputStream::last_error() const { + return last_error_; +} + +SocketInputStream::State SocketInputStream::GetState() const { + if (last_error_ < net::ERR_IO_PENDING) + return CLOSED; + + if (last_error_ == net::ERR_IO_PENDING) + return READING; + + DCHECK_EQ(last_error_, net::OK); + if (read_buffer_->BytesConsumed() == next_pos_) + return EMPTY; + + return READY; +} + +void SocketInputStream::RefreshCompletionCallback( + const base::Closure& callback, int result) { + // If an error occurred before the completion callback could complete, ignore + // the result. + if (GetState() == CLOSED) + return; + + // Result == 0 implies EOF, which is treated as an error. + if (result == 0) + result = net::ERR_CONNECTION_CLOSED; + + DCHECK_NE(result, net::ERR_IO_PENDING); + + if (result < net::OK) { + DVLOG(1) << "Failed to refresh socket: " << result; + CloseStream(static_cast<net::Error>(result), callback); + return; + } + + DCHECK_GT(result, 0); + last_error_ = net::OK; + read_buffer_->DidConsume(result); + + DVLOG(1) << "Refresh complete with " << result << " new bytes. " + << "Current position " << next_pos_ + << " of " << read_buffer_->BytesConsumed() << "."; + + if (!callback.is_null()) + callback.Run(); +} + +void SocketInputStream::ResetInternal() { + read_buffer_->SetOffset(0); + next_pos_ = 0; + last_error_ = net::OK; + weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks. +} + +void SocketInputStream::CloseStream(net::Error error, + const base::Closure& callback) { + DCHECK_LT(error, net::ERR_IO_PENDING); + ResetInternal(); + last_error_ = error; + LOG(ERROR) << "Closing stream with result " << error; + if (!callback.is_null()) + callback.Run(); +} + +SocketOutputStream::SocketOutputStream(net::StreamSocket* socket) + : socket_(socket), + io_buffer_(new net::IOBuffer(kDefaultBufferSize)), + write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), + kDefaultBufferSize)), + next_pos_(0), + last_error_(net::OK), + weak_ptr_factory_(this) { + DCHECK(socket->IsConnected()); +} + +SocketOutputStream::~SocketOutputStream() { +} + +bool SocketOutputStream::Next(void** data, int* size) { + DCHECK_NE(GetState(), CLOSED); + DCHECK_NE(GetState(), FLUSHING); + if (next_pos_ == write_buffer_->size()) + return false; + + *data = write_buffer_->data() + next_pos_; + *size = write_buffer_->size() - next_pos_; + next_pos_ = write_buffer_->size(); + return true; +} + +void SocketOutputStream::BackUp(int count) { + DCHECK_GE(count, 0); + if (count > next_pos_) + next_pos_ = 0; + next_pos_ -= count; + DVLOG(1) << "Backing up " << count << " bytes in output buffer. " + << next_pos_ << " bytes used."; +} + +int64 SocketOutputStream::ByteCount() const { + DCHECK_NE(GetState(), CLOSED); + DCHECK_NE(GetState(), FLUSHING); + return next_pos_; +} + +net::Error SocketOutputStream::Flush(const base::Closure& callback) { + DCHECK_EQ(GetState(), READY); + + if (!socket_->IsConnected()) { + LOG(ERROR) << "Socket was disconnected, closing output stream"; + last_error_ = net::ERR_CONNECTION_CLOSED; + return net::OK; + } + + DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket."; + int result = socket_->Write( + write_buffer_, + next_pos_, + base::Bind(&SocketOutputStream::FlushCompletionCallback, + weak_ptr_factory_.GetWeakPtr(), + callback)); + DVLOG(1) << "Write returned " << result; + if (result == net::ERR_IO_PENDING) { + last_error_ = net::ERR_IO_PENDING; + return net::ERR_IO_PENDING; + } + + FlushCompletionCallback(base::Closure(), result); + return net::OK; +} + +SocketOutputStream::State SocketOutputStream::GetState() const{ + if (last_error_ < net::ERR_IO_PENDING) + return CLOSED; + + if (last_error_ == net::ERR_IO_PENDING) + return FLUSHING; + + DCHECK_EQ(last_error_, net::OK); + if (next_pos_ == 0) + return EMPTY; + + return READY; +} + +net::Error SocketOutputStream::last_error() const { + return last_error_; +} + +void SocketOutputStream::FlushCompletionCallback( + const base::Closure& callback, int result) { + // If an error occurred before the completion callback could complete, ignore + // the result. + if (GetState() == CLOSED) + return; + + // Result == 0 implies EOF, which is treated as an error. + if (result == 0) + result = net::ERR_CONNECTION_CLOSED; + + DCHECK_NE(result, net::ERR_IO_PENDING); + + if (result < net::OK) { + LOG(ERROR) << "Failed to flush socket."; + last_error_ = static_cast<net::Error>(result); + if (!callback.is_null()) + callback.Run(); + return; + } + + DCHECK_GT(result, net::OK); + last_error_ = net::OK; + + if (write_buffer_->BytesConsumed() + result < next_pos_) { + DVLOG(1) << "Partial flush complete. Retrying."; + // Only a partial write was completed. Flush again to finish the write. + write_buffer_->DidConsume(result); + Flush(callback); + return; + } + + DVLOG(1) << "Socket flush complete."; + write_buffer_->SetOffset(0); + next_pos_ = 0; + if (!callback.is_null()) + callback.Run(); +} + +} // namespace gcm diff --git a/google_apis/gcm/base/socket_stream.h b/google_apis/gcm/base/socket_stream.h new file mode 100644 index 0000000..a458420 --- /dev/null +++ b/google_apis/gcm/base/socket_stream.h @@ -0,0 +1,205 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a +// net::StreamSocket. Built to work with Protobuf CodedStreams. + +#ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ +#define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ + +#include "base/basictypes.h" +#include "base/callback_forward.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/memory/weak_ptr.h" +#include "google/protobuf/io/zero_copy_stream.h" +#include "google_apis/gcm/base/gcm_export.h" +#include "net/base/net_errors.h" + +namespace net { +class DrainableIOBuffer; +class IOBuffer; +class StreamSocket; +} // namespace net + +namespace gcm { + +// A helper class for interacting with a net::StreamSocket that is receiving +// protobuf encoded messages. A SocketInputStream does not take ownership of +// the socket itself, and it is expected that the life of the input stream +// should match the life of the socket itself (while the socket remains +// connected). If an error is encounters, the input stream will store the error +// in |last_error_|, and GetState() will be set to CLOSED. +// Typical usage: +// 1. Check the GetState() of the input stream before using it. If CLOSED, the +// input stream must be rebuilt (and the socket likely needs to be +// reconnected as an error was encountered). +// 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size +// for a message, and wait until completion. It is invalid to attempt to +// Refresh an input stream or read data from the stream while a Refresh is +// pending. +// 3. Check GetState() again to ensure the Refresh was successful. +// 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of +// the SocketInputStream. Next(..) will return true until there is no data +// remaining. +// 5. Call RebuildBuffer when done reading, to shift any unread data to the +// start of the buffer. +// 6. Repeat as necessary. +class GCM_EXPORT SocketInputStream + : public google::protobuf::io::ZeroCopyInputStream { + public: + enum State { + // No valid data to read. This means the buffer is either empty or all data + // in the buffer has already been consumed. + EMPTY, + // Valid data to read. + READY, + // In the process of reading new data from the socket. + READING, + // An permanent error occurred and the stream is now closed. + CLOSED, + }; + + // |socket| should already be connected. + explicit SocketInputStream(net::StreamSocket* socket); + virtual ~SocketInputStream(); + + // ZeroCopyInputStream implementation. + virtual bool Next(const void** data, int* size) OVERRIDE; + virtual void BackUp(int count) OVERRIDE; + virtual bool Skip(int count) OVERRIDE; // Not implemented. + virtual int64 ByteCount() const OVERRIDE; + + // The remaining amount of valid data available to be read. + size_t UnreadByteCount() const; + + // Reads from the socket, appending a max of |byte_limit| bytes onto the read + // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete + // synchronously, in which case the callback is invoked upon completion. If + // the refresh can complete synchronously, even in case of an error, returns + // net::OK without invoking callback. + // Note: GetState() (and possibly last_error()) should be checked upon + // completion to determine whether the Refresh encountered an error. + net::Error Refresh(const base::Closure& callback, int byte_limit); + + // Rebuilds the buffer state by copying over any unread data to the beginning + // of the buffer and resetting the buffer read/write positions. + // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream + // must be recreated from scratch in such a scenario. + void RebuildBuffer(); + + // Returns the last fatal error encountered. Only valid if GetState() == + // CLOSED. + net::Error last_error() const; + + // Returns the current state. + State GetState() const; + + private: + // Clears the local state. + void ResetInternal(); + + // Callback for Socket::Read calls. + void RefreshCompletionCallback(const base::Closure& callback, int result); + + // Permanently closes the stream. + void CloseStream(net::Error error, const base::Closure& callback); + + // Internal net components. + net::StreamSocket* const socket_; + const scoped_refptr<net::IOBuffer> io_buffer_; + // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't + // been written to yet by Socket::Read calls. + const scoped_refptr<net::DrainableIOBuffer> read_buffer_; + + // Starting position of the data within |io_buffer_| to consume on subsequent + // Next(..) call. 0 <= next_pos_ <= read_buffer_.BytesConsumed() + // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY. + int next_pos_; + + // If < net::ERR_IO_PENDING, the last net error received. + // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING. + net::Error last_error_; + + base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_; + + DISALLOW_COPY_AND_ASSIGN(SocketInputStream); +}; + +// A helper class for writing to a SocketStream with protobuf encoded data. +// A SocketOutputStream does not take ownership of the socket itself, and it is +// expected that the life of the output stream should match the life of the +// socket itself (while the socket remains connected). +// Typical usage: +// 1. Check the GetState() of the output stream before using it. If CLOSED, the +// output stream must be rebuilt (and the socket likely needs to be +// reconnected, as an error was encountered). +// 2. If EMPTY, the output stream can be written via a CodedOutputStream using +// the ZeroCopyOutputStream interface. +// 3. Once done writing, GetState() should be READY, so call Flush(..) to write +// the buffer into the StreamSocket. Wait for the callback to be invoked +// (it's invalid to write to an output stream while it's flushing). +// 4. Check the GetState() again to ensure the Flush was successful. GetState() +// should be EMPTY again. +// 5. Repeat. +class GCM_EXPORT SocketOutputStream + : public google::protobuf::io::ZeroCopyOutputStream { + public: + enum State { + // No valid data yet. + EMPTY, + // Ready for flushing (some data is present). + READY, + // In the process of flushing into the socket. + FLUSHING, + // A permanent error occurred, and the stream is now closed. + CLOSED, + }; + + // |socket| should already be connected. + explicit SocketOutputStream(net::StreamSocket* socket); + virtual ~SocketOutputStream(); + + // ZeroCopyOutputStream implementation. + virtual bool Next(void** data, int* size) OVERRIDE; + virtual void BackUp(int count) OVERRIDE; + virtual int64 ByteCount() const OVERRIDE; + + // Writes the buffer into the Socket. + net::Error Flush(const base::Closure& callback); + + // Returns the last fatal error encountered. Only valid if GetState() == + // CLOSED. + net::Error last_error() const; + + // Returns the current state. + State GetState() const; + + private: + void FlushCompletionCallback(const base::Closure& callback, int result); + + // Internal net components. + net::StreamSocket* const socket_; + const scoped_refptr<net::IOBuffer> io_buffer_; + // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't + // been written to the socket yet. + const scoped_refptr<net::DrainableIOBuffer> write_buffer_; + + // Starting position of the data within |io_buffer_| to consume on subsequent + // Next(..) call. 0 <= write_buffer_.BytesConsumed() <= next_pos_ + // Note: next_pos == 0 implies GetState() == EMPTY. + int next_pos_; + + // If < net::ERR_IO_PENDING, the last net error received. + // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING. + net::Error last_error_; + + base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_; + + DISALLOW_COPY_AND_ASSIGN(SocketOutputStream); +}; + +} // namespace gcm + +#endif // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ diff --git a/google_apis/gcm/base/socket_stream_unittest.cc b/google_apis/gcm/base/socket_stream_unittest.cc new file mode 100644 index 0000000..d7ba679 --- /dev/null +++ b/google_apis/gcm/base/socket_stream_unittest.cc @@ -0,0 +1,406 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "google_apis/gcm/base/socket_stream.h" + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/memory/scoped_ptr.h" +#include "base/run_loop.h" +#include "base/stl_util.h" +#include "base/strings/string_piece.h" +#include "net/socket/socket_test_util.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace gcm { +namespace { + +typedef std::vector<net::MockRead> ReadList; +typedef std::vector<net::MockWrite> WriteList; + +const char kReadData[] = "read_data"; +const uint64 kReadDataSize = arraysize(kReadData) - 1; +const char kReadData2[] = "read_alternate_data"; +const uint64 kReadData2Size = arraysize(kReadData2) - 1; +const char kWriteData[] = "write_data"; +const uint64 kWriteDataSize = arraysize(kWriteData) - 1; + +class GCMSocketStreamTest : public testing::Test { + public: + GCMSocketStreamTest(); + virtual ~GCMSocketStreamTest(); + + // Build a socket with the expected reads and writes. + void BuildSocket(const ReadList& read_list, const WriteList& write_list); + + // Pump the message loop until idle. + void PumpLoop(); + + // Simulates a google::protobuf::io::CodedInputStream read. + base::StringPiece DoInputStreamRead(uint64 bytes); + // Simulates a google::protobuf::io::CodedOutputStream write. + uint64 DoOutputStreamWrite(const base::StringPiece& write_src); + + // Synchronous Refresh wrapper. + void WaitForData(size_t msg_size); + + base::MessageLoop* message_loop() { return &message_loop_; }; + net::DelayedSocketData* data_provider() { return data_provider_.get(); } + SocketInputStream* input_stream() { return socket_input_stream_.get(); } + SocketOutputStream* output_stream() { return socket_output_stream_.get(); } + net::StreamSocket* socket() { return socket_.get(); } + + private: + void OpenConnection(); + void ResetInputStream(); + void ResetOutputStream(); + + void ConnectCallback(int result); + + // SocketStreams and their data providers. + ReadList mock_reads_; + WriteList mock_writes_; + scoped_ptr<net::DelayedSocketData> data_provider_; + scoped_ptr<SocketInputStream> socket_input_stream_; + scoped_ptr<SocketOutputStream> socket_output_stream_; + + // net:: components. + scoped_ptr<net::StreamSocket> socket_; + net::MockClientSocketFactory socket_factory_; + net::AddressList address_list_; + + base::MessageLoopForIO message_loop_; +}; + +GCMSocketStreamTest::GCMSocketStreamTest() { + net::IPAddressNumber ip_number; + net::ParseIPLiteralToNumber("127.0.0.1", &ip_number); + address_list_ = net::AddressList::CreateFromIPAddress(ip_number, 5228); +} + +GCMSocketStreamTest::~GCMSocketStreamTest() {} + +void GCMSocketStreamTest::BuildSocket(const ReadList& read_list, + const WriteList& write_list) { + mock_reads_ = read_list; + mock_writes_ = write_list; + data_provider_.reset( + new net::DelayedSocketData( + 0, + vector_as_array(&mock_reads_), mock_reads_.size(), + vector_as_array(&mock_writes_), mock_writes_.size())); + socket_factory_.AddSocketDataProvider(data_provider_.get()); + OpenConnection(); + ResetInputStream(); + ResetOutputStream(); +} + +void GCMSocketStreamTest::PumpLoop() { + base::RunLoop run_loop; + run_loop.RunUntilIdle(); +} + +base::StringPiece GCMSocketStreamTest::DoInputStreamRead(uint64 bytes) { + uint64 total_bytes_read = 0; + const void* initial_buffer = NULL; + const void* buffer = NULL; + int size = 0; + + do { + DCHECK(socket_input_stream_->GetState() == SocketInputStream::EMPTY || + socket_input_stream_->GetState() == SocketInputStream::READY); + if (!socket_input_stream_->Next(&buffer, &size)) + break; + total_bytes_read += size; + if (initial_buffer) { // Verify the buffer doesn't skip data. + EXPECT_EQ(static_cast<const uint8*>(initial_buffer) + total_bytes_read, + static_cast<const uint8*>(buffer) + size); + } else { + initial_buffer = buffer; + } + } while (total_bytes_read < bytes); + + if (total_bytes_read > bytes) { + socket_input_stream_->BackUp(total_bytes_read - bytes); + total_bytes_read = bytes; + } + + return base::StringPiece(static_cast<const char*>(initial_buffer), + total_bytes_read); +} + +uint64 GCMSocketStreamTest::DoOutputStreamWrite( + const base::StringPiece& write_src) { + DCHECK_EQ(socket_output_stream_->GetState(), SocketOutputStream::EMPTY); + uint64 total_bytes_written = 0; + void* buffer = NULL; + int size = 0; + size_t bytes = write_src.size(); + + do { + if (!socket_output_stream_->Next(&buffer, &size)) + break; + uint64 bytes_to_write = (static_cast<uint64>(size) < bytes ? size : bytes); + memcpy(buffer, + write_src.data() + total_bytes_written, + bytes_to_write); + if (bytes_to_write < static_cast<uint64>(size)) + socket_output_stream_->BackUp(size - bytes_to_write); + total_bytes_written += bytes_to_write; + } while (total_bytes_written < bytes); + + base::RunLoop run_loop; + if (socket_output_stream_->Flush(run_loop.QuitClosure()) == + net::ERR_IO_PENDING) { + run_loop.Run(); + } + + return total_bytes_written; +} + +void GCMSocketStreamTest::WaitForData(size_t msg_size) { + while (input_stream()->UnreadByteCount() < msg_size) { + base::RunLoop run_loop; + if (input_stream()->Refresh(run_loop.QuitClosure(), + msg_size - input_stream()->UnreadByteCount()) == + net::ERR_IO_PENDING) { + run_loop.Run(); + } + if (input_stream()->GetState() == SocketInputStream::CLOSED) + return; + } +} + +void GCMSocketStreamTest::OpenConnection() { + socket_ = socket_factory_.CreateTransportClientSocket( + address_list_, NULL, net::NetLog::Source()); + socket_->Connect( + base::Bind(&GCMSocketStreamTest::ConnectCallback, + base::Unretained(this))); + PumpLoop(); +} + +void GCMSocketStreamTest::ConnectCallback(int result) {} + +void GCMSocketStreamTest::ResetInputStream() { + DCHECK(socket_.get()); + socket_input_stream_.reset(new SocketInputStream(socket_.get())); +} + +void GCMSocketStreamTest::ResetOutputStream() { + DCHECK(socket_.get()); + socket_output_stream_.reset(new SocketOutputStream(socket_.get())); +} + +// A read where all data is already available. +TEST_F(GCMSocketStreamTest, ReadDataSync) { + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, + kReadData, + kReadDataSize)), + WriteList()); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); +} + +// A read that comes in two parts. +TEST_F(GCMSocketStreamTest, ReadPartialDataSync) { + size_t first_read_len = kReadDataSize / 2; + size_t second_read_len = kReadDataSize - first_read_len; + ReadList read_list; + read_list.push_back( + net::MockRead(net::SYNCHRONOUS, + kReadData, + first_read_len)); + read_list.push_back( + net::MockRead(net::SYNCHRONOUS, + &kReadData[first_read_len], + second_read_len)); + BuildSocket(read_list, WriteList()); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); +} + +// A read where no data is available at first (IO_PENDING will be returned). +TEST_F(GCMSocketStreamTest, ReadAsync) { + size_t first_read_len = kReadDataSize / 2; + size_t second_read_len = kReadDataSize - first_read_len; + ReadList read_list; + read_list.push_back( + net::MockRead(net::SYNCHRONOUS, net::ERR_IO_PENDING)); + read_list.push_back( + net::MockRead(net::ASYNC, kReadData, first_read_len)); + read_list.push_back( + net::MockRead(net::ASYNC, &kReadData[first_read_len], second_read_len)); + BuildSocket(read_list, WriteList()); + + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&net::DelayedSocketData::ForceNextRead, + base::Unretained(data_provider()))); + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); +} + +// Simulate two packets arriving at once. Read them in two separate calls. +TEST_F(GCMSocketStreamTest, TwoReadsAtOnce) { + std::string long_data = std::string(kReadData, kReadDataSize) + + std::string(kReadData2, kReadData2Size); + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, + long_data.c_str(), + long_data.size())), + WriteList()); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); + + WaitForData(kReadData2Size); + ASSERT_EQ(std::string(kReadData2, kReadData2Size), + DoInputStreamRead(kReadData2Size)); +} + +// Simulate two packets arriving at once. Read them in two calls separated +// by a Rebuild. +TEST_F(GCMSocketStreamTest, TwoReadsAtOnceWithRebuild) { + std::string long_data = std::string(kReadData, kReadDataSize) + + std::string(kReadData2, kReadData2Size); + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, + long_data.c_str(), + long_data.size())), + WriteList()); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); + + input_stream()->RebuildBuffer(); + WaitForData(kReadData2Size); + ASSERT_EQ(std::string(kReadData2, kReadData2Size), + DoInputStreamRead(kReadData2Size)); +} + +// Simulate a read that is aborted. +TEST_F(GCMSocketStreamTest, ReadError) { + int result = net::ERR_ABORTED; + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, result)), + WriteList()); + + WaitForData(kReadDataSize); + ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState()); + ASSERT_EQ(result, input_stream()->last_error()); +} + +// Simulate a read after the connection is closed. +TEST_F(GCMSocketStreamTest, ReadDisconnected) { + BuildSocket(ReadList(), WriteList()); + socket()->Disconnect(); + WaitForData(kReadDataSize); + ASSERT_EQ(SocketInputStream::CLOSED, input_stream()->GetState()); + ASSERT_EQ(net::ERR_CONNECTION_CLOSED, input_stream()->last_error()); +} + +// Write a full message in one go. +TEST_F(GCMSocketStreamTest, WriteFull) { + BuildSocket(ReadList(), + WriteList(1, net::MockWrite(net::SYNCHRONOUS, + kWriteData, + kWriteDataSize))); + ASSERT_EQ(kWriteDataSize, + DoOutputStreamWrite(base::StringPiece(kWriteData, + kWriteDataSize))); +} + +// Write a message in two go's. +TEST_F(GCMSocketStreamTest, WritePartial) { + WriteList write_list; + write_list.push_back(net::MockWrite(net::SYNCHRONOUS, + kWriteData, + kWriteDataSize / 2)); + write_list.push_back(net::MockWrite(net::SYNCHRONOUS, + kWriteData + kWriteDataSize / 2, + kWriteDataSize / 2)); + BuildSocket(ReadList(), write_list); + ASSERT_EQ(kWriteDataSize, + DoOutputStreamWrite(base::StringPiece(kWriteData, + kWriteDataSize))); +} + +// Write a message completely asynchronously (returns IO_PENDING before +// finishing the write in two go's). +TEST_F(GCMSocketStreamTest, WriteNone) { + WriteList write_list; + write_list.push_back(net::MockWrite(net::SYNCHRONOUS, + kWriteData, + kWriteDataSize / 2)); + write_list.push_back(net::MockWrite(net::SYNCHRONOUS, + kWriteData + kWriteDataSize / 2, + kWriteDataSize / 2)); + BuildSocket(ReadList(), write_list); + ASSERT_EQ(kWriteDataSize, + DoOutputStreamWrite(base::StringPiece(kWriteData, + kWriteDataSize))); +} + +// Write a message then read a message. +TEST_F(GCMSocketStreamTest, WriteThenRead) { + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, + kReadData, + kReadDataSize)), + WriteList(1, net::MockWrite(net::SYNCHRONOUS, + kWriteData, + kWriteDataSize))); + + ASSERT_EQ(kWriteDataSize, + DoOutputStreamWrite(base::StringPiece(kWriteData, + kWriteDataSize))); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); +} + +// Read a message then write a message. +TEST_F(GCMSocketStreamTest, ReadThenWrite) { + BuildSocket(ReadList(1, net::MockRead(net::SYNCHRONOUS, + kReadData, + kReadDataSize)), + WriteList(1, net::MockWrite(net::SYNCHRONOUS, + kWriteData, + kWriteDataSize))); + + WaitForData(kReadDataSize); + ASSERT_EQ(std::string(kReadData, kReadDataSize), + DoInputStreamRead(kReadDataSize)); + + ASSERT_EQ(kWriteDataSize, + DoOutputStreamWrite(base::StringPiece(kWriteData, + kWriteDataSize))); +} + +// Simulate a write that gets aborted. +TEST_F(GCMSocketStreamTest, WriteError) { + int result = net::ERR_ABORTED; + BuildSocket(ReadList(), + WriteList(1, net::MockWrite(net::SYNCHRONOUS, result))); + DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize)); + ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState()); + ASSERT_EQ(result, output_stream()->last_error()); +} + +// Simulate a write after the connection is closed. +TEST_F(GCMSocketStreamTest, WriteDisconnected) { + BuildSocket(ReadList(), WriteList()); + socket()->Disconnect(); + DoOutputStreamWrite(base::StringPiece(kWriteData, kWriteDataSize)); + ASSERT_EQ(SocketOutputStream::CLOSED, output_stream()->GetState()); + ASSERT_EQ(net::ERR_CONNECTION_CLOSED, output_stream()->last_error()); +} + +} // namespace +} // namespace gcm |