summaryrefslogtreecommitdiffstats
path: root/jingle
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-04-22 01:30:51 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-04-22 01:30:51 +0000
commitdb6609b8a05c594585ef6e04cb3f0e995bc3ce7c (patch)
tree94f518ae8ec9ade935dc81f20be192b60392a160 /jingle
parent05a7794358d8de30baa93a784b7420f5f962d236 (diff)
downloadchromium_src-db6609b8a05c594585ef6e04cb3f0e995bc3ce7c.zip
chromium_src-db6609b8a05c594585ef6e04cb3f0e995bc3ce7c.tar.gz
chromium_src-db6609b8a05c594585ef6e04cb3f0e995bc3ce7c.tar.bz2
Implement PseudoTCP adapter.
The new PseudoTcpAdapter will replace PseudoTcpChannel in remoting/protocol and will also be used for Pepper P2P APIs. BUG=None TEST=Unittests Review URL: http://codereview.chromium.org/6879119 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@82606 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle')
-rw-r--r--jingle/glue/pseudotcp_adapter.cc343
-rw-r--r--jingle/glue/pseudotcp_adapter.h95
-rw-r--r--jingle/glue/pseudotcp_adapter_unittest.cc312
-rw-r--r--jingle/jingle.gyp5
4 files changed, 755 insertions, 0 deletions
diff --git a/jingle/glue/pseudotcp_adapter.cc b/jingle/glue/pseudotcp_adapter.cc
new file mode 100644
index 0000000..61a50a9
--- /dev/null
+++ b/jingle/glue/pseudotcp_adapter.cc
@@ -0,0 +1,343 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "jingle/glue/pseudotcp_adapter.h"
+
+#include "base/logging.h"
+#include "base/time.h"
+#include "net/base/address_list.h"
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
+
+using cricket::PseudoTcp;
+
+namespace {
+const int kReadBufferSize = 65536; // Maximum size of a packet.
+const uint16 kDefaultMtu = 1280;
+} // namespace
+
+namespace jingle_glue {
+
+PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
+ : socket_(socket),
+ ALLOW_THIS_IN_INITIALIZER_LIST(pseudotcp_(this, 0)),
+ connect_callback_(NULL),
+ read_callback_(NULL),
+ write_callback_(NULL),
+ socket_write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ socket_read_callback_(this, &PseudoTcpAdapter::OnRead)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ socket_write_callback_(this, &PseudoTcpAdapter::OnWritten)) {
+ pseudotcp_.NotifyMTU(kDefaultMtu);
+}
+
+PseudoTcpAdapter::~PseudoTcpAdapter() {
+}
+
+int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback) {
+ DCHECK(CalledOnValidThread());
+
+ // Verify that there is no other pending read.
+ DCHECK(read_callback_ == NULL);
+
+ PseudoTcp::TcpState state = pseudotcp_.State();
+ int result;
+ if (state == PseudoTcp::TCP_SYN_SENT ||
+ state == PseudoTcp::TCP_SYN_RECEIVED) {
+ result = net::ERR_IO_PENDING;
+ } else {
+ result = pseudotcp_.Recv(buffer->data(), buffer_size);
+ if (result < 0) {
+ result = net::MapSystemError(pseudotcp_.GetError());
+ DCHECK(result < 0);
+ }
+ }
+
+ if (result == net::ERR_IO_PENDING) {
+ read_buffer_ = buffer;
+ read_buffer_size_ = buffer_size;
+ read_callback_ = callback;
+ }
+
+ AdjustClock();
+
+ return result;
+}
+
+int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback) {
+ DCHECK(CalledOnValidThread());
+
+ // Verify that there is no other pending write.
+ DCHECK(write_callback_ == NULL);
+
+ PseudoTcp::TcpState state = pseudotcp_.State();
+ int result;
+ if (state == PseudoTcp::TCP_SYN_SENT ||
+ state == PseudoTcp::TCP_SYN_RECEIVED) {
+ result = net::ERR_IO_PENDING;
+ } else {
+ result = pseudotcp_.Send(buffer->data(), buffer_size);
+ if (result < 0) {
+ result = net::MapSystemError(pseudotcp_.GetError());
+ DCHECK(result < 0);
+ }
+ }
+
+ if (result == net::ERR_IO_PENDING) {
+ write_buffer_ = buffer;
+ write_buffer_size_ = buffer_size;
+ write_callback_ = callback;
+ }
+
+ AdjustClock();
+
+ return result;
+}
+
+bool PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
+ DCHECK(CalledOnValidThread());
+ // TODO(sergeyu): Implement support for adjustable buffer size and
+ // used it here.
+ return false;
+}
+
+bool PseudoTcpAdapter::SetSendBufferSize(int32 size) {
+ DCHECK(CalledOnValidThread());
+ // TODO(sergeyu): Implement support for adjustable buffer size and
+ // used it here.
+ return false;
+}
+
+int PseudoTcpAdapter::Connect(net::CompletionCallback* callback) {
+ DCHECK(CalledOnValidThread());
+
+ // Start reading from the socket.
+ DoReadFromSocket();
+
+ int result = pseudotcp_.Connect();
+ if (result < 0)
+ return net::ERR_FAILED;
+
+ AdjustClock();
+
+ connect_callback_ = callback;
+ return net::ERR_IO_PENDING;
+}
+
+void PseudoTcpAdapter::Disconnect() {
+ DCHECK(CalledOnValidThread());
+ pseudotcp_.Close(false);
+}
+
+bool PseudoTcpAdapter::IsConnected() const {
+ DCHECK(CalledOnValidThread());
+ return pseudotcp_.State() == PseudoTcp::TCP_ESTABLISHED;
+}
+
+bool PseudoTcpAdapter::IsConnectedAndIdle() const {
+ DCHECK(CalledOnValidThread());
+ NOTIMPLEMENTED();
+ return false;
+}
+
+int PseudoTcpAdapter::GetPeerAddress(net::AddressList* address) const {
+ DCHECK(CalledOnValidThread());
+
+ // We actually don't know the peer address. Returning so the upper layers
+ // won't complain.
+ net::IPAddressNumber ip_address(4);
+ *address = net::AddressList(ip_address, 0, false);
+ return net::OK;
+}
+
+int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
+ DCHECK(CalledOnValidThread());
+ NOTIMPLEMENTED();
+ return net::ERR_FAILED;
+}
+
+const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
+ DCHECK(CalledOnValidThread());
+ return net_log_;
+}
+
+void PseudoTcpAdapter::SetSubresourceSpeculation() {
+ DCHECK(CalledOnValidThread());
+ NOTIMPLEMENTED();
+}
+
+void PseudoTcpAdapter::SetOmniboxSpeculation() {
+ DCHECK(CalledOnValidThread());
+ NOTIMPLEMENTED();
+}
+
+bool PseudoTcpAdapter::WasEverUsed() const {
+ DCHECK(CalledOnValidThread());
+ NOTIMPLEMENTED();
+ return true;
+}
+
+bool PseudoTcpAdapter::UsingTCPFastOpen() const {
+ DCHECK(CalledOnValidThread());
+ return false;
+}
+
+void PseudoTcpAdapter::OnTcpOpen(PseudoTcp* tcp) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(connect_callback_);
+ connect_callback_->Run(net::OK);
+ connect_callback_ = NULL;
+
+ OnTcpReadable(tcp);
+ OnTcpWriteable(tcp);
+}
+
+void PseudoTcpAdapter::OnTcpReadable(PseudoTcp* tcp) {
+ DCHECK(CalledOnValidThread());
+
+ if (!read_buffer_)
+ return;
+
+ // Try to send the data we have pending.
+ int result = pseudotcp_.Recv(read_buffer_->data(), read_buffer_size_);
+ if (result < 0) {
+ result = net::MapSystemError(pseudotcp_.GetError());
+ DCHECK(result < 0);
+ if (result == net::ERR_IO_PENDING)
+ return;
+ }
+
+ AdjustClock();
+
+ net::CompletionCallback* cb = read_callback_;
+ read_callback_ = NULL;
+ read_buffer_ = NULL;
+ cb->Run(result);
+}
+
+void PseudoTcpAdapter::OnTcpWriteable(PseudoTcp* tcp) {
+ DCHECK(CalledOnValidThread());
+
+ if (!write_buffer_)
+ return;
+
+ // Try to send the data we have pending.
+ int result = pseudotcp_.Send(write_buffer_->data(), write_buffer_size_);
+ if (result < 0) {
+ result = net::MapSystemError(pseudotcp_.GetError());
+ DCHECK(result < 0);
+ if (result == net::ERR_IO_PENDING)
+ return;
+ }
+
+ AdjustClock();
+
+ net::CompletionCallback* cb = write_callback_;
+ write_callback_ = NULL;
+ write_buffer_ = NULL;
+ cb->Run(result);
+}
+
+void PseudoTcpAdapter::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
+ DCHECK(CalledOnValidThread());
+
+ if (connect_callback_) {
+ connect_callback_->Run(net::MapSystemError(error));
+ connect_callback_ = NULL;
+ }
+
+ if (read_callback_) {
+ read_callback_->Run(net::MapSystemError(error));
+ read_callback_ = NULL;
+ }
+
+ if (write_callback_) {
+ write_callback_->Run(net::MapSystemError(error));
+ write_callback_ = NULL;
+ }
+}
+
+cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::TcpWritePacket(
+ PseudoTcp* tcp,
+ const char* buffer,
+ size_t len) {
+ DCHECK(CalledOnValidThread());
+
+ if (socket_write_pending_)
+ return IPseudoTcpNotify::WR_SUCCESS;
+
+ scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
+ memcpy(write_buffer->data(), buffer, len);
+
+ int result = socket_->Write(write_buffer, len, &socket_write_callback_);
+ if (result == net::ERR_IO_PENDING) {
+ socket_write_pending_ = true;
+ return IPseudoTcpNotify::WR_SUCCESS;
+ } if (result == net::ERR_MSG_TOO_BIG) {
+ return IPseudoTcpNotify::WR_TOO_LARGE;
+ } else if (result < 0) {
+ return IPseudoTcpNotify::WR_FAIL;
+ } else {
+ return IPseudoTcpNotify::WR_SUCCESS;
+ }
+}
+
+void PseudoTcpAdapter::DoReadFromSocket() {
+ if (!socket_read_buffer_) {
+ socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
+ }
+
+ while (true) {
+ int result = socket_->Read(socket_read_buffer_, kReadBufferSize,
+ &socket_read_callback_);
+ if (result == net::ERR_IO_PENDING)
+ break;
+
+ HandleReadResults(result);
+ }
+}
+
+void PseudoTcpAdapter::HandleReadResults(int result) {
+ if (result <= 0) {
+ LOG(ERROR) << "Read returned " << result;
+ return;
+ }
+
+ pseudotcp_.NotifyPacket(socket_read_buffer_->data(), result);
+ AdjustClock();
+}
+
+void PseudoTcpAdapter::OnRead(int result) {
+ HandleReadResults(result);
+ if (result >= 0)
+ DoReadFromSocket();
+}
+
+void PseudoTcpAdapter::OnWritten(int result) {
+ socket_write_pending_ = false;
+ if (result < 0) {
+ LOG(WARNING) << "Write failed. Error code: " << result;
+ }
+}
+
+void PseudoTcpAdapter::AdjustClock() {
+ long timeout = 0;
+ if (pseudotcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
+ timer_.Stop();
+ timer_.Start(base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
+ &PseudoTcpAdapter::HandleTcpClock);
+ }
+}
+
+void PseudoTcpAdapter::HandleTcpClock() {
+ pseudotcp_.NotifyClock(PseudoTcp::Now());
+ AdjustClock();
+}
+
+} // namespace jingle_glue
diff --git a/jingle/glue/pseudotcp_adapter.h b/jingle/glue/pseudotcp_adapter.h
new file mode 100644
index 0000000..0001991
--- /dev/null
+++ b/jingle/glue/pseudotcp_adapter.h
@@ -0,0 +1,95 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef JINGLE_GLUE_PSEUDOTCP_ADAPTER_H_
+#define JINGLE_GLUE_PSEUDOTCP_ADAPTER_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/timer.h"
+#include "base/threading/non_thread_safe.h"
+#include "net/base/net_log.h"
+#include "net/socket/client_socket.h"
+#include "third_party/libjingle/source/talk/p2p/base/pseudotcp.h"
+
+namespace jingle_glue {
+
+class PseudoTcpAdapter : public net::ClientSocket,
+ public cricket::IPseudoTcpNotify,
+ public base::NonThreadSafe {
+ public:
+ // Creates adapter for the specified |socket|. |socket| is assumed
+ // to be already connected. Takes ownership of |socket|.
+ PseudoTcpAdapter(net::Socket* socket);
+ virtual ~PseudoTcpAdapter();
+
+ // net::Socket implementation.
+ virtual int Read(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback) OVERRIDE;
+ virtual int Write(net::IOBuffer* buffer, int buffer_size,
+ net::CompletionCallback* callback) OVERRIDE;
+ virtual bool SetReceiveBufferSize(int32 size) OVERRIDE;
+ virtual bool SetSendBufferSize(int32 size) OVERRIDE;
+
+ // net::ClientSocket implementation.
+ virtual int Connect(net::CompletionCallback* callback) OVERRIDE;
+ virtual void Disconnect() OVERRIDE;
+ virtual bool IsConnected() const OVERRIDE;
+ virtual bool IsConnectedAndIdle() const OVERRIDE;
+ virtual int GetPeerAddress(net::AddressList* address) const OVERRIDE;
+ virtual int GetLocalAddress(net::IPEndPoint* address) const OVERRIDE;
+ virtual const net::BoundNetLog& NetLog() const OVERRIDE;
+ virtual void SetSubresourceSpeculation() OVERRIDE;
+ virtual void SetOmniboxSpeculation() OVERRIDE;
+ virtual bool WasEverUsed() const OVERRIDE;
+ virtual bool UsingTCPFastOpen() const OVERRIDE;
+
+ // cricket::IPseudoTcpNotify implementation.
+ virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
+ virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
+ virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
+ virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
+ virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
+ const char* buffer, size_t len) OVERRIDE;
+
+ private:
+ void DoReadFromSocket();
+ void HandleReadResults(int result);
+
+ // Callback functions for Read() and Write() in |socket_|
+ void OnRead(int result);
+ void OnWritten(int result);
+
+ void AdjustClock();
+ void HandleTcpClock();
+
+ scoped_ptr<net::Socket> socket_;
+ cricket::PseudoTcp pseudotcp_;
+
+ net::CompletionCallback* connect_callback_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+ net::CompletionCallback* read_callback_;
+ scoped_refptr<net::IOBuffer> write_buffer_;
+ int write_buffer_size_;
+ net::CompletionCallback* write_callback_;
+
+ bool socket_write_pending_;
+ scoped_refptr<net::IOBuffer> socket_read_buffer_;
+
+ net::CompletionCallbackImpl<PseudoTcpAdapter> socket_read_callback_;
+ net::CompletionCallbackImpl<PseudoTcpAdapter> socket_write_callback_;
+
+ base::OneShotTimer<PseudoTcpAdapter> timer_;
+
+ net::BoundNetLog net_log_;
+
+ DISALLOW_COPY_AND_ASSIGN(PseudoTcpAdapter);
+};
+
+} // namespace jingle_glue
+
+#endif // JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_
diff --git a/jingle/glue/pseudotcp_adapter_unittest.cc b/jingle/glue/pseudotcp_adapter_unittest.cc
new file mode 100644
index 0000000..977c0ac
--- /dev/null
+++ b/jingle/glue/pseudotcp_adapter_unittest.cc
@@ -0,0 +1,312 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "jingle/glue/pseudotcp_adapter.h"
+
+#include <vector>
+
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "net/udp/udp_socket.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+
+namespace jingle_glue {
+namespace {
+class FakeSocket;
+} // namespace
+} // namespace jingle_glue
+
+DISABLE_RUNNABLE_METHOD_REFCOUNT(jingle_glue::FakeSocket);
+
+namespace jingle_glue {
+
+namespace {
+
+// The range is chosen arbitrarily. It must be big enough so that we
+// always have at least two UDP ports available.
+const int kMinPort = 32000;
+const int kMaxPort = 33000;
+
+const int kMessageSize = 1024;
+const int kMessages = 100;
+const int kTestDataSize = kMessages * kMessageSize;
+
+class FakeSocket : public net::Socket {
+ public:
+ FakeSocket()
+ : read_callback_(NULL),
+ loss_rate_(0.0) {
+ }
+ virtual ~FakeSocket() { }
+
+ void AppendInputPacket(const std::vector<char>& data) {
+ if ((static_cast<double>(rand()) / RAND_MAX) < loss_rate_)
+ return; // Lose the packet.
+
+ if (read_callback_) {
+ int size = std::min(read_buffer_size_, static_cast<int>(data.size()));
+ memcpy(read_buffer_->data(), &data[0], data.size());
+ net::CompletionCallback* cb = read_callback_;
+ read_callback_ = NULL;
+ read_buffer_ = NULL;
+ cb->Run(size);
+ } else {
+ incoming_packets_.push_back(data);
+ }
+ }
+
+ void Connect(FakeSocket* peer_socket) {
+ peer_socket_ = peer_socket;
+ }
+
+ void set_loss_rate(double value) { loss_rate_ = value; };
+
+ // net::Socket interface.
+ virtual int Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) {
+ CHECK(!read_callback_);
+ CHECK(buf);
+
+ if (incoming_packets_.size() > 0) {
+ scoped_refptr<net::IOBuffer> buffer(buf);
+ int size = std::min(
+ static_cast<int>(incoming_packets_.front().size()), buf_len);
+ memcpy(buffer->data(), &*incoming_packets_.front().begin(), size);
+ incoming_packets_.pop_front();
+ return size;
+ } else {
+ read_callback_ = callback;
+ read_buffer_ = buf;
+ read_buffer_size_ = buf_len;
+ return net::ERR_IO_PENDING;
+ }
+ }
+
+ virtual int Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) OVERRIDE {
+ DCHECK(buf);
+ if (peer_socket_) {
+ MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
+ peer_socket_, &FakeSocket::AppendInputPacket,
+ std::vector<char>(buf->data(), buf->data() + buf_len)));
+ }
+
+ return buf_len;
+ }
+
+ virtual bool SetReceiveBufferSize(int32 size) OVERRIDE {
+ NOTIMPLEMENTED();
+ return false;
+ }
+ virtual bool SetSendBufferSize(int32 size) OVERRIDE {
+ NOTIMPLEMENTED();
+ return false;
+ }
+
+ private:
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+ net::CompletionCallback* read_callback_;
+
+ std::deque<std::vector<char> > incoming_packets_;
+
+ FakeSocket* peer_socket_;
+ double loss_rate_;
+};
+
+class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> {
+ public:
+ TCPChannelTester(MessageLoop* message_loop,
+ net::Socket* client_socket,
+ net::Socket* host_socket)
+ : message_loop_(message_loop),
+ host_socket_(host_socket),
+ client_socket_(client_socket),
+ done_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_cb_(this, &TCPChannelTester::OnWritten)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_cb_(this, &TCPChannelTester::OnRead)),
+ write_errors_(0),
+ read_errors_(0) {
+ }
+
+ virtual ~TCPChannelTester() { }
+
+ void Start() {
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &TCPChannelTester::DoStart));
+ }
+
+ void CheckResults() {
+ EXPECT_EQ(0, write_errors_);
+ EXPECT_EQ(0, read_errors_);
+
+ ASSERT_EQ(kTestDataSize + kMessageSize, input_buffer_->capacity());
+
+ output_buffer_->SetOffset(0);
+ ASSERT_EQ(kTestDataSize, output_buffer_->size());
+
+ EXPECT_EQ(0, memcmp(output_buffer_->data(),
+ input_buffer_->StartOfBuffer(), kTestDataSize));
+ }
+
+ protected:
+ void Done() {
+ done_ = true;
+ message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask());
+ }
+
+ void DoStart() {
+ InitBuffers();
+ DoRead();
+ DoWrite();
+ }
+
+ void InitBuffers() {
+ output_buffer_ = new net::DrainableIOBuffer(
+ new net::IOBuffer(kTestDataSize), kTestDataSize);
+ memset(output_buffer_->data(), 123, kTestDataSize);
+
+ input_buffer_ = new net::GrowableIOBuffer();
+ // Always keep kMessageSize bytes available at the end of the input buffer.
+ input_buffer_->SetCapacity(kMessageSize);
+ }
+
+ void DoWrite() {
+ int result = 1;
+ while (result > 0) {
+ if (output_buffer_->BytesRemaining() == 0)
+ break;
+
+ int bytes_to_write = std::min(output_buffer_->BytesRemaining(),
+ kMessageSize);
+ result = client_socket_->Write(output_buffer_, bytes_to_write,
+ &write_cb_);
+ HandleWriteResult(result);
+ }
+ }
+
+ void OnWritten(int result) {
+ HandleWriteResult(result);
+ DoWrite();
+ }
+
+ void HandleWriteResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ LOG(ERROR) << "Received error " << result << " when trying to write";
+ write_errors_++;
+ Done();
+ } else if (result > 0) {
+ output_buffer_->DidConsume(result);
+ }
+ }
+
+ void DoRead() {
+ int result = 1;
+ while (result > 0) {
+ input_buffer_->set_offset(input_buffer_->capacity() - kMessageSize);
+
+ result = host_socket_->Read(input_buffer_, kMessageSize, &read_cb_);
+ HandleReadResult(result);
+ };
+ }
+
+ void OnRead(int result) {
+ HandleReadResult(result);
+ DoRead();
+ }
+
+ void HandleReadResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ if (!done_) {
+ LOG(ERROR) << "Received error " << result << " when trying to read";
+ read_errors_++;
+ Done();
+ }
+ } else if (result > 0) {
+ // Allocate memory for the next read.
+ input_buffer_->SetCapacity(input_buffer_->capacity() + result);
+ if (input_buffer_->capacity() == kTestDataSize + kMessageSize)
+ Done();
+ }
+ }
+
+ private:
+ MessageLoop* message_loop_;
+ net::Socket* host_socket_;
+ net::Socket* client_socket_;
+ bool done_;
+
+ scoped_refptr<net::DrainableIOBuffer> output_buffer_;
+ scoped_refptr<net::GrowableIOBuffer> input_buffer_;
+
+ net::CompletionCallbackImpl<TCPChannelTester> write_cb_;
+ net::CompletionCallbackImpl<TCPChannelTester> read_cb_;
+ int write_errors_;
+ int read_errors_;
+};
+
+class P2PTransportImplTest : public testing::Test {
+ protected:
+ virtual void SetUp() OVERRIDE {
+ host_socket_ = new FakeSocket();
+ client_socket_ = new FakeSocket();
+
+ host_socket_->Connect(client_socket_);
+ client_socket_->Connect(host_socket_);
+
+ host_pseudotcp_.reset(new PseudoTcpAdapter(host_socket_));
+ client_pseudotcp_.reset(new PseudoTcpAdapter(client_socket_));
+ }
+
+ FakeSocket* host_socket_;
+ FakeSocket* client_socket_;
+
+ scoped_ptr<PseudoTcpAdapter> host_pseudotcp_;
+ scoped_ptr<PseudoTcpAdapter> client_pseudotcp_;
+ MessageLoop message_loop_;
+};
+
+TEST_F(P2PTransportImplTest, TestDataTransfer) {
+ TestCompletionCallback host_connect_cb;
+ TestCompletionCallback client_connect_cb;
+
+ host_pseudotcp_->Connect(&host_connect_cb);
+ client_pseudotcp_->Connect(&client_connect_cb);
+
+ scoped_refptr<TCPChannelTester> tester =
+ new TCPChannelTester(&message_loop_, host_pseudotcp_.get(),
+ client_pseudotcp_.get());
+
+ tester->Start();
+ message_loop_.Run();
+ tester->CheckResults();
+}
+
+TEST_F(P2PTransportImplTest, TestLossyChannel) {
+ host_socket_->set_loss_rate(0.1);
+ client_socket_->set_loss_rate(0.1);
+
+ TestCompletionCallback host_connect_cb;
+ TestCompletionCallback client_connect_cb;
+
+ host_pseudotcp_->Connect(&host_connect_cb);
+ client_pseudotcp_->Connect(&client_connect_cb);
+
+ scoped_refptr<TCPChannelTester> tester =
+ new TCPChannelTester(&message_loop_, host_pseudotcp_.get(),
+ client_pseudotcp_.get());
+
+ tester->Start();
+ message_loop_.Run();
+ tester->CheckResults();
+}
+
+} // namespace
+
+} // namespace jingle_glue
diff --git a/jingle/jingle.gyp b/jingle/jingle.gyp
index fc600f5..09cbc51 100644
--- a/jingle/jingle.gyp
+++ b/jingle/jingle.gyp
@@ -14,6 +14,8 @@
'sources': [
'glue/channel_socket_adapter.cc',
'glue/channel_socket_adapter.h',
+ 'glue/pseudotcp_adapter.cc',
+ 'glue/pseudotcp_adapter.h',
'glue/stream_socket_adapter.cc',
'glue/stream_socket_adapter.h',
'glue/thread_wrapper.cc',
@@ -24,9 +26,11 @@
'dependencies': [
'../base/base.gyp:base',
'../third_party/libjingle/libjingle.gyp:libjingle',
+ '../third_party/libjingle/libjingle.gyp:libjingle_p2p',
],
'export_dependent_settings': [
'../third_party/libjingle/libjingle.gyp:libjingle',
+ '../third_party/libjingle/libjingle.gyp:libjingle_p2p',
],
},
# A library for sending and receiving peer-issued notifications.
@@ -149,6 +153,7 @@
'glue/channel_socket_adapter_unittest.cc',
'glue/jingle_glue_mock_objects.cc',
'glue/jingle_glue_mock_objects.h',
+ 'glue/pseudotcp_adapter_unittest.cc',
'glue/stream_socket_adapter_unittest.cc',
'glue/thread_wrapper_unittest.cc',
'notifier/base/chrome_async_socket_unittest.cc',