diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-03-31 00:26:03 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-03-31 00:26:03 +0000 |
commit | f6da0b2581a106cfb2ad22b1d15993a89dd9a75a (patch) | |
tree | ebe1ffc783015bad357a5dfa1e298ef36a6c6061 /jingle | |
parent | 5a1f7e2418011b542e12f324bea55895f4c2cc56 (diff) | |
download | chromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.zip chromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.tar.gz chromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.tar.bz2 |
Moved socket adapters from remoting/jingle_glue to jingle/glue.
BUG=None
TEST=compiles, unittests
Review URL: http://codereview.chromium.org/6776003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@79929 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle')
-rw-r--r-- | jingle/glue/channel_socket_adapter.cc | 170 | ||||
-rw-r--r-- | jingle/glue/channel_socket_adapter.h | 67 | ||||
-rw-r--r-- | jingle/glue/channel_socket_adapter_unittest.cc | 121 | ||||
-rw-r--r-- | jingle/glue/jingle_glue_mock_objects.cc | 13 | ||||
-rw-r--r-- | jingle/glue/jingle_glue_mock_objects.h | 32 | ||||
-rw-r--r-- | jingle/glue/stream_socket_adapter.cc | 237 | ||||
-rw-r--r-- | jingle/glue/stream_socket_adapter.h | 85 | ||||
-rw-r--r-- | jingle/glue/stream_socket_adapter_unittest.cc | 151 | ||||
-rw-r--r-- | jingle/jingle.gyp | 8 |
9 files changed, 884 insertions, 0 deletions
diff --git a/jingle/glue/channel_socket_adapter.cc b/jingle/glue/channel_socket_adapter.cc new file mode 100644 index 0000000..b6866ca --- /dev/null +++ b/jingle/glue/channel_socket_adapter.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "jingle/glue/channel_socket_adapter.h" + +#include <limits> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" + +namespace jingle_glue { + +TransportChannelSocketAdapter::TransportChannelSocketAdapter( + cricket::TransportChannel* channel) + : channel_(channel), + read_pending_(false), + write_pending_(false), + closed_error_code_(net::OK) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(channel_); + + channel_->SignalReadPacket.connect( + this, &TransportChannelSocketAdapter::OnNewPacket); + channel_->SignalWritableState.connect( + this, &TransportChannelSocketAdapter::OnWritableState); + channel_->SignalDestroyed.connect( + this, &TransportChannelSocketAdapter::OnChannelDestroyed); +} + +TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { +} + +int TransportChannelSocketAdapter::Read( + net::IOBuffer* buf, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buf); + CHECK(!read_pending_); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + read_callback_ = callback; + read_buffer_ = buf; + read_buffer_size_ = buffer_size; + read_pending_ = true; + + return net::ERR_IO_PENDING; +} + +int TransportChannelSocketAdapter::Write( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!write_pending_); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = channel_->SendPacket(buffer->data(), buffer_size); + if (result < 0) { + result = net::MapSystemError(channel_->GetError()); + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + write_callback_ = callback; + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + } + } + return result; +} + +bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +void TransportChannelSocketAdapter::Close(int error_code) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (!channel_) // Already closed. + return; + + DCHECK(error_code != net::OK); + closed_error_code_ = error_code; + channel_->SignalReadPacket.disconnect(this); + channel_->SignalDestroyed.disconnect(this); + channel_ = NULL; + + if (read_pending_) { + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(error_code); + } + + if (write_pending_) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(error_code); + } +} + +void TransportChannelSocketAdapter::OnNewPacket( + cricket::TransportChannel* channel, const char* data, size_t data_size) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK_EQ(channel, channel_); + if (read_pending_) { + DCHECK(read_buffer_); + CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); + + if (read_buffer_size_ < static_cast<int>(data_size)) { + LOG(WARNING) << "Data buffer is smaller than the received packet. " + << "Dropping the data that doesn't fit."; + data_size = read_buffer_size_; + } + + memcpy(read_buffer_->data(), data, data_size); + + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + + callback->Run(data_size); + } else { + LOG(WARNING) + << "Data was received without a callback. Dropping the packet."; + } +} + +void TransportChannelSocketAdapter::OnWritableState( + cricket::TransportChannel* channel) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + // Try to send the packet if there is a pending write. + if (write_pending_) { + int result = channel_->SendPacket(write_buffer_->data(), + write_buffer_size_); + if (result < 0) + result = net::MapSystemError(channel_->GetError()); + + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(result); + } + } +} + +void TransportChannelSocketAdapter::OnChannelDestroyed( + cricket::TransportChannel* channel) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK_EQ(channel, channel_); + Close(net::ERR_CONNECTION_ABORTED); +} + +} // namespace jingle_glue diff --git a/jingle/glue/channel_socket_adapter.h b/jingle/glue/channel_socket_adapter.h new file mode 100644 index 0000000..1df852d --- /dev/null +++ b/jingle/glue/channel_socket_adapter.h @@ -0,0 +1,67 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ +#define JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ + +#include "net/socket/socket.h" +#include "third_party/libjingle/source/talk/base/socketaddress.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" + +namespace cricket { +class TransportChannel; +} // namespace cricket + +namespace jingle_glue { + +// TransportChannelSocketAdapter implements net::Socket interface on +// top of libjingle's TransportChannel. It is used by +// JingleChromotocolConnection to provide net::Socket interface for channels. +class TransportChannelSocketAdapter : public net::Socket, + public sigslot::has_slots<> { + public: + // TransportChannel object is always owned by the corresponding session. + explicit TransportChannelSocketAdapter(cricket::TransportChannel* channel); + virtual ~TransportChannelSocketAdapter(); + + // Closes the stream. |error_code| specifies error code that will + // be returned by Read() and Write() after the stream is closed. + // Must be called before the session and the channel are destroyed. + void Close(int error_code); + + // Socket interface. + virtual int Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + void OnNewPacket(cricket::TransportChannel* channel, + const char* data, size_t data_size); + void OnWritableState(cricket::TransportChannel* channel); + void OnChannelDestroyed(cricket::TransportChannel* channel); + + cricket::TransportChannel* channel_; + + bool read_pending_; + net::CompletionCallback* read_callback_; // Not owned. + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + + bool write_pending_; + net::CompletionCallback* write_callback_; // Not owned. + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + + int closed_error_code_; + + DISALLOW_COPY_AND_ASSIGN(TransportChannelSocketAdapter); +}; + +} // namespace jingle_glue + +#endif // JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ diff --git a/jingle/glue/channel_socket_adapter_unittest.cc b/jingle/glue/channel_socket_adapter_unittest.cc new file mode 100644 index 0000000..60e60b6 --- /dev/null +++ b/jingle/glue/channel_socket_adapter_unittest.cc @@ -0,0 +1,121 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "jingle/glue/channel_socket_adapter.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" + +using net::IOBuffer; + +using testing::_; +using testing::Return; + +namespace jingle_glue { + +namespace { +const int kBufferSize = 4096; +const char kTestData[] = "data"; +const int kTestDataSize = 4; +const int kTestError = -32123; +} // namespace + +class MockTransportChannel : public cricket::TransportChannel { + public: + MockTransportChannel() + : cricket::TransportChannel("", "") { + } + + MOCK_METHOD2(SendPacket, int(const char *data, size_t len)); + MOCK_METHOD2(SetOption, int(talk_base::Socket::Option opt, int value)); + MOCK_METHOD0(GetError, int()); +}; + +class TransportChannelSocketAdapterTest : public testing::Test { + public: + TransportChannelSocketAdapterTest() + : ALLOW_THIS_IN_INITIALIZER_LIST( + callback_(this, &TransportChannelSocketAdapterTest::Callback)), + callback_result_(0) { + } + + protected: + virtual void SetUp() { + target_.reset(new TransportChannelSocketAdapter(&channel_)); + } + + void Callback(int result) { + callback_result_ = result; + } + + MockTransportChannel channel_; + scoped_ptr<TransportChannelSocketAdapter> target_; + net::CompletionCallbackImpl<TransportChannelSocketAdapterTest> callback_; + int callback_result_; + MessageLoopForIO message_loop_; +}; + +// Verify that Read() returns net::ERR_IO_PENDING. +TEST_F(TransportChannelSocketAdapterTest, Read) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize)); + + int result = target_->Read(buffer, kBufferSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + channel_.SignalReadPacket(&channel_, kTestData, kTestDataSize); + EXPECT_EQ(kTestDataSize, callback_result_); +} + +// Verify that Read() after Close() returns error. +TEST_F(TransportChannelSocketAdapterTest, ReadClose) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize)); + + int result = target_->Read(buffer, kBufferSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + target_->Close(kTestError); + EXPECT_EQ(kTestError, callback_result_); + + // All Read() calls after Close() should return the error. + EXPECT_EQ(kTestError, target_->Read(buffer, kBufferSize, &callback_)); +} + +// Verify that Write sends the packet and returns correct result. +TEST_F(TransportChannelSocketAdapterTest, Write) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize)); + + EXPECT_CALL(channel_, SendPacket(buffer->data(), kTestDataSize)) + .WillOnce(Return(kTestDataSize)); + + int result = target_->Write(buffer, kTestDataSize, &callback_); + EXPECT_EQ(kTestDataSize, result); +} + +// Verify that the message is still send if Write() is called while +// socket is not open yet, and that the callback is called. +TEST_F(TransportChannelSocketAdapterTest, WritePending) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize)); + + EXPECT_CALL(channel_, SendPacket(buffer->data(), kTestDataSize)) + .Times(2) + .WillOnce(Return(SOCKET_ERROR)) + .WillOnce(Return(kTestDataSize)); + + EXPECT_CALL(channel_, GetError()) + .WillOnce(Return(EWOULDBLOCK)); + + int result = target_->Write(buffer, kTestDataSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + channel_.SignalWritableState(&channel_); + EXPECT_EQ(kTestDataSize, callback_result_); +} + +} // namespace jingle_glue diff --git a/jingle/glue/jingle_glue_mock_objects.cc b/jingle/glue/jingle_glue_mock_objects.cc new file mode 100644 index 0000000..6cd48ce --- /dev/null +++ b/jingle/glue/jingle_glue_mock_objects.cc @@ -0,0 +1,13 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "jingle/glue/jingle_glue_mock_objects.h" + +namespace jingle_glue { + +MockStream::MockStream() {} + +MockStream::~MockStream() {} + +} // namespace jingle_glue diff --git a/jingle/glue/jingle_glue_mock_objects.h b/jingle/glue/jingle_glue_mock_objects.h new file mode 100644 index 0000000..e2cd704 --- /dev/null +++ b/jingle/glue/jingle_glue_mock_objects.h @@ -0,0 +1,32 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_ +#define JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_ + +#include "testing/gmock/include/gmock/gmock.h" +#include "third_party/libjingle/source/talk/base/stream.h" + +namespace jingle_glue { + +class MockStream : public talk_base::StreamInterface { + public: + MockStream(); + virtual ~MockStream(); + + MOCK_CONST_METHOD0(GetState, talk_base::StreamState()); + + MOCK_METHOD4(Read, talk_base::StreamResult(void*, size_t, size_t*, int*)); + MOCK_METHOD4(Write, talk_base::StreamResult(const void*, size_t, + size_t*, int*)); + MOCK_CONST_METHOD1(GetAvailable, bool(size_t*)); + MOCK_METHOD0(Close, void()); + + MOCK_METHOD3(PostEvent, void(talk_base::Thread*, int, int)); + MOCK_METHOD2(PostEvent, void(int, int)); +}; + +} // namespace jingle_glue + +#endif // JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_ diff --git a/jingle/glue/stream_socket_adapter.cc b/jingle/glue/stream_socket_adapter.cc new file mode 100644 index 0000000..0cf45fb --- /dev/null +++ b/jingle/glue/stream_socket_adapter.cc @@ -0,0 +1,237 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "jingle/glue/stream_socket_adapter.h" + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/address_list.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "third_party/libjingle/source/talk/base/stream.h" + +namespace jingle_glue { + +StreamSocketAdapter::StreamSocketAdapter(talk_base::StreamInterface* stream) + : stream_(stream), + read_pending_(false), + write_pending_(false), + closed_error_code_(net::OK) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(stream); + stream_->SignalEvent.connect(this, &StreamSocketAdapter::OnStreamEvent); +} + +StreamSocketAdapter::~StreamSocketAdapter() { +} + +int StreamSocketAdapter::Connect(net::CompletionCallback* callback) { + return net::OK; +} + +void StreamSocketAdapter::Disconnect() { +} + +bool StreamSocketAdapter::IsConnected() const { + return true; +} + +bool StreamSocketAdapter::IsConnectedAndIdle() const { + return true; +} + +int StreamSocketAdapter::GetPeerAddress(net::AddressList* address) const { + // We actually don't know the peer address. Returning so the upper layers + // won't complain. + net::IPAddressNumber ip_address(4); + *address = net::AddressList(ip_address, 0, false); + return net::OK; +} + +const net::BoundNetLog& StreamSocketAdapter::NetLog() const { + return net_log_; +} + +void StreamSocketAdapter::SetSubresourceSpeculation() { +} + +void StreamSocketAdapter::SetOmniboxSpeculation() { +} + +bool StreamSocketAdapter::WasEverUsed() const { + return true; +} + +bool StreamSocketAdapter::UsingTCPFastOpen() const { + return false; +} + +int StreamSocketAdapter::Read( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!read_pending_); + + if (!stream_.get()) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = ReadStream(buffer, buffer_size); + if (result == net::ERR_SOCKET_NOT_CONNECTED && + stream_->GetState() == talk_base::SS_OPENING) + result = net::ERR_IO_PENDING; + if (result == net::ERR_IO_PENDING) { + read_pending_ = true; + read_callback_ = callback; + read_buffer_ = buffer; + read_buffer_size_ = buffer_size; + } + return result; +} + +int StreamSocketAdapter::Write( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!write_pending_); + + if (!stream_.get()) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = WriteStream(buffer, buffer_size); + if (result == net::ERR_SOCKET_NOT_CONNECTED && + stream_->GetState() == talk_base::SS_OPENING) + result = net::ERR_IO_PENDING; + + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + write_callback_ = callback; + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + } + return result; +} + +bool StreamSocketAdapter::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +bool StreamSocketAdapter::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +void StreamSocketAdapter::Close(int error_code) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (!stream_.get()) // Already closed. + return; + + DCHECK(error_code != net::OK); + closed_error_code_ = error_code; + stream_->SignalEvent.disconnect(this); + stream_->Close(); + stream_.reset(NULL); + + if (read_pending_) { + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(error_code); + } + + if (write_pending_) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(error_code); + } +} + +void StreamSocketAdapter::OnStreamEvent( + talk_base::StreamInterface* stream, int events, int error) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (events & talk_base::SE_WRITE) + DoWrite(); + + if (events & talk_base::SE_READ) + DoRead(); +} + +void StreamSocketAdapter::DoWrite() { + // Write if there is a pending read. + if (write_buffer_) { + int result = WriteStream(write_buffer_, write_buffer_size_); + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(result); + } + } +} + +void StreamSocketAdapter::DoRead() { + // Read if there is a pending read. + if (read_pending_) { + int result = ReadStream(read_buffer_, read_buffer_size_); + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = read_callback_;\ + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(result); + } + } +} + +int StreamSocketAdapter::ReadStream(net::IOBuffer* buffer, int buffer_size) { + size_t bytes_read; + int error; + talk_base::StreamResult result = stream_->Read( + buffer->data(), buffer_size, &bytes_read, &error); + switch (result) { + case talk_base::SR_SUCCESS: + return bytes_read; + + case talk_base::SR_BLOCK: + return net::ERR_IO_PENDING; + + case talk_base::SR_EOS: + return net::ERR_CONNECTION_CLOSED; + + case talk_base::SR_ERROR: + return net::MapSystemError(error); + } + NOTREACHED(); + return net::ERR_FAILED; +} + +int StreamSocketAdapter::WriteStream(net::IOBuffer* buffer, int buffer_size) { + size_t bytes_written; + int error; + talk_base::StreamResult result = stream_->Write( + buffer->data(), buffer_size, &bytes_written, &error); + switch (result) { + case talk_base::SR_SUCCESS: + return bytes_written; + + case talk_base::SR_BLOCK: + return net::ERR_IO_PENDING; + + case talk_base::SR_EOS: + return net::ERR_CONNECTION_CLOSED; + + case talk_base::SR_ERROR: + return net::MapSystemError(error); + } + NOTREACHED(); + return net::ERR_FAILED; +} + +} // namespace jingle_glue diff --git a/jingle/glue/stream_socket_adapter.h b/jingle/glue/stream_socket_adapter.h new file mode 100644 index 0000000..aa5fb5d --- /dev/null +++ b/jingle/glue/stream_socket_adapter.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ +#define JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ + +#include "base/memory/scoped_ptr.h" +#include "net/base/net_log.h" +#include "net/socket/client_socket.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" + +namespace talk_base { +class StreamInterface; +} // namespace talk_base + +namespace jingle_glue { + +// StreamSocketAdapter implements net::Socket interface on top of +// libjingle's StreamInterface. It is used by JingleChromotocolConnection +// to provide net::Socket interface for channels. +class StreamSocketAdapter : public net::ClientSocket, + public sigslot::has_slots<> { + public: + // Ownership of the stream is passed to the adapter. + explicit StreamSocketAdapter(talk_base::StreamInterface* stream); + virtual ~StreamSocketAdapter(); + + // ClientSocket interface. + virtual int Connect(net::CompletionCallback* callback); + virtual void Disconnect(); + virtual bool IsConnected() const; + virtual bool IsConnectedAndIdle() const; + virtual int GetPeerAddress(net::AddressList* address) const; + virtual const net::BoundNetLog& NetLog() const; + virtual void SetSubresourceSpeculation(); + virtual void SetOmniboxSpeculation(); + virtual bool WasEverUsed() const; + virtual bool UsingTCPFastOpen() const; + + // Closes the stream. |error_code| specifies error code that will + // be returned by Read() and Write() after the stream is closed. + void Close(int error_code); + + // Socket interface. + virtual int Read(net::IOBuffer* buffer, int buffer_size, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buffer, int buffer_size, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + void OnStreamEvent(talk_base::StreamInterface* stream, + int events, int error); + + void DoWrite(); + void DoRead(); + + int ReadStream(net::IOBuffer* buffer, int buffer_size); + int WriteStream(net::IOBuffer* buffer, int buffer_size); + + scoped_ptr<talk_base::StreamInterface> stream_; + + bool read_pending_; + net::CompletionCallback* read_callback_; + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + + bool write_pending_; + net::CompletionCallback* write_callback_; + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + + int closed_error_code_; + + net::BoundNetLog net_log_; + + DISALLOW_COPY_AND_ASSIGN(StreamSocketAdapter); +}; + +} // namespace jingle_glue + +#endif // JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ diff --git a/jingle/glue/stream_socket_adapter_unittest.cc b/jingle/glue/stream_socket_adapter_unittest.cc new file mode 100644 index 0000000..bac43e5 --- /dev/null +++ b/jingle/glue/stream_socket_adapter_unittest.cc @@ -0,0 +1,151 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "jingle/glue/stream_socket_adapter.h" +#include "jingle/glue/jingle_glue_mock_objects.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" + +using net::IOBuffer; + +using testing::_; +using testing::Return; +using testing::SetArgumentPointee; + +namespace jingle_glue { + +namespace { +const int kBufferSize = 4096; +const int kTestDataSize = 4; +const int kTestError = -32123; +} // namespace + +class StreamSocketAdapterTest : public testing::Test { + public: + StreamSocketAdapterTest() + : ALLOW_THIS_IN_INITIALIZER_LIST( + callback_(this, &StreamSocketAdapterTest::Callback)), + callback_result_(0) { + stream_ = new MockStream(); + target_.reset(new StreamSocketAdapter(stream_)); + } + + protected: + void Callback(int result) { + callback_result_ = result; + } + + // |stream_| must be allocated on the heap, because StreamSocketAdapter + // owns the object and it will free it in the end. + MockStream* stream_; + scoped_ptr<StreamSocketAdapter> target_; + net::CompletionCallbackImpl<StreamSocketAdapterTest> callback_; + int callback_result_; + MessageLoopForIO message_loop_; +}; + +// Verify that Read() calls Read() in stream. +TEST_F(StreamSocketAdapterTest, Read) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize)); + + EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _)) + .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize), + Return(talk_base::SR_SUCCESS))); + + int result = target_->Read(buffer, kBufferSize, &callback_); + EXPECT_EQ(kTestDataSize, result); + EXPECT_EQ(0, callback_result_); +} + +// Verify that read callback is called for pending reads. +TEST_F(StreamSocketAdapterTest, ReadPending) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize)); + + EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _)) + .Times(2) + .WillOnce(Return(talk_base::SR_BLOCK)) + .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize), + Return(talk_base::SR_SUCCESS))); + + int result = target_->Read(buffer, kBufferSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + stream_->SignalEvent(stream_, talk_base::SE_READ, 0); + EXPECT_EQ(kTestDataSize, callback_result_); +} + +// Verify that Read() returns error after Close(). +TEST_F(StreamSocketAdapterTest, ReadClose) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize)); + + EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _)) + .WillOnce(Return(talk_base::SR_BLOCK)); + + int result = target_->Read(buffer, kBufferSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + EXPECT_CALL(*stream_, Close()); + target_->Close(kTestError); + EXPECT_EQ(kTestError, callback_result_); + + // All Read() calls after Close() should return the error. + EXPECT_EQ(kTestError, target_->Read(buffer, kBufferSize, &callback_)); +} + +// Verify that Write() calls stream's Write() and returns result. +TEST_F(StreamSocketAdapterTest, Write) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize)); + + EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _)) + .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize), + Return(talk_base::SR_SUCCESS))); + + int result = target_->Write(buffer, kTestDataSize, &callback_); + EXPECT_EQ(kTestDataSize, result); + EXPECT_EQ(0, callback_result_); +} + +// Verify that write callback is called for pending writes. +TEST_F(StreamSocketAdapterTest, WritePending) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize)); + + EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _)) + .Times(2) + .WillOnce(Return(talk_base::SR_BLOCK)) + .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize), + Return(talk_base::SR_SUCCESS))); + + int result = target_->Write(buffer, kTestDataSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + stream_->SignalEvent(stream_, talk_base::SE_WRITE, 0); + EXPECT_EQ(kTestDataSize, callback_result_); +} + +// Verify that Write() returns error after Close(). +TEST_F(StreamSocketAdapterTest, WriteClose) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize)); + + EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _)) + .WillOnce(Return(talk_base::SR_BLOCK)); + + int result = target_->Write(buffer, kTestDataSize, &callback_); + ASSERT_EQ(net::ERR_IO_PENDING, result); + + EXPECT_CALL(*stream_, Close()); + target_->Close(kTestError); + EXPECT_EQ(kTestError, callback_result_); + + // All Write() calls after Close() should return the error. + EXPECT_EQ(kTestError, target_->Write(buffer, kTestError, &callback_)); +} + +} // namespace jingle_glue diff --git a/jingle/jingle.gyp b/jingle/jingle.gyp index 1a50938..f7f7f85 100644 --- a/jingle/jingle.gyp +++ b/jingle/jingle.gyp @@ -12,6 +12,10 @@ 'target_name': 'jingle_glue', 'type': '<(library)', 'sources': [ + 'glue/channel_socket_adapter.cc', + 'glue/channel_socket_adapter.h', + 'glue/stream_socket_adapter.cc', + 'glue/stream_socket_adapter.h', 'glue/thread_wrapper.cc', 'glue/thread_wrapper.h', ], @@ -122,6 +126,10 @@ 'target_name': 'jingle_unittests', 'type': 'executable', 'sources': [ + 'glue/channel_socket_adapter_unittest.cc', + 'glue/jingle_glue_mock_objects.cc', + 'glue/jingle_glue_mock_objects.h', + 'glue/stream_socket_adapter_unittest.cc', 'glue/thread_wrapper_unittest.cc', 'notifier/base/chrome_async_socket_unittest.cc', 'notifier/base/fake_ssl_client_socket_unittest.cc', |