summaryrefslogtreecommitdiffstats
path: root/net/curvecp
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2011-05-18 10:36:56 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2011-05-18 10:36:56 +0000
commitb690e18ca906d699e05cddae14250ab40439fee3 (patch)
treed34539a899c988f422f1235c7b52c2d1eb81d7ba /net/curvecp
parent51c0220843bfc1939d2f1525f0fc0192b4926369 (diff)
downloadchromium_src-b690e18ca906d699e05cddae14250ab40439fee3.zip
chromium_src-b690e18ca906d699e05cddae14250ab40439fee3.tar.gz
chromium_src-b690e18ca906d699e05cddae14250ab40439fee3.tar.bz2
An initial curvecp implementation. This implementation is not complete,
but is good enough to start collaboration. BUG=none TEST=curvecp_unittests Review URL: http://codereview.chromium.org/7037022 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@85753 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/curvecp')
-rw-r--r--net/curvecp/README4
-rw-r--r--net/curvecp/circular_buffer.cc77
-rw-r--r--net/curvecp/circular_buffer.h40
-rw-r--r--net/curvecp/client_packetizer.cc385
-rw-r--r--net/curvecp/client_packetizer.h102
-rw-r--r--net/curvecp/connection_key.cc42
-rw-r--r--net/curvecp/connection_key.h33
-rw-r--r--net/curvecp/curvecp_client_socket.cc111
-rw-r--r--net/curvecp/curvecp_client_socket.h57
-rw-r--r--net/curvecp/curvecp_server_socket.cc82
-rw-r--r--net/curvecp/curvecp_server_socket.h63
-rw-r--r--net/curvecp/curvecp_transfer_unittest.cc313
-rw-r--r--net/curvecp/messenger.cc372
-rw-r--r--net/curvecp/messenger.h104
-rw-r--r--net/curvecp/packetizer.h51
-rw-r--r--net/curvecp/protocol.cc88
-rw-r--r--net/curvecp/protocol.h158
-rw-r--r--net/curvecp/received_block_list.cc96
-rw-r--r--net/curvecp/received_block_list.h70
-rw-r--r--net/curvecp/rtt_and_send_rate_calculator.cc172
-rw-r--r--net/curvecp/rtt_and_send_rate_calculator.h72
-rw-r--r--net/curvecp/sent_block_list.cc133
-rw-r--r--net/curvecp/sent_block_list.h95
-rw-r--r--net/curvecp/server_messenger.cc32
-rw-r--r--net/curvecp/server_messenger.h45
-rw-r--r--net/curvecp/server_packetizer.cc246
-rw-r--r--net/curvecp/server_packetizer.h97
-rw-r--r--net/curvecp/test_client.cc178
-rw-r--r--net/curvecp/test_client.h70
-rw-r--r--net/curvecp/test_data_stream.h88
-rw-r--r--net/curvecp/test_server.cc144
-rw-r--r--net/curvecp/test_server.h74
32 files changed, 3694 insertions, 0 deletions
diff --git a/net/curvecp/README b/net/curvecp/README
new file mode 100644
index 0000000..edf04b52
--- /dev/null
+++ b/net/curvecp/README
@@ -0,0 +1,4 @@
+CurveCP: An implementation of http://curvecp.org/
+
+This work is an attempt to build a secure, reliable transport protocol over UDP.
+This work is a research experiment.
diff --git a/net/curvecp/circular_buffer.cc b/net/curvecp/circular_buffer.cc
new file mode 100644
index 0000000..9eca179
--- /dev/null
+++ b/net/curvecp/circular_buffer.cc
@@ -0,0 +1,77 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/circular_buffer.h"
+
+#include <algorithm>
+#include "base/logging.h"
+
+namespace net {
+
+CircularBuffer::CircularBuffer(int capacity)
+ : capacity_(capacity),
+ head_(0),
+ length_(0) {
+ buffer_ = new char[capacity_];
+}
+
+CircularBuffer::~CircularBuffer() {
+ delete [] buffer_;
+}
+
+int CircularBuffer::write(const char* data, int length) {
+ int bytes_written = 0;
+
+ // We can't write more bytes than we have space for.
+ if (length > capacity_ - length_)
+ length = capacity_ - length_;
+
+ // Check for space at end of buffer.
+ if (head_ + length_ < capacity_) {
+ bytes_written = std::min(capacity_ - head_ - length_, length);
+ int end_pos = head_ + length_;
+
+ memcpy(buffer_ + end_pos, data, bytes_written);
+ length_ += bytes_written;
+ length -= bytes_written;
+ }
+
+ if (!length)
+ return bytes_written;
+
+ DCHECK_LT(length_, capacity_); // Buffer still has space.
+ DCHECK_GE(head_ + length_, capacity_); // Space is at the beginning.
+
+ int start_pos = (head_ + length_) % capacity_;
+ memcpy(&buffer_[start_pos], &data[bytes_written], length);
+ bytes_written += length;
+ length_ += length;
+ return bytes_written;
+}
+
+int CircularBuffer::read(char* data, int length) {
+ int bytes_read = 0;
+
+ // We can't read more bytes than are in the buffer.
+ if (length > length_)
+ length = length_;
+
+ while (length) {
+ DCHECK_LE(length, length_);
+ int span_end = std::min(capacity_, head_ + length);
+ int len = span_end - head_;
+ DCHECK_LE(len, length_);
+ memcpy(&data[bytes_read], &buffer_[head_], len);
+ bytes_read += len;
+ length -= len;
+ length_ -= len;
+ head_ += len;
+ DCHECK_LE(head_, capacity_);
+ if (head_ == capacity_)
+ head_ = 0;
+ }
+ return bytes_read;
+}
+
+} // namespace net
diff --git a/net/curvecp/circular_buffer.h b/net/curvecp/circular_buffer.h
new file mode 100644
index 0000000..1c257f9
--- /dev/null
+++ b/net/curvecp/circular_buffer.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_CIRCULAR_BUFFER_H_
+#define NET_CURVECP_CIRCULAR_BUFFER_H_
+#pragma once
+
+namespace net {
+
+// A circular buffer is a fixed sized buffer which can be read or written
+class CircularBuffer {
+ public:
+ // Create a CircularBuffer with maximum size |capacity|.
+ explicit CircularBuffer(int capacity);
+ ~CircularBuffer();
+
+ int length() const { return length_; }
+
+ // Writes data into the circular buffer.
+ // |data| is the bytes to write.
+ // |length| is the number of bytes to write.
+ // Returns the number of bytes written, which may be less than |length| or
+ // 0 if no space is available in the buffer.
+ int write(const char* data, int length);
+
+ // Reads up to |length| bytes from the buffer into |data|.
+ // Returns the number of bytes read, or 0 if no data is available.
+ int read(char* data, int length);
+
+ private:
+ int capacity_;
+ int head_;
+ int length_;
+ char* buffer_;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_CIRCULAR_BUFFER_H_
diff --git a/net/curvecp/client_packetizer.cc b/net/curvecp/client_packetizer.cc
new file mode 100644
index 0000000..54efddd
--- /dev/null
+++ b/net/curvecp/client_packetizer.cc
@@ -0,0 +1,385 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/client_packetizer.h"
+
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/sys_addrinfo.h"
+#include "net/curvecp/protocol.h"
+#include "net/udp/udp_client_socket.h"
+
+namespace {
+
+const int kMaxHelloAttempts = 8;
+const int kHelloTimeoutMs[kMaxHelloAttempts] = {
+ 1000, // 1 second, with 1.5x increase for each retry.
+ 1500,
+ 2250,
+ 3375,
+ 5063,
+ 7594,
+ 11391,
+ 17086,
+};
+
+} // namespace
+
+namespace net {
+
+ClientPacketizer::ClientPacketizer()
+ : Packetizer(),
+ next_state_(NONE),
+ listener_(NULL),
+ user_callback_(NULL),
+ current_address_(NULL),
+ hello_attempts_(0),
+ initiate_sent_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ io_callback_(this, &ClientPacketizer::OnIOComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(timers_factory_(this)) {
+ // TODO(mbelshe): Initialize our keys and such properly.
+ // for now we use random values to keep them unique.
+ for (int i = 0; i < 32; ++i)
+ shortterm_public_key_[i] = rand() % 26 + 'a';
+}
+
+ClientPacketizer::~ClientPacketizer() {
+}
+
+int ClientPacketizer::Connect(const AddressList& server,
+ Packetizer::Listener* listener,
+ CompletionCallback* callback) {
+ DCHECK(!user_callback_);
+ DCHECK(!socket_.get());
+ DCHECK(!listener_);
+
+ listener_ = listener;
+
+ addresses_ = server;
+
+ user_callback_ = callback;
+ next_state_ = LOOKUP_COOKIE;
+
+ return DoLoop(OK);
+}
+
+int ClientPacketizer::SendMessage(ConnectionKey key,
+ const char* data,
+ size_t length,
+ CompletionCallback* callback) {
+ // We can't send messages smaller than 16 bytes.
+ if (length < 16)
+ return ERR_UNEXPECTED;
+
+ if (!initiate_sent_) {
+ const size_t kMaxMessageInInitiatePacket =
+ kMaxPacketLength - sizeof(InitiatePacket);
+
+ if (length > kMaxMessageInInitiatePacket) {
+ NOTREACHED();
+ return ERR_UNEXPECTED;
+ }
+
+ initiate_sent_ = true;
+
+ // Bundle this message into the Initiate Packet.
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength));
+ InitiatePacket* packet = reinterpret_cast<InitiatePacket*>(buffer->data());
+ memset(packet, 0, sizeof(InitiatePacket));
+ memcpy(packet->id, "QvnQ5XlI", 8);
+ memcpy(packet->client_shortterm_public_key, shortterm_public_key_,
+ sizeof(shortterm_public_key_));
+ // TODO(mbelshe): Fill in rest of Initiate fields here
+ // TODO(mbelshe): Fill in rest of message
+ //
+ // TODO(mbelshe) - this is just broken to make it work with cleartext
+ memcpy(&buffer->data()[sizeof(InitiatePacket)], data, length);
+ int packet_length = sizeof(InitiatePacket) + length;
+ int rv = socket_->Write(buffer, packet_length, &io_callback_);
+ if (rv <= 0)
+ return rv;
+ CHECK_EQ(packet_length, rv); // We must send all data.
+ return length;
+ }
+
+ if (length > static_cast<size_t>(kMaxMessageLength)) {
+ NOTREACHED();
+ return ERR_UNEXPECTED;
+ }
+
+ // Bundle this message into the Initiate Packet.
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength));
+ ClientMessagePacket* packet =
+ reinterpret_cast<ClientMessagePacket*>(buffer->data());
+ memset(packet, 0, sizeof(ClientMessagePacket));
+ memcpy(packet->id, "QvnQ5XlM", 8);
+ memcpy(packet->client_shortterm_public_key, shortterm_public_key_,
+ sizeof(shortterm_public_key_));
+ // TODO(mbelshe): Fill in rest of Initiate fields here
+ // TODO(mbelshe): Fill in rest of message
+ memcpy(&buffer->data()[sizeof(ClientMessagePacket)], data, length);
+ int packet_length = sizeof(ClientMessagePacket) + length;
+ int rv = socket_->Write(buffer, packet_length, &io_callback_);
+ if (rv <= 0)
+ return rv;
+ CHECK_EQ(packet_length, rv); // We must send all data.
+ return length;
+}
+
+void ClientPacketizer::Close(ConnectionKey key) {
+ // TODO(mbelshe): implement me
+ NOTIMPLEMENTED();
+}
+
+int ClientPacketizer::GetPeerAddress(IPEndPoint* endpoint) const {
+ return socket_->GetPeerAddress(endpoint);
+}
+
+int ClientPacketizer::max_message_payload() const {
+ if (!initiate_sent_)
+ return kMaxPacketLength - sizeof(InitiatePacket) - sizeof(Message);
+ return kMaxMessageLength - sizeof(Message);
+}
+
+int ClientPacketizer::DoLoop(int result) {
+ DCHECK(next_state_ != NONE);
+ int rv = result;
+ do {
+ switch (next_state_) {
+ case LOOKUP_COOKIE:
+ rv = DoLookupCookie();
+ break;
+ case LOOKUP_COOKIE_COMPLETE:
+ rv = DoLookupCookieComplete(rv);
+ break;
+ case SENDING_HELLO:
+ rv = DoSendingHello();
+ break;
+ case SENDING_HELLO_COMPLETE:
+ rv = DoSendingHelloComplete(rv);
+ break;
+ case WAITING_COOKIE:
+ rv = DoWaitingCookie();
+ break;
+ case WAITING_COOKIE_COMPLETE:
+ rv = DoWaitingCookieComplete(rv);
+ break;
+ case CONNECTED:
+ rv = DoConnected(rv);
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+ } while (rv > ERR_IO_PENDING && next_state_ != CONNECTED);
+
+ return rv;
+}
+
+int ClientPacketizer::DoLookupCookie() {
+ // Eventually, we'll use this state to see if we have persisted the cookie
+ // in the disk cache. For now, we don't do any persistence, even in memory.
+ next_state_ = LOOKUP_COOKIE_COMPLETE;
+ return OK;
+}
+
+int ClientPacketizer::DoLookupCookieComplete(int rv) {
+ // TODO(mbelshe): If we got a cookie, goto state WAITING_COOKIE_COMPLETE
+ next_state_ = SENDING_HELLO;
+ return rv;
+}
+
+int ClientPacketizer::DoSendingHello() {
+ next_state_ = SENDING_HELLO_COMPLETE;
+
+ if (hello_attempts_ == kMaxHelloAttempts)
+ return ERR_TIMED_OUT;
+
+ // Connect to the next socket
+ int rv = ConnectNextAddress();
+ if (rv < 0) {
+ LOG(ERROR) << "Could not get next address!";
+ return rv;
+ }
+
+ // Construct Hello Packet
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(struct HelloPacket)));
+ struct HelloPacket* data =
+ reinterpret_cast<struct HelloPacket*>(buffer->data());
+ memset(data, 0, sizeof(struct HelloPacket));
+ memcpy(data->id, "QvnQ5XlH", 8);
+ memcpy(data->client_shortterm_public_key, shortterm_public_key_,
+ sizeof(shortterm_public_key_));
+ // TODO(mbelshe): populate all other fields of the HelloPacket.
+
+ return socket_->Write(buffer, sizeof(struct HelloPacket), &io_callback_);
+}
+
+int ClientPacketizer::DoSendingHelloComplete(int rv) {
+ next_state_ = NONE;
+
+ if (rv < 0)
+ return rv;
+
+ // Writing to UDP should not result in a partial datagram.
+ if (rv != sizeof(struct HelloPacket))
+ return ERR_FAILED;
+
+ next_state_ = WAITING_COOKIE;
+ return OK;
+}
+
+int ClientPacketizer::DoWaitingCookie() {
+ next_state_ = WAITING_COOKIE_COMPLETE;
+
+ StartHelloTimer(kHelloTimeoutMs[hello_attempts_++]);
+
+ read_buffer_ = new IOBuffer(kMaxPacketLength);
+ return socket_->Read(read_buffer_, kMaxPacketLength, &io_callback_);
+}
+
+int ClientPacketizer::DoWaitingCookieComplete(int rv) {
+ // TODO(mbelshe): Add Histogram for hello_attempts_.
+ RevokeHelloTimer();
+
+ // TODO(mbelshe): Validate the cookie here
+ if (rv < 0)
+ return rv;
+
+ if (rv == 0) { // Does this happen?
+ NOTREACHED();
+ return ERR_FAILED;
+ }
+
+ if (rv != sizeof(struct CookiePacket))
+ return ERR_FAILED; // TODO(mbelshe): more specific error message.
+
+ // TODO(mbelshe): verify contents of Cookie
+
+ listener_->OnConnection(shortterm_public_key_);
+
+ next_state_ = CONNECTED;
+
+ // Start reading for messages
+ rv = ReadPackets();
+ if (rv == ERR_IO_PENDING)
+ rv = OK;
+ return rv;
+}
+
+int ClientPacketizer::DoConnected(int rv) {
+ DCHECK(next_state_ == CONNECTED);
+ if (rv > 0)
+ ProcessRead(rv);
+ return ReadPackets();
+}
+
+void ClientPacketizer::DoCallback(int result) {
+ DCHECK_NE(result, ERR_IO_PENDING);
+ DCHECK(user_callback_);
+
+ CompletionCallback* callback = user_callback_;
+ user_callback_ = NULL;
+ callback->Run(result);
+}
+
+int ClientPacketizer::ConnectNextAddress() {
+ // TODO(mbelshe): plumb Netlog information
+
+ DCHECK(addresses_.head());
+
+ socket_.reset(new UDPClientSocket(NULL, NetLog::Source()));
+
+ // Rotate to next address in the list.
+ if (current_address_)
+ current_address_ = current_address_->ai_next;
+ if (!current_address_)
+ current_address_ = addresses_.head();
+
+ IPEndPoint endpoint;
+ if (!endpoint.FromSockAddr(current_address_->ai_addr,
+ current_address_->ai_addrlen))
+ return ERR_FAILED;
+
+ int rv = socket_->Connect(endpoint);
+ DCHECK_NE(ERR_IO_PENDING, rv);
+
+ return rv;
+}
+
+void ClientPacketizer::StartHelloTimer(int milliseconds) {
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE,
+ timers_factory_.NewRunnableMethod(&ClientPacketizer::OnHelloTimeout),
+ milliseconds);
+}
+
+void ClientPacketizer::RevokeHelloTimer() {
+ timers_factory_.RevokeAll();
+}
+
+void ClientPacketizer::OnHelloTimeout() {
+ DCHECK_EQ(WAITING_COOKIE_COMPLETE, next_state_);
+ next_state_ = SENDING_HELLO;
+ DLOG(INFO) << "HelloTimeout #" << hello_attempts_;
+ int rv = DoLoop(OK);
+ if (rv != ERR_IO_PENDING)
+ DoCallback(rv);
+}
+
+void ClientPacketizer::ProcessRead(int result) {
+ DCHECK_GT(result, 0);
+ DCHECK(listener_);
+
+ // The smallest packet we can receive is a ServerMessagePacket.
+ if (result < static_cast<int>(sizeof(ServerMessagePacket)) ||
+ result > kMaxPacketLength)
+ return;
+
+ // Packets are always 16 byte padded.
+ if (result & 15)
+ return;
+
+ Packet *packet = reinterpret_cast<Packet*>(read_buffer_->data());
+
+ // The only type of packet we should receive at this point is a Message
+ // packet.
+ // TODO(mbelshe): what happens if the server sends a new Cookie packet?
+ if (memcmp(packet->id, "RL3aNMXM", 8))
+ return;
+
+ uchar* msg = reinterpret_cast<uchar*>(packet);
+ int length = result - sizeof(ServerMessagePacket);
+ listener_->OnMessage(this,
+ shortterm_public_key_,
+ &msg[sizeof(ServerMessagePacket)],
+ length);
+}
+
+int ClientPacketizer::ReadPackets() {
+ DCHECK(socket_.get());
+
+ int rv;
+ while (true) {
+ rv = socket_->Read(read_buffer_,
+ kMaxPacketLength,
+ &io_callback_);
+ if (rv <= 0) {
+ if (rv != ERR_IO_PENDING)
+ LOG(ERROR) << "Error reading socket:" << rv;
+ return rv;
+ }
+ ProcessRead(rv);
+ }
+ return rv;
+}
+
+void ClientPacketizer::OnIOComplete(int result) {
+ int rv = DoLoop(result);
+ if (rv != ERR_IO_PENDING)
+ DoCallback(rv);
+}
+
+} // namespace net
diff --git a/net/curvecp/client_packetizer.h b/net/curvecp/client_packetizer.h
new file mode 100644
index 0000000..b5ca5dd
--- /dev/null
+++ b/net/curvecp/client_packetizer.h
@@ -0,0 +1,102 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_CLIENT_PACKETIZER_H_
+#define NET_CURVECP_CLIENT_PACKETIZER_H_
+#pragma once
+
+#include "base/scoped_ptr.h"
+#include "base/task.h"
+#include "net/base/address_list.h"
+#include "net/base/completion_callback.h"
+#include "net/curvecp/packetizer.h"
+#include "net/curvecp/protocol.h"
+
+namespace net {
+
+class AddressList;
+class IOBuffer;
+class IPEndPoint;
+class UDPClientSocket;
+
+class ClientPacketizer : public Packetizer {
+ public:
+ ClientPacketizer();
+ virtual ~ClientPacketizer();
+
+ int Connect(const AddressList& server,
+ Packetizer::Listener* listener,
+ CompletionCallback* callback);
+
+ // Packetizer methods
+ virtual int SendMessage(ConnectionKey key,
+ const char* data,
+ size_t length,
+ CompletionCallback* callback);
+ virtual void Close(ConnectionKey key);
+ virtual int GetPeerAddress(IPEndPoint* endpoint) const;
+ virtual int max_message_payload() const;
+
+ private:
+ enum StateType {
+ NONE, // The initial state, before connect.
+ LOOKUP_COOKIE, // Looking up a cookie in the disk cache.
+ LOOKUP_COOKIE_COMPLETE, // The disk cache lookup is complete.
+ SENDING_HELLO, // Sending a Hello packet.
+ SENDING_HELLO_COMPLETE, // Hello packet has been sent.
+ WAITING_COOKIE, // Waiting for a Cookie packet.
+ WAITING_COOKIE_COMPLETE, // The Cookie packet has arrived.
+ CONNECTED, // Connected
+ };
+
+ int DoLoop(int result);
+ int DoLookupCookie();
+ int DoLookupCookieComplete(int result);
+ int DoSendingHello();
+ int DoSendingHelloComplete(int result);
+ int DoWaitingCookie();
+ int DoWaitingCookieComplete(int result);
+ int DoConnected(int result);
+
+ void DoCallback(int result);
+
+ // Connect to the next address in our list.
+ int ConnectNextAddress();
+
+ // We set a timeout for responses to the Hello message.
+ void StartHelloTimer(int milliseconds);
+ void RevokeHelloTimer();
+ void OnHelloTimeout(); // Called when the Hello Timer fires.
+
+ // Process the result of a Read operation.
+ void ProcessRead(int bytes_read);
+
+ // Read packets until an error occurs.
+ int ReadPackets();
+
+ // Callback when an internal IO is completed.
+ void OnIOComplete(int result);
+
+ StateType next_state_;
+ scoped_ptr<UDPClientSocket> socket_;
+ Packetizer::Listener* listener_;
+ CompletionCallback* user_callback_;
+ AddressList addresses_;
+ const struct addrinfo* current_address_;
+ int hello_attempts_; // Number of attempts to send a Hello Packet.
+ bool initiate_sent_; // Indicates whether the Initialte Packet was sent.
+
+ scoped_refptr<IOBuffer> read_buffer_; // Buffer for interal reads.
+
+ uchar shortterm_public_key_[32];
+
+ CompletionCallbackImpl<ClientPacketizer> io_callback_;
+ ScopedRunnableMethodFactory<ClientPacketizer> timers_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(ClientPacketizer);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_CLIENT_PACKETIZER_H_
diff --git a/net/curvecp/connection_key.cc b/net/curvecp/connection_key.cc
new file mode 100644
index 0000000..566cf57
--- /dev/null
+++ b/net/curvecp/connection_key.cc
@@ -0,0 +1,42 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/connection_key.h"
+
+#include <string.h>
+
+namespace net {
+
+ConnectionKey::ConnectionKey() {
+ memset(key_, 0, sizeof(key_));
+}
+
+ConnectionKey::ConnectionKey(unsigned char bytes[]) {
+ memcpy(key_, bytes, sizeof(key_));
+}
+
+ConnectionKey::ConnectionKey(const ConnectionKey& other) {
+ memcpy(key_, other.key_, sizeof(key_));
+}
+
+ConnectionKey& ConnectionKey::operator=(const ConnectionKey& other) {
+ memcpy(key_, other.key_, sizeof(key_));
+ return *this;
+}
+
+bool ConnectionKey::operator==(const ConnectionKey& other) const {
+ return memcmp(key_, other.key_, sizeof(key_)) == 0;
+}
+
+bool ConnectionKey::operator<(const ConnectionKey& other) const {
+ return memcmp(key_, other.key_, sizeof(key_)) < 0;
+}
+
+std::string ConnectionKey::ToString() const {
+ // TODO(mbelshe): make this a nice hex formatter
+ return std::string("key") +
+ std::string(reinterpret_cast<const char*>(key_), 32);
+}
+
+} // namespace net
diff --git a/net/curvecp/connection_key.h b/net/curvecp/connection_key.h
new file mode 100644
index 0000000..8fe94ad
--- /dev/null
+++ b/net/curvecp/connection_key.h
@@ -0,0 +1,33 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_CONNECTION_KEY_H_
+#define NET_CURVECP_CONNECTION_KEY_H_
+#pragma once
+
+#include <string>
+
+namespace net {
+
+// A ConnectionKey uniquely identifies a connection.
+// It represents the client's short-term public key and is 32 bytes.
+class ConnectionKey {
+ public:
+ ConnectionKey();
+ ConnectionKey(unsigned char bytes[]);
+ ConnectionKey(const ConnectionKey& key);
+
+ ConnectionKey& operator=(const ConnectionKey& other);
+ bool operator==(const ConnectionKey& other) const;
+ bool operator<(const ConnectionKey& other) const;
+
+ std::string ToString() const;
+
+ private:
+ unsigned char key_[32];
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_CONNECTION_KEY_H_
diff --git a/net/curvecp/curvecp_client_socket.cc b/net/curvecp/curvecp_client_socket.cc
new file mode 100644
index 0000000..f28e839
--- /dev/null
+++ b/net/curvecp/curvecp_client_socket.cc
@@ -0,0 +1,111 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
+#include "net/base/sys_addrinfo.h"
+#include "net/curvecp/curvecp_client_socket.h"
+#include "net/curvecp/messenger.h"
+
+namespace net {
+
+CurveCPClientSocket::CurveCPClientSocket(const AddressList& addresses,
+ net::NetLog* net_log,
+ const net::NetLog::Source& source)
+ : addresses_(addresses),
+ net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)),
+ messenger_(&packetizer_) {
+}
+
+CurveCPClientSocket::~CurveCPClientSocket() {
+}
+
+int CurveCPClientSocket::Connect(CompletionCallback* callback) {
+ return packetizer_.Connect(addresses_, &messenger_, callback);
+}
+
+void CurveCPClientSocket::Disconnect() {
+ // TODO(mbelshe): DCHECK that we're connected.
+ // Record the ConnectionKey so that we can disconnect it properly.
+ // Do we need a close() on the messenger?
+ // packetizer_.Close();
+}
+
+bool CurveCPClientSocket::IsConnected() const {
+ // TODO(mbelshe): return packetizer_.IsConnected();
+ return false;
+}
+
+bool CurveCPClientSocket::IsConnectedAndIdle() const {
+ // TODO(mbelshe): return packetizer_.IsConnectedAndIdle();
+ return false;
+}
+
+int CurveCPClientSocket::GetPeerAddress(AddressList* address) const {
+ IPEndPoint endpoint;
+ int rv = packetizer_.GetPeerAddress(&endpoint);
+ if (rv < 0)
+ return rv;
+ struct sockaddr_storage sockaddr;
+ size_t sockaddr_length = sizeof(sockaddr);
+ bool success = endpoint.ToSockAddr(
+ reinterpret_cast<struct sockaddr*>(&sockaddr), &sockaddr_length);
+ if (!success)
+ return ERR_FAILED;
+ struct addrinfo ai;
+ memset(&ai, 0, sizeof(ai));
+ memcpy(&ai.ai_addr, &sockaddr, sockaddr_length);
+ ai.ai_addrlen = sockaddr_length;
+ *address = AddressList::CreateByCopying(&ai);
+ return OK;
+}
+
+int CurveCPClientSocket::GetLocalAddress(IPEndPoint* address) const {
+ NOTIMPLEMENTED();
+ return ERR_FAILED;
+}
+
+const BoundNetLog& CurveCPClientSocket::NetLog() const {
+ return net_log_;
+}
+
+void CurveCPClientSocket::SetSubresourceSpeculation() {
+ // This is ridiculous.
+}
+
+void CurveCPClientSocket::SetOmniboxSpeculation() {
+ // This is ridiculous.
+}
+
+bool CurveCPClientSocket::WasEverUsed() const {
+ // This is ridiculous.
+ return true;
+}
+
+bool CurveCPClientSocket::UsingTCPFastOpen() const {
+ // This is ridiculous.
+ return false;
+}
+
+int CurveCPClientSocket::Read(IOBuffer* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ return messenger_.Read(buf, buf_len, callback);
+}
+
+int CurveCPClientSocket::Write(IOBuffer* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ return messenger_.Write(buf, buf_len, callback);
+}
+
+bool CurveCPClientSocket::SetReceiveBufferSize(int32 size) {
+ return true;
+}
+
+bool CurveCPClientSocket::SetSendBufferSize(int32 size) {
+ return true;
+}
+
+} // namespace net
diff --git a/net/curvecp/curvecp_client_socket.h b/net/curvecp/curvecp_client_socket.h
new file mode 100644
index 0000000..213b13d
--- /dev/null
+++ b/net/curvecp/curvecp_client_socket.h
@@ -0,0 +1,57 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_CURVECP_CLIENT_SOCKET_H_
+#define NET_CURVECP_CURVECP_CLIENT_SOCKET_H_
+#pragma once
+
+#include "net/base/completion_callback.h"
+#include "net/curvecp/client_packetizer.h"
+#include "net/curvecp/messenger.h"
+#include "net/socket/stream_socket.h"
+
+namespace net {
+
+// A client socket that uses CurveCP as the transport layer.
+class CurveCPClientSocket : public StreamSocket {
+ public:
+ // The IP address(es) and port number to connect to. The CurveCP socket will
+ // try each IP address in the list until it succeeds in establishing a
+ // connection.
+ CurveCPClientSocket(const AddressList& addresses,
+ net::NetLog* net_log,
+ const net::NetLog::Source& source);
+ virtual ~CurveCPClientSocket();
+
+ // ClientSocket methods:
+ virtual int Connect(CompletionCallback* callback);
+ virtual void Disconnect();
+ virtual bool IsConnected() const;
+ virtual bool IsConnectedAndIdle() const;
+ virtual int GetPeerAddress(AddressList* address) const;
+ virtual int GetLocalAddress(IPEndPoint* address) const;
+ virtual const BoundNetLog& NetLog() const;
+ virtual void SetSubresourceSpeculation();
+ virtual void SetOmniboxSpeculation();
+ virtual bool WasEverUsed() const;
+ virtual bool UsingTCPFastOpen() const;
+
+ // Socket methods:
+ virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ virtual int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ private:
+ AddressList addresses_;
+ BoundNetLog net_log_;
+ Messenger messenger_;
+ ClientPacketizer packetizer_;
+
+ DISALLOW_COPY_AND_ASSIGN(CurveCPClientSocket);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_CURVECP_CLIENT_SOCKET_H_
diff --git a/net/curvecp/curvecp_server_socket.cc b/net/curvecp/curvecp_server_socket.cc
new file mode 100644
index 0000000..32d3786
--- /dev/null
+++ b/net/curvecp/curvecp_server_socket.cc
@@ -0,0 +1,82 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
+#include "net/base/sys_addrinfo.h"
+#include "net/curvecp/curvecp_server_socket.h"
+#include "net/curvecp/messenger.h"
+
+namespace net {
+
+CurveCPServerSocket::CurveCPServerSocket(net::NetLog* net_log,
+ const net::NetLog::Source& source)
+ : net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)),
+ packetizer_(new ServerPacketizer()),
+ ALLOW_THIS_IN_INITIALIZER_LIST(messenger_(packetizer_.get(), this)),
+ acceptor_(NULL),
+ is_child_socket_(false) {
+}
+
+// Constructor for an accepted socket.
+CurveCPServerSocket::CurveCPServerSocket(const ConnectionKey& key,
+ ServerPacketizer* packetizer,
+ net::NetLog* net_log,
+ const net::NetLog::Source& source)
+ : net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)),
+ packetizer_(packetizer),
+ ALLOW_THIS_IN_INITIALIZER_LIST(messenger_(key, packetizer_.get(), this)),
+ acceptor_(NULL),
+ is_child_socket_(true),
+ key_(key) {
+}
+
+CurveCPServerSocket::~CurveCPServerSocket() {
+}
+
+int CurveCPServerSocket::Listen(const IPEndPoint& endpoint,
+ Acceptor* acceptor) {
+ acceptor_ = acceptor;
+ return packetizer_->Listen(endpoint, &messenger_);
+}
+
+void CurveCPServerSocket::Close() {
+ if (acceptor_)
+ return; // The listener socket is not closable.
+ packetizer_->Close(key_);
+}
+
+int CurveCPServerSocket::Read(IOBuffer* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ return messenger_.Read(buf, buf_len, callback);
+}
+
+int CurveCPServerSocket::Write(IOBuffer* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ return messenger_.Write(buf, buf_len, callback);
+}
+
+bool CurveCPServerSocket::SetReceiveBufferSize(int32 size) {
+ return true;
+}
+
+bool CurveCPServerSocket::SetSendBufferSize(int32 size) {
+ return true;
+}
+
+void CurveCPServerSocket::OnAccept(ConnectionKey key) {
+ DCHECK(acceptor_);
+
+ CurveCPServerSocket* new_socket =
+ new CurveCPServerSocket(key,
+ packetizer_,
+ net_log_.net_log(),
+ NetLog::Source());
+ packetizer_->Open(key, new_socket->messenger());
+ acceptor_->OnAccept(new_socket);
+}
+
+} // namespace net
diff --git a/net/curvecp/curvecp_server_socket.h b/net/curvecp/curvecp_server_socket.h
new file mode 100644
index 0000000..672a574
--- /dev/null
+++ b/net/curvecp/curvecp_server_socket.h
@@ -0,0 +1,63 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_CURVECP_SERVER_SOCKET_H_
+#define NET_CURVECP_CURVECP_SERVER_SOCKET_H_
+#pragma once
+
+#include "net/base/completion_callback.h"
+#include "net/curvecp/connection_key.h"
+#include "net/curvecp/protocol.h"
+#include "net/curvecp/server_messenger.h"
+#include "net/curvecp/server_packetizer.h"
+#include "net/socket/stream_socket.h"
+
+namespace net {
+
+// A server socket that uses CurveCP as the transport layer.
+class CurveCPServerSocket : public Socket,
+ public ServerMessenger::Acceptor {
+ public:
+ class Acceptor {
+ public:
+ virtual ~Acceptor() {}
+ virtual void OnAccept(CurveCPServerSocket* new_socket) = 0;
+ };
+
+ CurveCPServerSocket(net::NetLog* net_log,
+ const net::NetLog::Source& source);
+ virtual ~CurveCPServerSocket();
+
+ int Listen(const IPEndPoint& endpoint, Acceptor* acceptor);
+ void Close();
+
+ // Socket methods:
+ virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ virtual int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ // ServerMessenger::Acceptor methods:
+ virtual void OnAccept(ConnectionKey key);
+
+ private:
+ CurveCPServerSocket(const ConnectionKey& key,
+ ServerPacketizer* packetizer_,
+ net::NetLog* net_log,
+ const net::NetLog::Source& source);
+ ServerMessenger* messenger() { return &messenger_; }
+
+ BoundNetLog net_log_;
+ scoped_refptr<ServerPacketizer> packetizer_;
+ ServerMessenger messenger_;
+ Acceptor* acceptor_;
+ bool is_child_socket_; // Was this socket accepted from another.
+ ConnectionKey key_;
+
+ DISALLOW_COPY_AND_ASSIGN(CurveCPServerSocket);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_CURVECP_SERVER_SOCKET_H_
diff --git a/net/curvecp/curvecp_transfer_unittest.cc b/net/curvecp/curvecp_transfer_unittest.cc
new file mode 100644
index 0000000..e433118
--- /dev/null
+++ b/net/curvecp/curvecp_transfer_unittest.cc
@@ -0,0 +1,313 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <poll.h>
+
+#include "base/basictypes.h"
+#include "base/message_loop.h"
+#include "base/metrics/histogram.h"
+#include "net/base/net_test_suite.h"
+#include "net/base/test_completion_callback.h"
+#include "net/curvecp/circular_buffer.h"
+#include "net/curvecp/received_block_list.h"
+#include "net/curvecp/rtt_and_send_rate_calculator.h"
+#include "net/curvecp/test_client.h"
+#include "net/curvecp/test_server.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "testing/platform_test.h"
+
+namespace net {
+
+class CurveCPTransferTest : public PlatformTest {
+ public:
+ CurveCPTransferTest() {}
+};
+
+void RunEchoTest(int bytes) {
+ TestServer server;
+ TestClient client;
+
+ EXPECT_TRUE(server.Start(1234));
+ HostPortPair server_address("localhost", 1234);
+ TestCompletionCallback cb;
+ EXPECT_TRUE(client.Start(server_address, bytes, &cb));
+
+ int rv = cb.WaitForResult();
+ EXPECT_EQ(0, rv);
+ EXPECT_EQ(0, server.error_count());
+ EXPECT_EQ(0, client.error_count());
+}
+
+/*
+TEST_F(CurveCPTransferTest, Echo_50B_Of_Data) {
+ RunEchoTest(50);
+}
+
+TEST_F(CurveCPTransferTest, Echo_1KB_Of_Data) {
+ RunEchoTest(1024);
+}
+
+TEST_F(CurveCPTransferTest, Echo_4KB_Of_Data) {
+ RunEchoTest(4096);
+}
+*/
+
+// XXXMB
+TEST_F(CurveCPTransferTest, Echo_1MB_Of_Data) {
+ RunEchoTest(1024*1024);
+}
+
+// TODO(mbelshe): Do something meaningful with this test.
+TEST_F(CurveCPTransferTest, RTTIntial) {
+ RttAndSendRateCalculator rate;
+ printf("initial %d (%d, %d/%d)\n", rate.send_rate(),
+ rate.rtt_average(),
+ rate.rtt_lo(),
+ rate.rtt_hi());
+
+ for (int i = 0; i < 10; ++i) {
+ rate.OnMessage(100*1000);
+ printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(),
+ rate.rtt_average(),
+ rate.rtt_lo(),
+ rate.rtt_hi());
+ usleep(200000);
+ }
+
+ rate.OnTimeout();
+ rate.OnTimeout();
+ rate.OnTimeout();
+ rate.OnTimeout();
+
+ // Get worse
+ for (int i = 0; i < 10; ++i) {
+ rate.OnMessage((500 + i)*1000);
+ printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(),
+ rate.rtt_average(),
+ rate.rtt_lo(),
+ rate.rtt_hi());
+ usleep(200000);
+ }
+
+ // Get better
+ for (int i = 0; i < 10; ++i) {
+ rate.OnMessage((100 - i)*1000);
+ printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(),
+ rate.rtt_average(),
+ rate.rtt_lo(),
+ rate.rtt_hi());
+ usleep(200000);
+ }
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferFillAndDrain) {
+ CircularBuffer buffer(10);
+ for (int i = 0; i < 30; ++i) {
+ EXPECT_EQ(0, buffer.length());
+ EXPECT_EQ(3, buffer.write("abc", 3));
+ EXPECT_EQ(3, buffer.length());
+ char read_data[3];
+ EXPECT_EQ(3, buffer.read(read_data, 3));
+ EXPECT_EQ(0, memcmp(read_data, "abc", 3));
+ }
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferTooLargeFill) {
+ CircularBuffer buffer(10);
+ EXPECT_EQ(10, buffer.write("abcdefghijklm", 13));
+ EXPECT_EQ(0, buffer.write("a", 1));
+ char read_data[10];
+ EXPECT_EQ(10, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10));
+ EXPECT_EQ(1, buffer.write("a", 1));
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferEdgeCases) {
+ CircularBuffer buffer(10);
+ char read_data[10];
+ EXPECT_EQ(9, buffer.write("abcdefghi", 9));
+ EXPECT_EQ(9, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "abcdefghi", 9));
+ EXPECT_EQ(1, buffer.write("a", 1));
+ EXPECT_EQ(1, buffer.write("b", 1));
+ EXPECT_EQ(2, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "ab", 2));
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferOneWriteTwoReads) {
+ CircularBuffer buffer(1000);
+ char read_data[10];
+ EXPECT_EQ(13, buffer.write("abcdefghijklm", 13));
+ EXPECT_EQ(10, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10));
+ EXPECT_EQ(3, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "klm", 3));
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferTwoWritesOneRead) {
+ CircularBuffer buffer(1000);
+ char read_data[10];
+ EXPECT_EQ(6, buffer.write("abcdef", 6));
+ EXPECT_EQ(4, buffer.write("ghij", 4));
+ EXPECT_EQ(10, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10));
+}
+
+TEST_F(CurveCPTransferTest, CircularBufferFillOnSpill) {
+ CircularBuffer buffer(10);
+ char read_data[10];
+ EXPECT_EQ(10, buffer.write("abcdefghij", 10));
+ EXPECT_EQ(3, buffer.read(read_data, 3));
+ EXPECT_EQ(0, memcmp(read_data, "abc", 3));
+ // We now have a hole at the head of the circular buffer.
+ EXPECT_EQ(1, buffer.write("x", 1));
+ EXPECT_EQ(1, buffer.write("y", 1));
+ EXPECT_EQ(1, buffer.write("z", 1));
+ EXPECT_EQ(0, buffer.write("q", 1)); // Overflow
+ EXPECT_EQ(10, buffer.read(read_data, 10));
+ EXPECT_EQ(0, memcmp(read_data, "defghijxyz", 10));
+}
+
+TEST_F(CurveCPTransferTest, ReceivedBlockList) {
+ scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(16));
+ memcpy(buffer->data(), "abcdefghijklmnop", 16);
+ scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100));
+
+ ReceivedBlockList list;
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(0, buffer, 16);
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(0, buffer, 16);
+ EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size()));
+ EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 16));
+ EXPECT_EQ(16, list.current_position());
+}
+
+TEST_F(CurveCPTransferTest, ReceivedBlockListCoalesce) {
+ scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8));
+ memcpy(buffer->data(), "abcdefgh", 8);
+ scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100));
+
+ ReceivedBlockList list;
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(0, buffer, 8);
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(8, buffer, 8);
+ EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size()));
+ EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 8));
+ EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data() + 8, 8));
+ EXPECT_EQ(16, list.current_position());
+}
+
+TEST_F(CurveCPTransferTest, ReceivedBlockListGap) {
+ scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8));
+ memcpy(buffer->data(), "abcdefgh", 8);
+ scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100));
+
+ ReceivedBlockList list;
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(0, buffer, 8);
+ EXPECT_EQ(0, list.current_position());
+
+ // Leaves a gap
+ list.AddBlock(16, buffer, 8);
+ EXPECT_EQ(8, list.ReadBytes(read_buffer.get(), read_buffer->size()));
+ EXPECT_EQ(0, list.ReadBytes(read_buffer.get(), read_buffer->size()));
+ EXPECT_EQ(8, list.current_position());
+
+ // Fill the gap
+ list.AddBlock(8, buffer, 8);
+ EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size()));
+ EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 8));
+ EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data() + 8, 8));
+ EXPECT_EQ(24, list.current_position());
+}
+
+TEST_F(CurveCPTransferTest, ReceivedBlockListPartialRead) {
+ scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8));
+ memcpy(buffer->data(), "abcdefgh", 8);
+ scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100));
+
+ ReceivedBlockList list;
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(0, buffer, 8);
+ EXPECT_EQ(0, list.current_position());
+ list.AddBlock(8, buffer, 8);
+ EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("abc", read_buffer->data(), 3));
+ EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("def", read_buffer->data(), 3));
+ EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("gha", read_buffer->data(), 3));
+ EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("bcd", read_buffer->data(), 3));
+ EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("efg", read_buffer->data(), 3));
+ EXPECT_EQ(1, list.ReadBytes(read_buffer.get(), 3));
+ EXPECT_EQ(0, memcmp("h", read_buffer->data(), 1));
+ EXPECT_EQ(16, list.current_position());
+}
+
+/*
+
+class PollTimerTester {
+ public:
+ PollTimerTester() {
+ Test();
+ }
+
+ void Test() {
+ struct pollfd fds[3];
+ memset(fds, 0, sizeof(struct pollfd) * 3);
+
+ while(true) {
+ (void)poll(fds, 1, 2);
+ base::TimeTicks now(base::TimeTicks::Now());
+ if (!last_.is_null()) {
+ LOG(ERROR) << "Time was " << (now - last_).InMicroseconds();
+ LOG(ERROR) << "Time was " << (now - last_).InMilliseconds();
+ }
+ last_ = now;
+ }
+ }
+
+ base::TimeTicks last_;
+};
+
+class TimerTester {
+ public:
+ TimerTester() {
+ timer_.Start(base::TimeDelta(), this, &TimerTester::OnTimer);
+ }
+ void OnTimer() {
+ base::TimeTicks now(base::TimeTicks::Now());
+ if (!last_.is_null()) {
+ LOG(ERROR) << "Time was " << (now - last_).InMicroseconds();
+ LOG(ERROR) << "Time was " << (now - last_).InMilliseconds();
+ }
+ last_ = now;
+ //timer_.Start(base::TimeDelta(), this, &TimerTester::OnTimer);
+ timer_.Start(base::TimeDelta::FromMicroseconds(150),
+ this,
+ &TimerTester::OnTimer);
+ }
+
+ base::TimeTicks last_;
+ base::OneShotTimer<TimerTester> timer_;
+};
+
+TEST_F(CurveCPTransferTest, MinTimer) {
+ PollTimerTester tester;
+ MessageLoop::current()->Run();
+}
+
+*/
+
+} // namespace net
+
+int main(int argc, char**argv) {
+ base::StatisticsRecorder recorder;
+ NetTestSuite test_suite(argc, argv);
+ return test_suite.Run();
+}
diff --git a/net/curvecp/messenger.cc b/net/curvecp/messenger.cc
new file mode 100644
index 0000000..466cef5
--- /dev/null
+++ b/net/curvecp/messenger.cc
@@ -0,0 +1,372 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/messenger.h"
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/curvecp/protocol.h"
+
+// Basic protocol design:
+//
+// OnTimeout: Called when the timeout timer pops.
+// - call SendMessage()
+// - call RecvMessage()
+//
+// OnSendTimer: Called when the send-timer pops.
+// - call SendMessage()
+// - call RecvMessage()
+//
+// OnMessage: Called when a message arrived from the packet layer
+// - add the message to the receive queue
+// - call RecvMessage()
+//
+// Write: Called by application to write data to remote
+// - add the data to the send_buffer
+// - call SendMessage()
+//
+// SendMessage: Called to Send a message to the remote
+// - msg = first message to retransmit where retransmit timer popped
+// - if msg == NULL
+// - msg = create a new message from the send buffer
+// - if msg != NULL
+// - send message to the packet layer
+// - setTimer(OnSendTimer, send_rate);
+//
+// RecvMessage: Called to process a Received message from the remote
+// - calculate RTT
+// - calculate Send Rate
+// - acknowledge data from received message
+// - resetTimeout
+// - timeout = now + rtt_timeout
+// - if currrent_timeout > timeout
+// setTimer(OnTimeout, timeout)
+
+namespace net {
+
+// Maximum number of write blocks.
+static const size_t kMaxWriteQueueMessages = 128;
+
+// Size of the send buffer.
+static const size_t kSendBufferSize = (128 * 1024);
+// Size of the receive buffer.
+static const size_t kReceiveBufferSize = (128 * 1024);
+
+Messenger::Messenger(Packetizer* packetizer)
+ : packetizer_(packetizer),
+ send_buffer_(kSendBufferSize),
+ send_complete_callback_(NULL),
+ receive_complete_callback_(NULL),
+ pending_receive_length_(0),
+ send_message_in_progress_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ send_message_callback_(this, &Messenger::OnSendMessageComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
+}
+
+Messenger::~Messenger() {
+}
+
+int Messenger::Read(IOBuffer* buf, int buf_len, CompletionCallback* callback) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(!receive_complete_callback_);
+
+ if (!received_list_.bytes_available()) {
+ receive_complete_callback_ = callback;
+ pending_receive_ = buf;
+ pending_receive_length_ = buf_len;
+ return ERR_IO_PENDING;
+ }
+
+ int bytes_read = InternalRead(buf, buf_len);
+ DCHECK_LT(0, bytes_read);
+ return bytes_read;
+}
+
+int Messenger::Write(IOBuffer* buf, int buf_len, CompletionCallback* callback) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(!pending_send_.get()); // Already a write pending!
+ DCHECK(!send_complete_callback_);
+ DCHECK_LT(0, buf_len);
+
+ int len = send_buffer_.write(buf->data(), buf_len);
+ if (!send_timer_.IsRunning())
+ send_timer_.Start(base::TimeDelta(), this, &Messenger::OnSendTimer);
+ if (len)
+ return len;
+
+ // We couldn't add data to the send buffer, so block the application.
+ pending_send_ = buf;
+ pending_send_length_ = buf_len;
+ send_complete_callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
+void Messenger::OnConnection(ConnectionKey key) {
+ LOG(ERROR) << "Client Connect: " << key.ToString();
+ key_ = key;
+}
+
+void Messenger::OnClose(Packetizer* packetizer, ConnectionKey key) {
+ LOG(ERROR) << "Got Close!";
+}
+
+void Messenger::OnMessage(Packetizer* packetizer,
+ ConnectionKey key,
+ unsigned char* msg,
+ size_t length) {
+ DCHECK(key == key_);
+
+ // Do basic message sanity checking.
+ if (length < sizeof(Message))
+ return;
+ if (length > static_cast<size_t>(kMaxMessageLength))
+ return;
+
+ // Add message to received queue.
+ scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(length));
+ memcpy(buffer->data(), msg, length);
+ read_queue_.push_back(buffer);
+
+ // Process a single received message
+ RecvMessage();
+}
+
+int Messenger::InternalRead(IOBuffer* buffer, int buffer_length) {
+ return received_list_.ReadBytes(buffer, buffer_length);
+}
+
+IOBufferWithSize* Messenger::CreateBufferFromSendQueue() {
+ DCHECK_LT(0, send_buffer_.length());
+
+ int length = std::min(packetizer_->max_message_payload(),
+ send_buffer_.length());
+ IOBufferWithSize* rv = new IOBufferWithSize(length);
+ int bytes = send_buffer_.read(rv->data(), length);
+ DCHECK_EQ(bytes, length);
+
+ // We consumed data, check to see if someone is waiting to write more data.
+ if (send_complete_callback_) {
+ DCHECK(pending_send_.get());
+
+ int len = send_buffer_.write(pending_send_->data(), pending_send_length_);
+ if (len) {
+ pending_send_ = NULL;
+ CompletionCallback* callback = send_complete_callback_;
+ send_complete_callback_ = NULL;
+ callback->Run(len);
+ }
+ }
+
+ return rv;
+}
+
+void Messenger::OnSendMessageComplete(int result) {
+ DCHECK_NE(ERR_IO_PENDING, result);
+
+ send_message_in_progress_ = false;
+
+ if (result <= 0) {
+ // TODO(mbelshe): Handle error.
+ NOTIMPLEMENTED();
+ }
+
+ // If the send timer fired while we were waiting for this send to complete,
+ // we need to manually run the timer now.
+ if (!send_timer_.IsRunning())
+ OnSendTimer();
+
+ if (!send_timeout_timer_.IsRunning()) {
+ LOG(ERROR) << "RttTimeout is " << rtt_.rtt_timeout();
+ base::TimeDelta delay =
+ base::TimeDelta::FromMicroseconds(rtt_.rtt_timeout());
+ send_timeout_timer_.Start(delay, this, &Messenger::OnTimeout);
+ }
+}
+
+void Messenger::OnTimeout() {
+ LOG(ERROR) << "OnTimeout fired";
+ int64 position = sent_list_.FindPositionOfOldestSentBlock();
+ // XXXMB - need to verify that we really need to retransmit...
+ if (position >= 0) {
+ rtt_.OnTimeout(); // adjust our send rate.
+ LOG(ERROR) << "OnTimeout retransmitting: " << position;
+ SendMessage(position);
+ } else {
+ DCHECK_EQ(0u, sent_list_.size());
+ }
+ RecvMessage();
+ received_list_.LogBlockList();
+}
+
+void Messenger::OnSendTimer() {
+ LOG(ERROR) << "OnSendTimer!";
+ DCHECK(!send_timer_.IsRunning());
+
+ // If the send buffer is empty, then we don't need to keep firing.
+ if (!send_buffer_.length()) {
+ LOG(ERROR) << "OnSendTimer: send_buffer empty";
+ return;
+ }
+
+ // Set the next send timer.
+ LOG(ERROR) << "SendRate is: " << rtt_.send_rate() << "us";
+ send_timer_.Start(base::TimeDelta::FromMicroseconds(rtt_.send_rate()),
+ this,
+ &Messenger::OnSendTimer);
+
+ // Create a block from the send_buffer.
+ if (!sent_list_.is_full()) {
+ scoped_refptr<IOBufferWithSize> buffer = CreateBufferFromSendQueue();
+ int64 position = sent_list_.CreateBlock(buffer.get());
+ DCHECK_LE(0, position);
+ SendMessage(position);
+ }
+
+ RecvMessage(); // Try to process an incoming message
+}
+
+void Messenger::SendMessage(int64 position) {
+ LOG(ERROR) << "SendMessage (position=" << position << ")";
+ if (send_message_in_progress_)
+ return; // We're still waiting for the last write to complete.
+
+ IOBufferWithSize* data = sent_list_.FindBlockByPosition(position);
+ DCHECK(data != NULL);
+ size_t message_size = sizeof(Message) + data->size();
+ size_t padded_size = (message_size + 15) & 0xfffffff0;
+
+ scoped_refptr<IOBufferWithSize> message(new IOBufferWithSize(padded_size));
+ Message* header = reinterpret_cast<Message*>(message->data());
+ memset(header, 0, sizeof(Message));
+ uint64 id = sent_list_.GetNewMessageId();
+ uint32_pack(header->message_id, id);
+ // TODO(mbelshe): Needs to carry EOF flags
+ uint16_pack(header->size.val, data->size());
+ uint64_pack(header->position, position);
+ // TODO(mbelshe): Fill in rest of the header fields.
+ // needs to have the block-position. He tags each chunk with an
+ // absolute offset into the data stream.
+ // Copy the contents of the message into the Message frame.
+ memcpy(message->data() + sizeof(Message), data->data(), data->size());
+
+ sent_list_.MarkBlockSent(position, id);
+
+ int rv = packetizer_->SendMessage(key_,
+ message->data(),
+ padded_size,
+ &send_message_callback_);
+ if (rv == ERR_IO_PENDING) {
+ send_message_in_progress_ = true;
+ return;
+ }
+
+ // UDP must write all or none.
+ DCHECK_EQ(padded_size, static_cast<size_t>(rv));
+ OnSendMessageComplete(rv);
+}
+
+void Messenger::RecvMessage() {
+ if (!read_queue_.size())
+ return;
+
+ scoped_refptr<IOBufferWithSize> message(read_queue_.front());
+ read_queue_.pop_front();
+
+ Message* header = reinterpret_cast<Message*>(message->data());
+ uint16 body_length = uint16_unpack(header->size.val);
+ if (body_length < 0)
+ return;
+ if (body_length > kMaxMessageLength)
+ return;
+ if (body_length > message->size())
+ return;
+
+ uint32 message_id = uint32_unpack(header->message_id);
+ if (message_id) {
+ LOG(ERROR) << "RecvMessage Message id: " << message_id
+ << ", " << body_length << " bytes";
+ } else {
+ LOG(ERROR) << "RecvMessage ACK ";
+ }
+
+ // See if this message has information for recomputing RTT.
+ uint32 response_to_msg = uint32_unpack(header->last_message_received);
+ base::TimeTicks last_sent_time = sent_list_.FindLastSendTime(response_to_msg);
+ if (!last_sent_time.is_null()) {
+ int rtt = (base::TimeTicks::Now() - last_sent_time).InMicroseconds();
+ DCHECK_LE(0, rtt);
+ LOG(ERROR) << "RTT was: " << rtt << "us";
+ rtt_.OnMessage(rtt);
+ }
+
+ // Handle acknowledgements
+ uint64 start_byte = 0;
+ uint64 stop_byte = uint64_unpack(header->acknowledge_1);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ start_byte = stop_byte + uint16_unpack(header->gap_1);
+ stop_byte = start_byte + uint16_unpack(header->acknowledge_2);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ start_byte = stop_byte + uint16_unpack(header->gap_2);
+ stop_byte = start_byte + uint16_unpack(header->acknowledge_3);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ start_byte = stop_byte + uint16_unpack(header->gap_3);
+ stop_byte = start_byte + uint16_unpack(header->acknowledge_4);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ start_byte = stop_byte + uint16_unpack(header->gap_4);
+ stop_byte = start_byte + uint16_unpack(header->acknowledge_5);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ start_byte = stop_byte + uint16_unpack(header->gap_5);
+ stop_byte = start_byte + uint16_unpack(header->acknowledge_6);
+ sent_list_.AcknowledgeBlocks(start_byte, stop_byte);
+
+ if (!header->is_ack()) {
+ // Add to our received block list
+ uint64 position = uint64_unpack(header->position);
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(body_length));
+ memcpy(buffer->data(), message->data() + sizeof(Message), body_length);
+ received_list_.AddBlock(position, buffer, body_length);
+
+ SendAck(message_id);
+ }
+
+ // If we have data available, and a read is pending, notify the callback.
+ if (received_list_.bytes_available() && receive_complete_callback_) {
+ // Pass the data up to the caller.
+ int bytes_read = InternalRead(pending_receive_, pending_receive_length_);
+ CompletionCallback* callback = receive_complete_callback_;
+ receive_complete_callback_ = NULL;
+ callback->Run(bytes_read);
+ }
+}
+
+void Messenger::SendAck(uint32 last_message_received) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(Message)));
+ memset(buffer->data(), 0, sizeof(Message));
+ Message* message = reinterpret_cast<Message*>(buffer->data());
+ uint32_pack(message->last_message_received, last_message_received);
+ uint64_pack(message->acknowledge_1, received_list_.bytes_received());
+ LOG(ERROR) << "SendAck " << received_list_.bytes_received();
+ // TODO(mbelshe): fill in remainder of selective acknowledgements
+
+ // TODO(mbelshe): Fix this - it is totally possible to have a send message
+ // in progress here...
+ DCHECK(!send_message_in_progress_);
+
+ int rv = packetizer_->SendMessage(key_,
+ buffer->data(),
+ sizeof(Message),
+ &send_message_callback_);
+ // TODO(mbelshe): Fix me! Deal with the error cases
+ DCHECK(rv == sizeof(Message));
+}
+
+} // namespace net
diff --git a/net/curvecp/messenger.h b/net/curvecp/messenger.h
new file mode 100644
index 0000000..7bd1bb8
--- /dev/null
+++ b/net/curvecp/messenger.h
@@ -0,0 +1,104 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_MESSENGER_H_
+#define NET_CURVECP_MESSENGER_H_
+#pragma once
+
+#include <deque>
+#include <list>
+
+#include "base/basictypes.h"
+#include "base/task.h"
+#include "base/threading/non_thread_safe.h"
+#include "base/timer.h"
+#include "net/base/completion_callback.h"
+#include "net/curvecp/circular_buffer.h"
+#include "net/curvecp/packetizer.h"
+#include "net/curvecp/received_block_list.h"
+#include "net/curvecp/rtt_and_send_rate_calculator.h"
+#include "net/curvecp/sent_block_list.h"
+#include "net/socket/socket.h"
+
+namespace net {
+
+class DrainableIOBuffer;
+class IOBufferWithSize;
+class Packetizer;
+
+// The messenger provides the reliable CurveCP transport.
+class Messenger : public base::NonThreadSafe,
+ public Packetizer::Listener {
+ public:
+ explicit Messenger(Packetizer* packetizer);
+ virtual ~Messenger();
+
+ int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+
+ // Packetizer::Listener methods:
+ virtual void OnConnection(ConnectionKey key);
+ virtual void OnClose(Packetizer* packetizer, ConnectionKey key);
+ virtual void OnMessage(Packetizer* packetizer,
+ ConnectionKey key,
+ unsigned char* msg,
+ size_t length);
+
+ protected:
+ ConnectionKey key_;
+
+ private:
+ // Handle reading data from the read queue.
+ int InternalRead(IOBuffer* buffer, int buffer_length);
+
+ // Extracts data from send queue to create a new buffer of data to send.
+ IOBufferWithSize* CreateBufferFromSendQueue();
+
+ // Continuation function after a SendMessage() call was blocked.
+ void OnSendMessageComplete(int result);
+
+ // Protocol handling routines
+ void OnTimeout();
+ void OnSendTimer();
+ void SendMessage(int64 position);
+ void RecvMessage();
+ void SendAck(uint32 last_message_received);
+
+ RttAndSendRateCalculator rtt_;
+ Packetizer* packetizer_;
+
+ // The send_buffer is a list of pending data to pack into messages and send
+ // to the remote.
+ CircularBuffer send_buffer_;
+ CompletionCallback* send_complete_callback_;
+ scoped_refptr<IOBuffer> pending_send_;
+ int pending_send_length_;
+
+ // The read_buffer is a list of pending data which has been unpacked from
+ // messages and is awaiting delivery to the application.
+ CompletionCallback* receive_complete_callback_;
+ scoped_refptr<IOBuffer> pending_receive_;
+ int pending_receive_length_;
+
+ // The list of received but unprocessed messages.
+ std::list<scoped_refptr<IOBufferWithSize> > read_queue_;
+
+ ReceivedBlockList received_list_;
+ SentBlockList sent_list_;
+ bool send_message_in_progress_;
+
+ // A timer to fire when a timeout has occurred.
+ base::OneShotTimer<Messenger> send_timeout_timer_;
+ // A timer to fire when we can send data.
+ base::OneShotTimer<Messenger> send_timer_;
+
+ CompletionCallbackImpl<Messenger> send_message_callback_;
+
+ ScopedRunnableMethodFactory<Messenger> factory_;
+ DISALLOW_COPY_AND_ASSIGN(Messenger);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_MESSENGER_H_
diff --git a/net/curvecp/packetizer.h b/net/curvecp/packetizer.h
new file mode 100644
index 0000000..30d6c29
--- /dev/null
+++ b/net/curvecp/packetizer.h
@@ -0,0 +1,51 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_PACKETIZER_H_
+#define NET_CURVECP_PACKETIZER_H_
+#pragma once
+
+#include "base/basictypes.h"
+#include "base/scoped_ptr.h"
+#include "net/base/completion_callback.h"
+#include "net/curvecp/connection_key.h"
+
+namespace net {
+
+class IPEndPoint;
+
+class Packetizer {
+ public:
+ class Listener {
+ public:
+ virtual ~Listener() {}
+ // Callback for new connections.
+ virtual void OnConnection(ConnectionKey key) = 0;
+ virtual void OnMessage(Packetizer* packetizer,
+ ConnectionKey key,
+ unsigned char* msg,
+ size_t length) = 0;
+ };
+
+ virtual ~Packetizer() {}
+
+ // Send a message on a connection.
+ virtual int SendMessage(ConnectionKey key,
+ const char* data,
+ size_t length,
+ CompletionCallback* callback) = 0;
+
+ // Close an existing connection.
+ virtual void Close(ConnectionKey key) = 0;
+
+ virtual int GetPeerAddress(IPEndPoint* addresses) const = 0;
+
+ // Returns the current maximum message size which can be fit into the next
+ // message payload to be sent on the packetizer.
+ virtual int max_message_payload() const = 0;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_PACKETIZER_H_
diff --git a/net/curvecp/protocol.cc b/net/curvecp/protocol.cc
new file mode 100644
index 0000000..69f9b60
--- /dev/null
+++ b/net/curvecp/protocol.cc
@@ -0,0 +1,88 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/protocol.h"
+
+#include "base/basictypes.h"
+
+namespace net {
+
+void uint16_pack(uchar* dest, uint16 val) {
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+}
+
+uint16 uint16_unpack(uchar* src) {
+ uint16 result;
+ result = src[1];
+ result <<= 8;
+ result |= src[0];
+ return result;
+}
+
+void uint32_pack(uchar* dest, uint32 val) {
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+}
+
+uint32 uint32_unpack(uchar* src) {
+ uint32 result;
+ result = src[3];
+ result <<= 8;
+ result |= src[2];
+ result <<= 8;
+ result |= src[1];
+ result <<= 8;
+ result |= src[0];
+ return result;
+}
+
+void uint64_pack(uchar* dest, uint64 val) {
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+ *dest++ = val;
+ val >>= 8;
+}
+
+uint64 uint64_unpack(uchar* src) {
+ uint64 result;
+ result = src[7];
+ result <<= 8;
+ result |= src[6];
+ result <<= 8;
+ result |= src[5];
+ result <<= 8;
+ result |= src[4];
+ result <<= 8;
+ result |= src[3];
+ result <<= 8;
+ result |= src[2];
+ result <<= 8;
+ result |= src[1];
+ result <<= 8;
+ result |= src[0];
+ return result;
+}
+
+} // namespace net
diff --git a/net/curvecp/protocol.h b/net/curvecp/protocol.h
new file mode 100644
index 0000000..07eea49
--- /dev/null
+++ b/net/curvecp/protocol.h
@@ -0,0 +1,158 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_PROTOCOL_H_
+#define NET_CURVECP_PROTOCOL_H_
+#pragma once
+
+// Mike's thoughts on the protocol:
+// 1) the packet layer probably should have a "close" mechanism. although
+// some clients won't use it, it is nice to have.
+//
+// 2) when do we re "initiate"? maybe connections should have an inferred
+// lifetime, and client should always re-initiate after 1min?
+//
+// 3) An initiate packet can cary 640B of data. But how does the app layer
+// know that only 640B of data is available there? This is a bit awkward?
+
+#include "base/basictypes.h"
+
+namespace net {
+
+typedef unsigned char uchar;
+
+// Messages can range in size from 16 to 1088 bytes.
+const int kMinMessageLength = 16;
+const int kMaxMessageLength = 1088;
+
+// Maximum packet size.
+const int kMaxPacketLength = kMaxMessageLength + 96;
+
+
+
+// Packet layer.
+
+// All Packets start with an 8 byte identifier.
+struct Packet {
+ char id[8];
+};
+
+// A HelloPacket is sent from the client to the server to establish a secure
+// cookie to use when communicating with a server.
+struct HelloPacket : Packet {
+ uchar server_extension[16];
+ uchar client_extension[16];
+ uchar client_shortterm_public_key[32];
+ uchar zero[64];
+ uchar nonce[8];
+ uchar box[80];
+};
+
+// A CookiePacket is sent from the server to the client in response to a
+// HelloPacket.
+struct CookiePacket : Packet {
+ uchar client_extension[16];
+ uchar server_extension[16];
+ uchar nonce[16];
+ uchar box[144];
+};
+
+// An InitiatePacket is sent from the client to the server to initiate
+// the connection once a cookie has been exchanged.
+struct InitiatePacket : Packet {
+ uchar server_extension[16];
+ uchar client_extension[16];
+ uchar client_shortterm_public_key[32];
+ uchar server_cookie[96];
+ uchar initiate_nonce[8];
+
+ uchar client_longterm_public_key[32];
+ uchar nonce[16];
+ uchar box[48];
+ uchar server_name[256];
+
+ // A message is carried here.
+};
+
+// A ClientMessagePacket contains a Message from the client to the server.
+struct ClientMessagePacket : Packet {
+ uchar server_extension[16];
+ uchar client_extension[16];
+ uchar client_shortterm_public_key[32];
+ uchar nonce[8];
+
+ // A message is carried here of at least 16 bytes.
+};
+
+// A ServerMessagePacket contains a Message from the server to the client.
+struct ServerMessagePacket : Packet {
+ uchar client_extension[16];
+ uchar server_extension[16];
+ uchar nonce[8];
+
+ // A message is carried here of at least 16 bytes.
+};
+
+
+
+
+// Messaging layer.
+
+struct Message {
+ // If message_id is all zero, this Message is a pure ACK message.
+ uchar message_id[4];
+ // For regular messages going back and forth, last_message_received will be
+ // zero. For an ACK, it will be filled in and can be used to compute RTT.
+ uchar last_message_received[4];
+ uchar acknowledge_1[8]; // bytes acknowledged in the first range.
+ uchar gap_1[4]; // gap between the first range and the second range.
+ uchar acknowledge_2[2]; // bytes acknowledged in the second range.
+ uchar gap_2[2]; // gap between the second range and the third range.
+ uchar acknowledge_3[2]; // bytes acknowledged in the third range.
+ uchar gap_3[2]; // gap between the third range and the fourth range.
+ uchar acknowledge_4[2]; // bytes acknowledged in the fourth range.
+ uchar gap_4[2]; // gap between the fourth range and the fifth range.
+ uchar acknowledge_5[2]; // bytes acknowledged in the fifth range.
+ uchar gap_5[2]; // gap between the fifth range and the sixth range.
+ uchar acknowledge_6[2]; // bytes acknowledged in the sixth range.
+ union {
+ struct {
+ unsigned short unused:4;
+ unsigned short fail:1;
+ unsigned short success:1;
+ unsigned short length:10;
+ } bits;
+ uchar val[2];
+ } size;
+ uchar position[8];
+ uchar padding[16];
+
+ bool is_ack() { return *(reinterpret_cast<int32*>(message_id)) == 0; }
+};
+
+// Connection state
+// TODO(mbelshe) move this to the messenger.
+struct ConnectionState {
+ uchar client_shortterm_public_key[32];
+
+ // Fields we'll need going forward:
+ //
+ // uchar secret_key_client_short_server_short[32];
+ // crypto_uint64 received_nonce;
+ // crypto_uint64 sent_nonce;
+ // uchar client_extension[16];
+ // uchar server_extension[16];
+};
+
+void uint16_pack(uchar* dest, uint16 val);
+uint16 uint16_unpack(uchar* src);
+void uint32_pack(uchar* dest, uint32 val);
+uint32 uint32_unpack(uchar* src);
+void uint64_pack(uchar* dest, uint64 val);
+uint64 uint64_unpack(uchar* src);
+
+
+} // namespace net
+
+#endif // NET_CURVECP_PROTOCOL_H_
diff --git a/net/curvecp/received_block_list.cc b/net/curvecp/received_block_list.cc
new file mode 100644
index 0000000..b7b6059
--- /dev/null
+++ b/net/curvecp/received_block_list.cc
@@ -0,0 +1,96 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/received_block_list.h"
+
+#include "net/base/io_buffer.h"
+
+namespace net {
+
+ReceivedBlockList::ReceivedBlockList()
+ : current_position_(0),
+ bytes_received_(0) {
+}
+
+ReceivedBlockList::~ReceivedBlockList() {
+}
+
+int ReceivedBlockList::bytes_available() const {
+ int bytes_available = 0;
+ BlockList::const_iterator it = list_.begin();
+ while (it != list_.end()) {
+ if (it->position != current_position_ + bytes_available)
+ break;
+ bytes_available += it->data->size();
+ it++;
+ }
+ return bytes_available;
+}
+
+void ReceivedBlockList::AddBlock(int64 position, IOBuffer* data, int length) {
+ if (position < bytes_received_) {
+ LOG(ERROR) << "Unnecessary retransmit of " << position;
+ return;
+ }
+
+ BlockList::iterator it = list_.begin();
+ while (it != list_.end()) {
+ if (it->position == position) {
+ // Duplicate block. Make sure they are the same length.
+ DCHECK_EQ(it->data->size(), length);
+ return;
+ }
+ if (it->position > position) {
+ // The block must not overlap with the next block.
+ DCHECK_LE(position + length, it->position);
+ break;
+ }
+ it++;
+ }
+ Block new_block;
+ new_block.position = position;
+ new_block.data = new DrainableIOBuffer(data, length);
+ list_.insert(it, new_block);
+
+ ComputeBytesReceived();
+}
+
+int ReceivedBlockList::ReadBytes(IOBuffer* buffer, int size) {
+ int bytes_read = 0;
+
+ LogBlockList();
+
+ while (size > 0 && list_.size()) {
+ BlockList::iterator it = list_.begin();
+ if (it->position != current_position_)
+ break;
+ int bytes = std::min(size, it->data->BytesRemaining());
+ memcpy(buffer->data() + bytes_read, it->data->data(), bytes);
+ it->data->DidConsume(bytes);
+ size -= bytes;
+ bytes_read += bytes;
+ if (!it->data->BytesRemaining()) {
+ current_position_ += it->data->size();
+ list_.erase(it);
+ }
+ }
+ return bytes_read;
+}
+
+void ReceivedBlockList::ComputeBytesReceived() {
+ bytes_received_ = current_position_ + bytes_available();
+}
+
+void ReceivedBlockList::LogBlockList() const {
+ LOG(INFO) << "Received Blocks: " << list_.size();
+ std::string msg;
+ std::ostringstream stream(msg);
+ for (size_t index = 0; index < list_.size(); ++index)
+ stream << "(" << list_[index].position
+ << "," << list_[index].data->size() << ")";
+ LOG(INFO) << stream.str();
+}
+
+
+} // namespace net
diff --git a/net/curvecp/received_block_list.h b/net/curvecp/received_block_list.h
new file mode 100644
index 0000000..4c05ecf
--- /dev/null
+++ b/net/curvecp/received_block_list.h
@@ -0,0 +1,70 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#ifndef NET_CURVECP_RECEIVED_BLOCK_LIST_H_
+#define NET_CURVECP_RECEIVED_BLOCK_LIST_H_
+#pragma once
+
+#include <deque>
+
+#include "base/memory/ref_counted.h"
+
+namespace net {
+
+class DrainableIOBuffer;
+class IOBuffer;
+class IOBufferWithSize;
+
+// A ReceivedBlockList manages everything needed to track a set of of incoming
+// blocks. It can coalesce continuous blocks and locate holes in the received
+// data.
+// Note: this list does not expect to receive overlapping blocks such as:
+// Block 1: bytes 10-20
+// Block 2: bytes 15-25
+class ReceivedBlockList {
+ public:
+ ReceivedBlockList();
+ ~ReceivedBlockList();
+
+ // Returns the total number of bytes received in order.
+ int bytes_received() const { return bytes_received_; }
+
+ // Returns the number of currently available bytes to read.
+ int bytes_available() const;
+
+ // Adds an incoming block into the list of received blocks.
+ void AddBlock(int64 position, IOBuffer* data, int length);
+
+ // Reads data from the buffer starting at |current_position()|.
+ // |buffer| is the buffer to fill.
+ // |size| is the maximum number of bytes to be filled into |buffer|.
+ // Returns the number of bytes read.
+ int ReadBytes(IOBuffer* buffer, int size);
+
+ // Gets the current position of the received data in the stream.
+ // This is for testing only.
+ int current_position() const { return current_position_; }
+
+ // Debugging: Writes the entire blocklist to the log.
+ void LogBlockList() const;
+
+ private:
+ typedef struct {
+ int64 position; // Position of this block.
+ scoped_refptr<DrainableIOBuffer> data; // The data.
+ } Block;
+ typedef std::deque<Block> BlockList;
+
+ // Recomputes |bytes_received_|.
+ void ComputeBytesReceived();
+
+ int64 current_position_;
+ int64 bytes_received_;
+ BlockList list_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReceivedBlockList);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_RECEIVED_BLOCK_LIST_H_
diff --git a/net/curvecp/rtt_and_send_rate_calculator.cc b/net/curvecp/rtt_and_send_rate_calculator.cc
new file mode 100644
index 0000000..da2be78
--- /dev/null
+++ b/net/curvecp/rtt_and_send_rate_calculator.cc
@@ -0,0 +1,172 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <stdlib.h>
+
+#include "net/curvecp/rtt_and_send_rate_calculator.h"
+
+namespace net {
+
+RttAndSendRateCalculator::RttAndSendRateCalculator()
+ : rtt_latest_(0),
+ rtt_average_(0),
+ rtt_deviation_(0),
+ rtt_lowwater_(0),
+ rtt_highwater_(0),
+ rtt_timeout_(base::Time::kMicrosecondsPerSecond),
+ send_rate_(1000000),
+ rtt_seenrecenthigh_(0),
+ rtt_seenrecentlow_(0),
+ rtt_seenolderhigh_(0),
+ rtt_seenolderlow_(0),
+ rtt_phase_(false) {
+}
+
+void RttAndSendRateCalculator::OnMessage(int32 rtt_us) {
+ UpdateRTT(rtt_us);
+ AdjustSendRate();
+}
+
+void RttAndSendRateCalculator::OnTimeout() {
+ base::TimeDelta time_since_last_loss = last_sample_time_ - last_loss_time_;
+ if (time_since_last_loss.InMilliseconds() < 4 * rtt_timeout_)
+ return;
+ send_rate_ *= 2;
+ last_loss_time_ = last_sample_time_;
+ last_edge_time_ = last_sample_time_;
+}
+
+// Updates RTT
+void RttAndSendRateCalculator::UpdateRTT(int32 rtt_us) {
+ rtt_latest_ = rtt_us;
+ last_sample_time_ = base::TimeTicks::Now();
+
+ // Initialize for the first sample.
+ if (!rtt_average_) {
+ rtt_average_ = rtt_latest_;
+ rtt_deviation_ = rtt_latest_;
+ rtt_highwater_ = rtt_latest_;
+ rtt_lowwater_ = rtt_latest_;
+ }
+
+ // Jacobson's retransmission timeout calculation.
+ int32 rtt_delta = rtt_latest_ - rtt_average_;
+ rtt_average_ += rtt_delta / 8;
+ if (rtt_delta < 0)
+ rtt_delta = -rtt_delta;
+ rtt_delta -= rtt_deviation_;
+ rtt_deviation_ += rtt_delta / 4;
+ rtt_timeout_ = rtt_average_ + (4 * rtt_deviation_);
+
+ // Adjust for delayed acks with anti-spiking.
+ // TODO(mbelshe): this seems high.
+ rtt_timeout_ += 8 * send_rate_;
+
+ // Recognize the top and bottom of the congestion cycle.
+ rtt_delta = rtt_latest_ - rtt_highwater_;
+ rtt_highwater_ += rtt_delta / 1024;
+
+ rtt_delta = rtt_latest_ - rtt_lowwater_;
+ if (rtt_delta > 0)
+ rtt_lowwater_ += rtt_delta / 8192;
+ else
+ rtt_lowwater_ += rtt_delta / 256;
+}
+
+void RttAndSendRateCalculator::AdjustSendRate() {
+ last_sample_time_ = base::TimeTicks::Now();
+
+ base::TimeDelta time = last_sample_time_ - last_send_adjust_time_;
+
+ // We only adjust the send rate approximately every 16 samples.
+ if (time.InMicroseconds() < 16 * send_rate_)
+ return;
+
+ if (rtt_average_ >
+ rtt_highwater_ + (5 * base::Time::kMicrosecondsPerMillisecond))
+ rtt_seenrecenthigh_ = true;
+ else if (rtt_average_ < rtt_lowwater_)
+ rtt_seenrecentlow_ = true;
+
+ last_send_adjust_time_ = last_sample_time_;
+
+ // If too much time has elapsed, re-initialize the send_rate.
+ if (time.InMicroseconds() > 10 * base::Time::kMicrosecondsPerSecond) {
+ send_rate_ = base::Time::kMicrosecondsPerSecond; // restart.
+ send_rate_ += randommod(send_rate_ / 8);
+ }
+
+ // TODO(mbelshe): Why is 128us a lower bound?
+ if (send_rate_ >= 128) {
+ // Additive increase: adjust 1/N by a constant c.
+ // rtt-fair additive increase: adjust 1/N by a constant c every nanosec.
+ // approximation: adjust 1/N by cN every N nanoseconds.
+
+ // TODO(mbelshe): he used c == 2^-51. for nanosecs.
+ // I use c == 2^31, for microsecs.
+
+ // i.e. N <- 1/(1/N + cN) = N/(1 + cN^2) every N nanosec.
+ if (false && send_rate_ < 16384) {
+ // N/(1+cN^2) approx N - cN^3
+ // TODO(mbelshe): note that he is using (cN)^3 here, not what he wrote.
+ int32 msec = send_rate_ / 128;
+ send_rate_ -= msec * msec * msec;
+ } else {
+ double d = send_rate_;
+ send_rate_ = d / (1 + d * d / 2147483648.0); // 2^31
+ }
+ }
+
+ if (rtt_phase_ == false) {
+ if (rtt_seenolderhigh_) {
+ rtt_phase_ = true;
+ last_edge_time_ = last_sample_time_;
+ send_rate_ += randommod(send_rate_ / 4);
+ }
+ } else {
+ if (rtt_seenolderlow_) {
+ rtt_phase_ = false;
+ }
+ }
+
+ rtt_seenolderhigh_ = rtt_seenrecenthigh_;
+ rtt_seenolderlow_ = rtt_seenrecentlow_;
+ rtt_seenrecenthigh_ = false;
+ rtt_seenrecentlow_ = false;
+
+ AttemptToDoubleSendRate();
+}
+
+void RttAndSendRateCalculator::AttemptToDoubleSendRate() {
+ base::TimeDelta time_since_edge = last_sample_time_ - last_edge_time_;
+ base::TimeDelta time_since_doubling =
+ last_sample_time_ - last_doubling_time_;
+
+
+ int32 threshold = 0;
+ if (time_since_edge.InMicroseconds() <
+ base::Time::kMicrosecondsPerSecond * 60) {
+ threshold = (4 * send_rate_) +
+ (64 * rtt_timeout_) +
+ (5 * base::Time::kMicrosecondsPerSecond);
+ } else {
+ threshold = (4 * send_rate_) +
+ (2 * rtt_timeout_);
+ }
+ if (time_since_doubling.InMicroseconds() < threshold)
+ return;
+
+ if (send_rate_ < 64)
+ return;
+
+ send_rate_ /= 2;
+ last_doubling_time_ = last_sample_time_;
+}
+
+// returns a random number from 0 to val-1.
+int32 RttAndSendRateCalculator::randommod(int32 val) {
+ return rand() % val;
+}
+
+} // namespace net
diff --git a/net/curvecp/rtt_and_send_rate_calculator.h b/net/curvecp/rtt_and_send_rate_calculator.h
new file mode 100644
index 0000000..ad7aca4
--- /dev/null
+++ b/net/curvecp/rtt_and_send_rate_calculator.h
@@ -0,0 +1,72 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_
+#define NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_
+#pragma once
+
+#include "base/basictypes.h"
+#include "base/time.h"
+
+namespace net {
+
+// Calculator for tracking the current RTT value.
+// Measurements are in microseconds.
+class RttAndSendRateCalculator {
+ public:
+ RttAndSendRateCalculator();
+
+ // Returns the current, average RTT, in microseconds.
+ int32 rtt_average() const { return rtt_average_; }
+
+ // Returns the current, RTT timeout, in microseconds.
+ int32 rtt_timeout() const { return rtt_timeout_; }
+
+ // The current send rate.
+ // Measurement is in microseconds between sends.
+ int32 send_rate() const { return send_rate_; }
+
+ int32 rtt_hi() const { return rtt_highwater_; }
+ int32 rtt_lo() const { return rtt_lowwater_; }
+
+ // Called when a new RTT sample is measured.
+ void OnMessage(int32 rtt_us);
+
+ // Adjusts the sendrate when a timeout condition has occurred.
+ void OnTimeout();
+
+ private:
+ // Updates RTT
+ void UpdateRTT(int32 rtt_us);
+
+ // Adjusts the send rate when a message is received.
+ void AdjustSendRate();
+
+ void AttemptToDoubleSendRate();
+
+ // returns a random number from 0 to val-1.
+ int32 randommod(int32 val);
+
+ int32 rtt_latest_; // The most recent RTT
+ int32 rtt_average_;
+ int32 rtt_deviation_;
+ int32 rtt_lowwater_;
+ int32 rtt_highwater_;
+ int32 rtt_timeout_;
+ int32 send_rate_;
+ base::TimeTicks last_sample_time_;
+ base::TimeTicks last_send_adjust_time_;
+ base::TimeTicks last_edge_time_;
+ base::TimeTicks last_doubling_time_;
+ base::TimeTicks last_loss_time_;
+ bool rtt_seenrecenthigh_;
+ bool rtt_seenrecentlow_;
+ bool rtt_seenolderhigh_;
+ bool rtt_seenolderlow_;
+ bool rtt_phase_;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_
diff --git a/net/curvecp/sent_block_list.cc b/net/curvecp/sent_block_list.cc
new file mode 100644
index 0000000..2eb55dc
--- /dev/null
+++ b/net/curvecp/sent_block_list.cc
@@ -0,0 +1,133 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/sent_block_list.h"
+
+#include "base/logging.h"
+#include "net/base/io_buffer.h"
+
+namespace net {
+
+static const size_t kMaxBlocks = 128;
+
+SentBlockList::SentBlockList()
+ : current_message_id_(1),
+ send_sequence_number_(0) {
+}
+
+SentBlockList::~SentBlockList() {
+}
+
+int64 SentBlockList::CreateBlock(IOBufferWithSize* buffer) {
+ DCHECK_LT(0, buffer->size());
+
+ if (is_full())
+ return -1;
+
+ Block new_block;
+ new_block.position = send_sequence_number_;
+ new_block.length = buffer->size();
+ new_block.transmissions = 0;
+ new_block.last_message_id = 0;
+ new_block.data= buffer;
+
+ send_sequence_number_ += buffer->size();
+ list_.push_back(new_block);
+ return new_block.position;
+}
+
+bool SentBlockList::MarkBlockSent(int64 position, int64 message_id) {
+ int index = FindBlock(position);
+ if (index < 0) {
+ NOTREACHED();
+ return false;
+ }
+ list_[index].last_message_id = message_id;
+ list_[index].transmissions++;
+ list_[index].last_sent_time = base::TimeTicks::Now();
+ return true;
+}
+
+void SentBlockList::AcknowledgeBlocks(int64 begin_range, int64 end_range) {
+ if (begin_range == end_range)
+ return;
+
+ // TODO(mbelshe): use a better data structure.
+ LOG(ERROR) << "ACK of: " << begin_range << " to " << end_range;
+
+ BlockList::iterator it = list_.begin();
+ while (it != list_.end()) {
+ int64 position = it->position;
+ int64 length = it->length;
+ if (position >= begin_range && (position + length) <= end_range) {
+ list_.erase(it);
+ it = list_.begin(); // iterator was invalidated, so go to start of list.
+ continue;
+ }
+
+ // Verify we didn't have a partial block acknowledgement.
+ CHECK(position < begin_range || position >= end_range);
+ CHECK((position + length) < begin_range || (position + length) > end_range);
+ it++;
+ }
+}
+
+int64 SentBlockList::GetNewMessageId() {
+ return current_message_id_++;
+}
+
+IOBufferWithSize* SentBlockList::FindBlockByPosition(int64 position) const {
+ int index = FindBlock(position);
+ if (index < 0)
+ return NULL;
+ return list_[index].data;
+}
+
+base::TimeTicks SentBlockList::FindLastSendTime(int64 last_message_id) const {
+ for (size_t index = 0; index < list_.size(); ++index)
+ if (list_[index].last_message_id == last_message_id)
+ return list_[index].last_sent_time;
+ return base::TimeTicks();
+}
+
+int SentBlockList::FindBlock(int64 position) const {
+ for (size_t index = 0; index < list_.size(); ++index)
+ if (list_[index].position == position)
+ return index;
+ return -1;
+}
+
+int64 SentBlockList::FindPositionOfOldestSentBlock() const {
+ base::TimeTicks oldest_time;
+ int64 position = -1;
+
+ LogBlockList();
+
+ // Walks the entire list.
+ for (size_t index = 0; index < list_.size(); ++index) {
+ base::TimeTicks last_sent_time = list_[index].last_sent_time;
+ if (!last_sent_time.is_null()) {
+ if (last_sent_time < oldest_time || oldest_time.is_null()) {
+ oldest_time = last_sent_time;
+ position = list_[index].position;
+ }
+ }
+ }
+ return position;
+}
+bool SentBlockList::is_full() const {
+ return list_.size() == kMaxBlocks;
+}
+
+void SentBlockList::LogBlockList() const {
+ LOG(INFO) << "Sent Blocks: " << list_.size();
+ std::string msg;
+ std::ostringstream stream(msg);
+ for (size_t index = 0; index < list_.size(); ++index)
+ stream << "(" << list_[index].position
+ << "," << list_[index].length << ")";
+ LOG(INFO) << stream.str();
+}
+
+} // namespace net
diff --git a/net/curvecp/sent_block_list.h b/net/curvecp/sent_block_list.h
new file mode 100644
index 0000000..c63e8bfd
--- /dev/null
+++ b/net/curvecp/sent_block_list.h
@@ -0,0 +1,95 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_SENT_BLOCK_LIST_H_
+#define NET_CURVECP_SENT_BLOCK_LIST_H_
+#pragma once
+
+#include <vector>
+
+#include "base/memory/ref_counted.h"
+#include "base/time.h"
+
+namespace net {
+
+class IOBufferWithSize;
+
+// A SentBlockList manages everything needed to track a set of of blocks
+// which have been sent. It can assign new ids to new blocks, find pending
+// blocks by id, or discard blocks after they have been acknowledged by the
+// remote.
+class SentBlockList {
+ public:
+ SentBlockList();
+ ~SentBlockList();
+
+ // Creates a new block to be tracked.
+ // On success, returns the unique position of this buffer within the stream,
+ // which can be used to lookup this block later.
+ // Returns -1 if the blocklist is already full.
+ int64 CreateBlock(IOBufferWithSize* buffer);
+
+ // Tracks that a block has been sent.
+ // |position| is the identifier of the block being sent.
+ // |message_id| is the id of the message sent containing the block.
+ // Returns false if no block for |position| exists.
+ bool MarkBlockSent(int64 position, int64 message_id);
+
+ // Acknowledges ranges from |begin_range| to |end_range|.
+ // Partial acknowledgements (e.g. where the ack range spans a partial
+ // block) are not supported.
+ // Removes blocks covered by these ranges from tracking.
+ void AcknowledgeBlocks(int64 begin_range, int64 end_range);
+
+ // Returns a new, unique message id.
+ int64 GetNewMessageId();
+
+ // Find a block based on its |position|.
+ // Returns the block, if found, or NULL otherwise.
+ IOBufferWithSize* FindBlockByPosition(int64 position) const;
+
+ // Find the last send time of a block, based the id of the last
+ // message to send it.
+ base::TimeTicks FindLastSendTime(int64 last_message_id) const;
+
+ // Returns the position of the oldest sent block.
+ int64 FindPositionOfOldestSentBlock() const;
+
+ // Returns the number of blocks in the list.
+ size_t size() const { return list_.size(); }
+
+ // Returns true if the list is full and cannot take new blocks.
+ bool is_full() const;
+
+ private:
+ // An UnackedBlock is a block of application data which has been sent in a
+ // message, but not acknowledged by the other side yet. We track this
+ // because we may need to retransmit.
+ typedef struct {
+ int64 position; // Offset of this block in the stream.
+ int64 length; // Number of bytes in this block.
+ int64 transmissions; // Count of transmissions of this block.
+ int64 last_message_id; // ID of last message sending this block.
+ base::TimeTicks last_sent_time; // Time of last message sending this block.
+ scoped_refptr<IOBufferWithSize> data;
+ } Block;
+
+ typedef std::vector<Block> BlockList;
+
+ // Finds a block by position and returns it index within |list_|.
+ int FindBlock(int64 position) const;
+
+ // Debugging: Writes the entire blocklist to the log.
+ void LogBlockList() const;
+
+ int64 current_message_id_; // The current message id.
+ int64 send_sequence_number_; // The sending sequence number.
+ BlockList list_;
+
+ DISALLOW_COPY_AND_ASSIGN(SentBlockList);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_SENT_BLOCK_LIST_H_
diff --git a/net/curvecp/server_messenger.cc b/net/curvecp/server_messenger.cc
new file mode 100644
index 0000000..53bad71
--- /dev/null
+++ b/net/curvecp/server_messenger.cc
@@ -0,0 +1,32 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/server_messenger.h"
+
+namespace net {
+
+ServerMessenger::ServerMessenger(Packetizer* packetizer,
+ Acceptor* acceptor)
+ : Messenger(packetizer),
+ acceptor_(acceptor) {
+ DCHECK(acceptor_);
+}
+
+ServerMessenger::ServerMessenger(ConnectionKey key,
+ Packetizer* packetizer,
+ Acceptor* acceptor)
+ : Messenger(packetizer),
+ acceptor_(acceptor) {
+ DCHECK(acceptor_);
+ key_ = key;
+}
+
+ServerMessenger::~ServerMessenger() {
+}
+
+void ServerMessenger::OnConnection(ConnectionKey key) {
+ acceptor_->OnAccept(key);
+}
+
+} // namespace net
diff --git a/net/curvecp/server_messenger.h b/net/curvecp/server_messenger.h
new file mode 100644
index 0000000..d7fb95a
--- /dev/null
+++ b/net/curvecp/server_messenger.h
@@ -0,0 +1,45 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_SERVER_MESSENGER_H_
+#define NET_CURVECP_SERVER_MESSENGER_H_
+#pragma once
+
+#include <list>
+
+#include "base/basictypes.h"
+#include "base/task.h"
+#include "net/curvecp/messenger.h"
+#include "net/curvecp/packetizer.h"
+#include "net/socket/socket.h"
+
+namespace net {
+
+// A specialized messenger for listening on a socket, accepting new sockets,
+// and dispatching io to secondary
+class ServerMessenger : public Messenger {
+ public:
+ class Acceptor {
+ public:
+ virtual ~Acceptor() {}
+ virtual void OnAccept(ConnectionKey key) = 0;
+ };
+
+ ServerMessenger(Packetizer* packetizer, Acceptor* acceptor);
+ ServerMessenger(ConnectionKey key,
+ Packetizer* packetizer,
+ Acceptor* acceptor);
+ virtual ~ServerMessenger();
+
+ // Override OnConnection to track incoming connections.
+ virtual void OnConnection(ConnectionKey key);
+
+ private:
+ Acceptor* acceptor_;
+ DISALLOW_COPY_AND_ASSIGN(ServerMessenger);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_SERVER_MESSENGER_H_
diff --git a/net/curvecp/server_packetizer.cc b/net/curvecp/server_packetizer.cc
new file mode 100644
index 0000000..4715a3b
--- /dev/null
+++ b/net/curvecp/server_packetizer.cc
@@ -0,0 +1,246 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/server_packetizer.h"
+
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/curvecp/protocol.h"
+#include "net/udp/udp_server_socket.h"
+
+namespace net {
+
+ServerPacketizer::ServerPacketizer()
+ : Packetizer(),
+ state_(NONE),
+ listener_(NULL),
+ read_buffer_(new IOBuffer(kMaxPacketLength)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &ServerPacketizer::OnReadComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_callback_(this, &ServerPacketizer::OnWriteComplete)) {
+}
+
+ServerPacketizer::~ServerPacketizer() {
+}
+
+int ServerPacketizer::Listen(const IPEndPoint& endpoint,
+ Packetizer::Listener* listener) {
+ DCHECK(!listener_);
+ listener_ = listener;
+ socket_.reset(new UDPServerSocket(NULL, NetLog::Source()));
+ int rv = socket_->Listen(endpoint);
+ if (rv != OK)
+ return rv;
+
+ return ReadPackets();
+}
+
+bool ServerPacketizer::Open(ConnectionKey key, Packetizer::Listener* listener) {
+ DCHECK(listener_map_.find(key) == listener_map_.end());
+ listener_map_[key] = listener;
+ return true;
+}
+
+int ServerPacketizer::SendMessage(ConnectionKey key,
+ const char* data,
+ size_t length,
+ CompletionCallback* callback) {
+ DCHECK(socket_.get());
+ DCHECK_LT(0u, length);
+ DCHECK_GT(kMaxPacketLength - sizeof(ServerMessagePacket), length);
+
+ ConnectionMap::const_iterator it = connection_map_.find(key);
+ if (it == connection_map_.end()) {
+ LOG(ERROR) << "Unknown connection key";
+ return ERR_FAILED; // No route to the client!
+ }
+ IPEndPoint endpoint = it->second;
+
+ // Build up a message packet.
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength));
+ ServerMessagePacket* packet =
+ reinterpret_cast<ServerMessagePacket*>(buffer->data());
+ memset(packet, 0, sizeof(ServerMessagePacket));
+ memcpy(packet->id, "RL3aNMXM", 8);
+ memcpy(&buffer->data()[sizeof(ServerMessagePacket)], data, length);
+ int packet_length = sizeof(ServerMessagePacket) + length;
+ int rv = socket_->SendTo(buffer, packet_length, endpoint, callback);
+ if (rv <= 0)
+ return rv;
+ CHECK_EQ(packet_length, rv);
+ return length; // The number of message bytes written.
+}
+
+void ServerPacketizer::Close(ConnectionKey key) {
+ ListenerMap::iterator it = listener_map_.find(key);
+ DCHECK(it != listener_map_.end());
+ listener_map_.erase(it);
+ socket_->Close();
+ socket_.reset(NULL);
+}
+
+int ServerPacketizer::GetPeerAddress(IPEndPoint* endpoint) const {
+ return socket_->GetPeerAddress(endpoint);
+}
+
+int ServerPacketizer::max_message_payload() const {
+ return kMaxMessageLength - sizeof(Message);
+}
+
+void ServerPacketizer::ProcessRead(int result) {
+ DCHECK_GT(result, 0);
+
+ // The smallest packet we can receive is a ClientMessagePacket.
+ if (result < static_cast<int>(sizeof(ClientMessagePacket)) ||
+ result > kMaxPacketLength)
+ return;
+
+ // Packets are always 16 byte padded.
+ if (result & 15)
+ return;
+
+ Packet *packet = reinterpret_cast<Packet*>(read_buffer_->data());
+ if (memcmp(packet, "QvnQ5Xl", 7))
+ return;
+
+ switch (packet->id[7]) {
+ case 'H':
+ HandleHelloPacket(packet, result);
+ break;
+ case 'I':
+ HandleInitiatePacket(packet, result);
+ break;
+ case 'M':
+ HandleClientMessagePacket(packet, result);
+ break;
+ }
+}
+
+void ServerPacketizer::HandleHelloPacket(Packet* packet, int length) {
+ if (length != sizeof(HelloPacket))
+ return;
+
+ LOG(ERROR) << "Received Hello Packet";
+
+ HelloPacket* hello_packet = reinterpret_cast<HelloPacket*>(packet);
+
+ // Handle HelloPacket
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(struct CookiePacket)));
+ struct CookiePacket* data =
+ reinterpret_cast<struct CookiePacket*>(buffer->data());
+ memset(data, 0, sizeof(struct CookiePacket));
+ memcpy(data->id, "RL3aNMXK", 8);
+ memcpy(data->client_extension, hello_packet->client_extension, 16);
+ // TODO(mbelshe) Fill in the rest of the CookiePacket fields.
+
+ // XXXMB - Can't have two pending writes at the same time...
+ int rv = socket_->SendTo(buffer, sizeof(struct CookiePacket), recv_address_,
+ &write_callback_);
+ DCHECK(rv == ERR_IO_PENDING || rv == sizeof(struct CookiePacket));
+}
+
+void ServerPacketizer::HandleInitiatePacket(Packet* packet, int length) {
+ // Handle InitiatePacket
+ LOG(ERROR) << "Received Initiate Packet";
+
+ InitiatePacket* initiate_packet = reinterpret_cast<InitiatePacket*>(packet);
+
+ // We have an active connection.
+ AddConnection(initiate_packet->client_shortterm_public_key, recv_address_);
+
+ listener_->OnConnection(initiate_packet->client_shortterm_public_key);
+
+ // The initiate packet can carry a message.
+ int message_length = length - sizeof(InitiatePacket);
+ DCHECK_LT(0, message_length);
+ if (message_length) {
+ uchar* data = reinterpret_cast<uchar*>(packet);
+ HandleMessage(initiate_packet->client_shortterm_public_key,
+ &data[sizeof(InitiatePacket)],
+ message_length);
+ }
+}
+
+void ServerPacketizer::HandleClientMessagePacket(Packet* packet, int length) {
+ // Handle Message
+ if (length < 16)
+ return;
+
+ const int kMaxMessagePacketLength =
+ kMaxMessageLength + sizeof(ClientMessagePacket);
+ if (length > static_cast<int>(kMaxMessagePacketLength))
+ return;
+
+ ClientMessagePacket* message_packet =
+ reinterpret_cast<ClientMessagePacket*>(packet);
+
+ int message_length = length - sizeof(ClientMessagePacket);
+ DCHECK_LT(0, message_length);
+ if (message_length) {
+ uchar* data = reinterpret_cast<uchar*>(packet);
+ HandleMessage(message_packet->client_shortterm_public_key,
+ &data[sizeof(ClientMessagePacket)],
+ message_length);
+ }
+}
+
+void ServerPacketizer::HandleMessage(ConnectionKey key,
+ unsigned char* msg,
+ int length) {
+ ListenerMap::iterator it = listener_map_.find(key);
+ if (it == listener_map_.end()) {
+ // Received a message for a closed connection.
+ return;
+ }
+
+ // Decode the message here
+
+ Packetizer::Listener* listener = it->second;
+ listener->OnMessage(this, key, msg, length);
+}
+
+void ServerPacketizer::AddConnection(ConnectionKey key,
+ const IPEndPoint& endpoint) {
+ DCHECK(connection_map_.find(key) == connection_map_.end());
+ connection_map_[key] = endpoint;
+}
+
+void ServerPacketizer::RemoveConnection(ConnectionKey key) {
+ DCHECK(connection_map_.find(key) != connection_map_.end());
+ connection_map_.erase(key);
+}
+
+int ServerPacketizer::ReadPackets() {
+ DCHECK(socket_.get());
+
+ int rv;
+ while (true) {
+ rv = socket_->RecvFrom(read_buffer_,
+ kMaxPacketLength,
+ &recv_address_,
+ &read_callback_);
+ if (rv <= 0) {
+ if (rv != ERR_IO_PENDING)
+ LOG(ERROR) << "Error reading listen socket: " << rv;
+ return rv;
+ }
+
+ ProcessRead(rv);
+ }
+
+ return rv;
+}
+
+void ServerPacketizer::OnReadComplete(int result) {
+ if (result > 0)
+ ProcessRead(result);
+ ReadPackets();
+}
+
+void ServerPacketizer::OnWriteComplete(int result) {
+ // TODO(mbelshe): do we need to do anything?
+}
+
+}
diff --git a/net/curvecp/server_packetizer.h b/net/curvecp/server_packetizer.h
new file mode 100644
index 0000000..2fa60c7
--- /dev/null
+++ b/net/curvecp/server_packetizer.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_SERVER_PACKETIZER_H_
+#define NET_CURVECP_SERVER_PACKETIZER_H_
+#pragma once
+
+#include <map>
+
+#include "base/memory/ref_counted.h"
+#include "net/base/completion_callback.h"
+#include "net/base/ip_endpoint.h"
+#include "net/curvecp/packetizer.h"
+#include "net/curvecp/protocol.h"
+
+namespace net {
+
+class IOBuffer;
+class IPEndPoint;
+class UDPServerSocket;
+
+class ServerPacketizer : public base::RefCounted<ServerPacketizer>,
+ public Packetizer {
+ public:
+ ServerPacketizer();
+ virtual ~ServerPacketizer();
+
+ // Listen for new connections from the Packetizer.
+ int Listen(const IPEndPoint& endpoint, Packetizer::Listener* listener);
+
+ // Register a listener for a connection.
+ // To revoke the registration, call Close().
+ bool Open(ConnectionKey key, Packetizer::Listener* listener);
+
+ // Packetizer methods
+ virtual int SendMessage(ConnectionKey key,
+ const char* data,
+ size_t length,
+ CompletionCallback* callback);
+ virtual void Close(ConnectionKey key);
+ virtual int GetPeerAddress(IPEndPoint* endpoint) const;
+ virtual int max_message_payload() const;
+
+ private:
+ enum State {
+ NONE, // The initial state, before listen.
+ LISTENING, // Listening for packets.
+ };
+
+ typedef std::map<ConnectionKey, Packetizer::Listener*> ListenerMap;
+ typedef std::map<ConnectionKey, IPEndPoint> ConnectionMap;
+
+ // Callbacks when an internal IO is completed.
+ void OnReadComplete(int result);
+ void OnWriteComplete(int result);
+
+ // Process the result of a Read operation.
+ void ProcessRead(int bytes_read);
+
+ // Read packets until an error occurs.
+ int ReadPackets();
+
+ // Handlers for recognized packets.
+ void HandleHelloPacket(Packet* packet, int length);
+ void HandleInitiatePacket(Packet* packet, int length);
+ void HandleClientMessagePacket(Packet* packet, int length);
+ void HandleMessage(ConnectionKey key, unsigned char* message, int length);
+
+ // Manage the list of known "connections".
+ void AddConnection(ConnectionKey key, const IPEndPoint& endpoint);
+ // TODO(mbelshe): need to trim out aged/idle connections
+ void RemoveConnection(ConnectionKey key);
+ bool IsActiveConnection(ConnectionKey key);
+
+ State state_;
+ scoped_ptr<UDPServerSocket> socket_;
+ Packetizer::Listener* listener_; // Accept listener.
+
+ scoped_refptr<IOBuffer> read_buffer_;
+ IPEndPoint recv_address_;
+
+ // The connection map tracks active client addresses which are known
+ // to this server.
+ ConnectionMap connection_map_;
+ // The listener map tracks active message listeners known to the packetizer.
+ ListenerMap listener_map_;
+
+ CompletionCallbackImpl<ServerPacketizer> read_callback_;
+ CompletionCallbackImpl<ServerPacketizer> write_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(ServerPacketizer);
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_SERVER_PACKETIZER_H_
diff --git a/net/curvecp/test_client.cc b/net/curvecp/test_client.cc
new file mode 100644
index 0000000..f71f162
--- /dev/null
+++ b/net/curvecp/test_client.cc
@@ -0,0 +1,178 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/test_client.h"
+
+#include <string>
+#include <deque>
+#include <vector>
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "net/base/address_list.h"
+#include "net/base/host_resolver.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_log.h"
+#include "net/curvecp/curvecp_client_socket.h"
+
+namespace net {
+
+TestClient::TestClient()
+ : socket_(NULL),
+ errors_(0),
+ bytes_to_send_(0),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ connect_callback_(this, &TestClient::OnConnectComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &TestClient::OnReadComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_callback_(this, &TestClient::OnWriteComplete)),
+ finished_callback_(NULL) {
+}
+
+TestClient::~TestClient() {
+ if (socket_) {
+ // TODO(mbelshe): The CurveCPClientSocket has a method called Disconnect.
+ // The CurveCPServerSocket has a method called Close.
+ // Unify them into either Close or Disconnect!!!
+ socket_->Disconnect();
+ socket_ = NULL;
+ }
+}
+
+bool TestClient::Start(const HostPortPair& server_host_port_pair,
+ int bytes_to_send,
+ CompletionCallback* callback) {
+ DCHECK(!socket_);
+ DCHECK(!finished_callback_);
+
+ finished_callback_ = callback;
+ bytes_to_read_ = bytes_to_send_ = bytes_to_send;
+
+ scoped_ptr<HostResolver> system_host_resolver(
+ CreateSystemHostResolver(1, NULL));
+ SingleRequestHostResolver host_resolver(system_host_resolver.get());
+ HostResolver::RequestInfo request(server_host_port_pair);
+ AddressList addresses;
+ int rv = host_resolver.Resolve(request, &addresses, NULL, BoundNetLog());
+ if (rv != OK) {
+ LOG(ERROR) << "Could not resolve host";
+ return false;
+ }
+
+ socket_ = new CurveCPClientSocket(addresses, NULL, NetLog::Source());
+ rv = socket_->Connect(&connect_callback_);
+ if (rv == ERR_IO_PENDING)
+ return true;
+ OnConnectComplete(rv);
+ return rv == OK;
+}
+
+void TestClient::OnConnectComplete(int result) {
+ LOG(ERROR) << "Connect complete";
+ if (result < 0) {
+ LOG(ERROR) << "Connect failure: " << result;
+ errors_++;
+ Finish(result);
+ return;
+ }
+
+ DCHECK(bytes_to_send_); // We should have data to send.
+ ReadData();
+
+ DCHECK_EQ(bytes_to_send_, bytes_to_read_);
+
+ SendData();
+}
+
+void TestClient::OnReadComplete(int result) {
+ if (result <= 0) {
+ LOG(ERROR) << "Read failure: " << result;
+ errors_++;
+ Finish(result);
+ return;
+ }
+
+ if (!received_stream_.VerifyBytes(read_buffer_->data(), result)) {
+ if (!errors_) // Only log the first error.
+ LOG(ERROR) << "Client Received corrupt receive data!";
+ errors_++;
+ }
+
+ read_buffer_ = NULL;
+ bytes_to_read_ -= result;
+
+ // Now read more data...
+ if (bytes_to_read_)
+ ReadData();
+ else
+ Finish(OK);
+}
+
+void TestClient::OnWriteComplete(int result) {
+ LOG(ERROR) << "Write complete (remaining = " << bytes_to_send_ << ")";
+ if (result <= 0) {
+ errors_++;
+ Finish(result);
+ return;
+ }
+
+ write_buffer_->DidConsume(result);
+ bytes_to_send_ -= result;
+ if (!write_buffer_->BytesRemaining())
+ write_buffer_ = NULL;
+
+ if (bytes_to_send_)
+ SendData();
+}
+
+void TestClient::ReadData() {
+ DCHECK(!read_buffer_.get());
+ read_buffer_ = new IOBuffer(kMaxMessage);
+
+ int rv;
+ do {
+ rv = socket_->Read(read_buffer_, kMaxMessage, &read_callback_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ OnReadComplete(rv); // Complete the read manually
+ } while (rv > 0);
+}
+
+void TestClient::SendData() {
+ DCHECK(bytes_to_send_); // We should have data to send.
+ const int kWriteChunkSize = 777; // 777 is more abusive
+
+ do {
+ if (!write_buffer_.get()) {
+ int bytes_to_send = std::min(kWriteChunkSize, bytes_to_send_);
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(bytes_to_send));
+ sent_stream_.GetBytes(buffer->data(), bytes_to_send);
+ write_buffer_ = new DrainableIOBuffer(buffer, bytes_to_send);
+ }
+
+ int rv = socket_->Write(write_buffer_,
+ write_buffer_->BytesRemaining(),
+ &write_callback_);
+ if (rv == ERR_IO_PENDING)
+ return;
+
+ write_buffer_->DidConsume(rv);
+ bytes_to_send_ -= rv;
+ if (!write_buffer_->BytesRemaining())
+ write_buffer_ = NULL;
+ } while (bytes_to_send_);
+}
+
+void TestClient::Finish(int result) {
+ DCHECK(finished_callback_);
+
+ LOG(ERROR) << "TestClient Done!";
+ CompletionCallback* callback = finished_callback_;
+ finished_callback_ = NULL;
+ callback->Run(result);
+}
+
+} // namespace net
diff --git a/net/curvecp/test_client.h b/net/curvecp/test_client.h
new file mode 100644
index 0000000..08f496f
--- /dev/null
+++ b/net/curvecp/test_client.h
@@ -0,0 +1,70 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_TEST_CLIENT_H_
+#define NET_CURVECP_TEST_CLIENT_H_
+#pragma once
+
+#include "base/task.h"
+#include "net/base/completion_callback.h"
+#include "net/base/host_port_pair.h"
+#include "net/base/io_buffer.h"
+#include "net/curvecp/test_data_stream.h"
+
+namespace net {
+
+class CurveCPClientSocket;
+
+// The TestClient connects to a test server, sending a stream of verifiable
+// bytes. The TestClient expects to get the same bytes echoed back from the
+// TestServer. After sending all bytes and receiving the echoes, the
+// TestClient closes itself.
+//
+// Several hooks are provided for testing edge cases and failures.
+class TestClient {
+ public:
+ TestClient();
+ virtual ~TestClient();
+
+ // Starts the client, connecting to |server|.
+ // Client will send |bytes_to_send| bytes from the verifiable stream.
+ // When the client has received all echoed bytes from the server, or
+ // when an error occurs causing the client to stop, |callback| will be
+ // called with a net status code.
+ // Returns true if successful in starting the client.
+ bool Start(const HostPortPair& server,
+ int bytes_to_send,
+ CompletionCallback* callback);
+
+ // Returns the number of errors this server encountered.
+ int error_count() { return errors_; }
+
+ private:
+ static const int kMaxMessage = 1024;
+
+ void OnConnectComplete(int result);
+ void OnReadComplete(int result);
+ void OnWriteComplete(int result);
+
+ void ReadData();
+ void SendData();
+ void Finish(int result);
+
+ CurveCPClientSocket* socket_;
+ scoped_refptr<IOBuffer> read_buffer_;
+ scoped_refptr<DrainableIOBuffer> write_buffer_;
+ int errors_;
+ int bytes_to_read_;
+ int bytes_to_send_;
+ TestDataStream sent_stream_;
+ TestDataStream received_stream_;
+ CompletionCallbackImpl<TestClient> connect_callback_;
+ CompletionCallbackImpl<TestClient> read_callback_;
+ CompletionCallbackImpl<TestClient> write_callback_;
+ CompletionCallback* finished_callback_;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_TEST_CLIENT_H_
diff --git a/net/curvecp/test_data_stream.h b/net/curvecp/test_data_stream.h
new file mode 100644
index 0000000..d2bbc87
--- /dev/null
+++ b/net/curvecp/test_data_stream.h
@@ -0,0 +1,88 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_TEST_DATA_STREAM_H_
+#define NET_CURVECP_TEST_DATA_STREAM_H_
+#pragma once
+
+#include <string.h> // for memcpy()
+#include <algorithm>
+
+// This is a test class for generating an infinite stream of data which can
+// be verified independently to be the correct stream of data.
+
+namespace net {
+
+class TestDataStream {
+ public:
+ TestDataStream() {
+ Reset();
+ }
+
+ // Fill |buffer| with |length| bytes of data from the stream.
+ void GetBytes(char* buffer, int length) {
+ while (length) {
+ AdvanceIndex();
+ int bytes_to_copy = std::min(length, bytes_remaining_);
+ memcpy(buffer, buffer_ptr_, bytes_to_copy);
+ buffer += bytes_to_copy;
+ Consume(bytes_to_copy);
+ length -= bytes_to_copy;
+ }
+ }
+
+ // Verify that |buffer| contains the expected next |length| bytes from the
+ // stream. Returns true if correct, false otherwise.
+ bool VerifyBytes(const char *buffer, int length) {
+ while (length) {
+ AdvanceIndex();
+ int bytes_to_compare = std::min(length, bytes_remaining_);
+ if (memcmp(buffer, buffer_ptr_, bytes_to_compare))
+ return false;
+ Consume(bytes_to_compare);
+ length -= bytes_to_compare;
+ buffer += bytes_to_compare;
+ }
+ return true;
+ }
+
+ void Reset() {
+ index_ = 0;
+ bytes_remaining_ = 0;
+ buffer_ptr_ = buffer_;
+ }
+
+ private:
+ // If there is no data spilled over from the previous index, advance the
+ // index and fill the buffer.
+ void AdvanceIndex() {
+ if (bytes_remaining_ == 0) {
+ // Convert it to ascii, but don't bother to reverse it.
+ // (e.g. 12345 becomes "54321")
+ int val = index_++;
+ do {
+ buffer_[bytes_remaining_++] = (val % 10) + '0';
+ } while ((val /= 10) > 0);
+ buffer_[bytes_remaining_++] = '.';
+ }
+ }
+
+ // Consume data from the spill buffer.
+ void Consume(int bytes) {
+ bytes_remaining_ -= bytes;
+ if (bytes_remaining_)
+ buffer_ptr_ += bytes;
+ else
+ buffer_ptr_ = buffer_;
+ }
+
+ int index_;
+ int bytes_remaining_;
+ char buffer_[16];
+ char* buffer_ptr_;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_TEST_DATA_STREAM_H_
diff --git a/net/curvecp/test_server.cc b/net/curvecp/test_server.cc
new file mode 100644
index 0000000..a1ad4b2
--- /dev/null
+++ b/net/curvecp/test_server.cc
@@ -0,0 +1,144 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/curvecp/test_server.h"
+
+#include <string>
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
+#include "net/curvecp/curvecp_server_socket.h"
+
+namespace net {
+
+TestServer::TestServer()
+ : socket_(NULL),
+ errors_(0) {
+}
+
+TestServer::~TestServer() {
+ if (socket_) {
+ socket_->Close();
+ socket_ = NULL;
+ }
+}
+
+bool TestServer::Start(int port) {
+ IPAddressNumber ip_number;
+ std::string ip_str("0.0.0.0");
+ if (!ParseIPLiteralToNumber(ip_str, &ip_number)) {
+ LOG(ERROR) << "Bad IP Address";
+ return false;
+ }
+ IPEndPoint bind_address(ip_number, port);
+
+ DCHECK(!socket_);
+ socket_ = new CurveCPServerSocket(NULL, NetLog::Source());
+ int rv = socket_->Listen(bind_address, this);
+ if (rv < ERR_IO_PENDING) {
+ LOG(ERROR) << "Listen on port " << port << " failed: " << rv;
+ return false;
+ }
+ return true;
+}
+
+void TestServer::RunWithParams(const Tuple1<int>& params) {
+ int status = params.a;
+ LOG(INFO) << "Callback! " << status;
+ if (status < 0)
+ MessageLoop::current()->Quit();
+}
+
+void TestServer::OnAccept(CurveCPServerSocket* new_socket) {
+ DCHECK(new_socket);
+ LOG(ERROR) << "Accepted socket! Starting Echo Server";
+ EchoServer* new_server = new EchoServer();
+ new_server->Start(new_socket);
+}
+
+EchoServer::EchoServer()
+ : socket_(NULL),
+ bytes_received_(0),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &EchoServer::OnReadComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_callback_(this, &EchoServer::OnWriteComplete)) {
+}
+
+EchoServer::~EchoServer() {
+}
+
+void EchoServer::Start(CurveCPServerSocket* socket) {
+ DCHECK(!socket_);
+ socket_ = socket;
+
+ ReadData();
+ // Note: |this| could be deleted here.
+}
+
+void EchoServer::OnReadComplete(int result) {
+ LOG(INFO) << "Read complete: " << result;
+ if (result <= 0) {
+ delete this;
+ return;
+ }
+
+ bytes_received_ += result;
+ LOG(INFO) << "Server received " << result << "(" << bytes_received_ << ")";
+
+ if (!received_stream_.VerifyBytes(read_buffer_->data(), result)) {
+ LOG(ERROR) << "Server Received corrupt receive data!";
+ delete this;
+ return;
+ }
+
+ // Echo the read data back here.
+ DCHECK(!write_buffer_.get());
+ write_buffer_ = new DrainableIOBuffer(read_buffer_, result);
+ int rv = socket_->Write(write_buffer_, result, &write_callback_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ OnWriteComplete(rv);
+}
+
+void EchoServer::OnWriteComplete(int result) {
+ if (result <= 0) {
+ delete this;
+ return;
+ }
+
+ write_buffer_->DidConsume(result);
+ while (write_buffer_->BytesRemaining()) {
+ int rv = socket_->Write(write_buffer_,
+ write_buffer_->BytesRemaining(),
+ &write_callback_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ OnWriteComplete(rv);
+ }
+
+ // Now we can read more data.
+ write_buffer_ = NULL;
+ // read_buffer_ = NULL;
+ // ReadData();
+}
+
+void EchoServer::ReadData() {
+ DCHECK(!read_buffer_.get());
+ read_buffer_ = new IOBuffer(kMaxMessage);
+
+ int rv;
+ do {
+ rv = socket_->Read(read_buffer_, kMaxMessage, &read_callback_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ OnReadComplete(rv); // Complete the read manually
+ } while (rv > 0);
+}
+
+} // namespace net
diff --git a/net/curvecp/test_server.h b/net/curvecp/test_server.h
new file mode 100644
index 0000000..22c44e4
--- /dev/null
+++ b/net/curvecp/test_server.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_CURVECP_TEST_SERVER_H_
+#define NET_CURVECP_TEST_SERVER_H_
+#pragma once
+
+#include "base/task.h"
+#include "net/base/completion_callback.h"
+#include "net/curvecp/curvecp_server_socket.h"
+#include "net/curvecp/test_data_stream.h"
+
+namespace net {
+
+class DrainableIOBuffer;
+class EchoServer;
+class IOBuffer;
+
+// TestServer is the server which processes the listen socket.
+// It will create an EchoServer instance to handle each connection.
+class TestServer : public CompletionCallback,
+ public CurveCPServerSocket::Acceptor {
+ public:
+ TestServer();
+ virtual ~TestServer();
+
+ bool Start(int port);
+
+ // CompletionCallback methods:
+ virtual void RunWithParams(const Tuple1<int>& params);
+
+ // CurveCPServerSocket::Acceptor methods:
+ virtual void OnAccept(CurveCPServerSocket* new_socket);
+
+ // Returns the number of errors this server encountered.
+ int error_count() { return errors_; }
+
+ private:
+ CurveCPServerSocket* socket_;
+ int errors_;
+};
+
+
+// EchoServer does the actual server work for a connection.
+// This object self destructs after finishing its work.
+class EchoServer {
+ public:
+ EchoServer();
+ ~EchoServer();
+
+ // Start the Echo Server
+ void Start(CurveCPServerSocket* socket);
+
+ private:
+ void OnReadComplete(int result);
+ void OnWriteComplete(int result);
+
+ void ReadData();
+
+ private:
+ static const int kMaxMessage = 1024;
+ CurveCPServerSocket* socket_;
+ scoped_refptr<IOBuffer> read_buffer_;
+ scoped_refptr<DrainableIOBuffer> write_buffer_;
+ TestDataStream received_stream_;
+ int bytes_received_;
+ CompletionCallbackImpl<EchoServer> read_callback_;
+ CompletionCallbackImpl<EchoServer> write_callback_;
+};
+
+} // namespace net
+
+#endif // NET_CURVECP_TEST_SERVER_H_