diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-05-18 10:36:56 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-05-18 10:36:56 +0000 |
commit | b690e18ca906d699e05cddae14250ab40439fee3 (patch) | |
tree | d34539a899c988f422f1235c7b52c2d1eb81d7ba /net/curvecp | |
parent | 51c0220843bfc1939d2f1525f0fc0192b4926369 (diff) | |
download | chromium_src-b690e18ca906d699e05cddae14250ab40439fee3.zip chromium_src-b690e18ca906d699e05cddae14250ab40439fee3.tar.gz chromium_src-b690e18ca906d699e05cddae14250ab40439fee3.tar.bz2 |
An initial curvecp implementation. This implementation is not complete,
but is good enough to start collaboration.
BUG=none
TEST=curvecp_unittests
Review URL: http://codereview.chromium.org/7037022
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@85753 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/curvecp')
32 files changed, 3694 insertions, 0 deletions
diff --git a/net/curvecp/README b/net/curvecp/README new file mode 100644 index 0000000..edf04b52 --- /dev/null +++ b/net/curvecp/README @@ -0,0 +1,4 @@ +CurveCP: An implementation of http://curvecp.org/ + +This work is an attempt to build a secure, reliable transport protocol over UDP. +This work is a research experiment. diff --git a/net/curvecp/circular_buffer.cc b/net/curvecp/circular_buffer.cc new file mode 100644 index 0000000..9eca179 --- /dev/null +++ b/net/curvecp/circular_buffer.cc @@ -0,0 +1,77 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/circular_buffer.h" + +#include <algorithm> +#include "base/logging.h" + +namespace net { + +CircularBuffer::CircularBuffer(int capacity) + : capacity_(capacity), + head_(0), + length_(0) { + buffer_ = new char[capacity_]; +} + +CircularBuffer::~CircularBuffer() { + delete [] buffer_; +} + +int CircularBuffer::write(const char* data, int length) { + int bytes_written = 0; + + // We can't write more bytes than we have space for. + if (length > capacity_ - length_) + length = capacity_ - length_; + + // Check for space at end of buffer. + if (head_ + length_ < capacity_) { + bytes_written = std::min(capacity_ - head_ - length_, length); + int end_pos = head_ + length_; + + memcpy(buffer_ + end_pos, data, bytes_written); + length_ += bytes_written; + length -= bytes_written; + } + + if (!length) + return bytes_written; + + DCHECK_LT(length_, capacity_); // Buffer still has space. + DCHECK_GE(head_ + length_, capacity_); // Space is at the beginning. + + int start_pos = (head_ + length_) % capacity_; + memcpy(&buffer_[start_pos], &data[bytes_written], length); + bytes_written += length; + length_ += length; + return bytes_written; +} + +int CircularBuffer::read(char* data, int length) { + int bytes_read = 0; + + // We can't read more bytes than are in the buffer. + if (length > length_) + length = length_; + + while (length) { + DCHECK_LE(length, length_); + int span_end = std::min(capacity_, head_ + length); + int len = span_end - head_; + DCHECK_LE(len, length_); + memcpy(&data[bytes_read], &buffer_[head_], len); + bytes_read += len; + length -= len; + length_ -= len; + head_ += len; + DCHECK_LE(head_, capacity_); + if (head_ == capacity_) + head_ = 0; + } + return bytes_read; +} + +} // namespace net diff --git a/net/curvecp/circular_buffer.h b/net/curvecp/circular_buffer.h new file mode 100644 index 0000000..1c257f9 --- /dev/null +++ b/net/curvecp/circular_buffer.h @@ -0,0 +1,40 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_CIRCULAR_BUFFER_H_ +#define NET_CURVECP_CIRCULAR_BUFFER_H_ +#pragma once + +namespace net { + +// A circular buffer is a fixed sized buffer which can be read or written +class CircularBuffer { + public: + // Create a CircularBuffer with maximum size |capacity|. + explicit CircularBuffer(int capacity); + ~CircularBuffer(); + + int length() const { return length_; } + + // Writes data into the circular buffer. + // |data| is the bytes to write. + // |length| is the number of bytes to write. + // Returns the number of bytes written, which may be less than |length| or + // 0 if no space is available in the buffer. + int write(const char* data, int length); + + // Reads up to |length| bytes from the buffer into |data|. + // Returns the number of bytes read, or 0 if no data is available. + int read(char* data, int length); + + private: + int capacity_; + int head_; + int length_; + char* buffer_; +}; + +} // namespace net + +#endif // NET_CURVECP_CIRCULAR_BUFFER_H_ diff --git a/net/curvecp/client_packetizer.cc b/net/curvecp/client_packetizer.cc new file mode 100644 index 0000000..54efddd --- /dev/null +++ b/net/curvecp/client_packetizer.cc @@ -0,0 +1,385 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/client_packetizer.h" + +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/sys_addrinfo.h" +#include "net/curvecp/protocol.h" +#include "net/udp/udp_client_socket.h" + +namespace { + +const int kMaxHelloAttempts = 8; +const int kHelloTimeoutMs[kMaxHelloAttempts] = { + 1000, // 1 second, with 1.5x increase for each retry. + 1500, + 2250, + 3375, + 5063, + 7594, + 11391, + 17086, +}; + +} // namespace + +namespace net { + +ClientPacketizer::ClientPacketizer() + : Packetizer(), + next_state_(NONE), + listener_(NULL), + user_callback_(NULL), + current_address_(NULL), + hello_attempts_(0), + initiate_sent_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + io_callback_(this, &ClientPacketizer::OnIOComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST(timers_factory_(this)) { + // TODO(mbelshe): Initialize our keys and such properly. + // for now we use random values to keep them unique. + for (int i = 0; i < 32; ++i) + shortterm_public_key_[i] = rand() % 26 + 'a'; +} + +ClientPacketizer::~ClientPacketizer() { +} + +int ClientPacketizer::Connect(const AddressList& server, + Packetizer::Listener* listener, + CompletionCallback* callback) { + DCHECK(!user_callback_); + DCHECK(!socket_.get()); + DCHECK(!listener_); + + listener_ = listener; + + addresses_ = server; + + user_callback_ = callback; + next_state_ = LOOKUP_COOKIE; + + return DoLoop(OK); +} + +int ClientPacketizer::SendMessage(ConnectionKey key, + const char* data, + size_t length, + CompletionCallback* callback) { + // We can't send messages smaller than 16 bytes. + if (length < 16) + return ERR_UNEXPECTED; + + if (!initiate_sent_) { + const size_t kMaxMessageInInitiatePacket = + kMaxPacketLength - sizeof(InitiatePacket); + + if (length > kMaxMessageInInitiatePacket) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + + initiate_sent_ = true; + + // Bundle this message into the Initiate Packet. + scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength)); + InitiatePacket* packet = reinterpret_cast<InitiatePacket*>(buffer->data()); + memset(packet, 0, sizeof(InitiatePacket)); + memcpy(packet->id, "QvnQ5XlI", 8); + memcpy(packet->client_shortterm_public_key, shortterm_public_key_, + sizeof(shortterm_public_key_)); + // TODO(mbelshe): Fill in rest of Initiate fields here + // TODO(mbelshe): Fill in rest of message + // + // TODO(mbelshe) - this is just broken to make it work with cleartext + memcpy(&buffer->data()[sizeof(InitiatePacket)], data, length); + int packet_length = sizeof(InitiatePacket) + length; + int rv = socket_->Write(buffer, packet_length, &io_callback_); + if (rv <= 0) + return rv; + CHECK_EQ(packet_length, rv); // We must send all data. + return length; + } + + if (length > static_cast<size_t>(kMaxMessageLength)) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + + // Bundle this message into the Initiate Packet. + scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength)); + ClientMessagePacket* packet = + reinterpret_cast<ClientMessagePacket*>(buffer->data()); + memset(packet, 0, sizeof(ClientMessagePacket)); + memcpy(packet->id, "QvnQ5XlM", 8); + memcpy(packet->client_shortterm_public_key, shortterm_public_key_, + sizeof(shortterm_public_key_)); + // TODO(mbelshe): Fill in rest of Initiate fields here + // TODO(mbelshe): Fill in rest of message + memcpy(&buffer->data()[sizeof(ClientMessagePacket)], data, length); + int packet_length = sizeof(ClientMessagePacket) + length; + int rv = socket_->Write(buffer, packet_length, &io_callback_); + if (rv <= 0) + return rv; + CHECK_EQ(packet_length, rv); // We must send all data. + return length; +} + +void ClientPacketizer::Close(ConnectionKey key) { + // TODO(mbelshe): implement me + NOTIMPLEMENTED(); +} + +int ClientPacketizer::GetPeerAddress(IPEndPoint* endpoint) const { + return socket_->GetPeerAddress(endpoint); +} + +int ClientPacketizer::max_message_payload() const { + if (!initiate_sent_) + return kMaxPacketLength - sizeof(InitiatePacket) - sizeof(Message); + return kMaxMessageLength - sizeof(Message); +} + +int ClientPacketizer::DoLoop(int result) { + DCHECK(next_state_ != NONE); + int rv = result; + do { + switch (next_state_) { + case LOOKUP_COOKIE: + rv = DoLookupCookie(); + break; + case LOOKUP_COOKIE_COMPLETE: + rv = DoLookupCookieComplete(rv); + break; + case SENDING_HELLO: + rv = DoSendingHello(); + break; + case SENDING_HELLO_COMPLETE: + rv = DoSendingHelloComplete(rv); + break; + case WAITING_COOKIE: + rv = DoWaitingCookie(); + break; + case WAITING_COOKIE_COMPLETE: + rv = DoWaitingCookieComplete(rv); + break; + case CONNECTED: + rv = DoConnected(rv); + break; + default: + NOTREACHED(); + break; + } + } while (rv > ERR_IO_PENDING && next_state_ != CONNECTED); + + return rv; +} + +int ClientPacketizer::DoLookupCookie() { + // Eventually, we'll use this state to see if we have persisted the cookie + // in the disk cache. For now, we don't do any persistence, even in memory. + next_state_ = LOOKUP_COOKIE_COMPLETE; + return OK; +} + +int ClientPacketizer::DoLookupCookieComplete(int rv) { + // TODO(mbelshe): If we got a cookie, goto state WAITING_COOKIE_COMPLETE + next_state_ = SENDING_HELLO; + return rv; +} + +int ClientPacketizer::DoSendingHello() { + next_state_ = SENDING_HELLO_COMPLETE; + + if (hello_attempts_ == kMaxHelloAttempts) + return ERR_TIMED_OUT; + + // Connect to the next socket + int rv = ConnectNextAddress(); + if (rv < 0) { + LOG(ERROR) << "Could not get next address!"; + return rv; + } + + // Construct Hello Packet + scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(struct HelloPacket))); + struct HelloPacket* data = + reinterpret_cast<struct HelloPacket*>(buffer->data()); + memset(data, 0, sizeof(struct HelloPacket)); + memcpy(data->id, "QvnQ5XlH", 8); + memcpy(data->client_shortterm_public_key, shortterm_public_key_, + sizeof(shortterm_public_key_)); + // TODO(mbelshe): populate all other fields of the HelloPacket. + + return socket_->Write(buffer, sizeof(struct HelloPacket), &io_callback_); +} + +int ClientPacketizer::DoSendingHelloComplete(int rv) { + next_state_ = NONE; + + if (rv < 0) + return rv; + + // Writing to UDP should not result in a partial datagram. + if (rv != sizeof(struct HelloPacket)) + return ERR_FAILED; + + next_state_ = WAITING_COOKIE; + return OK; +} + +int ClientPacketizer::DoWaitingCookie() { + next_state_ = WAITING_COOKIE_COMPLETE; + + StartHelloTimer(kHelloTimeoutMs[hello_attempts_++]); + + read_buffer_ = new IOBuffer(kMaxPacketLength); + return socket_->Read(read_buffer_, kMaxPacketLength, &io_callback_); +} + +int ClientPacketizer::DoWaitingCookieComplete(int rv) { + // TODO(mbelshe): Add Histogram for hello_attempts_. + RevokeHelloTimer(); + + // TODO(mbelshe): Validate the cookie here + if (rv < 0) + return rv; + + if (rv == 0) { // Does this happen? + NOTREACHED(); + return ERR_FAILED; + } + + if (rv != sizeof(struct CookiePacket)) + return ERR_FAILED; // TODO(mbelshe): more specific error message. + + // TODO(mbelshe): verify contents of Cookie + + listener_->OnConnection(shortterm_public_key_); + + next_state_ = CONNECTED; + + // Start reading for messages + rv = ReadPackets(); + if (rv == ERR_IO_PENDING) + rv = OK; + return rv; +} + +int ClientPacketizer::DoConnected(int rv) { + DCHECK(next_state_ == CONNECTED); + if (rv > 0) + ProcessRead(rv); + return ReadPackets(); +} + +void ClientPacketizer::DoCallback(int result) { + DCHECK_NE(result, ERR_IO_PENDING); + DCHECK(user_callback_); + + CompletionCallback* callback = user_callback_; + user_callback_ = NULL; + callback->Run(result); +} + +int ClientPacketizer::ConnectNextAddress() { + // TODO(mbelshe): plumb Netlog information + + DCHECK(addresses_.head()); + + socket_.reset(new UDPClientSocket(NULL, NetLog::Source())); + + // Rotate to next address in the list. + if (current_address_) + current_address_ = current_address_->ai_next; + if (!current_address_) + current_address_ = addresses_.head(); + + IPEndPoint endpoint; + if (!endpoint.FromSockAddr(current_address_->ai_addr, + current_address_->ai_addrlen)) + return ERR_FAILED; + + int rv = socket_->Connect(endpoint); + DCHECK_NE(ERR_IO_PENDING, rv); + + return rv; +} + +void ClientPacketizer::StartHelloTimer(int milliseconds) { + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + timers_factory_.NewRunnableMethod(&ClientPacketizer::OnHelloTimeout), + milliseconds); +} + +void ClientPacketizer::RevokeHelloTimer() { + timers_factory_.RevokeAll(); +} + +void ClientPacketizer::OnHelloTimeout() { + DCHECK_EQ(WAITING_COOKIE_COMPLETE, next_state_); + next_state_ = SENDING_HELLO; + DLOG(INFO) << "HelloTimeout #" << hello_attempts_; + int rv = DoLoop(OK); + if (rv != ERR_IO_PENDING) + DoCallback(rv); +} + +void ClientPacketizer::ProcessRead(int result) { + DCHECK_GT(result, 0); + DCHECK(listener_); + + // The smallest packet we can receive is a ServerMessagePacket. + if (result < static_cast<int>(sizeof(ServerMessagePacket)) || + result > kMaxPacketLength) + return; + + // Packets are always 16 byte padded. + if (result & 15) + return; + + Packet *packet = reinterpret_cast<Packet*>(read_buffer_->data()); + + // The only type of packet we should receive at this point is a Message + // packet. + // TODO(mbelshe): what happens if the server sends a new Cookie packet? + if (memcmp(packet->id, "RL3aNMXM", 8)) + return; + + uchar* msg = reinterpret_cast<uchar*>(packet); + int length = result - sizeof(ServerMessagePacket); + listener_->OnMessage(this, + shortterm_public_key_, + &msg[sizeof(ServerMessagePacket)], + length); +} + +int ClientPacketizer::ReadPackets() { + DCHECK(socket_.get()); + + int rv; + while (true) { + rv = socket_->Read(read_buffer_, + kMaxPacketLength, + &io_callback_); + if (rv <= 0) { + if (rv != ERR_IO_PENDING) + LOG(ERROR) << "Error reading socket:" << rv; + return rv; + } + ProcessRead(rv); + } + return rv; +} + +void ClientPacketizer::OnIOComplete(int result) { + int rv = DoLoop(result); + if (rv != ERR_IO_PENDING) + DoCallback(rv); +} + +} // namespace net diff --git a/net/curvecp/client_packetizer.h b/net/curvecp/client_packetizer.h new file mode 100644 index 0000000..b5ca5dd --- /dev/null +++ b/net/curvecp/client_packetizer.h @@ -0,0 +1,102 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_CLIENT_PACKETIZER_H_ +#define NET_CURVECP_CLIENT_PACKETIZER_H_ +#pragma once + +#include "base/scoped_ptr.h" +#include "base/task.h" +#include "net/base/address_list.h" +#include "net/base/completion_callback.h" +#include "net/curvecp/packetizer.h" +#include "net/curvecp/protocol.h" + +namespace net { + +class AddressList; +class IOBuffer; +class IPEndPoint; +class UDPClientSocket; + +class ClientPacketizer : public Packetizer { + public: + ClientPacketizer(); + virtual ~ClientPacketizer(); + + int Connect(const AddressList& server, + Packetizer::Listener* listener, + CompletionCallback* callback); + + // Packetizer methods + virtual int SendMessage(ConnectionKey key, + const char* data, + size_t length, + CompletionCallback* callback); + virtual void Close(ConnectionKey key); + virtual int GetPeerAddress(IPEndPoint* endpoint) const; + virtual int max_message_payload() const; + + private: + enum StateType { + NONE, // The initial state, before connect. + LOOKUP_COOKIE, // Looking up a cookie in the disk cache. + LOOKUP_COOKIE_COMPLETE, // The disk cache lookup is complete. + SENDING_HELLO, // Sending a Hello packet. + SENDING_HELLO_COMPLETE, // Hello packet has been sent. + WAITING_COOKIE, // Waiting for a Cookie packet. + WAITING_COOKIE_COMPLETE, // The Cookie packet has arrived. + CONNECTED, // Connected + }; + + int DoLoop(int result); + int DoLookupCookie(); + int DoLookupCookieComplete(int result); + int DoSendingHello(); + int DoSendingHelloComplete(int result); + int DoWaitingCookie(); + int DoWaitingCookieComplete(int result); + int DoConnected(int result); + + void DoCallback(int result); + + // Connect to the next address in our list. + int ConnectNextAddress(); + + // We set a timeout for responses to the Hello message. + void StartHelloTimer(int milliseconds); + void RevokeHelloTimer(); + void OnHelloTimeout(); // Called when the Hello Timer fires. + + // Process the result of a Read operation. + void ProcessRead(int bytes_read); + + // Read packets until an error occurs. + int ReadPackets(); + + // Callback when an internal IO is completed. + void OnIOComplete(int result); + + StateType next_state_; + scoped_ptr<UDPClientSocket> socket_; + Packetizer::Listener* listener_; + CompletionCallback* user_callback_; + AddressList addresses_; + const struct addrinfo* current_address_; + int hello_attempts_; // Number of attempts to send a Hello Packet. + bool initiate_sent_; // Indicates whether the Initialte Packet was sent. + + scoped_refptr<IOBuffer> read_buffer_; // Buffer for interal reads. + + uchar shortterm_public_key_[32]; + + CompletionCallbackImpl<ClientPacketizer> io_callback_; + ScopedRunnableMethodFactory<ClientPacketizer> timers_factory_; + + DISALLOW_COPY_AND_ASSIGN(ClientPacketizer); +}; + +} // namespace net + +#endif // NET_CURVECP_CLIENT_PACKETIZER_H_ diff --git a/net/curvecp/connection_key.cc b/net/curvecp/connection_key.cc new file mode 100644 index 0000000..566cf57 --- /dev/null +++ b/net/curvecp/connection_key.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/connection_key.h" + +#include <string.h> + +namespace net { + +ConnectionKey::ConnectionKey() { + memset(key_, 0, sizeof(key_)); +} + +ConnectionKey::ConnectionKey(unsigned char bytes[]) { + memcpy(key_, bytes, sizeof(key_)); +} + +ConnectionKey::ConnectionKey(const ConnectionKey& other) { + memcpy(key_, other.key_, sizeof(key_)); +} + +ConnectionKey& ConnectionKey::operator=(const ConnectionKey& other) { + memcpy(key_, other.key_, sizeof(key_)); + return *this; +} + +bool ConnectionKey::operator==(const ConnectionKey& other) const { + return memcmp(key_, other.key_, sizeof(key_)) == 0; +} + +bool ConnectionKey::operator<(const ConnectionKey& other) const { + return memcmp(key_, other.key_, sizeof(key_)) < 0; +} + +std::string ConnectionKey::ToString() const { + // TODO(mbelshe): make this a nice hex formatter + return std::string("key") + + std::string(reinterpret_cast<const char*>(key_), 32); +} + +} // namespace net diff --git a/net/curvecp/connection_key.h b/net/curvecp/connection_key.h new file mode 100644 index 0000000..8fe94ad --- /dev/null +++ b/net/curvecp/connection_key.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_CONNECTION_KEY_H_ +#define NET_CURVECP_CONNECTION_KEY_H_ +#pragma once + +#include <string> + +namespace net { + +// A ConnectionKey uniquely identifies a connection. +// It represents the client's short-term public key and is 32 bytes. +class ConnectionKey { + public: + ConnectionKey(); + ConnectionKey(unsigned char bytes[]); + ConnectionKey(const ConnectionKey& key); + + ConnectionKey& operator=(const ConnectionKey& other); + bool operator==(const ConnectionKey& other) const; + bool operator<(const ConnectionKey& other) const; + + std::string ToString() const; + + private: + unsigned char key_[32]; +}; + +} // namespace net + +#endif // NET_CURVECP_CONNECTION_KEY_H_ diff --git a/net/curvecp/curvecp_client_socket.cc b/net/curvecp/curvecp_client_socket.cc new file mode 100644 index 0000000..f28e839 --- /dev/null +++ b/net/curvecp/curvecp_client_socket.cc @@ -0,0 +1,111 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/base/ip_endpoint.h" +#include "net/base/net_errors.h" +#include "net/base/sys_addrinfo.h" +#include "net/curvecp/curvecp_client_socket.h" +#include "net/curvecp/messenger.h" + +namespace net { + +CurveCPClientSocket::CurveCPClientSocket(const AddressList& addresses, + net::NetLog* net_log, + const net::NetLog::Source& source) + : addresses_(addresses), + net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)), + messenger_(&packetizer_) { +} + +CurveCPClientSocket::~CurveCPClientSocket() { +} + +int CurveCPClientSocket::Connect(CompletionCallback* callback) { + return packetizer_.Connect(addresses_, &messenger_, callback); +} + +void CurveCPClientSocket::Disconnect() { + // TODO(mbelshe): DCHECK that we're connected. + // Record the ConnectionKey so that we can disconnect it properly. + // Do we need a close() on the messenger? + // packetizer_.Close(); +} + +bool CurveCPClientSocket::IsConnected() const { + // TODO(mbelshe): return packetizer_.IsConnected(); + return false; +} + +bool CurveCPClientSocket::IsConnectedAndIdle() const { + // TODO(mbelshe): return packetizer_.IsConnectedAndIdle(); + return false; +} + +int CurveCPClientSocket::GetPeerAddress(AddressList* address) const { + IPEndPoint endpoint; + int rv = packetizer_.GetPeerAddress(&endpoint); + if (rv < 0) + return rv; + struct sockaddr_storage sockaddr; + size_t sockaddr_length = sizeof(sockaddr); + bool success = endpoint.ToSockAddr( + reinterpret_cast<struct sockaddr*>(&sockaddr), &sockaddr_length); + if (!success) + return ERR_FAILED; + struct addrinfo ai; + memset(&ai, 0, sizeof(ai)); + memcpy(&ai.ai_addr, &sockaddr, sockaddr_length); + ai.ai_addrlen = sockaddr_length; + *address = AddressList::CreateByCopying(&ai); + return OK; +} + +int CurveCPClientSocket::GetLocalAddress(IPEndPoint* address) const { + NOTIMPLEMENTED(); + return ERR_FAILED; +} + +const BoundNetLog& CurveCPClientSocket::NetLog() const { + return net_log_; +} + +void CurveCPClientSocket::SetSubresourceSpeculation() { + // This is ridiculous. +} + +void CurveCPClientSocket::SetOmniboxSpeculation() { + // This is ridiculous. +} + +bool CurveCPClientSocket::WasEverUsed() const { + // This is ridiculous. + return true; +} + +bool CurveCPClientSocket::UsingTCPFastOpen() const { + // This is ridiculous. + return false; +} + +int CurveCPClientSocket::Read(IOBuffer* buf, + int buf_len, + CompletionCallback* callback) { + return messenger_.Read(buf, buf_len, callback); +} + +int CurveCPClientSocket::Write(IOBuffer* buf, + int buf_len, + CompletionCallback* callback) { + return messenger_.Write(buf, buf_len, callback); +} + +bool CurveCPClientSocket::SetReceiveBufferSize(int32 size) { + return true; +} + +bool CurveCPClientSocket::SetSendBufferSize(int32 size) { + return true; +} + +} // namespace net diff --git a/net/curvecp/curvecp_client_socket.h b/net/curvecp/curvecp_client_socket.h new file mode 100644 index 0000000..213b13d --- /dev/null +++ b/net/curvecp/curvecp_client_socket.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_CURVECP_CLIENT_SOCKET_H_ +#define NET_CURVECP_CURVECP_CLIENT_SOCKET_H_ +#pragma once + +#include "net/base/completion_callback.h" +#include "net/curvecp/client_packetizer.h" +#include "net/curvecp/messenger.h" +#include "net/socket/stream_socket.h" + +namespace net { + +// A client socket that uses CurveCP as the transport layer. +class CurveCPClientSocket : public StreamSocket { + public: + // The IP address(es) and port number to connect to. The CurveCP socket will + // try each IP address in the list until it succeeds in establishing a + // connection. + CurveCPClientSocket(const AddressList& addresses, + net::NetLog* net_log, + const net::NetLog::Source& source); + virtual ~CurveCPClientSocket(); + + // ClientSocket methods: + virtual int Connect(CompletionCallback* callback); + virtual void Disconnect(); + virtual bool IsConnected() const; + virtual bool IsConnectedAndIdle() const; + virtual int GetPeerAddress(AddressList* address) const; + virtual int GetLocalAddress(IPEndPoint* address) const; + virtual const BoundNetLog& NetLog() const; + virtual void SetSubresourceSpeculation(); + virtual void SetOmniboxSpeculation(); + virtual bool WasEverUsed() const; + virtual bool UsingTCPFastOpen() const; + + // Socket methods: + virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback); + virtual int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback); + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + AddressList addresses_; + BoundNetLog net_log_; + Messenger messenger_; + ClientPacketizer packetizer_; + + DISALLOW_COPY_AND_ASSIGN(CurveCPClientSocket); +}; + +} // namespace net + +#endif // NET_CURVECP_CURVECP_CLIENT_SOCKET_H_ diff --git a/net/curvecp/curvecp_server_socket.cc b/net/curvecp/curvecp_server_socket.cc new file mode 100644 index 0000000..32d3786 --- /dev/null +++ b/net/curvecp/curvecp_server_socket.cc @@ -0,0 +1,82 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/base/ip_endpoint.h" +#include "net/base/net_errors.h" +#include "net/base/sys_addrinfo.h" +#include "net/curvecp/curvecp_server_socket.h" +#include "net/curvecp/messenger.h" + +namespace net { + +CurveCPServerSocket::CurveCPServerSocket(net::NetLog* net_log, + const net::NetLog::Source& source) + : net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)), + packetizer_(new ServerPacketizer()), + ALLOW_THIS_IN_INITIALIZER_LIST(messenger_(packetizer_.get(), this)), + acceptor_(NULL), + is_child_socket_(false) { +} + +// Constructor for an accepted socket. +CurveCPServerSocket::CurveCPServerSocket(const ConnectionKey& key, + ServerPacketizer* packetizer, + net::NetLog* net_log, + const net::NetLog::Source& source) + : net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)), + packetizer_(packetizer), + ALLOW_THIS_IN_INITIALIZER_LIST(messenger_(key, packetizer_.get(), this)), + acceptor_(NULL), + is_child_socket_(true), + key_(key) { +} + +CurveCPServerSocket::~CurveCPServerSocket() { +} + +int CurveCPServerSocket::Listen(const IPEndPoint& endpoint, + Acceptor* acceptor) { + acceptor_ = acceptor; + return packetizer_->Listen(endpoint, &messenger_); +} + +void CurveCPServerSocket::Close() { + if (acceptor_) + return; // The listener socket is not closable. + packetizer_->Close(key_); +} + +int CurveCPServerSocket::Read(IOBuffer* buf, + int buf_len, + CompletionCallback* callback) { + return messenger_.Read(buf, buf_len, callback); +} + +int CurveCPServerSocket::Write(IOBuffer* buf, + int buf_len, + CompletionCallback* callback) { + return messenger_.Write(buf, buf_len, callback); +} + +bool CurveCPServerSocket::SetReceiveBufferSize(int32 size) { + return true; +} + +bool CurveCPServerSocket::SetSendBufferSize(int32 size) { + return true; +} + +void CurveCPServerSocket::OnAccept(ConnectionKey key) { + DCHECK(acceptor_); + + CurveCPServerSocket* new_socket = + new CurveCPServerSocket(key, + packetizer_, + net_log_.net_log(), + NetLog::Source()); + packetizer_->Open(key, new_socket->messenger()); + acceptor_->OnAccept(new_socket); +} + +} // namespace net diff --git a/net/curvecp/curvecp_server_socket.h b/net/curvecp/curvecp_server_socket.h new file mode 100644 index 0000000..672a574 --- /dev/null +++ b/net/curvecp/curvecp_server_socket.h @@ -0,0 +1,63 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_CURVECP_SERVER_SOCKET_H_ +#define NET_CURVECP_CURVECP_SERVER_SOCKET_H_ +#pragma once + +#include "net/base/completion_callback.h" +#include "net/curvecp/connection_key.h" +#include "net/curvecp/protocol.h" +#include "net/curvecp/server_messenger.h" +#include "net/curvecp/server_packetizer.h" +#include "net/socket/stream_socket.h" + +namespace net { + +// A server socket that uses CurveCP as the transport layer. +class CurveCPServerSocket : public Socket, + public ServerMessenger::Acceptor { + public: + class Acceptor { + public: + virtual ~Acceptor() {} + virtual void OnAccept(CurveCPServerSocket* new_socket) = 0; + }; + + CurveCPServerSocket(net::NetLog* net_log, + const net::NetLog::Source& source); + virtual ~CurveCPServerSocket(); + + int Listen(const IPEndPoint& endpoint, Acceptor* acceptor); + void Close(); + + // Socket methods: + virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback); + virtual int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback); + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + // ServerMessenger::Acceptor methods: + virtual void OnAccept(ConnectionKey key); + + private: + CurveCPServerSocket(const ConnectionKey& key, + ServerPacketizer* packetizer_, + net::NetLog* net_log, + const net::NetLog::Source& source); + ServerMessenger* messenger() { return &messenger_; } + + BoundNetLog net_log_; + scoped_refptr<ServerPacketizer> packetizer_; + ServerMessenger messenger_; + Acceptor* acceptor_; + bool is_child_socket_; // Was this socket accepted from another. + ConnectionKey key_; + + DISALLOW_COPY_AND_ASSIGN(CurveCPServerSocket); +}; + +} // namespace net + +#endif // NET_CURVECP_CURVECP_SERVER_SOCKET_H_ diff --git a/net/curvecp/curvecp_transfer_unittest.cc b/net/curvecp/curvecp_transfer_unittest.cc new file mode 100644 index 0000000..e433118 --- /dev/null +++ b/net/curvecp/curvecp_transfer_unittest.cc @@ -0,0 +1,313 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <poll.h> + +#include "base/basictypes.h" +#include "base/message_loop.h" +#include "base/metrics/histogram.h" +#include "net/base/net_test_suite.h" +#include "net/base/test_completion_callback.h" +#include "net/curvecp/circular_buffer.h" +#include "net/curvecp/received_block_list.h" +#include "net/curvecp/rtt_and_send_rate_calculator.h" +#include "net/curvecp/test_client.h" +#include "net/curvecp/test_server.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +namespace net { + +class CurveCPTransferTest : public PlatformTest { + public: + CurveCPTransferTest() {} +}; + +void RunEchoTest(int bytes) { + TestServer server; + TestClient client; + + EXPECT_TRUE(server.Start(1234)); + HostPortPair server_address("localhost", 1234); + TestCompletionCallback cb; + EXPECT_TRUE(client.Start(server_address, bytes, &cb)); + + int rv = cb.WaitForResult(); + EXPECT_EQ(0, rv); + EXPECT_EQ(0, server.error_count()); + EXPECT_EQ(0, client.error_count()); +} + +/* +TEST_F(CurveCPTransferTest, Echo_50B_Of_Data) { + RunEchoTest(50); +} + +TEST_F(CurveCPTransferTest, Echo_1KB_Of_Data) { + RunEchoTest(1024); +} + +TEST_F(CurveCPTransferTest, Echo_4KB_Of_Data) { + RunEchoTest(4096); +} +*/ + +// XXXMB +TEST_F(CurveCPTransferTest, Echo_1MB_Of_Data) { + RunEchoTest(1024*1024); +} + +// TODO(mbelshe): Do something meaningful with this test. +TEST_F(CurveCPTransferTest, RTTIntial) { + RttAndSendRateCalculator rate; + printf("initial %d (%d, %d/%d)\n", rate.send_rate(), + rate.rtt_average(), + rate.rtt_lo(), + rate.rtt_hi()); + + for (int i = 0; i < 10; ++i) { + rate.OnMessage(100*1000); + printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(), + rate.rtt_average(), + rate.rtt_lo(), + rate.rtt_hi()); + usleep(200000); + } + + rate.OnTimeout(); + rate.OnTimeout(); + rate.OnTimeout(); + rate.OnTimeout(); + + // Get worse + for (int i = 0; i < 10; ++i) { + rate.OnMessage((500 + i)*1000); + printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(), + rate.rtt_average(), + rate.rtt_lo(), + rate.rtt_hi()); + usleep(200000); + } + + // Get better + for (int i = 0; i < 10; ++i) { + rate.OnMessage((100 - i)*1000); + printf("%d: %d (%d, %d/%d)\n", i, rate.send_rate(), + rate.rtt_average(), + rate.rtt_lo(), + rate.rtt_hi()); + usleep(200000); + } +} + +TEST_F(CurveCPTransferTest, CircularBufferFillAndDrain) { + CircularBuffer buffer(10); + for (int i = 0; i < 30; ++i) { + EXPECT_EQ(0, buffer.length()); + EXPECT_EQ(3, buffer.write("abc", 3)); + EXPECT_EQ(3, buffer.length()); + char read_data[3]; + EXPECT_EQ(3, buffer.read(read_data, 3)); + EXPECT_EQ(0, memcmp(read_data, "abc", 3)); + } +} + +TEST_F(CurveCPTransferTest, CircularBufferTooLargeFill) { + CircularBuffer buffer(10); + EXPECT_EQ(10, buffer.write("abcdefghijklm", 13)); + EXPECT_EQ(0, buffer.write("a", 1)); + char read_data[10]; + EXPECT_EQ(10, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10)); + EXPECT_EQ(1, buffer.write("a", 1)); +} + +TEST_F(CurveCPTransferTest, CircularBufferEdgeCases) { + CircularBuffer buffer(10); + char read_data[10]; + EXPECT_EQ(9, buffer.write("abcdefghi", 9)); + EXPECT_EQ(9, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "abcdefghi", 9)); + EXPECT_EQ(1, buffer.write("a", 1)); + EXPECT_EQ(1, buffer.write("b", 1)); + EXPECT_EQ(2, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "ab", 2)); +} + +TEST_F(CurveCPTransferTest, CircularBufferOneWriteTwoReads) { + CircularBuffer buffer(1000); + char read_data[10]; + EXPECT_EQ(13, buffer.write("abcdefghijklm", 13)); + EXPECT_EQ(10, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10)); + EXPECT_EQ(3, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "klm", 3)); +} + +TEST_F(CurveCPTransferTest, CircularBufferTwoWritesOneRead) { + CircularBuffer buffer(1000); + char read_data[10]; + EXPECT_EQ(6, buffer.write("abcdef", 6)); + EXPECT_EQ(4, buffer.write("ghij", 4)); + EXPECT_EQ(10, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "abcdefghij", 10)); +} + +TEST_F(CurveCPTransferTest, CircularBufferFillOnSpill) { + CircularBuffer buffer(10); + char read_data[10]; + EXPECT_EQ(10, buffer.write("abcdefghij", 10)); + EXPECT_EQ(3, buffer.read(read_data, 3)); + EXPECT_EQ(0, memcmp(read_data, "abc", 3)); + // We now have a hole at the head of the circular buffer. + EXPECT_EQ(1, buffer.write("x", 1)); + EXPECT_EQ(1, buffer.write("y", 1)); + EXPECT_EQ(1, buffer.write("z", 1)); + EXPECT_EQ(0, buffer.write("q", 1)); // Overflow + EXPECT_EQ(10, buffer.read(read_data, 10)); + EXPECT_EQ(0, memcmp(read_data, "defghijxyz", 10)); +} + +TEST_F(CurveCPTransferTest, ReceivedBlockList) { + scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(16)); + memcpy(buffer->data(), "abcdefghijklmnop", 16); + scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100)); + + ReceivedBlockList list; + EXPECT_EQ(0, list.current_position()); + list.AddBlock(0, buffer, 16); + EXPECT_EQ(0, list.current_position()); + list.AddBlock(0, buffer, 16); + EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size())); + EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 16)); + EXPECT_EQ(16, list.current_position()); +} + +TEST_F(CurveCPTransferTest, ReceivedBlockListCoalesce) { + scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8)); + memcpy(buffer->data(), "abcdefgh", 8); + scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100)); + + ReceivedBlockList list; + EXPECT_EQ(0, list.current_position()); + list.AddBlock(0, buffer, 8); + EXPECT_EQ(0, list.current_position()); + list.AddBlock(8, buffer, 8); + EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size())); + EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 8)); + EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data() + 8, 8)); + EXPECT_EQ(16, list.current_position()); +} + +TEST_F(CurveCPTransferTest, ReceivedBlockListGap) { + scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8)); + memcpy(buffer->data(), "abcdefgh", 8); + scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100)); + + ReceivedBlockList list; + EXPECT_EQ(0, list.current_position()); + list.AddBlock(0, buffer, 8); + EXPECT_EQ(0, list.current_position()); + + // Leaves a gap + list.AddBlock(16, buffer, 8); + EXPECT_EQ(8, list.ReadBytes(read_buffer.get(), read_buffer->size())); + EXPECT_EQ(0, list.ReadBytes(read_buffer.get(), read_buffer->size())); + EXPECT_EQ(8, list.current_position()); + + // Fill the gap + list.AddBlock(8, buffer, 8); + EXPECT_EQ(16, list.ReadBytes(read_buffer.get(), read_buffer->size())); + EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data(), 8)); + EXPECT_EQ(0, memcmp(buffer->data(), read_buffer->data() + 8, 8)); + EXPECT_EQ(24, list.current_position()); +} + +TEST_F(CurveCPTransferTest, ReceivedBlockListPartialRead) { + scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(8)); + memcpy(buffer->data(), "abcdefgh", 8); + scoped_refptr<IOBufferWithSize> read_buffer(new IOBufferWithSize(100)); + + ReceivedBlockList list; + EXPECT_EQ(0, list.current_position()); + list.AddBlock(0, buffer, 8); + EXPECT_EQ(0, list.current_position()); + list.AddBlock(8, buffer, 8); + EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("abc", read_buffer->data(), 3)); + EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("def", read_buffer->data(), 3)); + EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("gha", read_buffer->data(), 3)); + EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("bcd", read_buffer->data(), 3)); + EXPECT_EQ(3, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("efg", read_buffer->data(), 3)); + EXPECT_EQ(1, list.ReadBytes(read_buffer.get(), 3)); + EXPECT_EQ(0, memcmp("h", read_buffer->data(), 1)); + EXPECT_EQ(16, list.current_position()); +} + +/* + +class PollTimerTester { + public: + PollTimerTester() { + Test(); + } + + void Test() { + struct pollfd fds[3]; + memset(fds, 0, sizeof(struct pollfd) * 3); + + while(true) { + (void)poll(fds, 1, 2); + base::TimeTicks now(base::TimeTicks::Now()); + if (!last_.is_null()) { + LOG(ERROR) << "Time was " << (now - last_).InMicroseconds(); + LOG(ERROR) << "Time was " << (now - last_).InMilliseconds(); + } + last_ = now; + } + } + + base::TimeTicks last_; +}; + +class TimerTester { + public: + TimerTester() { + timer_.Start(base::TimeDelta(), this, &TimerTester::OnTimer); + } + void OnTimer() { + base::TimeTicks now(base::TimeTicks::Now()); + if (!last_.is_null()) { + LOG(ERROR) << "Time was " << (now - last_).InMicroseconds(); + LOG(ERROR) << "Time was " << (now - last_).InMilliseconds(); + } + last_ = now; + //timer_.Start(base::TimeDelta(), this, &TimerTester::OnTimer); + timer_.Start(base::TimeDelta::FromMicroseconds(150), + this, + &TimerTester::OnTimer); + } + + base::TimeTicks last_; + base::OneShotTimer<TimerTester> timer_; +}; + +TEST_F(CurveCPTransferTest, MinTimer) { + PollTimerTester tester; + MessageLoop::current()->Run(); +} + +*/ + +} // namespace net + +int main(int argc, char**argv) { + base::StatisticsRecorder recorder; + NetTestSuite test_suite(argc, argv); + return test_suite.Run(); +} diff --git a/net/curvecp/messenger.cc b/net/curvecp/messenger.cc new file mode 100644 index 0000000..466cef5 --- /dev/null +++ b/net/curvecp/messenger.cc @@ -0,0 +1,372 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/messenger.h" + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/curvecp/protocol.h" + +// Basic protocol design: +// +// OnTimeout: Called when the timeout timer pops. +// - call SendMessage() +// - call RecvMessage() +// +// OnSendTimer: Called when the send-timer pops. +// - call SendMessage() +// - call RecvMessage() +// +// OnMessage: Called when a message arrived from the packet layer +// - add the message to the receive queue +// - call RecvMessage() +// +// Write: Called by application to write data to remote +// - add the data to the send_buffer +// - call SendMessage() +// +// SendMessage: Called to Send a message to the remote +// - msg = first message to retransmit where retransmit timer popped +// - if msg == NULL +// - msg = create a new message from the send buffer +// - if msg != NULL +// - send message to the packet layer +// - setTimer(OnSendTimer, send_rate); +// +// RecvMessage: Called to process a Received message from the remote +// - calculate RTT +// - calculate Send Rate +// - acknowledge data from received message +// - resetTimeout +// - timeout = now + rtt_timeout +// - if currrent_timeout > timeout +// setTimer(OnTimeout, timeout) + +namespace net { + +// Maximum number of write blocks. +static const size_t kMaxWriteQueueMessages = 128; + +// Size of the send buffer. +static const size_t kSendBufferSize = (128 * 1024); +// Size of the receive buffer. +static const size_t kReceiveBufferSize = (128 * 1024); + +Messenger::Messenger(Packetizer* packetizer) + : packetizer_(packetizer), + send_buffer_(kSendBufferSize), + send_complete_callback_(NULL), + receive_complete_callback_(NULL), + pending_receive_length_(0), + send_message_in_progress_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + send_message_callback_(this, &Messenger::OnSendMessageComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { +} + +Messenger::~Messenger() { +} + +int Messenger::Read(IOBuffer* buf, int buf_len, CompletionCallback* callback) { + DCHECK(CalledOnValidThread()); + DCHECK(!receive_complete_callback_); + + if (!received_list_.bytes_available()) { + receive_complete_callback_ = callback; + pending_receive_ = buf; + pending_receive_length_ = buf_len; + return ERR_IO_PENDING; + } + + int bytes_read = InternalRead(buf, buf_len); + DCHECK_LT(0, bytes_read); + return bytes_read; +} + +int Messenger::Write(IOBuffer* buf, int buf_len, CompletionCallback* callback) { + DCHECK(CalledOnValidThread()); + DCHECK(!pending_send_.get()); // Already a write pending! + DCHECK(!send_complete_callback_); + DCHECK_LT(0, buf_len); + + int len = send_buffer_.write(buf->data(), buf_len); + if (!send_timer_.IsRunning()) + send_timer_.Start(base::TimeDelta(), this, &Messenger::OnSendTimer); + if (len) + return len; + + // We couldn't add data to the send buffer, so block the application. + pending_send_ = buf; + pending_send_length_ = buf_len; + send_complete_callback_ = callback; + return ERR_IO_PENDING; +} + +void Messenger::OnConnection(ConnectionKey key) { + LOG(ERROR) << "Client Connect: " << key.ToString(); + key_ = key; +} + +void Messenger::OnClose(Packetizer* packetizer, ConnectionKey key) { + LOG(ERROR) << "Got Close!"; +} + +void Messenger::OnMessage(Packetizer* packetizer, + ConnectionKey key, + unsigned char* msg, + size_t length) { + DCHECK(key == key_); + + // Do basic message sanity checking. + if (length < sizeof(Message)) + return; + if (length > static_cast<size_t>(kMaxMessageLength)) + return; + + // Add message to received queue. + scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(length)); + memcpy(buffer->data(), msg, length); + read_queue_.push_back(buffer); + + // Process a single received message + RecvMessage(); +} + +int Messenger::InternalRead(IOBuffer* buffer, int buffer_length) { + return received_list_.ReadBytes(buffer, buffer_length); +} + +IOBufferWithSize* Messenger::CreateBufferFromSendQueue() { + DCHECK_LT(0, send_buffer_.length()); + + int length = std::min(packetizer_->max_message_payload(), + send_buffer_.length()); + IOBufferWithSize* rv = new IOBufferWithSize(length); + int bytes = send_buffer_.read(rv->data(), length); + DCHECK_EQ(bytes, length); + + // We consumed data, check to see if someone is waiting to write more data. + if (send_complete_callback_) { + DCHECK(pending_send_.get()); + + int len = send_buffer_.write(pending_send_->data(), pending_send_length_); + if (len) { + pending_send_ = NULL; + CompletionCallback* callback = send_complete_callback_; + send_complete_callback_ = NULL; + callback->Run(len); + } + } + + return rv; +} + +void Messenger::OnSendMessageComplete(int result) { + DCHECK_NE(ERR_IO_PENDING, result); + + send_message_in_progress_ = false; + + if (result <= 0) { + // TODO(mbelshe): Handle error. + NOTIMPLEMENTED(); + } + + // If the send timer fired while we were waiting for this send to complete, + // we need to manually run the timer now. + if (!send_timer_.IsRunning()) + OnSendTimer(); + + if (!send_timeout_timer_.IsRunning()) { + LOG(ERROR) << "RttTimeout is " << rtt_.rtt_timeout(); + base::TimeDelta delay = + base::TimeDelta::FromMicroseconds(rtt_.rtt_timeout()); + send_timeout_timer_.Start(delay, this, &Messenger::OnTimeout); + } +} + +void Messenger::OnTimeout() { + LOG(ERROR) << "OnTimeout fired"; + int64 position = sent_list_.FindPositionOfOldestSentBlock(); + // XXXMB - need to verify that we really need to retransmit... + if (position >= 0) { + rtt_.OnTimeout(); // adjust our send rate. + LOG(ERROR) << "OnTimeout retransmitting: " << position; + SendMessage(position); + } else { + DCHECK_EQ(0u, sent_list_.size()); + } + RecvMessage(); + received_list_.LogBlockList(); +} + +void Messenger::OnSendTimer() { + LOG(ERROR) << "OnSendTimer!"; + DCHECK(!send_timer_.IsRunning()); + + // If the send buffer is empty, then we don't need to keep firing. + if (!send_buffer_.length()) { + LOG(ERROR) << "OnSendTimer: send_buffer empty"; + return; + } + + // Set the next send timer. + LOG(ERROR) << "SendRate is: " << rtt_.send_rate() << "us"; + send_timer_.Start(base::TimeDelta::FromMicroseconds(rtt_.send_rate()), + this, + &Messenger::OnSendTimer); + + // Create a block from the send_buffer. + if (!sent_list_.is_full()) { + scoped_refptr<IOBufferWithSize> buffer = CreateBufferFromSendQueue(); + int64 position = sent_list_.CreateBlock(buffer.get()); + DCHECK_LE(0, position); + SendMessage(position); + } + + RecvMessage(); // Try to process an incoming message +} + +void Messenger::SendMessage(int64 position) { + LOG(ERROR) << "SendMessage (position=" << position << ")"; + if (send_message_in_progress_) + return; // We're still waiting for the last write to complete. + + IOBufferWithSize* data = sent_list_.FindBlockByPosition(position); + DCHECK(data != NULL); + size_t message_size = sizeof(Message) + data->size(); + size_t padded_size = (message_size + 15) & 0xfffffff0; + + scoped_refptr<IOBufferWithSize> message(new IOBufferWithSize(padded_size)); + Message* header = reinterpret_cast<Message*>(message->data()); + memset(header, 0, sizeof(Message)); + uint64 id = sent_list_.GetNewMessageId(); + uint32_pack(header->message_id, id); + // TODO(mbelshe): Needs to carry EOF flags + uint16_pack(header->size.val, data->size()); + uint64_pack(header->position, position); + // TODO(mbelshe): Fill in rest of the header fields. + // needs to have the block-position. He tags each chunk with an + // absolute offset into the data stream. + // Copy the contents of the message into the Message frame. + memcpy(message->data() + sizeof(Message), data->data(), data->size()); + + sent_list_.MarkBlockSent(position, id); + + int rv = packetizer_->SendMessage(key_, + message->data(), + padded_size, + &send_message_callback_); + if (rv == ERR_IO_PENDING) { + send_message_in_progress_ = true; + return; + } + + // UDP must write all or none. + DCHECK_EQ(padded_size, static_cast<size_t>(rv)); + OnSendMessageComplete(rv); +} + +void Messenger::RecvMessage() { + if (!read_queue_.size()) + return; + + scoped_refptr<IOBufferWithSize> message(read_queue_.front()); + read_queue_.pop_front(); + + Message* header = reinterpret_cast<Message*>(message->data()); + uint16 body_length = uint16_unpack(header->size.val); + if (body_length < 0) + return; + if (body_length > kMaxMessageLength) + return; + if (body_length > message->size()) + return; + + uint32 message_id = uint32_unpack(header->message_id); + if (message_id) { + LOG(ERROR) << "RecvMessage Message id: " << message_id + << ", " << body_length << " bytes"; + } else { + LOG(ERROR) << "RecvMessage ACK "; + } + + // See if this message has information for recomputing RTT. + uint32 response_to_msg = uint32_unpack(header->last_message_received); + base::TimeTicks last_sent_time = sent_list_.FindLastSendTime(response_to_msg); + if (!last_sent_time.is_null()) { + int rtt = (base::TimeTicks::Now() - last_sent_time).InMicroseconds(); + DCHECK_LE(0, rtt); + LOG(ERROR) << "RTT was: " << rtt << "us"; + rtt_.OnMessage(rtt); + } + + // Handle acknowledgements + uint64 start_byte = 0; + uint64 stop_byte = uint64_unpack(header->acknowledge_1); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + start_byte = stop_byte + uint16_unpack(header->gap_1); + stop_byte = start_byte + uint16_unpack(header->acknowledge_2); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + start_byte = stop_byte + uint16_unpack(header->gap_2); + stop_byte = start_byte + uint16_unpack(header->acknowledge_3); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + start_byte = stop_byte + uint16_unpack(header->gap_3); + stop_byte = start_byte + uint16_unpack(header->acknowledge_4); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + start_byte = stop_byte + uint16_unpack(header->gap_4); + stop_byte = start_byte + uint16_unpack(header->acknowledge_5); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + start_byte = stop_byte + uint16_unpack(header->gap_5); + stop_byte = start_byte + uint16_unpack(header->acknowledge_6); + sent_list_.AcknowledgeBlocks(start_byte, stop_byte); + + if (!header->is_ack()) { + // Add to our received block list + uint64 position = uint64_unpack(header->position); + scoped_refptr<IOBuffer> buffer(new IOBuffer(body_length)); + memcpy(buffer->data(), message->data() + sizeof(Message), body_length); + received_list_.AddBlock(position, buffer, body_length); + + SendAck(message_id); + } + + // If we have data available, and a read is pending, notify the callback. + if (received_list_.bytes_available() && receive_complete_callback_) { + // Pass the data up to the caller. + int bytes_read = InternalRead(pending_receive_, pending_receive_length_); + CompletionCallback* callback = receive_complete_callback_; + receive_complete_callback_ = NULL; + callback->Run(bytes_read); + } +} + +void Messenger::SendAck(uint32 last_message_received) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(Message))); + memset(buffer->data(), 0, sizeof(Message)); + Message* message = reinterpret_cast<Message*>(buffer->data()); + uint32_pack(message->last_message_received, last_message_received); + uint64_pack(message->acknowledge_1, received_list_.bytes_received()); + LOG(ERROR) << "SendAck " << received_list_.bytes_received(); + // TODO(mbelshe): fill in remainder of selective acknowledgements + + // TODO(mbelshe): Fix this - it is totally possible to have a send message + // in progress here... + DCHECK(!send_message_in_progress_); + + int rv = packetizer_->SendMessage(key_, + buffer->data(), + sizeof(Message), + &send_message_callback_); + // TODO(mbelshe): Fix me! Deal with the error cases + DCHECK(rv == sizeof(Message)); +} + +} // namespace net diff --git a/net/curvecp/messenger.h b/net/curvecp/messenger.h new file mode 100644 index 0000000..7bd1bb8 --- /dev/null +++ b/net/curvecp/messenger.h @@ -0,0 +1,104 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_MESSENGER_H_ +#define NET_CURVECP_MESSENGER_H_ +#pragma once + +#include <deque> +#include <list> + +#include "base/basictypes.h" +#include "base/task.h" +#include "base/threading/non_thread_safe.h" +#include "base/timer.h" +#include "net/base/completion_callback.h" +#include "net/curvecp/circular_buffer.h" +#include "net/curvecp/packetizer.h" +#include "net/curvecp/received_block_list.h" +#include "net/curvecp/rtt_and_send_rate_calculator.h" +#include "net/curvecp/sent_block_list.h" +#include "net/socket/socket.h" + +namespace net { + +class DrainableIOBuffer; +class IOBufferWithSize; +class Packetizer; + +// The messenger provides the reliable CurveCP transport. +class Messenger : public base::NonThreadSafe, + public Packetizer::Listener { + public: + explicit Messenger(Packetizer* packetizer); + virtual ~Messenger(); + + int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback); + int Write(IOBuffer* buf, int buf_len, CompletionCallback* callback); + + // Packetizer::Listener methods: + virtual void OnConnection(ConnectionKey key); + virtual void OnClose(Packetizer* packetizer, ConnectionKey key); + virtual void OnMessage(Packetizer* packetizer, + ConnectionKey key, + unsigned char* msg, + size_t length); + + protected: + ConnectionKey key_; + + private: + // Handle reading data from the read queue. + int InternalRead(IOBuffer* buffer, int buffer_length); + + // Extracts data from send queue to create a new buffer of data to send. + IOBufferWithSize* CreateBufferFromSendQueue(); + + // Continuation function after a SendMessage() call was blocked. + void OnSendMessageComplete(int result); + + // Protocol handling routines + void OnTimeout(); + void OnSendTimer(); + void SendMessage(int64 position); + void RecvMessage(); + void SendAck(uint32 last_message_received); + + RttAndSendRateCalculator rtt_; + Packetizer* packetizer_; + + // The send_buffer is a list of pending data to pack into messages and send + // to the remote. + CircularBuffer send_buffer_; + CompletionCallback* send_complete_callback_; + scoped_refptr<IOBuffer> pending_send_; + int pending_send_length_; + + // The read_buffer is a list of pending data which has been unpacked from + // messages and is awaiting delivery to the application. + CompletionCallback* receive_complete_callback_; + scoped_refptr<IOBuffer> pending_receive_; + int pending_receive_length_; + + // The list of received but unprocessed messages. + std::list<scoped_refptr<IOBufferWithSize> > read_queue_; + + ReceivedBlockList received_list_; + SentBlockList sent_list_; + bool send_message_in_progress_; + + // A timer to fire when a timeout has occurred. + base::OneShotTimer<Messenger> send_timeout_timer_; + // A timer to fire when we can send data. + base::OneShotTimer<Messenger> send_timer_; + + CompletionCallbackImpl<Messenger> send_message_callback_; + + ScopedRunnableMethodFactory<Messenger> factory_; + DISALLOW_COPY_AND_ASSIGN(Messenger); +}; + +} // namespace net + +#endif // NET_CURVECP_MESSENGER_H_ diff --git a/net/curvecp/packetizer.h b/net/curvecp/packetizer.h new file mode 100644 index 0000000..30d6c29 --- /dev/null +++ b/net/curvecp/packetizer.h @@ -0,0 +1,51 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_PACKETIZER_H_ +#define NET_CURVECP_PACKETIZER_H_ +#pragma once + +#include "base/basictypes.h" +#include "base/scoped_ptr.h" +#include "net/base/completion_callback.h" +#include "net/curvecp/connection_key.h" + +namespace net { + +class IPEndPoint; + +class Packetizer { + public: + class Listener { + public: + virtual ~Listener() {} + // Callback for new connections. + virtual void OnConnection(ConnectionKey key) = 0; + virtual void OnMessage(Packetizer* packetizer, + ConnectionKey key, + unsigned char* msg, + size_t length) = 0; + }; + + virtual ~Packetizer() {} + + // Send a message on a connection. + virtual int SendMessage(ConnectionKey key, + const char* data, + size_t length, + CompletionCallback* callback) = 0; + + // Close an existing connection. + virtual void Close(ConnectionKey key) = 0; + + virtual int GetPeerAddress(IPEndPoint* addresses) const = 0; + + // Returns the current maximum message size which can be fit into the next + // message payload to be sent on the packetizer. + virtual int max_message_payload() const = 0; +}; + +} // namespace net + +#endif // NET_CURVECP_PACKETIZER_H_ diff --git a/net/curvecp/protocol.cc b/net/curvecp/protocol.cc new file mode 100644 index 0000000..69f9b60 --- /dev/null +++ b/net/curvecp/protocol.cc @@ -0,0 +1,88 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/protocol.h" + +#include "base/basictypes.h" + +namespace net { + +void uint16_pack(uchar* dest, uint16 val) { + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; +} + +uint16 uint16_unpack(uchar* src) { + uint16 result; + result = src[1]; + result <<= 8; + result |= src[0]; + return result; +} + +void uint32_pack(uchar* dest, uint32 val) { + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; +} + +uint32 uint32_unpack(uchar* src) { + uint32 result; + result = src[3]; + result <<= 8; + result |= src[2]; + result <<= 8; + result |= src[1]; + result <<= 8; + result |= src[0]; + return result; +} + +void uint64_pack(uchar* dest, uint64 val) { + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; + *dest++ = val; + val >>= 8; +} + +uint64 uint64_unpack(uchar* src) { + uint64 result; + result = src[7]; + result <<= 8; + result |= src[6]; + result <<= 8; + result |= src[5]; + result <<= 8; + result |= src[4]; + result <<= 8; + result |= src[3]; + result <<= 8; + result |= src[2]; + result <<= 8; + result |= src[1]; + result <<= 8; + result |= src[0]; + return result; +} + +} // namespace net diff --git a/net/curvecp/protocol.h b/net/curvecp/protocol.h new file mode 100644 index 0000000..07eea49 --- /dev/null +++ b/net/curvecp/protocol.h @@ -0,0 +1,158 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_PROTOCOL_H_ +#define NET_CURVECP_PROTOCOL_H_ +#pragma once + +// Mike's thoughts on the protocol: +// 1) the packet layer probably should have a "close" mechanism. although +// some clients won't use it, it is nice to have. +// +// 2) when do we re "initiate"? maybe connections should have an inferred +// lifetime, and client should always re-initiate after 1min? +// +// 3) An initiate packet can cary 640B of data. But how does the app layer +// know that only 640B of data is available there? This is a bit awkward? + +#include "base/basictypes.h" + +namespace net { + +typedef unsigned char uchar; + +// Messages can range in size from 16 to 1088 bytes. +const int kMinMessageLength = 16; +const int kMaxMessageLength = 1088; + +// Maximum packet size. +const int kMaxPacketLength = kMaxMessageLength + 96; + + + +// Packet layer. + +// All Packets start with an 8 byte identifier. +struct Packet { + char id[8]; +}; + +// A HelloPacket is sent from the client to the server to establish a secure +// cookie to use when communicating with a server. +struct HelloPacket : Packet { + uchar server_extension[16]; + uchar client_extension[16]; + uchar client_shortterm_public_key[32]; + uchar zero[64]; + uchar nonce[8]; + uchar box[80]; +}; + +// A CookiePacket is sent from the server to the client in response to a +// HelloPacket. +struct CookiePacket : Packet { + uchar client_extension[16]; + uchar server_extension[16]; + uchar nonce[16]; + uchar box[144]; +}; + +// An InitiatePacket is sent from the client to the server to initiate +// the connection once a cookie has been exchanged. +struct InitiatePacket : Packet { + uchar server_extension[16]; + uchar client_extension[16]; + uchar client_shortterm_public_key[32]; + uchar server_cookie[96]; + uchar initiate_nonce[8]; + + uchar client_longterm_public_key[32]; + uchar nonce[16]; + uchar box[48]; + uchar server_name[256]; + + // A message is carried here. +}; + +// A ClientMessagePacket contains a Message from the client to the server. +struct ClientMessagePacket : Packet { + uchar server_extension[16]; + uchar client_extension[16]; + uchar client_shortterm_public_key[32]; + uchar nonce[8]; + + // A message is carried here of at least 16 bytes. +}; + +// A ServerMessagePacket contains a Message from the server to the client. +struct ServerMessagePacket : Packet { + uchar client_extension[16]; + uchar server_extension[16]; + uchar nonce[8]; + + // A message is carried here of at least 16 bytes. +}; + + + + +// Messaging layer. + +struct Message { + // If message_id is all zero, this Message is a pure ACK message. + uchar message_id[4]; + // For regular messages going back and forth, last_message_received will be + // zero. For an ACK, it will be filled in and can be used to compute RTT. + uchar last_message_received[4]; + uchar acknowledge_1[8]; // bytes acknowledged in the first range. + uchar gap_1[4]; // gap between the first range and the second range. + uchar acknowledge_2[2]; // bytes acknowledged in the second range. + uchar gap_2[2]; // gap between the second range and the third range. + uchar acknowledge_3[2]; // bytes acknowledged in the third range. + uchar gap_3[2]; // gap between the third range and the fourth range. + uchar acknowledge_4[2]; // bytes acknowledged in the fourth range. + uchar gap_4[2]; // gap between the fourth range and the fifth range. + uchar acknowledge_5[2]; // bytes acknowledged in the fifth range. + uchar gap_5[2]; // gap between the fifth range and the sixth range. + uchar acknowledge_6[2]; // bytes acknowledged in the sixth range. + union { + struct { + unsigned short unused:4; + unsigned short fail:1; + unsigned short success:1; + unsigned short length:10; + } bits; + uchar val[2]; + } size; + uchar position[8]; + uchar padding[16]; + + bool is_ack() { return *(reinterpret_cast<int32*>(message_id)) == 0; } +}; + +// Connection state +// TODO(mbelshe) move this to the messenger. +struct ConnectionState { + uchar client_shortterm_public_key[32]; + + // Fields we'll need going forward: + // + // uchar secret_key_client_short_server_short[32]; + // crypto_uint64 received_nonce; + // crypto_uint64 sent_nonce; + // uchar client_extension[16]; + // uchar server_extension[16]; +}; + +void uint16_pack(uchar* dest, uint16 val); +uint16 uint16_unpack(uchar* src); +void uint32_pack(uchar* dest, uint32 val); +uint32 uint32_unpack(uchar* src); +void uint64_pack(uchar* dest, uint64 val); +uint64 uint64_unpack(uchar* src); + + +} // namespace net + +#endif // NET_CURVECP_PROTOCOL_H_ diff --git a/net/curvecp/received_block_list.cc b/net/curvecp/received_block_list.cc new file mode 100644 index 0000000..b7b6059 --- /dev/null +++ b/net/curvecp/received_block_list.cc @@ -0,0 +1,96 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/received_block_list.h" + +#include "net/base/io_buffer.h" + +namespace net { + +ReceivedBlockList::ReceivedBlockList() + : current_position_(0), + bytes_received_(0) { +} + +ReceivedBlockList::~ReceivedBlockList() { +} + +int ReceivedBlockList::bytes_available() const { + int bytes_available = 0; + BlockList::const_iterator it = list_.begin(); + while (it != list_.end()) { + if (it->position != current_position_ + bytes_available) + break; + bytes_available += it->data->size(); + it++; + } + return bytes_available; +} + +void ReceivedBlockList::AddBlock(int64 position, IOBuffer* data, int length) { + if (position < bytes_received_) { + LOG(ERROR) << "Unnecessary retransmit of " << position; + return; + } + + BlockList::iterator it = list_.begin(); + while (it != list_.end()) { + if (it->position == position) { + // Duplicate block. Make sure they are the same length. + DCHECK_EQ(it->data->size(), length); + return; + } + if (it->position > position) { + // The block must not overlap with the next block. + DCHECK_LE(position + length, it->position); + break; + } + it++; + } + Block new_block; + new_block.position = position; + new_block.data = new DrainableIOBuffer(data, length); + list_.insert(it, new_block); + + ComputeBytesReceived(); +} + +int ReceivedBlockList::ReadBytes(IOBuffer* buffer, int size) { + int bytes_read = 0; + + LogBlockList(); + + while (size > 0 && list_.size()) { + BlockList::iterator it = list_.begin(); + if (it->position != current_position_) + break; + int bytes = std::min(size, it->data->BytesRemaining()); + memcpy(buffer->data() + bytes_read, it->data->data(), bytes); + it->data->DidConsume(bytes); + size -= bytes; + bytes_read += bytes; + if (!it->data->BytesRemaining()) { + current_position_ += it->data->size(); + list_.erase(it); + } + } + return bytes_read; +} + +void ReceivedBlockList::ComputeBytesReceived() { + bytes_received_ = current_position_ + bytes_available(); +} + +void ReceivedBlockList::LogBlockList() const { + LOG(INFO) << "Received Blocks: " << list_.size(); + std::string msg; + std::ostringstream stream(msg); + for (size_t index = 0; index < list_.size(); ++index) + stream << "(" << list_[index].position + << "," << list_[index].data->size() << ")"; + LOG(INFO) << stream.str(); +} + + +} // namespace net diff --git a/net/curvecp/received_block_list.h b/net/curvecp/received_block_list.h new file mode 100644 index 0000000..4c05ecf --- /dev/null +++ b/net/curvecp/received_block_list.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#ifndef NET_CURVECP_RECEIVED_BLOCK_LIST_H_ +#define NET_CURVECP_RECEIVED_BLOCK_LIST_H_ +#pragma once + +#include <deque> + +#include "base/memory/ref_counted.h" + +namespace net { + +class DrainableIOBuffer; +class IOBuffer; +class IOBufferWithSize; + +// A ReceivedBlockList manages everything needed to track a set of of incoming +// blocks. It can coalesce continuous blocks and locate holes in the received +// data. +// Note: this list does not expect to receive overlapping blocks such as: +// Block 1: bytes 10-20 +// Block 2: bytes 15-25 +class ReceivedBlockList { + public: + ReceivedBlockList(); + ~ReceivedBlockList(); + + // Returns the total number of bytes received in order. + int bytes_received() const { return bytes_received_; } + + // Returns the number of currently available bytes to read. + int bytes_available() const; + + // Adds an incoming block into the list of received blocks. + void AddBlock(int64 position, IOBuffer* data, int length); + + // Reads data from the buffer starting at |current_position()|. + // |buffer| is the buffer to fill. + // |size| is the maximum number of bytes to be filled into |buffer|. + // Returns the number of bytes read. + int ReadBytes(IOBuffer* buffer, int size); + + // Gets the current position of the received data in the stream. + // This is for testing only. + int current_position() const { return current_position_; } + + // Debugging: Writes the entire blocklist to the log. + void LogBlockList() const; + + private: + typedef struct { + int64 position; // Position of this block. + scoped_refptr<DrainableIOBuffer> data; // The data. + } Block; + typedef std::deque<Block> BlockList; + + // Recomputes |bytes_received_|. + void ComputeBytesReceived(); + + int64 current_position_; + int64 bytes_received_; + BlockList list_; + + DISALLOW_COPY_AND_ASSIGN(ReceivedBlockList); +}; + +} // namespace net + +#endif // NET_CURVECP_RECEIVED_BLOCK_LIST_H_ diff --git a/net/curvecp/rtt_and_send_rate_calculator.cc b/net/curvecp/rtt_and_send_rate_calculator.cc new file mode 100644 index 0000000..da2be78 --- /dev/null +++ b/net/curvecp/rtt_and_send_rate_calculator.cc @@ -0,0 +1,172 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <stdlib.h> + +#include "net/curvecp/rtt_and_send_rate_calculator.h" + +namespace net { + +RttAndSendRateCalculator::RttAndSendRateCalculator() + : rtt_latest_(0), + rtt_average_(0), + rtt_deviation_(0), + rtt_lowwater_(0), + rtt_highwater_(0), + rtt_timeout_(base::Time::kMicrosecondsPerSecond), + send_rate_(1000000), + rtt_seenrecenthigh_(0), + rtt_seenrecentlow_(0), + rtt_seenolderhigh_(0), + rtt_seenolderlow_(0), + rtt_phase_(false) { +} + +void RttAndSendRateCalculator::OnMessage(int32 rtt_us) { + UpdateRTT(rtt_us); + AdjustSendRate(); +} + +void RttAndSendRateCalculator::OnTimeout() { + base::TimeDelta time_since_last_loss = last_sample_time_ - last_loss_time_; + if (time_since_last_loss.InMilliseconds() < 4 * rtt_timeout_) + return; + send_rate_ *= 2; + last_loss_time_ = last_sample_time_; + last_edge_time_ = last_sample_time_; +} + +// Updates RTT +void RttAndSendRateCalculator::UpdateRTT(int32 rtt_us) { + rtt_latest_ = rtt_us; + last_sample_time_ = base::TimeTicks::Now(); + + // Initialize for the first sample. + if (!rtt_average_) { + rtt_average_ = rtt_latest_; + rtt_deviation_ = rtt_latest_; + rtt_highwater_ = rtt_latest_; + rtt_lowwater_ = rtt_latest_; + } + + // Jacobson's retransmission timeout calculation. + int32 rtt_delta = rtt_latest_ - rtt_average_; + rtt_average_ += rtt_delta / 8; + if (rtt_delta < 0) + rtt_delta = -rtt_delta; + rtt_delta -= rtt_deviation_; + rtt_deviation_ += rtt_delta / 4; + rtt_timeout_ = rtt_average_ + (4 * rtt_deviation_); + + // Adjust for delayed acks with anti-spiking. + // TODO(mbelshe): this seems high. + rtt_timeout_ += 8 * send_rate_; + + // Recognize the top and bottom of the congestion cycle. + rtt_delta = rtt_latest_ - rtt_highwater_; + rtt_highwater_ += rtt_delta / 1024; + + rtt_delta = rtt_latest_ - rtt_lowwater_; + if (rtt_delta > 0) + rtt_lowwater_ += rtt_delta / 8192; + else + rtt_lowwater_ += rtt_delta / 256; +} + +void RttAndSendRateCalculator::AdjustSendRate() { + last_sample_time_ = base::TimeTicks::Now(); + + base::TimeDelta time = last_sample_time_ - last_send_adjust_time_; + + // We only adjust the send rate approximately every 16 samples. + if (time.InMicroseconds() < 16 * send_rate_) + return; + + if (rtt_average_ > + rtt_highwater_ + (5 * base::Time::kMicrosecondsPerMillisecond)) + rtt_seenrecenthigh_ = true; + else if (rtt_average_ < rtt_lowwater_) + rtt_seenrecentlow_ = true; + + last_send_adjust_time_ = last_sample_time_; + + // If too much time has elapsed, re-initialize the send_rate. + if (time.InMicroseconds() > 10 * base::Time::kMicrosecondsPerSecond) { + send_rate_ = base::Time::kMicrosecondsPerSecond; // restart. + send_rate_ += randommod(send_rate_ / 8); + } + + // TODO(mbelshe): Why is 128us a lower bound? + if (send_rate_ >= 128) { + // Additive increase: adjust 1/N by a constant c. + // rtt-fair additive increase: adjust 1/N by a constant c every nanosec. + // approximation: adjust 1/N by cN every N nanoseconds. + + // TODO(mbelshe): he used c == 2^-51. for nanosecs. + // I use c == 2^31, for microsecs. + + // i.e. N <- 1/(1/N + cN) = N/(1 + cN^2) every N nanosec. + if (false && send_rate_ < 16384) { + // N/(1+cN^2) approx N - cN^3 + // TODO(mbelshe): note that he is using (cN)^3 here, not what he wrote. + int32 msec = send_rate_ / 128; + send_rate_ -= msec * msec * msec; + } else { + double d = send_rate_; + send_rate_ = d / (1 + d * d / 2147483648.0); // 2^31 + } + } + + if (rtt_phase_ == false) { + if (rtt_seenolderhigh_) { + rtt_phase_ = true; + last_edge_time_ = last_sample_time_; + send_rate_ += randommod(send_rate_ / 4); + } + } else { + if (rtt_seenolderlow_) { + rtt_phase_ = false; + } + } + + rtt_seenolderhigh_ = rtt_seenrecenthigh_; + rtt_seenolderlow_ = rtt_seenrecentlow_; + rtt_seenrecenthigh_ = false; + rtt_seenrecentlow_ = false; + + AttemptToDoubleSendRate(); +} + +void RttAndSendRateCalculator::AttemptToDoubleSendRate() { + base::TimeDelta time_since_edge = last_sample_time_ - last_edge_time_; + base::TimeDelta time_since_doubling = + last_sample_time_ - last_doubling_time_; + + + int32 threshold = 0; + if (time_since_edge.InMicroseconds() < + base::Time::kMicrosecondsPerSecond * 60) { + threshold = (4 * send_rate_) + + (64 * rtt_timeout_) + + (5 * base::Time::kMicrosecondsPerSecond); + } else { + threshold = (4 * send_rate_) + + (2 * rtt_timeout_); + } + if (time_since_doubling.InMicroseconds() < threshold) + return; + + if (send_rate_ < 64) + return; + + send_rate_ /= 2; + last_doubling_time_ = last_sample_time_; +} + +// returns a random number from 0 to val-1. +int32 RttAndSendRateCalculator::randommod(int32 val) { + return rand() % val; +} + +} // namespace net diff --git a/net/curvecp/rtt_and_send_rate_calculator.h b/net/curvecp/rtt_and_send_rate_calculator.h new file mode 100644 index 0000000..ad7aca4 --- /dev/null +++ b/net/curvecp/rtt_and_send_rate_calculator.h @@ -0,0 +1,72 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_ +#define NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_ +#pragma once + +#include "base/basictypes.h" +#include "base/time.h" + +namespace net { + +// Calculator for tracking the current RTT value. +// Measurements are in microseconds. +class RttAndSendRateCalculator { + public: + RttAndSendRateCalculator(); + + // Returns the current, average RTT, in microseconds. + int32 rtt_average() const { return rtt_average_; } + + // Returns the current, RTT timeout, in microseconds. + int32 rtt_timeout() const { return rtt_timeout_; } + + // The current send rate. + // Measurement is in microseconds between sends. + int32 send_rate() const { return send_rate_; } + + int32 rtt_hi() const { return rtt_highwater_; } + int32 rtt_lo() const { return rtt_lowwater_; } + + // Called when a new RTT sample is measured. + void OnMessage(int32 rtt_us); + + // Adjusts the sendrate when a timeout condition has occurred. + void OnTimeout(); + + private: + // Updates RTT + void UpdateRTT(int32 rtt_us); + + // Adjusts the send rate when a message is received. + void AdjustSendRate(); + + void AttemptToDoubleSendRate(); + + // returns a random number from 0 to val-1. + int32 randommod(int32 val); + + int32 rtt_latest_; // The most recent RTT + int32 rtt_average_; + int32 rtt_deviation_; + int32 rtt_lowwater_; + int32 rtt_highwater_; + int32 rtt_timeout_; + int32 send_rate_; + base::TimeTicks last_sample_time_; + base::TimeTicks last_send_adjust_time_; + base::TimeTicks last_edge_time_; + base::TimeTicks last_doubling_time_; + base::TimeTicks last_loss_time_; + bool rtt_seenrecenthigh_; + bool rtt_seenrecentlow_; + bool rtt_seenolderhigh_; + bool rtt_seenolderlow_; + bool rtt_phase_; +}; + +} // namespace net + +#endif // NET_CURVECP_RTT_AND_SEND_RATE_CALCULATOR_H_ diff --git a/net/curvecp/sent_block_list.cc b/net/curvecp/sent_block_list.cc new file mode 100644 index 0000000..2eb55dc --- /dev/null +++ b/net/curvecp/sent_block_list.cc @@ -0,0 +1,133 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/sent_block_list.h" + +#include "base/logging.h" +#include "net/base/io_buffer.h" + +namespace net { + +static const size_t kMaxBlocks = 128; + +SentBlockList::SentBlockList() + : current_message_id_(1), + send_sequence_number_(0) { +} + +SentBlockList::~SentBlockList() { +} + +int64 SentBlockList::CreateBlock(IOBufferWithSize* buffer) { + DCHECK_LT(0, buffer->size()); + + if (is_full()) + return -1; + + Block new_block; + new_block.position = send_sequence_number_; + new_block.length = buffer->size(); + new_block.transmissions = 0; + new_block.last_message_id = 0; + new_block.data= buffer; + + send_sequence_number_ += buffer->size(); + list_.push_back(new_block); + return new_block.position; +} + +bool SentBlockList::MarkBlockSent(int64 position, int64 message_id) { + int index = FindBlock(position); + if (index < 0) { + NOTREACHED(); + return false; + } + list_[index].last_message_id = message_id; + list_[index].transmissions++; + list_[index].last_sent_time = base::TimeTicks::Now(); + return true; +} + +void SentBlockList::AcknowledgeBlocks(int64 begin_range, int64 end_range) { + if (begin_range == end_range) + return; + + // TODO(mbelshe): use a better data structure. + LOG(ERROR) << "ACK of: " << begin_range << " to " << end_range; + + BlockList::iterator it = list_.begin(); + while (it != list_.end()) { + int64 position = it->position; + int64 length = it->length; + if (position >= begin_range && (position + length) <= end_range) { + list_.erase(it); + it = list_.begin(); // iterator was invalidated, so go to start of list. + continue; + } + + // Verify we didn't have a partial block acknowledgement. + CHECK(position < begin_range || position >= end_range); + CHECK((position + length) < begin_range || (position + length) > end_range); + it++; + } +} + +int64 SentBlockList::GetNewMessageId() { + return current_message_id_++; +} + +IOBufferWithSize* SentBlockList::FindBlockByPosition(int64 position) const { + int index = FindBlock(position); + if (index < 0) + return NULL; + return list_[index].data; +} + +base::TimeTicks SentBlockList::FindLastSendTime(int64 last_message_id) const { + for (size_t index = 0; index < list_.size(); ++index) + if (list_[index].last_message_id == last_message_id) + return list_[index].last_sent_time; + return base::TimeTicks(); +} + +int SentBlockList::FindBlock(int64 position) const { + for (size_t index = 0; index < list_.size(); ++index) + if (list_[index].position == position) + return index; + return -1; +} + +int64 SentBlockList::FindPositionOfOldestSentBlock() const { + base::TimeTicks oldest_time; + int64 position = -1; + + LogBlockList(); + + // Walks the entire list. + for (size_t index = 0; index < list_.size(); ++index) { + base::TimeTicks last_sent_time = list_[index].last_sent_time; + if (!last_sent_time.is_null()) { + if (last_sent_time < oldest_time || oldest_time.is_null()) { + oldest_time = last_sent_time; + position = list_[index].position; + } + } + } + return position; +} +bool SentBlockList::is_full() const { + return list_.size() == kMaxBlocks; +} + +void SentBlockList::LogBlockList() const { + LOG(INFO) << "Sent Blocks: " << list_.size(); + std::string msg; + std::ostringstream stream(msg); + for (size_t index = 0; index < list_.size(); ++index) + stream << "(" << list_[index].position + << "," << list_[index].length << ")"; + LOG(INFO) << stream.str(); +} + +} // namespace net diff --git a/net/curvecp/sent_block_list.h b/net/curvecp/sent_block_list.h new file mode 100644 index 0000000..c63e8bfd --- /dev/null +++ b/net/curvecp/sent_block_list.h @@ -0,0 +1,95 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_SENT_BLOCK_LIST_H_ +#define NET_CURVECP_SENT_BLOCK_LIST_H_ +#pragma once + +#include <vector> + +#include "base/memory/ref_counted.h" +#include "base/time.h" + +namespace net { + +class IOBufferWithSize; + +// A SentBlockList manages everything needed to track a set of of blocks +// which have been sent. It can assign new ids to new blocks, find pending +// blocks by id, or discard blocks after they have been acknowledged by the +// remote. +class SentBlockList { + public: + SentBlockList(); + ~SentBlockList(); + + // Creates a new block to be tracked. + // On success, returns the unique position of this buffer within the stream, + // which can be used to lookup this block later. + // Returns -1 if the blocklist is already full. + int64 CreateBlock(IOBufferWithSize* buffer); + + // Tracks that a block has been sent. + // |position| is the identifier of the block being sent. + // |message_id| is the id of the message sent containing the block. + // Returns false if no block for |position| exists. + bool MarkBlockSent(int64 position, int64 message_id); + + // Acknowledges ranges from |begin_range| to |end_range|. + // Partial acknowledgements (e.g. where the ack range spans a partial + // block) are not supported. + // Removes blocks covered by these ranges from tracking. + void AcknowledgeBlocks(int64 begin_range, int64 end_range); + + // Returns a new, unique message id. + int64 GetNewMessageId(); + + // Find a block based on its |position|. + // Returns the block, if found, or NULL otherwise. + IOBufferWithSize* FindBlockByPosition(int64 position) const; + + // Find the last send time of a block, based the id of the last + // message to send it. + base::TimeTicks FindLastSendTime(int64 last_message_id) const; + + // Returns the position of the oldest sent block. + int64 FindPositionOfOldestSentBlock() const; + + // Returns the number of blocks in the list. + size_t size() const { return list_.size(); } + + // Returns true if the list is full and cannot take new blocks. + bool is_full() const; + + private: + // An UnackedBlock is a block of application data which has been sent in a + // message, but not acknowledged by the other side yet. We track this + // because we may need to retransmit. + typedef struct { + int64 position; // Offset of this block in the stream. + int64 length; // Number of bytes in this block. + int64 transmissions; // Count of transmissions of this block. + int64 last_message_id; // ID of last message sending this block. + base::TimeTicks last_sent_time; // Time of last message sending this block. + scoped_refptr<IOBufferWithSize> data; + } Block; + + typedef std::vector<Block> BlockList; + + // Finds a block by position and returns it index within |list_|. + int FindBlock(int64 position) const; + + // Debugging: Writes the entire blocklist to the log. + void LogBlockList() const; + + int64 current_message_id_; // The current message id. + int64 send_sequence_number_; // The sending sequence number. + BlockList list_; + + DISALLOW_COPY_AND_ASSIGN(SentBlockList); +}; + +} // namespace net + +#endif // NET_CURVECP_SENT_BLOCK_LIST_H_ diff --git a/net/curvecp/server_messenger.cc b/net/curvecp/server_messenger.cc new file mode 100644 index 0000000..53bad71 --- /dev/null +++ b/net/curvecp/server_messenger.cc @@ -0,0 +1,32 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/server_messenger.h" + +namespace net { + +ServerMessenger::ServerMessenger(Packetizer* packetizer, + Acceptor* acceptor) + : Messenger(packetizer), + acceptor_(acceptor) { + DCHECK(acceptor_); +} + +ServerMessenger::ServerMessenger(ConnectionKey key, + Packetizer* packetizer, + Acceptor* acceptor) + : Messenger(packetizer), + acceptor_(acceptor) { + DCHECK(acceptor_); + key_ = key; +} + +ServerMessenger::~ServerMessenger() { +} + +void ServerMessenger::OnConnection(ConnectionKey key) { + acceptor_->OnAccept(key); +} + +} // namespace net diff --git a/net/curvecp/server_messenger.h b/net/curvecp/server_messenger.h new file mode 100644 index 0000000..d7fb95a --- /dev/null +++ b/net/curvecp/server_messenger.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_SERVER_MESSENGER_H_ +#define NET_CURVECP_SERVER_MESSENGER_H_ +#pragma once + +#include <list> + +#include "base/basictypes.h" +#include "base/task.h" +#include "net/curvecp/messenger.h" +#include "net/curvecp/packetizer.h" +#include "net/socket/socket.h" + +namespace net { + +// A specialized messenger for listening on a socket, accepting new sockets, +// and dispatching io to secondary +class ServerMessenger : public Messenger { + public: + class Acceptor { + public: + virtual ~Acceptor() {} + virtual void OnAccept(ConnectionKey key) = 0; + }; + + ServerMessenger(Packetizer* packetizer, Acceptor* acceptor); + ServerMessenger(ConnectionKey key, + Packetizer* packetizer, + Acceptor* acceptor); + virtual ~ServerMessenger(); + + // Override OnConnection to track incoming connections. + virtual void OnConnection(ConnectionKey key); + + private: + Acceptor* acceptor_; + DISALLOW_COPY_AND_ASSIGN(ServerMessenger); +}; + +} // namespace net + +#endif // NET_CURVECP_SERVER_MESSENGER_H_ diff --git a/net/curvecp/server_packetizer.cc b/net/curvecp/server_packetizer.cc new file mode 100644 index 0000000..4715a3b --- /dev/null +++ b/net/curvecp/server_packetizer.cc @@ -0,0 +1,246 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/server_packetizer.h" + +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/curvecp/protocol.h" +#include "net/udp/udp_server_socket.h" + +namespace net { + +ServerPacketizer::ServerPacketizer() + : Packetizer(), + state_(NONE), + listener_(NULL), + read_buffer_(new IOBuffer(kMaxPacketLength)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &ServerPacketizer::OnReadComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_callback_(this, &ServerPacketizer::OnWriteComplete)) { +} + +ServerPacketizer::~ServerPacketizer() { +} + +int ServerPacketizer::Listen(const IPEndPoint& endpoint, + Packetizer::Listener* listener) { + DCHECK(!listener_); + listener_ = listener; + socket_.reset(new UDPServerSocket(NULL, NetLog::Source())); + int rv = socket_->Listen(endpoint); + if (rv != OK) + return rv; + + return ReadPackets(); +} + +bool ServerPacketizer::Open(ConnectionKey key, Packetizer::Listener* listener) { + DCHECK(listener_map_.find(key) == listener_map_.end()); + listener_map_[key] = listener; + return true; +} + +int ServerPacketizer::SendMessage(ConnectionKey key, + const char* data, + size_t length, + CompletionCallback* callback) { + DCHECK(socket_.get()); + DCHECK_LT(0u, length); + DCHECK_GT(kMaxPacketLength - sizeof(ServerMessagePacket), length); + + ConnectionMap::const_iterator it = connection_map_.find(key); + if (it == connection_map_.end()) { + LOG(ERROR) << "Unknown connection key"; + return ERR_FAILED; // No route to the client! + } + IPEndPoint endpoint = it->second; + + // Build up a message packet. + scoped_refptr<IOBuffer> buffer(new IOBuffer(kMaxPacketLength)); + ServerMessagePacket* packet = + reinterpret_cast<ServerMessagePacket*>(buffer->data()); + memset(packet, 0, sizeof(ServerMessagePacket)); + memcpy(packet->id, "RL3aNMXM", 8); + memcpy(&buffer->data()[sizeof(ServerMessagePacket)], data, length); + int packet_length = sizeof(ServerMessagePacket) + length; + int rv = socket_->SendTo(buffer, packet_length, endpoint, callback); + if (rv <= 0) + return rv; + CHECK_EQ(packet_length, rv); + return length; // The number of message bytes written. +} + +void ServerPacketizer::Close(ConnectionKey key) { + ListenerMap::iterator it = listener_map_.find(key); + DCHECK(it != listener_map_.end()); + listener_map_.erase(it); + socket_->Close(); + socket_.reset(NULL); +} + +int ServerPacketizer::GetPeerAddress(IPEndPoint* endpoint) const { + return socket_->GetPeerAddress(endpoint); +} + +int ServerPacketizer::max_message_payload() const { + return kMaxMessageLength - sizeof(Message); +} + +void ServerPacketizer::ProcessRead(int result) { + DCHECK_GT(result, 0); + + // The smallest packet we can receive is a ClientMessagePacket. + if (result < static_cast<int>(sizeof(ClientMessagePacket)) || + result > kMaxPacketLength) + return; + + // Packets are always 16 byte padded. + if (result & 15) + return; + + Packet *packet = reinterpret_cast<Packet*>(read_buffer_->data()); + if (memcmp(packet, "QvnQ5Xl", 7)) + return; + + switch (packet->id[7]) { + case 'H': + HandleHelloPacket(packet, result); + break; + case 'I': + HandleInitiatePacket(packet, result); + break; + case 'M': + HandleClientMessagePacket(packet, result); + break; + } +} + +void ServerPacketizer::HandleHelloPacket(Packet* packet, int length) { + if (length != sizeof(HelloPacket)) + return; + + LOG(ERROR) << "Received Hello Packet"; + + HelloPacket* hello_packet = reinterpret_cast<HelloPacket*>(packet); + + // Handle HelloPacket + scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(struct CookiePacket))); + struct CookiePacket* data = + reinterpret_cast<struct CookiePacket*>(buffer->data()); + memset(data, 0, sizeof(struct CookiePacket)); + memcpy(data->id, "RL3aNMXK", 8); + memcpy(data->client_extension, hello_packet->client_extension, 16); + // TODO(mbelshe) Fill in the rest of the CookiePacket fields. + + // XXXMB - Can't have two pending writes at the same time... + int rv = socket_->SendTo(buffer, sizeof(struct CookiePacket), recv_address_, + &write_callback_); + DCHECK(rv == ERR_IO_PENDING || rv == sizeof(struct CookiePacket)); +} + +void ServerPacketizer::HandleInitiatePacket(Packet* packet, int length) { + // Handle InitiatePacket + LOG(ERROR) << "Received Initiate Packet"; + + InitiatePacket* initiate_packet = reinterpret_cast<InitiatePacket*>(packet); + + // We have an active connection. + AddConnection(initiate_packet->client_shortterm_public_key, recv_address_); + + listener_->OnConnection(initiate_packet->client_shortterm_public_key); + + // The initiate packet can carry a message. + int message_length = length - sizeof(InitiatePacket); + DCHECK_LT(0, message_length); + if (message_length) { + uchar* data = reinterpret_cast<uchar*>(packet); + HandleMessage(initiate_packet->client_shortterm_public_key, + &data[sizeof(InitiatePacket)], + message_length); + } +} + +void ServerPacketizer::HandleClientMessagePacket(Packet* packet, int length) { + // Handle Message + if (length < 16) + return; + + const int kMaxMessagePacketLength = + kMaxMessageLength + sizeof(ClientMessagePacket); + if (length > static_cast<int>(kMaxMessagePacketLength)) + return; + + ClientMessagePacket* message_packet = + reinterpret_cast<ClientMessagePacket*>(packet); + + int message_length = length - sizeof(ClientMessagePacket); + DCHECK_LT(0, message_length); + if (message_length) { + uchar* data = reinterpret_cast<uchar*>(packet); + HandleMessage(message_packet->client_shortterm_public_key, + &data[sizeof(ClientMessagePacket)], + message_length); + } +} + +void ServerPacketizer::HandleMessage(ConnectionKey key, + unsigned char* msg, + int length) { + ListenerMap::iterator it = listener_map_.find(key); + if (it == listener_map_.end()) { + // Received a message for a closed connection. + return; + } + + // Decode the message here + + Packetizer::Listener* listener = it->second; + listener->OnMessage(this, key, msg, length); +} + +void ServerPacketizer::AddConnection(ConnectionKey key, + const IPEndPoint& endpoint) { + DCHECK(connection_map_.find(key) == connection_map_.end()); + connection_map_[key] = endpoint; +} + +void ServerPacketizer::RemoveConnection(ConnectionKey key) { + DCHECK(connection_map_.find(key) != connection_map_.end()); + connection_map_.erase(key); +} + +int ServerPacketizer::ReadPackets() { + DCHECK(socket_.get()); + + int rv; + while (true) { + rv = socket_->RecvFrom(read_buffer_, + kMaxPacketLength, + &recv_address_, + &read_callback_); + if (rv <= 0) { + if (rv != ERR_IO_PENDING) + LOG(ERROR) << "Error reading listen socket: " << rv; + return rv; + } + + ProcessRead(rv); + } + + return rv; +} + +void ServerPacketizer::OnReadComplete(int result) { + if (result > 0) + ProcessRead(result); + ReadPackets(); +} + +void ServerPacketizer::OnWriteComplete(int result) { + // TODO(mbelshe): do we need to do anything? +} + +} diff --git a/net/curvecp/server_packetizer.h b/net/curvecp/server_packetizer.h new file mode 100644 index 0000000..2fa60c7 --- /dev/null +++ b/net/curvecp/server_packetizer.h @@ -0,0 +1,97 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_SERVER_PACKETIZER_H_ +#define NET_CURVECP_SERVER_PACKETIZER_H_ +#pragma once + +#include <map> + +#include "base/memory/ref_counted.h" +#include "net/base/completion_callback.h" +#include "net/base/ip_endpoint.h" +#include "net/curvecp/packetizer.h" +#include "net/curvecp/protocol.h" + +namespace net { + +class IOBuffer; +class IPEndPoint; +class UDPServerSocket; + +class ServerPacketizer : public base::RefCounted<ServerPacketizer>, + public Packetizer { + public: + ServerPacketizer(); + virtual ~ServerPacketizer(); + + // Listen for new connections from the Packetizer. + int Listen(const IPEndPoint& endpoint, Packetizer::Listener* listener); + + // Register a listener for a connection. + // To revoke the registration, call Close(). + bool Open(ConnectionKey key, Packetizer::Listener* listener); + + // Packetizer methods + virtual int SendMessage(ConnectionKey key, + const char* data, + size_t length, + CompletionCallback* callback); + virtual void Close(ConnectionKey key); + virtual int GetPeerAddress(IPEndPoint* endpoint) const; + virtual int max_message_payload() const; + + private: + enum State { + NONE, // The initial state, before listen. + LISTENING, // Listening for packets. + }; + + typedef std::map<ConnectionKey, Packetizer::Listener*> ListenerMap; + typedef std::map<ConnectionKey, IPEndPoint> ConnectionMap; + + // Callbacks when an internal IO is completed. + void OnReadComplete(int result); + void OnWriteComplete(int result); + + // Process the result of a Read operation. + void ProcessRead(int bytes_read); + + // Read packets until an error occurs. + int ReadPackets(); + + // Handlers for recognized packets. + void HandleHelloPacket(Packet* packet, int length); + void HandleInitiatePacket(Packet* packet, int length); + void HandleClientMessagePacket(Packet* packet, int length); + void HandleMessage(ConnectionKey key, unsigned char* message, int length); + + // Manage the list of known "connections". + void AddConnection(ConnectionKey key, const IPEndPoint& endpoint); + // TODO(mbelshe): need to trim out aged/idle connections + void RemoveConnection(ConnectionKey key); + bool IsActiveConnection(ConnectionKey key); + + State state_; + scoped_ptr<UDPServerSocket> socket_; + Packetizer::Listener* listener_; // Accept listener. + + scoped_refptr<IOBuffer> read_buffer_; + IPEndPoint recv_address_; + + // The connection map tracks active client addresses which are known + // to this server. + ConnectionMap connection_map_; + // The listener map tracks active message listeners known to the packetizer. + ListenerMap listener_map_; + + CompletionCallbackImpl<ServerPacketizer> read_callback_; + CompletionCallbackImpl<ServerPacketizer> write_callback_; + + DISALLOW_COPY_AND_ASSIGN(ServerPacketizer); +}; + +} // namespace net + +#endif // NET_CURVECP_SERVER_PACKETIZER_H_ diff --git a/net/curvecp/test_client.cc b/net/curvecp/test_client.cc new file mode 100644 index 0000000..f71f162 --- /dev/null +++ b/net/curvecp/test_client.cc @@ -0,0 +1,178 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/test_client.h" + +#include <string> +#include <deque> +#include <vector> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/address_list.h" +#include "net/base/host_resolver.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/net_log.h" +#include "net/curvecp/curvecp_client_socket.h" + +namespace net { + +TestClient::TestClient() + : socket_(NULL), + errors_(0), + bytes_to_send_(0), + ALLOW_THIS_IN_INITIALIZER_LIST( + connect_callback_(this, &TestClient::OnConnectComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &TestClient::OnReadComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_callback_(this, &TestClient::OnWriteComplete)), + finished_callback_(NULL) { +} + +TestClient::~TestClient() { + if (socket_) { + // TODO(mbelshe): The CurveCPClientSocket has a method called Disconnect. + // The CurveCPServerSocket has a method called Close. + // Unify them into either Close or Disconnect!!! + socket_->Disconnect(); + socket_ = NULL; + } +} + +bool TestClient::Start(const HostPortPair& server_host_port_pair, + int bytes_to_send, + CompletionCallback* callback) { + DCHECK(!socket_); + DCHECK(!finished_callback_); + + finished_callback_ = callback; + bytes_to_read_ = bytes_to_send_ = bytes_to_send; + + scoped_ptr<HostResolver> system_host_resolver( + CreateSystemHostResolver(1, NULL)); + SingleRequestHostResolver host_resolver(system_host_resolver.get()); + HostResolver::RequestInfo request(server_host_port_pair); + AddressList addresses; + int rv = host_resolver.Resolve(request, &addresses, NULL, BoundNetLog()); + if (rv != OK) { + LOG(ERROR) << "Could not resolve host"; + return false; + } + + socket_ = new CurveCPClientSocket(addresses, NULL, NetLog::Source()); + rv = socket_->Connect(&connect_callback_); + if (rv == ERR_IO_PENDING) + return true; + OnConnectComplete(rv); + return rv == OK; +} + +void TestClient::OnConnectComplete(int result) { + LOG(ERROR) << "Connect complete"; + if (result < 0) { + LOG(ERROR) << "Connect failure: " << result; + errors_++; + Finish(result); + return; + } + + DCHECK(bytes_to_send_); // We should have data to send. + ReadData(); + + DCHECK_EQ(bytes_to_send_, bytes_to_read_); + + SendData(); +} + +void TestClient::OnReadComplete(int result) { + if (result <= 0) { + LOG(ERROR) << "Read failure: " << result; + errors_++; + Finish(result); + return; + } + + if (!received_stream_.VerifyBytes(read_buffer_->data(), result)) { + if (!errors_) // Only log the first error. + LOG(ERROR) << "Client Received corrupt receive data!"; + errors_++; + } + + read_buffer_ = NULL; + bytes_to_read_ -= result; + + // Now read more data... + if (bytes_to_read_) + ReadData(); + else + Finish(OK); +} + +void TestClient::OnWriteComplete(int result) { + LOG(ERROR) << "Write complete (remaining = " << bytes_to_send_ << ")"; + if (result <= 0) { + errors_++; + Finish(result); + return; + } + + write_buffer_->DidConsume(result); + bytes_to_send_ -= result; + if (!write_buffer_->BytesRemaining()) + write_buffer_ = NULL; + + if (bytes_to_send_) + SendData(); +} + +void TestClient::ReadData() { + DCHECK(!read_buffer_.get()); + read_buffer_ = new IOBuffer(kMaxMessage); + + int rv; + do { + rv = socket_->Read(read_buffer_, kMaxMessage, &read_callback_); + if (rv == ERR_IO_PENDING) + return; + OnReadComplete(rv); // Complete the read manually + } while (rv > 0); +} + +void TestClient::SendData() { + DCHECK(bytes_to_send_); // We should have data to send. + const int kWriteChunkSize = 777; // 777 is more abusive + + do { + if (!write_buffer_.get()) { + int bytes_to_send = std::min(kWriteChunkSize, bytes_to_send_); + scoped_refptr<IOBuffer> buffer(new IOBuffer(bytes_to_send)); + sent_stream_.GetBytes(buffer->data(), bytes_to_send); + write_buffer_ = new DrainableIOBuffer(buffer, bytes_to_send); + } + + int rv = socket_->Write(write_buffer_, + write_buffer_->BytesRemaining(), + &write_callback_); + if (rv == ERR_IO_PENDING) + return; + + write_buffer_->DidConsume(rv); + bytes_to_send_ -= rv; + if (!write_buffer_->BytesRemaining()) + write_buffer_ = NULL; + } while (bytes_to_send_); +} + +void TestClient::Finish(int result) { + DCHECK(finished_callback_); + + LOG(ERROR) << "TestClient Done!"; + CompletionCallback* callback = finished_callback_; + finished_callback_ = NULL; + callback->Run(result); +} + +} // namespace net diff --git a/net/curvecp/test_client.h b/net/curvecp/test_client.h new file mode 100644 index 0000000..08f496f --- /dev/null +++ b/net/curvecp/test_client.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_TEST_CLIENT_H_ +#define NET_CURVECP_TEST_CLIENT_H_ +#pragma once + +#include "base/task.h" +#include "net/base/completion_callback.h" +#include "net/base/host_port_pair.h" +#include "net/base/io_buffer.h" +#include "net/curvecp/test_data_stream.h" + +namespace net { + +class CurveCPClientSocket; + +// The TestClient connects to a test server, sending a stream of verifiable +// bytes. The TestClient expects to get the same bytes echoed back from the +// TestServer. After sending all bytes and receiving the echoes, the +// TestClient closes itself. +// +// Several hooks are provided for testing edge cases and failures. +class TestClient { + public: + TestClient(); + virtual ~TestClient(); + + // Starts the client, connecting to |server|. + // Client will send |bytes_to_send| bytes from the verifiable stream. + // When the client has received all echoed bytes from the server, or + // when an error occurs causing the client to stop, |callback| will be + // called with a net status code. + // Returns true if successful in starting the client. + bool Start(const HostPortPair& server, + int bytes_to_send, + CompletionCallback* callback); + + // Returns the number of errors this server encountered. + int error_count() { return errors_; } + + private: + static const int kMaxMessage = 1024; + + void OnConnectComplete(int result); + void OnReadComplete(int result); + void OnWriteComplete(int result); + + void ReadData(); + void SendData(); + void Finish(int result); + + CurveCPClientSocket* socket_; + scoped_refptr<IOBuffer> read_buffer_; + scoped_refptr<DrainableIOBuffer> write_buffer_; + int errors_; + int bytes_to_read_; + int bytes_to_send_; + TestDataStream sent_stream_; + TestDataStream received_stream_; + CompletionCallbackImpl<TestClient> connect_callback_; + CompletionCallbackImpl<TestClient> read_callback_; + CompletionCallbackImpl<TestClient> write_callback_; + CompletionCallback* finished_callback_; +}; + +} // namespace net + +#endif // NET_CURVECP_TEST_CLIENT_H_ diff --git a/net/curvecp/test_data_stream.h b/net/curvecp/test_data_stream.h new file mode 100644 index 0000000..d2bbc87 --- /dev/null +++ b/net/curvecp/test_data_stream.h @@ -0,0 +1,88 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_TEST_DATA_STREAM_H_ +#define NET_CURVECP_TEST_DATA_STREAM_H_ +#pragma once + +#include <string.h> // for memcpy() +#include <algorithm> + +// This is a test class for generating an infinite stream of data which can +// be verified independently to be the correct stream of data. + +namespace net { + +class TestDataStream { + public: + TestDataStream() { + Reset(); + } + + // Fill |buffer| with |length| bytes of data from the stream. + void GetBytes(char* buffer, int length) { + while (length) { + AdvanceIndex(); + int bytes_to_copy = std::min(length, bytes_remaining_); + memcpy(buffer, buffer_ptr_, bytes_to_copy); + buffer += bytes_to_copy; + Consume(bytes_to_copy); + length -= bytes_to_copy; + } + } + + // Verify that |buffer| contains the expected next |length| bytes from the + // stream. Returns true if correct, false otherwise. + bool VerifyBytes(const char *buffer, int length) { + while (length) { + AdvanceIndex(); + int bytes_to_compare = std::min(length, bytes_remaining_); + if (memcmp(buffer, buffer_ptr_, bytes_to_compare)) + return false; + Consume(bytes_to_compare); + length -= bytes_to_compare; + buffer += bytes_to_compare; + } + return true; + } + + void Reset() { + index_ = 0; + bytes_remaining_ = 0; + buffer_ptr_ = buffer_; + } + + private: + // If there is no data spilled over from the previous index, advance the + // index and fill the buffer. + void AdvanceIndex() { + if (bytes_remaining_ == 0) { + // Convert it to ascii, but don't bother to reverse it. + // (e.g. 12345 becomes "54321") + int val = index_++; + do { + buffer_[bytes_remaining_++] = (val % 10) + '0'; + } while ((val /= 10) > 0); + buffer_[bytes_remaining_++] = '.'; + } + } + + // Consume data from the spill buffer. + void Consume(int bytes) { + bytes_remaining_ -= bytes; + if (bytes_remaining_) + buffer_ptr_ += bytes; + else + buffer_ptr_ = buffer_; + } + + int index_; + int bytes_remaining_; + char buffer_[16]; + char* buffer_ptr_; +}; + +} // namespace net + +#endif // NET_CURVECP_TEST_DATA_STREAM_H_ diff --git a/net/curvecp/test_server.cc b/net/curvecp/test_server.cc new file mode 100644 index 0000000..a1ad4b2 --- /dev/null +++ b/net/curvecp/test_server.cc @@ -0,0 +1,144 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/curvecp/test_server.h" + +#include <string> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/ip_endpoint.h" +#include "net/base/net_errors.h" +#include "net/base/net_util.h" +#include "net/curvecp/curvecp_server_socket.h" + +namespace net { + +TestServer::TestServer() + : socket_(NULL), + errors_(0) { +} + +TestServer::~TestServer() { + if (socket_) { + socket_->Close(); + socket_ = NULL; + } +} + +bool TestServer::Start(int port) { + IPAddressNumber ip_number; + std::string ip_str("0.0.0.0"); + if (!ParseIPLiteralToNumber(ip_str, &ip_number)) { + LOG(ERROR) << "Bad IP Address"; + return false; + } + IPEndPoint bind_address(ip_number, port); + + DCHECK(!socket_); + socket_ = new CurveCPServerSocket(NULL, NetLog::Source()); + int rv = socket_->Listen(bind_address, this); + if (rv < ERR_IO_PENDING) { + LOG(ERROR) << "Listen on port " << port << " failed: " << rv; + return false; + } + return true; +} + +void TestServer::RunWithParams(const Tuple1<int>& params) { + int status = params.a; + LOG(INFO) << "Callback! " << status; + if (status < 0) + MessageLoop::current()->Quit(); +} + +void TestServer::OnAccept(CurveCPServerSocket* new_socket) { + DCHECK(new_socket); + LOG(ERROR) << "Accepted socket! Starting Echo Server"; + EchoServer* new_server = new EchoServer(); + new_server->Start(new_socket); +} + +EchoServer::EchoServer() + : socket_(NULL), + bytes_received_(0), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &EchoServer::OnReadComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_callback_(this, &EchoServer::OnWriteComplete)) { +} + +EchoServer::~EchoServer() { +} + +void EchoServer::Start(CurveCPServerSocket* socket) { + DCHECK(!socket_); + socket_ = socket; + + ReadData(); + // Note: |this| could be deleted here. +} + +void EchoServer::OnReadComplete(int result) { + LOG(INFO) << "Read complete: " << result; + if (result <= 0) { + delete this; + return; + } + + bytes_received_ += result; + LOG(INFO) << "Server received " << result << "(" << bytes_received_ << ")"; + + if (!received_stream_.VerifyBytes(read_buffer_->data(), result)) { + LOG(ERROR) << "Server Received corrupt receive data!"; + delete this; + return; + } + + // Echo the read data back here. + DCHECK(!write_buffer_.get()); + write_buffer_ = new DrainableIOBuffer(read_buffer_, result); + int rv = socket_->Write(write_buffer_, result, &write_callback_); + if (rv == ERR_IO_PENDING) + return; + OnWriteComplete(rv); +} + +void EchoServer::OnWriteComplete(int result) { + if (result <= 0) { + delete this; + return; + } + + write_buffer_->DidConsume(result); + while (write_buffer_->BytesRemaining()) { + int rv = socket_->Write(write_buffer_, + write_buffer_->BytesRemaining(), + &write_callback_); + if (rv == ERR_IO_PENDING) + return; + OnWriteComplete(rv); + } + + // Now we can read more data. + write_buffer_ = NULL; + // read_buffer_ = NULL; + // ReadData(); +} + +void EchoServer::ReadData() { + DCHECK(!read_buffer_.get()); + read_buffer_ = new IOBuffer(kMaxMessage); + + int rv; + do { + rv = socket_->Read(read_buffer_, kMaxMessage, &read_callback_); + if (rv == ERR_IO_PENDING) + return; + OnReadComplete(rv); // Complete the read manually + } while (rv > 0); +} + +} // namespace net diff --git a/net/curvecp/test_server.h b/net/curvecp/test_server.h new file mode 100644 index 0000000..22c44e4 --- /dev/null +++ b/net/curvecp/test_server.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_CURVECP_TEST_SERVER_H_ +#define NET_CURVECP_TEST_SERVER_H_ +#pragma once + +#include "base/task.h" +#include "net/base/completion_callback.h" +#include "net/curvecp/curvecp_server_socket.h" +#include "net/curvecp/test_data_stream.h" + +namespace net { + +class DrainableIOBuffer; +class EchoServer; +class IOBuffer; + +// TestServer is the server which processes the listen socket. +// It will create an EchoServer instance to handle each connection. +class TestServer : public CompletionCallback, + public CurveCPServerSocket::Acceptor { + public: + TestServer(); + virtual ~TestServer(); + + bool Start(int port); + + // CompletionCallback methods: + virtual void RunWithParams(const Tuple1<int>& params); + + // CurveCPServerSocket::Acceptor methods: + virtual void OnAccept(CurveCPServerSocket* new_socket); + + // Returns the number of errors this server encountered. + int error_count() { return errors_; } + + private: + CurveCPServerSocket* socket_; + int errors_; +}; + + +// EchoServer does the actual server work for a connection. +// This object self destructs after finishing its work. +class EchoServer { + public: + EchoServer(); + ~EchoServer(); + + // Start the Echo Server + void Start(CurveCPServerSocket* socket); + + private: + void OnReadComplete(int result); + void OnWriteComplete(int result); + + void ReadData(); + + private: + static const int kMaxMessage = 1024; + CurveCPServerSocket* socket_; + scoped_refptr<IOBuffer> read_buffer_; + scoped_refptr<DrainableIOBuffer> write_buffer_; + TestDataStream received_stream_; + int bytes_received_; + CompletionCallbackImpl<EchoServer> read_callback_; + CompletionCallbackImpl<EchoServer> write_callback_; +}; + +} // namespace net + +#endif // NET_CURVECP_TEST_SERVER_H_ |