summaryrefslogtreecommitdiffstats
path: root/jingle
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-03-31 00:26:03 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-03-31 00:26:03 +0000
commitf6da0b2581a106cfb2ad22b1d15993a89dd9a75a (patch)
treeebe1ffc783015bad357a5dfa1e298ef36a6c6061 /jingle
parent5a1f7e2418011b542e12f324bea55895f4c2cc56 (diff)
downloadchromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.zip
chromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.tar.gz
chromium_src-f6da0b2581a106cfb2ad22b1d15993a89dd9a75a.tar.bz2
Moved socket adapters from remoting/jingle_glue to jingle/glue.
BUG=None TEST=compiles, unittests Review URL: http://codereview.chromium.org/6776003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@79929 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle')
-rw-r--r--jingle/glue/channel_socket_adapter.cc170
-rw-r--r--jingle/glue/channel_socket_adapter.h67
-rw-r--r--jingle/glue/channel_socket_adapter_unittest.cc121
-rw-r--r--jingle/glue/jingle_glue_mock_objects.cc13
-rw-r--r--jingle/glue/jingle_glue_mock_objects.h32
-rw-r--r--jingle/glue/stream_socket_adapter.cc237
-rw-r--r--jingle/glue/stream_socket_adapter.h85
-rw-r--r--jingle/glue/stream_socket_adapter_unittest.cc151
-rw-r--r--jingle/jingle.gyp8
9 files changed, 884 insertions, 0 deletions
diff --git a/jingle/glue/channel_socket_adapter.cc b/jingle/glue/channel_socket_adapter.cc
new file mode 100644
index 0000000..b6866ca
--- /dev/null
+++ b/jingle/glue/channel_socket_adapter.cc
@@ -0,0 +1,170 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "jingle/glue/channel_socket_adapter.h"
+
+#include <limits>
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
+
+namespace jingle_glue {
+
+TransportChannelSocketAdapter::TransportChannelSocketAdapter(
+ cricket::TransportChannel* channel)
+ : channel_(channel),
+ read_pending_(false),
+ write_pending_(false),
+ closed_error_code_(net::OK) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(channel_);
+
+ channel_->SignalReadPacket.connect(
+ this, &TransportChannelSocketAdapter::OnNewPacket);
+ channel_->SignalWritableState.connect(
+ this, &TransportChannelSocketAdapter::OnWritableState);
+ channel_->SignalDestroyed.connect(
+ this, &TransportChannelSocketAdapter::OnChannelDestroyed);
+}
+
+TransportChannelSocketAdapter::~TransportChannelSocketAdapter() {
+}
+
+int TransportChannelSocketAdapter::Read(
+ net::IOBuffer* buf, int buffer_size, net::CompletionCallback* callback) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(buf);
+ CHECK(!read_pending_);
+
+ if (!channel_) {
+ DCHECK(closed_error_code_ != net::OK);
+ return closed_error_code_;
+ }
+
+ read_callback_ = callback;
+ read_buffer_ = buf;
+ read_buffer_size_ = buffer_size;
+ read_pending_ = true;
+
+ return net::ERR_IO_PENDING;
+}
+
+int TransportChannelSocketAdapter::Write(
+ net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(buffer);
+ CHECK(!write_pending_);
+
+ if (!channel_) {
+ DCHECK(closed_error_code_ != net::OK);
+ return closed_error_code_;
+ }
+
+ int result = channel_->SendPacket(buffer->data(), buffer_size);
+ if (result < 0) {
+ result = net::MapSystemError(channel_->GetError());
+ if (result == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ write_callback_ = callback;
+ write_buffer_ = buffer;
+ write_buffer_size_ = buffer_size;
+ }
+ }
+ return result;
+}
+
+bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+void TransportChannelSocketAdapter::Close(int error_code) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+
+ if (!channel_) // Already closed.
+ return;
+
+ DCHECK(error_code != net::OK);
+ closed_error_code_ = error_code;
+ channel_->SignalReadPacket.disconnect(this);
+ channel_->SignalDestroyed.disconnect(this);
+ channel_ = NULL;
+
+ if (read_pending_) {
+ net::CompletionCallback* callback = read_callback_;
+ read_pending_ = false;
+ read_buffer_ = NULL;
+ callback->Run(error_code);
+ }
+
+ if (write_pending_) {
+ net::CompletionCallback* callback = write_callback_;
+ write_pending_ = false;
+ write_buffer_ = NULL;
+ callback->Run(error_code);
+ }
+}
+
+void TransportChannelSocketAdapter::OnNewPacket(
+ cricket::TransportChannel* channel, const char* data, size_t data_size) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK_EQ(channel, channel_);
+ if (read_pending_) {
+ DCHECK(read_buffer_);
+ CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max()));
+
+ if (read_buffer_size_ < static_cast<int>(data_size)) {
+ LOG(WARNING) << "Data buffer is smaller than the received packet. "
+ << "Dropping the data that doesn't fit.";
+ data_size = read_buffer_size_;
+ }
+
+ memcpy(read_buffer_->data(), data, data_size);
+
+ net::CompletionCallback* callback = read_callback_;
+ read_pending_ = false;
+ read_buffer_ = NULL;
+
+ callback->Run(data_size);
+ } else {
+ LOG(WARNING)
+ << "Data was received without a callback. Dropping the packet.";
+ }
+}
+
+void TransportChannelSocketAdapter::OnWritableState(
+ cricket::TransportChannel* channel) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ // Try to send the packet if there is a pending write.
+ if (write_pending_) {
+ int result = channel_->SendPacket(write_buffer_->data(),
+ write_buffer_size_);
+ if (result < 0)
+ result = net::MapSystemError(channel_->GetError());
+
+ if (result != net::ERR_IO_PENDING) {
+ net::CompletionCallback* callback = write_callback_;
+ write_pending_ = false;
+ write_buffer_ = NULL;
+ callback->Run(result);
+ }
+ }
+}
+
+void TransportChannelSocketAdapter::OnChannelDestroyed(
+ cricket::TransportChannel* channel) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK_EQ(channel, channel_);
+ Close(net::ERR_CONNECTION_ABORTED);
+}
+
+} // namespace jingle_glue
diff --git a/jingle/glue/channel_socket_adapter.h b/jingle/glue/channel_socket_adapter.h
new file mode 100644
index 0000000..1df852d
--- /dev/null
+++ b/jingle/glue/channel_socket_adapter.h
@@ -0,0 +1,67 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_
+#define JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_
+
+#include "net/socket/socket.h"
+#include "third_party/libjingle/source/talk/base/socketaddress.h"
+#include "third_party/libjingle/source/talk/base/sigslot.h"
+
+namespace cricket {
+class TransportChannel;
+} // namespace cricket
+
+namespace jingle_glue {
+
+// TransportChannelSocketAdapter implements net::Socket interface on
+// top of libjingle's TransportChannel. It is used by
+// JingleChromotocolConnection to provide net::Socket interface for channels.
+class TransportChannelSocketAdapter : public net::Socket,
+ public sigslot::has_slots<> {
+ public:
+ // TransportChannel object is always owned by the corresponding session.
+ explicit TransportChannelSocketAdapter(cricket::TransportChannel* channel);
+ virtual ~TransportChannelSocketAdapter();
+
+ // Closes the stream. |error_code| specifies error code that will
+ // be returned by Read() and Write() after the stream is closed.
+ // Must be called before the session and the channel are destroyed.
+ void Close(int error_code);
+
+ // Socket interface.
+ virtual int Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+ virtual int Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ private:
+ void OnNewPacket(cricket::TransportChannel* channel,
+ const char* data, size_t data_size);
+ void OnWritableState(cricket::TransportChannel* channel);
+ void OnChannelDestroyed(cricket::TransportChannel* channel);
+
+ cricket::TransportChannel* channel_;
+
+ bool read_pending_;
+ net::CompletionCallback* read_callback_; // Not owned.
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+
+ bool write_pending_;
+ net::CompletionCallback* write_callback_; // Not owned.
+ scoped_refptr<net::IOBuffer> write_buffer_;
+ int write_buffer_size_;
+
+ int closed_error_code_;
+
+ DISALLOW_COPY_AND_ASSIGN(TransportChannelSocketAdapter);
+};
+
+} // namespace jingle_glue
+
+#endif // JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_
diff --git a/jingle/glue/channel_socket_adapter_unittest.cc b/jingle/glue/channel_socket_adapter_unittest.cc
new file mode 100644
index 0000000..60e60b6
--- /dev/null
+++ b/jingle/glue/channel_socket_adapter_unittest.cc
@@ -0,0 +1,121 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "jingle/glue/channel_socket_adapter.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
+
+using net::IOBuffer;
+
+using testing::_;
+using testing::Return;
+
+namespace jingle_glue {
+
+namespace {
+const int kBufferSize = 4096;
+const char kTestData[] = "data";
+const int kTestDataSize = 4;
+const int kTestError = -32123;
+} // namespace
+
+class MockTransportChannel : public cricket::TransportChannel {
+ public:
+ MockTransportChannel()
+ : cricket::TransportChannel("", "") {
+ }
+
+ MOCK_METHOD2(SendPacket, int(const char *data, size_t len));
+ MOCK_METHOD2(SetOption, int(talk_base::Socket::Option opt, int value));
+ MOCK_METHOD0(GetError, int());
+};
+
+class TransportChannelSocketAdapterTest : public testing::Test {
+ public:
+ TransportChannelSocketAdapterTest()
+ : ALLOW_THIS_IN_INITIALIZER_LIST(
+ callback_(this, &TransportChannelSocketAdapterTest::Callback)),
+ callback_result_(0) {
+ }
+
+ protected:
+ virtual void SetUp() {
+ target_.reset(new TransportChannelSocketAdapter(&channel_));
+ }
+
+ void Callback(int result) {
+ callback_result_ = result;
+ }
+
+ MockTransportChannel channel_;
+ scoped_ptr<TransportChannelSocketAdapter> target_;
+ net::CompletionCallbackImpl<TransportChannelSocketAdapterTest> callback_;
+ int callback_result_;
+ MessageLoopForIO message_loop_;
+};
+
+// Verify that Read() returns net::ERR_IO_PENDING.
+TEST_F(TransportChannelSocketAdapterTest, Read) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize));
+
+ int result = target_->Read(buffer, kBufferSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ channel_.SignalReadPacket(&channel_, kTestData, kTestDataSize);
+ EXPECT_EQ(kTestDataSize, callback_result_);
+}
+
+// Verify that Read() after Close() returns error.
+TEST_F(TransportChannelSocketAdapterTest, ReadClose) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize));
+
+ int result = target_->Read(buffer, kBufferSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ target_->Close(kTestError);
+ EXPECT_EQ(kTestError, callback_result_);
+
+ // All Read() calls after Close() should return the error.
+ EXPECT_EQ(kTestError, target_->Read(buffer, kBufferSize, &callback_));
+}
+
+// Verify that Write sends the packet and returns correct result.
+TEST_F(TransportChannelSocketAdapterTest, Write) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize));
+
+ EXPECT_CALL(channel_, SendPacket(buffer->data(), kTestDataSize))
+ .WillOnce(Return(kTestDataSize));
+
+ int result = target_->Write(buffer, kTestDataSize, &callback_);
+ EXPECT_EQ(kTestDataSize, result);
+}
+
+// Verify that the message is still send if Write() is called while
+// socket is not open yet, and that the callback is called.
+TEST_F(TransportChannelSocketAdapterTest, WritePending) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize));
+
+ EXPECT_CALL(channel_, SendPacket(buffer->data(), kTestDataSize))
+ .Times(2)
+ .WillOnce(Return(SOCKET_ERROR))
+ .WillOnce(Return(kTestDataSize));
+
+ EXPECT_CALL(channel_, GetError())
+ .WillOnce(Return(EWOULDBLOCK));
+
+ int result = target_->Write(buffer, kTestDataSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ channel_.SignalWritableState(&channel_);
+ EXPECT_EQ(kTestDataSize, callback_result_);
+}
+
+} // namespace jingle_glue
diff --git a/jingle/glue/jingle_glue_mock_objects.cc b/jingle/glue/jingle_glue_mock_objects.cc
new file mode 100644
index 0000000..6cd48ce
--- /dev/null
+++ b/jingle/glue/jingle_glue_mock_objects.cc
@@ -0,0 +1,13 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "jingle/glue/jingle_glue_mock_objects.h"
+
+namespace jingle_glue {
+
+MockStream::MockStream() {}
+
+MockStream::~MockStream() {}
+
+} // namespace jingle_glue
diff --git a/jingle/glue/jingle_glue_mock_objects.h b/jingle/glue/jingle_glue_mock_objects.h
new file mode 100644
index 0000000..e2cd704
--- /dev/null
+++ b/jingle/glue/jingle_glue_mock_objects.h
@@ -0,0 +1,32 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_
+#define JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_
+
+#include "testing/gmock/include/gmock/gmock.h"
+#include "third_party/libjingle/source/talk/base/stream.h"
+
+namespace jingle_glue {
+
+class MockStream : public talk_base::StreamInterface {
+ public:
+ MockStream();
+ virtual ~MockStream();
+
+ MOCK_CONST_METHOD0(GetState, talk_base::StreamState());
+
+ MOCK_METHOD4(Read, talk_base::StreamResult(void*, size_t, size_t*, int*));
+ MOCK_METHOD4(Write, talk_base::StreamResult(const void*, size_t,
+ size_t*, int*));
+ MOCK_CONST_METHOD1(GetAvailable, bool(size_t*));
+ MOCK_METHOD0(Close, void());
+
+ MOCK_METHOD3(PostEvent, void(talk_base::Thread*, int, int));
+ MOCK_METHOD2(PostEvent, void(int, int));
+};
+
+} // namespace jingle_glue
+
+#endif // JINGLE_GLUE_JINGLE_GLUE_MOCK_OBJECTS_H_
diff --git a/jingle/glue/stream_socket_adapter.cc b/jingle/glue/stream_socket_adapter.cc
new file mode 100644
index 0000000..0cf45fb
--- /dev/null
+++ b/jingle/glue/stream_socket_adapter.cc
@@ -0,0 +1,237 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "jingle/glue/stream_socket_adapter.h"
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "net/base/address_list.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "third_party/libjingle/source/talk/base/stream.h"
+
+namespace jingle_glue {
+
+StreamSocketAdapter::StreamSocketAdapter(talk_base::StreamInterface* stream)
+ : stream_(stream),
+ read_pending_(false),
+ write_pending_(false),
+ closed_error_code_(net::OK) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(stream);
+ stream_->SignalEvent.connect(this, &StreamSocketAdapter::OnStreamEvent);
+}
+
+StreamSocketAdapter::~StreamSocketAdapter() {
+}
+
+int StreamSocketAdapter::Connect(net::CompletionCallback* callback) {
+ return net::OK;
+}
+
+void StreamSocketAdapter::Disconnect() {
+}
+
+bool StreamSocketAdapter::IsConnected() const {
+ return true;
+}
+
+bool StreamSocketAdapter::IsConnectedAndIdle() const {
+ return true;
+}
+
+int StreamSocketAdapter::GetPeerAddress(net::AddressList* address) const {
+ // We actually don't know the peer address. Returning so the upper layers
+ // won't complain.
+ net::IPAddressNumber ip_address(4);
+ *address = net::AddressList(ip_address, 0, false);
+ return net::OK;
+}
+
+const net::BoundNetLog& StreamSocketAdapter::NetLog() const {
+ return net_log_;
+}
+
+void StreamSocketAdapter::SetSubresourceSpeculation() {
+}
+
+void StreamSocketAdapter::SetOmniboxSpeculation() {
+}
+
+bool StreamSocketAdapter::WasEverUsed() const {
+ return true;
+}
+
+bool StreamSocketAdapter::UsingTCPFastOpen() const {
+ return false;
+}
+
+int StreamSocketAdapter::Read(
+ net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(buffer);
+ CHECK(!read_pending_);
+
+ if (!stream_.get()) {
+ DCHECK(closed_error_code_ != net::OK);
+ return closed_error_code_;
+ }
+
+ int result = ReadStream(buffer, buffer_size);
+ if (result == net::ERR_SOCKET_NOT_CONNECTED &&
+ stream_->GetState() == talk_base::SS_OPENING)
+ result = net::ERR_IO_PENDING;
+ if (result == net::ERR_IO_PENDING) {
+ read_pending_ = true;
+ read_callback_ = callback;
+ read_buffer_ = buffer;
+ read_buffer_size_ = buffer_size;
+ }
+ return result;
+}
+
+int StreamSocketAdapter::Write(
+ net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+ DCHECK(buffer);
+ CHECK(!write_pending_);
+
+ if (!stream_.get()) {
+ DCHECK(closed_error_code_ != net::OK);
+ return closed_error_code_;
+ }
+
+ int result = WriteStream(buffer, buffer_size);
+ if (result == net::ERR_SOCKET_NOT_CONNECTED &&
+ stream_->GetState() == talk_base::SS_OPENING)
+ result = net::ERR_IO_PENDING;
+
+ if (result == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ write_callback_ = callback;
+ write_buffer_ = buffer;
+ write_buffer_size_ = buffer_size;
+ }
+ return result;
+}
+
+bool StreamSocketAdapter::SetReceiveBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+bool StreamSocketAdapter::SetSendBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+void StreamSocketAdapter::Close(int error_code) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+
+ if (!stream_.get()) // Already closed.
+ return;
+
+ DCHECK(error_code != net::OK);
+ closed_error_code_ = error_code;
+ stream_->SignalEvent.disconnect(this);
+ stream_->Close();
+ stream_.reset(NULL);
+
+ if (read_pending_) {
+ net::CompletionCallback* callback = read_callback_;
+ read_pending_ = false;
+ read_buffer_ = NULL;
+ callback->Run(error_code);
+ }
+
+ if (write_pending_) {
+ net::CompletionCallback* callback = write_callback_;
+ write_pending_ = false;
+ write_buffer_ = NULL;
+ callback->Run(error_code);
+ }
+}
+
+void StreamSocketAdapter::OnStreamEvent(
+ talk_base::StreamInterface* stream, int events, int error) {
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type());
+
+ if (events & talk_base::SE_WRITE)
+ DoWrite();
+
+ if (events & talk_base::SE_READ)
+ DoRead();
+}
+
+void StreamSocketAdapter::DoWrite() {
+ // Write if there is a pending read.
+ if (write_buffer_) {
+ int result = WriteStream(write_buffer_, write_buffer_size_);
+ if (result != net::ERR_IO_PENDING) {
+ net::CompletionCallback* callback = write_callback_;
+ write_pending_ = false;
+ write_buffer_ = NULL;
+ callback->Run(result);
+ }
+ }
+}
+
+void StreamSocketAdapter::DoRead() {
+ // Read if there is a pending read.
+ if (read_pending_) {
+ int result = ReadStream(read_buffer_, read_buffer_size_);
+ if (result != net::ERR_IO_PENDING) {
+ net::CompletionCallback* callback = read_callback_;\
+ read_pending_ = false;
+ read_buffer_ = NULL;
+ callback->Run(result);
+ }
+ }
+}
+
+int StreamSocketAdapter::ReadStream(net::IOBuffer* buffer, int buffer_size) {
+ size_t bytes_read;
+ int error;
+ talk_base::StreamResult result = stream_->Read(
+ buffer->data(), buffer_size, &bytes_read, &error);
+ switch (result) {
+ case talk_base::SR_SUCCESS:
+ return bytes_read;
+
+ case talk_base::SR_BLOCK:
+ return net::ERR_IO_PENDING;
+
+ case talk_base::SR_EOS:
+ return net::ERR_CONNECTION_CLOSED;
+
+ case talk_base::SR_ERROR:
+ return net::MapSystemError(error);
+ }
+ NOTREACHED();
+ return net::ERR_FAILED;
+}
+
+int StreamSocketAdapter::WriteStream(net::IOBuffer* buffer, int buffer_size) {
+ size_t bytes_written;
+ int error;
+ talk_base::StreamResult result = stream_->Write(
+ buffer->data(), buffer_size, &bytes_written, &error);
+ switch (result) {
+ case talk_base::SR_SUCCESS:
+ return bytes_written;
+
+ case talk_base::SR_BLOCK:
+ return net::ERR_IO_PENDING;
+
+ case talk_base::SR_EOS:
+ return net::ERR_CONNECTION_CLOSED;
+
+ case talk_base::SR_ERROR:
+ return net::MapSystemError(error);
+ }
+ NOTREACHED();
+ return net::ERR_FAILED;
+}
+
+} // namespace jingle_glue
diff --git a/jingle/glue/stream_socket_adapter.h b/jingle/glue/stream_socket_adapter.h
new file mode 100644
index 0000000..aa5fb5d
--- /dev/null
+++ b/jingle/glue/stream_socket_adapter.h
@@ -0,0 +1,85 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_
+#define JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_
+
+#include "base/memory/scoped_ptr.h"
+#include "net/base/net_log.h"
+#include "net/socket/client_socket.h"
+#include "third_party/libjingle/source/talk/base/sigslot.h"
+
+namespace talk_base {
+class StreamInterface;
+} // namespace talk_base
+
+namespace jingle_glue {
+
+// StreamSocketAdapter implements net::Socket interface on top of
+// libjingle's StreamInterface. It is used by JingleChromotocolConnection
+// to provide net::Socket interface for channels.
+class StreamSocketAdapter : public net::ClientSocket,
+ public sigslot::has_slots<> {
+ public:
+ // Ownership of the stream is passed to the adapter.
+ explicit StreamSocketAdapter(talk_base::StreamInterface* stream);
+ virtual ~StreamSocketAdapter();
+
+ // ClientSocket interface.
+ virtual int Connect(net::CompletionCallback* callback);
+ virtual void Disconnect();
+ virtual bool IsConnected() const;
+ virtual bool IsConnectedAndIdle() const;
+ virtual int GetPeerAddress(net::AddressList* address) const;
+ virtual const net::BoundNetLog& NetLog() const;
+ virtual void SetSubresourceSpeculation();
+ virtual void SetOmniboxSpeculation();
+ virtual bool WasEverUsed() const;
+ virtual bool UsingTCPFastOpen() const;
+
+ // Closes the stream. |error_code| specifies error code that will
+ // be returned by Read() and Write() after the stream is closed.
+ void Close(int error_code);
+
+ // Socket interface.
+ virtual int Read(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback);
+ virtual int Write(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback);
+
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ private:
+ void OnStreamEvent(talk_base::StreamInterface* stream,
+ int events, int error);
+
+ void DoWrite();
+ void DoRead();
+
+ int ReadStream(net::IOBuffer* buffer, int buffer_size);
+ int WriteStream(net::IOBuffer* buffer, int buffer_size);
+
+ scoped_ptr<talk_base::StreamInterface> stream_;
+
+ bool read_pending_;
+ net::CompletionCallback* read_callback_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+
+ bool write_pending_;
+ net::CompletionCallback* write_callback_;
+ scoped_refptr<net::IOBuffer> write_buffer_;
+ int write_buffer_size_;
+
+ int closed_error_code_;
+
+ net::BoundNetLog net_log_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamSocketAdapter);
+};
+
+} // namespace jingle_glue
+
+#endif // JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_
diff --git a/jingle/glue/stream_socket_adapter_unittest.cc b/jingle/glue/stream_socket_adapter_unittest.cc
new file mode 100644
index 0000000..bac43e5
--- /dev/null
+++ b/jingle/glue/stream_socket_adapter_unittest.cc
@@ -0,0 +1,151 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "jingle/glue/stream_socket_adapter.h"
+#include "jingle/glue/jingle_glue_mock_objects.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h"
+
+using net::IOBuffer;
+
+using testing::_;
+using testing::Return;
+using testing::SetArgumentPointee;
+
+namespace jingle_glue {
+
+namespace {
+const int kBufferSize = 4096;
+const int kTestDataSize = 4;
+const int kTestError = -32123;
+} // namespace
+
+class StreamSocketAdapterTest : public testing::Test {
+ public:
+ StreamSocketAdapterTest()
+ : ALLOW_THIS_IN_INITIALIZER_LIST(
+ callback_(this, &StreamSocketAdapterTest::Callback)),
+ callback_result_(0) {
+ stream_ = new MockStream();
+ target_.reset(new StreamSocketAdapter(stream_));
+ }
+
+ protected:
+ void Callback(int result) {
+ callback_result_ = result;
+ }
+
+ // |stream_| must be allocated on the heap, because StreamSocketAdapter
+ // owns the object and it will free it in the end.
+ MockStream* stream_;
+ scoped_ptr<StreamSocketAdapter> target_;
+ net::CompletionCallbackImpl<StreamSocketAdapterTest> callback_;
+ int callback_result_;
+ MessageLoopForIO message_loop_;
+};
+
+// Verify that Read() calls Read() in stream.
+TEST_F(StreamSocketAdapterTest, Read) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize));
+
+ EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _))
+ .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize),
+ Return(talk_base::SR_SUCCESS)));
+
+ int result = target_->Read(buffer, kBufferSize, &callback_);
+ EXPECT_EQ(kTestDataSize, result);
+ EXPECT_EQ(0, callback_result_);
+}
+
+// Verify that read callback is called for pending reads.
+TEST_F(StreamSocketAdapterTest, ReadPending) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize));
+
+ EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _))
+ .Times(2)
+ .WillOnce(Return(talk_base::SR_BLOCK))
+ .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize),
+ Return(talk_base::SR_SUCCESS)));
+
+ int result = target_->Read(buffer, kBufferSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ stream_->SignalEvent(stream_, talk_base::SE_READ, 0);
+ EXPECT_EQ(kTestDataSize, callback_result_);
+}
+
+// Verify that Read() returns error after Close().
+TEST_F(StreamSocketAdapterTest, ReadClose) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kBufferSize));
+
+ EXPECT_CALL(*stream_, Read(buffer->data(), kBufferSize, _, _))
+ .WillOnce(Return(talk_base::SR_BLOCK));
+
+ int result = target_->Read(buffer, kBufferSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ EXPECT_CALL(*stream_, Close());
+ target_->Close(kTestError);
+ EXPECT_EQ(kTestError, callback_result_);
+
+ // All Read() calls after Close() should return the error.
+ EXPECT_EQ(kTestError, target_->Read(buffer, kBufferSize, &callback_));
+}
+
+// Verify that Write() calls stream's Write() and returns result.
+TEST_F(StreamSocketAdapterTest, Write) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize));
+
+ EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _))
+ .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize),
+ Return(talk_base::SR_SUCCESS)));
+
+ int result = target_->Write(buffer, kTestDataSize, &callback_);
+ EXPECT_EQ(kTestDataSize, result);
+ EXPECT_EQ(0, callback_result_);
+}
+
+// Verify that write callback is called for pending writes.
+TEST_F(StreamSocketAdapterTest, WritePending) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize));
+
+ EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _))
+ .Times(2)
+ .WillOnce(Return(talk_base::SR_BLOCK))
+ .WillOnce(DoAll(SetArgumentPointee<2>(kTestDataSize),
+ Return(talk_base::SR_SUCCESS)));
+
+ int result = target_->Write(buffer, kTestDataSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ stream_->SignalEvent(stream_, talk_base::SE_WRITE, 0);
+ EXPECT_EQ(kTestDataSize, callback_result_);
+}
+
+// Verify that Write() returns error after Close().
+TEST_F(StreamSocketAdapterTest, WriteClose) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kTestDataSize));
+
+ EXPECT_CALL(*stream_, Write(buffer->data(), kTestDataSize, _, _))
+ .WillOnce(Return(talk_base::SR_BLOCK));
+
+ int result = target_->Write(buffer, kTestDataSize, &callback_);
+ ASSERT_EQ(net::ERR_IO_PENDING, result);
+
+ EXPECT_CALL(*stream_, Close());
+ target_->Close(kTestError);
+ EXPECT_EQ(kTestError, callback_result_);
+
+ // All Write() calls after Close() should return the error.
+ EXPECT_EQ(kTestError, target_->Write(buffer, kTestError, &callback_));
+}
+
+} // namespace jingle_glue
diff --git a/jingle/jingle.gyp b/jingle/jingle.gyp
index 1a50938..f7f7f85 100644
--- a/jingle/jingle.gyp
+++ b/jingle/jingle.gyp
@@ -12,6 +12,10 @@
'target_name': 'jingle_glue',
'type': '<(library)',
'sources': [
+ 'glue/channel_socket_adapter.cc',
+ 'glue/channel_socket_adapter.h',
+ 'glue/stream_socket_adapter.cc',
+ 'glue/stream_socket_adapter.h',
'glue/thread_wrapper.cc',
'glue/thread_wrapper.h',
],
@@ -122,6 +126,10 @@
'target_name': 'jingle_unittests',
'type': 'executable',
'sources': [
+ 'glue/channel_socket_adapter_unittest.cc',
+ 'glue/jingle_glue_mock_objects.cc',
+ 'glue/jingle_glue_mock_objects.h',
+ 'glue/stream_socket_adapter_unittest.cc',
'glue/thread_wrapper_unittest.cc',
'notifier/base/chrome_async_socket_unittest.cc',
'notifier/base/fake_ssl_client_socket_unittest.cc',