diff options
-rw-r--r-- | net/flip/flip_bitmasks.h | 16 | ||||
-rw-r--r-- | net/flip/flip_frame_builder.cc | 184 | ||||
-rw-r--r-- | net/flip/flip_frame_builder.h | 184 | ||||
-rw-r--r-- | net/flip/flip_framer.cc | 732 | ||||
-rw-r--r-- | net/flip/flip_framer.h | 228 | ||||
-rw-r--r-- | net/flip/flip_framer_test.cc | 251 | ||||
-rw-r--r-- | net/flip/flip_network_transaction.cc | 414 | ||||
-rw-r--r-- | net/flip/flip_network_transaction.h | 142 | ||||
-rw-r--r-- | net/flip/flip_network_transaction_unittest.cc | 190 | ||||
-rw-r--r-- | net/flip/flip_protocol.h | 203 | ||||
-rw-r--r-- | net/flip/flip_session.cc | 762 | ||||
-rw-r--r-- | net/flip/flip_session.h | 216 | ||||
-rw-r--r-- | net/flip/flip_session_pool.cc | 99 | ||||
-rw-r--r-- | net/flip/flip_session_pool.h | 57 | ||||
-rw-r--r-- | net/flip/flip_session_unittest.cc | 55 | ||||
-rw-r--r-- | net/flip/flip_transaction_factory.h | 36 |
16 files changed, 3769 insertions, 0 deletions
diff --git a/net/flip/flip_bitmasks.h b/net/flip/flip_bitmasks.h new file mode 100644 index 0000000..8f139e0 --- /dev/null +++ b/net/flip/flip_bitmasks.h @@ -0,0 +1,16 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_BITMASKS_H_ +#define NET_FLIP_FLIP_BITMASKS_H_ + +namespace flip { + +const int kStreamIdMask = 0x7fffffff; // StreamId mask from the FlipHeader +const int kControlFlagMask = 0x8000; // Control flag mask from the FlipHeader +const int kPriorityMask = 0xc0; // Priority mask from the SYN_FRAME +} // flip + +#endif // NET_FLIP_FLIP_BITMASKS_H_ + diff --git a/net/flip/flip_frame_builder.cc b/net/flip/flip_frame_builder.cc new file mode 100644 index 0000000..28d156c --- /dev/null +++ b/net/flip/flip_frame_builder.cc @@ -0,0 +1,184 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <limits> + +#include "flip_frame_builder.h" // cross-google3 directory naming. +#include "flip_protocol.h" + +namespace flip { + + // We mark a read only FlipFrameBuilder with a special capacity_. +static const size_t kCapacityReadOnly = std::numeric_limits<size_t>::max(); + +FlipFrameBuilder::FlipFrameBuilder() + : buffer_(NULL), + capacity_(0), + length_(0), + variable_buffer_offset_(0) { + Resize(kInitialPayload); +} + +FlipFrameBuilder::FlipFrameBuilder(const char* data, int data_len) + : buffer_(const_cast<char*>(data)), + capacity_(kCapacityReadOnly), + length_(data_len), + variable_buffer_offset_(0) { +} + +FlipFrameBuilder::~FlipFrameBuilder() { + if (capacity_ != kCapacityReadOnly) + delete[] buffer_; +} + +bool FlipFrameBuilder::ReadUInt16(void** iter, uint16* result) const { + DCHECK(iter); + if (!*iter) + *iter = const_cast<char*>(buffer_); + + if (!IteratorHasRoomFor(*iter, sizeof(*result))) + return false; + + *result = ntohs(*(reinterpret_cast<uint16*>(*iter))); + + UpdateIter(iter, sizeof(*result)); + return true; +} + +bool FlipFrameBuilder::ReadUInt32(void** iter, uint32* result) const { + DCHECK(iter); + if (!*iter) + *iter = const_cast<char*>(buffer_); + + if (!IteratorHasRoomFor(*iter, sizeof(*result))) + return false; + + *result = ntohl(*(reinterpret_cast<uint32*>(*iter))); + + UpdateIter(iter, sizeof(*result)); + return true; +} + +bool FlipFrameBuilder::ReadString(void** iter, std::string* result) const { + DCHECK(iter); + + uint16 len; + if (!ReadUInt16(iter, &len)) { + VLOG(1) << "Unable to read length"; + return false; + } + if (!IteratorHasRoomFor(*iter, len)) { + VLOG(1) << "!IteratorHasRoomFor"; + return false; + } + + char* chars = reinterpret_cast<char*>(*iter); + result->assign(chars, len); + + UpdateIter(iter, len); + return true; +} + +bool FlipFrameBuilder::ReadBytes(void** iter, const char** data, + uint16 length) const { + DCHECK(iter); + DCHECK(data); + + if (!IteratorHasRoomFor(*iter, length)) + return false; + + *data = reinterpret_cast<const char*>(*iter); + + UpdateIter(iter, length); + return true; +} + +bool FlipFrameBuilder::ReadData(void** iter, const char** data, + uint16* length) const { + DCHECK(iter); + DCHECK(data); + DCHECK(length); + + if (!ReadUInt16(iter, length)) + return false; + + return ReadBytes(iter, data, *length); +} + +char* FlipFrameBuilder::BeginWrite(size_t length) { + size_t offset = length_; + size_t needed_size = length_ + length; + if (needed_size > capacity_ && !Resize(std::max(capacity_ * 2, needed_size))) + return NULL; + +#ifdef ARCH_CPU_64_BITS + DCHECK_LE(length, std::numeric_limits<uint32>::max()); +#endif + + return buffer_ + offset; +} + +void FlipFrameBuilder::EndWrite(char* dest, int length) { +} + +bool FlipFrameBuilder::WriteBytes(const void* data, uint16 data_len) { + DCHECK(capacity_ != kCapacityReadOnly); + + char* dest = BeginWrite(data_len); + if (!dest) + return false; + + memcpy(dest, data, data_len); + + EndWrite(dest, data_len); + length_ += data_len; + return true; +} + +bool FlipFrameBuilder::WriteString(const std::string& value) { + if (value.size() > 0xffff) + return false; + + if (!WriteUInt16(static_cast<int>(value.size()))) + return false; + + return WriteBytes(value.data(), static_cast<uint16>(value.size())); +} + +char* FlipFrameBuilder::BeginWriteData(uint16 length) { + DCHECK_EQ(variable_buffer_offset_, 0U) << + "There can only be one variable buffer in a FlipFrameBuilder"; + + if (!WriteUInt16(length)) + return false; + + char *data_ptr = BeginWrite(length); + if (!data_ptr) + return NULL; + + variable_buffer_offset_ = data_ptr - buffer_ - sizeof(int); + + // EndWrite doesn't necessarily have to be called after the write operation, + // so we call it here to pad out what the caller will eventually write. + EndWrite(data_ptr, length); + return data_ptr; +} + +bool FlipFrameBuilder::Resize(size_t new_capacity) { + if (new_capacity < capacity_) + return true; + + char* p = new char[new_capacity]; + if (buffer_) { + memcpy(p, buffer_, capacity_); + delete[] buffer_; + } + if (!p && new_capacity > 0) + return false; + buffer_ = p; + capacity_ = new_capacity; + return true; +} + +} // namespace flip diff --git a/net/flip/flip_frame_builder.h b/net/flip/flip_frame_builder.h new file mode 100644 index 0000000..809f451 --- /dev/null +++ b/net/flip/flip_frame_builder.h @@ -0,0 +1,184 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FRAME_BUILDER_H_ +#define NET_FLIP_FRAME_BUILDER_H_ + +#ifdef WIN32 +#include <winsock2.h> // for htonl() functions +#else +#include <arpa/inet.h> +#endif + +#include <string> + +#include "base/logging.h" +#include "flip_protocol.h" // cross-google3 directory naming. + +#ifdef WIN32 +#undef VLOG +#define VLOG(x) LOG_IF(INFO, false) +#endif // WIN32 + +namespace flip { + +// This class provides facilities for basic binary value packing and unpacking +// into Flip frames. Note: this is similar to Chrome's pickle class, but is +// simplified to work in both the client and server, and without excess +// padding. +// +// The FlipFrameBuilder supports appending primitive values (int, string, etc) +// to a frame instance. The FlipFrameBuilder grows its internal memory buffer +// dynamically to hold the sequence of primitive values. The internal memory +// buffer is exposed as the "data" of the FlipFrameBuilder. +// +// When reading from a FlipFrameBuilder the consumer must know what value types +// to read and in what order to read them as the FlipFrameBuilder does not keep +// track of the type of data written to it. +class FlipFrameBuilder { + public: + FlipFrameBuilder(); + ~FlipFrameBuilder(); + + // Initializes a FlipFrameBuilder from a const block of data. The data is + // not copied; instead the data is merely referenced by this + // FlipFrameBuilder. Only const methods should be used when initialized + // this way. + FlipFrameBuilder(const char* data, int data_len); + + // Returns the size of the FlipFrameBuilder's data. + int length() const { return length_; } + + // Returns the data for this FlipFrameBuilder. + const FlipFrame* data() const { + return reinterpret_cast<FlipFrame*>(buffer_); + } + + // Takes the buffer from the FlipFrameBuilder. + FlipFrame* take() { + FlipFrame* rv = reinterpret_cast<FlipFrame*>(buffer_); + buffer_ = NULL; + capacity_ = 0; + length_ = 0; + return rv; + } + + // Methods for reading the payload of the FlipFrameBuilder. To read from the + // start of the FlipFrameBuilder, initialize *iter to NULL. If successful, + // these methods return true. Otherwise, false is returned to indicate that + // the result could not be extracted. + bool ReadUInt16(void** iter, uint16* result) const; + bool ReadUInt32(void** iter, uint32* result) const; + bool ReadString(void** iter, std::string* result) const; + bool ReadBytes(void** iter, const char** data, uint16 length) const; + bool ReadData(void** iter, const char** data, uint16* length) const; + + // Methods for adding to the payload. These values are appended to the end + // of the FlipFrameBuilder payload. When reading values, you must read them + // in the order they were added. Note - binary integers are converted from + // host to network form. + bool WriteUInt16(uint16 value) { + value = htons(value); + return WriteBytes(&value, sizeof(value)); + } + bool WriteUInt32(uint32 value) { + value = htonl(value); + return WriteBytes(&value, sizeof(value)); + } + bool WriteString(const std::string& value); + bool WriteBytes(const void* data, uint16 data_len); + + // Write an integer to a particular offset in the data buffer. + bool WriteUInt32ToOffset(int offset, uint32 value) { + if (offset + sizeof(value) > length_) + return false; + value = htonl(value); + char *ptr = buffer_ + offset; + memcpy(ptr, &value, sizeof(value)); + return true; + } + + // Allows the caller to write data directly into the FlipFrameBuilder. + // This saves a copy when the data is not already available in a buffer. + // The caller must not write more than the length it declares it will. + // Use ReadData to get the data. + // Returns NULL on failure. + // + // The returned pointer will only be valid until the next write operation + // on this FlipFrameBuilder. + char* BeginWriteData(uint16 length); + + // Returns true if the given iterator could point to data with the given + // length. If there is no room for the given data before the end of the + // payload, returns false. + bool IteratorHasRoomFor(const void* iter, int len) const { + const char* end_of_region = reinterpret_cast<const char*>(iter) + len; + VLOG(1) << "len: " << len; + if (len < 0) { + VLOG(1) << "Len < 0"; + return false; + } else if (iter < buffer_) { + VLOG(1) << "iter < buffer_"; + return false; + } else if (iter > end_of_payload()) { + VLOG(1) << "iter > end_of_payload())"; + return false; + } else if (iter > end_of_region) { + VLOG(1) << "iter > end_of_region)"; + return false; + } else if (end_of_region > end_of_payload()) { + VLOG(1) << "end_of_region > end_of_payload()"; + VLOG(1) << "end_of_region - end_of_payload(): " + << (end_of_region - end_of_payload()); + + return false; + } + + // Watch out for overflow in pointer calculation, which wraps. + return (iter <= end_of_region) && (end_of_region <= end_of_payload()); + } + + protected: + size_t capacity() const { + return capacity_; + } + + const char* end_of_payload() const { return buffer_ + length_; } + + // Resizes the buffer for use when writing the specified amount of data. The + // location that the data should be written at is returned, or NULL if there + // was an error. Call EndWrite with the returned offset and the given length + // to pad out for the next write. + char* BeginWrite(size_t length); + + // Completes the write operation by padding the data with NULL bytes until it + // is padded. Should be paired with BeginWrite, but it does not necessarily + // have to be called after the data is written. + void EndWrite(char* dest, int length); + + // Resize the capacity, note that the input value should include the size of + // the header: new_capacity = sizeof(Header) + desired_payload_capacity. + // A new failure will cause a Resize failure... and caller should check + // the return result for true (i.e., successful resizing). + bool Resize(size_t new_capacity); + + // Moves the iterator by the given number of bytes. + static void UpdateIter(void** iter, int bytes) { + *iter = static_cast<char*>(*iter) + bytes; + } + + // Initial size of the payload. + static const int kInitialPayload = 1024; + + private: + char* buffer_; + size_t capacity_; // Allocation size of payload (or -1 if buffer is const). + size_t length_; // current length of the buffer + size_t variable_buffer_offset_; // IF non-zero, then offset to a buffer. +}; + +} // namespace flip + +#endif // NET_FLIP_FRAME_BUILDER_H_ + diff --git a/net/flip/flip_framer.cc b/net/flip/flip_framer.cc new file mode 100644 index 0000000..ec183cc --- /dev/null +++ b/net/flip/flip_framer.cc @@ -0,0 +1,732 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/scoped_ptr.h" +#include "base/stats_counters.h" + +#include "flip_framer.h" // cross-google3 directory naming. +#include "flip_frame_builder.h" +#include "flip_bitmasks.h" + +#ifdef WIN32 +#include "third_party/zlib/zlib.h" +#else +#include "third_party/zlib/v1_2_3/zlib.h" +#endif + +namespace flip { + +// The initial size of the control frame buffer; this is used internally +// as we we parse though control frames. +static const int kControlFrameBufferInitialSize = 32 * 1024; +// The maximum size of the control frame buffer that we support. +// TODO(mbelshe): We should make this stream-based so there are no limits. +static const int kControlFrameBufferMaxSize = 64 * 1024; + +// This implementation of Flip is version 1. +static const int kFlipProtocolVersion = 1; + +// By default is compression on or off. +bool FlipFramer::compression_default_ = true; + +#ifdef DEBUG_FLIP_STATE_CHANGES +#define CHANGE_STATE(newstate) \ +{ \ + do { \ + LOG(INFO) << "Changing state from: " \ + << StateToString(state_) \ + << " to " << StateToString(newstate) << "\n"; \ + state_ = newstate; \ + } while (false); \ +} +#else +#define CHANGE_STATE(newstate) (state_ = newstate) +#endif + +FlipFramer::FlipFramer() + : state_(FLIP_RESET), + error_code_(FLIP_NO_ERROR), + remaining_payload_(0), + remaining_control_payload_(0), + current_frame_buffer_(NULL), + current_frame_len_(0), + current_frame_capacity_(0), + enable_compression_(compression_default_), + visitor_(NULL) { +} + +FlipFramer::~FlipFramer() { + if (compressor_.get()) { + deflateEnd(compressor_.get()); + } + delete [] current_frame_buffer_; +} + +void FlipFramer::Reset() { + state_ = FLIP_RESET; + error_code_ = FLIP_NO_ERROR; + remaining_payload_ = 0; + remaining_control_payload_ = 0; + current_frame_len_ = 0; + if (current_frame_capacity_ != kControlFrameBufferInitialSize) { + delete [] current_frame_buffer_; + current_frame_buffer_ = 0; + current_frame_capacity_ = 0; + ExpandControlFrameBuffer(kControlFrameBufferInitialSize); + } +} + +const char* FlipFramer::StateToString(int state) { + switch (state) { + case FLIP_ERROR: + return "ERROR"; + case FLIP_DONE: + return "DONE"; + case FLIP_AUTO_RESET: + return "AUTO_RESET"; + case FLIP_RESET: + return "RESET"; + case FLIP_READING_COMMON_HEADER: + return "READING_COMMON_HEADER"; + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + return "INTERPRET_CONTROL_FRAME_COMMON_HEADER"; + case FLIP_CONTROL_FRAME_PAYLOAD: + return "CONTROL_FRAME_PAYLOAD"; + case FLIP_IGNORE_REMAINING_PAYLOAD: + return "IGNORE_REMAINING_PAYLOAD"; + case FLIP_FORWARD_STREAM_FRAME: + return "FORWARD_STREAM_FRAME"; + } + return "UNKNOWN_STATE"; +} + +uint32 FlipFramer::BytesSafeToRead() const { + switch (state_) { + case FLIP_ERROR: + case FLIP_DONE: + case FLIP_AUTO_RESET: + case FLIP_RESET: + return 0; + case FLIP_READING_COMMON_HEADER: + DCHECK(current_frame_len_ < sizeof(FlipFrame)); + return sizeof(FlipFrame) - current_frame_len_; + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + return 0; + case FLIP_CONTROL_FRAME_PAYLOAD: + case FLIP_IGNORE_REMAINING_PAYLOAD: + case FLIP_FORWARD_STREAM_FRAME: + return remaining_payload_; + } + // We should never get to here. + return 0; +} + +void FlipFramer::set_error(FlipError error) { + DCHECK(false); + DCHECK(visitor_); + error_code_ = error; + visitor_->OnError(this); +} + +const char* FlipFramer::ErrorCodeToString(int error_code) { + switch (error_code) { + case FLIP_NO_ERROR: + return "NO_ERROR"; + case FLIP_UNKNOWN_CONTROL_TYPE: + return "UNKNOWN_CONTROL_TYPE"; + case FLIP_INVALID_CONTROL_FRAME: + return "INVALID_CONTROL_FRAME"; + case FLIP_CONTROL_PAYLOAD_TOO_LARGE: + return "CONTROL_PAYLOAD_TOO_LARGE"; + case FLIP_ZLIB_INIT_FAILURE: + return "ZLIB_INIT_FAILURE"; + case FLIP_UNSUPPORTED_VERSION: + return "UNSUPPORTED_VERSION"; + case FLIP_DECOMPRESS_FAILURE: + return "DECOMPRESS_FAILURE"; + } + return "UNKNOWN_STATE"; +} + +uint32 FlipFramer::ProcessInput(const char* data, uint32 len) { + DCHECK(visitor_); + DCHECK(data); + + uint32 original_len = len; + while (len != 0) { + FlipControlFrame* current_control_frame = + reinterpret_cast<FlipControlFrame*>(current_frame_buffer_); + FlipDataFrame* current_data_frame = + reinterpret_cast<FlipDataFrame*>(current_frame_buffer_); + + switch (state_) { + case FLIP_ERROR: + case FLIP_DONE: + goto bottom; + + case FLIP_AUTO_RESET: + case FLIP_RESET: + Reset(); + CHANGE_STATE(FLIP_READING_COMMON_HEADER); + continue; + + case FLIP_READING_COMMON_HEADER: { + int bytes_read = ProcessCommonHeader(data, len); + len -= bytes_read; + data += bytes_read; + continue; + } + + // Arguably, this case is not necessary, as no bytes are consumed here. + // I felt it was a nice partitioning, however (which probably indicates + // that it should be refactored into its own function!) + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + DCHECK(error_code_ == 0); + DCHECK(current_frame_len_ >= sizeof(FlipFrame)); + // Do some sanity checking on the control frame sizes. + switch (current_control_frame->type()) { + case SYN_STREAM: + // NOTE: sizeof(FlipSynStreamControlFrame) is not accurate. + if (current_control_frame->length() < + sizeof(FlipSynStreamControlFrame) - sizeof(FlipControlFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case SYN_REPLY: + if (current_control_frame->length() < + sizeof(FlipSynReplyControlFrame) - sizeof(FlipControlFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case FIN_STREAM: + if (current_control_frame->length() != + sizeof(FlipFinStreamControlFrame) - sizeof(FlipFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case NOOP: + // NOP. Swallow it. + CHANGE_STATE(FLIP_AUTO_RESET); + continue; + default: + set_error(FLIP_UNKNOWN_CONTROL_TYPE); + break; + } + + // We only support version 1 of this protocol. + if (current_control_frame->version() != kFlipProtocolVersion) + set_error(FLIP_UNSUPPORTED_VERSION); + + if (error_code_) { + CHANGE_STATE(FLIP_ERROR); + goto bottom; + } + + remaining_control_payload_ = current_control_frame->length(); + if (remaining_control_payload_ > kControlFrameBufferMaxSize) { + set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE); + CHANGE_STATE(FLIP_ERROR); + goto bottom; + } + ExpandControlFrameBuffer(remaining_control_payload_); + CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD); + continue; + + case FLIP_CONTROL_FRAME_PAYLOAD: { + int bytes_read = ProcessControlFramePayload(data, len); + len -= bytes_read; + data += bytes_read; + } + // intentional fallthrough + case FLIP_IGNORE_REMAINING_PAYLOAD: + // control frame has too-large payload + // intentional fallthrough + case FLIP_FORWARD_STREAM_FRAME: + if (remaining_payload_) { + uint32 amount_to_forward = std::min(remaining_payload_, len); + if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) { + const FlipDataFrame* data_frame = + reinterpret_cast<const FlipDataFrame*>(current_data_frame); + if (data_frame->flags() & DATA_FLAG_COMPRESSED) { + // TODO(mbelshe): Assert that the decompressor is init'ed. + if (!InitializeDecompressor()) + return NULL; + + int decompressed_max_size = amount_to_forward * 100; + scoped_array<char> decompressed(new char[decompressed_max_size]); + decompressor_->next_in = reinterpret_cast<Bytef*>( + const_cast<char*>(data)); + decompressor_->avail_in = amount_to_forward; + decompressor_->next_out = + reinterpret_cast<Bytef*>(decompressed.get()); + decompressor_->avail_out = decompressed_max_size; + + int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + if (rv != Z_OK) { + set_error(FLIP_DECOMPRESS_FAILURE); + goto bottom; + } + int decompressed_size = decompressed_max_size - + decompressor_->avail_out; + visitor_->OnStreamFrameData(current_data_frame->stream_id(), + decompressed.get(), + decompressed_size); + amount_to_forward -= decompressor_->avail_in; + } else { + // The data frame was not compressed + visitor_->OnStreamFrameData(current_data_frame->stream_id(), + data, amount_to_forward); + } + } + data += amount_to_forward; + len -= amount_to_forward; + remaining_payload_ -= amount_to_forward; + } else { + CHANGE_STATE(FLIP_AUTO_RESET); + } + continue; + default: + break; + } + } + bottom: + return original_len - len; +} + +uint32 FlipFramer::ProcessCommonHeader(const char* data, uint32 len) { + // This should only be called when we're in the FLIP_READING_COMMON_HEADER + // state. + DCHECK(state_ == FLIP_READING_COMMON_HEADER); + + int original_len = len; + FlipDataFrame* current_frame = + reinterpret_cast<FlipDataFrame*>(current_frame_buffer_); + + do { + if (current_frame_len_ < sizeof(FlipFrame)) { + uint32 bytes_desired = sizeof(FlipFrame) - current_frame_len_; + uint32 bytes_to_append = std::min(bytes_desired, len); + char* header_buffer = current_frame_buffer_; + memcpy(&header_buffer[current_frame_len_], data, bytes_to_append); + current_frame_len_ += bytes_to_append; + data += bytes_to_append; + len -= bytes_to_append; + // Check for an empty data packet. + if (current_frame_len_ == sizeof(FlipFrame) && + !current_frame->is_control_frame() && + current_frame->length() == 0) { + visitor_->OnStreamFrameData(current_frame->stream_id(), NULL, 0); + CHANGE_STATE(FLIP_RESET); + } + break; + } + remaining_payload_ = current_frame->length(); + // if we're here, then we have the common header all received. + if (!current_frame->is_control_frame()) + CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME); + else + CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER); + } while (false); + + return original_len - len; +} + +uint32 FlipFramer::ProcessControlFramePayload(const char* data, uint32 len) { + int original_len = len; + do { + if (remaining_control_payload_) { + uint32 amount_to_consume = std::min(remaining_control_payload_, len); + memcpy(¤t_frame_buffer_[current_frame_len_], data, + amount_to_consume); + current_frame_len_ += amount_to_consume; + data += amount_to_consume; + len -= amount_to_consume; + remaining_control_payload_ -= amount_to_consume; + remaining_payload_ -= amount_to_consume; + if (remaining_control_payload_) + break; + } + FlipControlFrame* control_frame = + reinterpret_cast<FlipControlFrame*>(current_frame_buffer_); + visitor_->OnControl(control_frame); + CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD); + } while (false); + return original_len - len; +} + +void FlipFramer::ExpandControlFrameBuffer(int size) { + DCHECK(size < kControlFrameBufferMaxSize); + if (size < current_frame_capacity_) + return; + + int alloc_size = size + sizeof(FlipFrame); + char* new_buffer = new char[alloc_size]; + memcpy(new_buffer, current_frame_buffer_, current_frame_len_); + current_frame_capacity_ = alloc_size; + current_frame_buffer_ = new_buffer; +} + +bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame, + FlipHeaderBlock* block) { + uint32 type = reinterpret_cast<const FlipControlFrame*>(frame)->type(); + if (type != SYN_STREAM && type != SYN_REPLY) + return false; + + // Find the header data within the control frame. + scoped_array<FlipSynStreamControlFrame> control_frame( + reinterpret_cast<FlipSynStreamControlFrame*>(DecompressFrame(frame))); + if (!control_frame.get()) + return false; + const char *header_data = control_frame.get()->header_block(); + int header_length = control_frame.get()->header_block_len(); + + FlipFrameBuilder builder(header_data, header_length); + void* iter = NULL; + uint16 num_headers; + if (builder.ReadUInt16(&iter, &num_headers)) { + VLOG(2) << "found num_headers: " << num_headers; + for (int index = 0; index < num_headers; ++index) { + std::string name; + std::string value; + if (!builder.ReadString(&iter, &name)) { + VLOG(1) << "couldn't read string (key)!"; + break; + } + if (!builder.ReadString(&iter, &value)) { + VLOG(1) << "couldn't read string (value)!"; + break; + } + if (block->empty()) { + (*block)[name] = value; + } else { + FlipHeaderBlock::iterator last = --block->end(); + if (block->key_comp()(last->first, name)) { + block->insert(block->end(), + std::pair<std::string, std::string>(name, value)); + } else { + return false; + } + } + } + return true; + } else { + VLOG(2) << "didn't find headers"; + } + return false; +} + +FlipSynStreamControlFrame* FlipFramer::CreateSynStream( + int stream_id, + int priority, + bool compress, + FlipHeaderBlock* headers) { + FlipFrameBuilder frame; + + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(SYN_STREAM); + frame.WriteUInt32(0); // Placeholder for the length. + frame.WriteUInt32(stream_id); + frame.WriteUInt16(ntohs(priority) << 6); // Priority. + + frame.WriteUInt16(headers->size()); // Number of headers. + FlipHeaderBlock::iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + frame.WriteString(it->first); + frame.WriteString(it->second); + } + // write the length + frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); + if (compress) { + FlipSynStreamControlFrame* new_frame = + reinterpret_cast<FlipSynStreamControlFrame*>( + CompressFrame(frame.data())); + return new_frame; + } + + return reinterpret_cast<FlipSynStreamControlFrame*>(frame.take()); +} + +/* static */ +FlipFinStreamControlFrame* FlipFramer::CreateFinStream(int stream_id, + int status) { + FlipFrameBuilder frame; + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(FIN_STREAM); + frame.WriteUInt32(8); + frame.WriteUInt32(stream_id); + frame.WriteUInt32(status); + return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take()); +} + +FlipSynReplyControlFrame* FlipFramer::CreateSynReply(int stream_id, + bool compressed, FlipHeaderBlock* headers) { + + FlipFrameBuilder frame; + + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(SYN_REPLY); + frame.WriteUInt32(0); // Placeholder for the length. + frame.WriteUInt32(stream_id); + frame.WriteUInt16(0); // Priority. + + frame.WriteUInt16(headers->size()); // Number of headers. + FlipHeaderBlock::iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + // TODO(mbelshe): Headers need to be sorted. + frame.WriteString(it->first); + frame.WriteString(it->second); + } + // write the length + frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); + if (compressed) + return reinterpret_cast<FlipSynReplyControlFrame*>( + CompressFrame(frame.data())); + return reinterpret_cast<FlipSynReplyControlFrame*>(frame.take()); +} + +FlipDataFrame* FlipFramer::CreateDataFrame(int stream_id, + const char* data, + int len, bool compressed) { + FlipFrameBuilder frame; + + frame.WriteUInt32(stream_id); + + frame.WriteUInt32(len); + frame.WriteBytes(data, len); + if (compressed) + return reinterpret_cast<FlipDataFrame*>(CompressFrame(frame.data())); + return reinterpret_cast<FlipDataFrame*>(frame.take()); +} + +static const int kCompressorLevel = Z_DEFAULT_COMPRESSION; +// This is just a hacked dictionary to use for shrinking HTTP-like headers. +// TODO(mbelshe): Use a scientific methodology for computing the dictionary. +static const char dictionary[] = + "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-" + "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi" + "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser" + "-agent10010120020120220320420520630030130230330430530630740040140240340440" + "5406407408409410411412413414415416417500501502503504505accept-rangesageeta" + "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic" + "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran" + "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati" + "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo" + "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe" + "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic" + "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1" + ".1statusversionurl"; +static uLong dictionary_id = 0; + +bool FlipFramer::InitializeCompressor() { + if (compressor_.get()) + return true; // Already initialized. + + compressor_.reset(new z_stream); + memset(compressor_.get(), 0, sizeof(z_stream)); + + int success = deflateInit(compressor_.get(), kCompressorLevel); + if (success == Z_OK) + success = deflateSetDictionary(compressor_.get(), + reinterpret_cast<const Bytef*>(dictionary), + sizeof(dictionary)); + if (success != Z_OK) + compressor_.reset(NULL); + return success == Z_OK; +} + +bool FlipFramer::InitializeDecompressor() { + if (decompressor_.get()) + return true; // Already initialized. + + decompressor_.reset(new z_stream); + memset(decompressor_.get(), 0, sizeof(z_stream)); + + // Compute the id of our dictionary so that we know we're using the + // right one when asked for it. + if (dictionary_id == 0) { + dictionary_id = adler32(0L, Z_NULL, 0); + dictionary_id = adler32(dictionary_id, + reinterpret_cast<const Bytef*>(dictionary), + sizeof(dictionary)); + } + + int success = inflateInit(decompressor_.get()); + if (success != Z_OK) + decompressor_.reset(NULL); + return success == Z_OK; +} + +bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame, + int* payload_length, + int* header_length, + const unsigned char** payload) const { + if (frame->is_control_frame()) { + const FlipControlFrame* control_frame = + reinterpret_cast<const FlipControlFrame*>(frame); + switch (control_frame->type()) { + case SYN_STREAM: + case SYN_REPLY: + { + const FlipSynStreamControlFrame *syn_frame = + reinterpret_cast<const FlipSynStreamControlFrame*>(frame); + *payload_length = syn_frame->header_block_len(); + *header_length = sizeof(FlipFrame) + syn_frame->length() - + syn_frame->header_block_len(); + *payload = reinterpret_cast<const unsigned char*>(frame) + + *header_length; + } + break; + default: + // TODO(mbelshe): set an error? + return false; // We can't compress this frame! + } + } else { + *header_length = sizeof(FlipFrame); + *payload_length = frame->length(); + *payload = reinterpret_cast<const unsigned char*>(frame) + + sizeof(FlipFrame); + } + DCHECK(static_cast<size_t>(*header_length) <= + sizeof(FlipFrame) + *payload_length); + return true; +} + + +FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) { + int payload_length; + int header_length; + const unsigned char* payload; + + static StatsCounter pre_compress_bytes("flip.PreCompressSize"); + static StatsCounter post_compress_bytes("flip.PostCompressSize"); + + if (!enable_compression_) + return DuplicateFrame(frame); + + if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) + return NULL; + + if (!InitializeCompressor()) + return NULL; + + // TODO(mbelshe): Should we have a zlib header like what http servers do? + + // Create an output frame. + int compressed_max_size = deflateBound(compressor_.get(), payload_length); + int new_frame_size = header_length + compressed_max_size; + FlipFrame* new_frame = + reinterpret_cast<FlipFrame*>(new char[new_frame_size]); + memcpy(new_frame, frame, header_length); + + compressor_->next_in = const_cast<Bytef*>(payload); + compressor_->avail_in = payload_length; + compressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + header_length; + compressor_->avail_out = compressed_max_size; + + // Data packets have a 'compressed flag + if (!new_frame->is_control_frame()) { + FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); + data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED); + } + + int rv = deflate(compressor_.get(), Z_SYNC_FLUSH); + if (rv != Z_OK) { // How can we know that it compressed everything? + // This shouldn't happen, right? + free(new_frame); + return NULL; + } + + int compressed_size = compressed_max_size - compressor_->avail_out; + new_frame->set_length(header_length + compressed_size - sizeof(FlipFrame)); + + pre_compress_bytes.Add(payload_length); + post_compress_bytes.Add(new_frame->length()); + + return new_frame; +} + +FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) { + int payload_length; + int header_length; + const unsigned char* payload; + + static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize"); + static StatsCounter post_decompress_bytes("flip.PostDeCompressSize"); + + if (!enable_compression_) + return DuplicateFrame(frame); + + if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) + return NULL; + + if (!frame->is_control_frame()) { + const FlipDataFrame* data_frame = + reinterpret_cast<const FlipDataFrame*>(frame); + if (!data_frame->flags() & DATA_FLAG_COMPRESSED) + return DuplicateFrame(frame); + } + + if (!InitializeDecompressor()) + return NULL; + + // TODO(mbelshe): Should we have a zlib header like what http servers do? + + // Create an output frame. Assume it does not need to be longer than + // the input data. + int decompressed_max_size = kControlFrameBufferInitialSize; + int new_frame_size = header_length + decompressed_max_size; + FlipFrame* new_frame = + reinterpret_cast<FlipFrame*>(new char[new_frame_size]); + memcpy(new_frame, frame, header_length); + + decompressor_->next_in = const_cast<Bytef*>(payload); + decompressor_->avail_in = payload_length; + decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + + header_length; + decompressor_->avail_out = decompressed_max_size; + + int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + if (rv == Z_NEED_DICT) { + // Need to try again with the right dictionary. + if (decompressor_->adler == dictionary_id) { + rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary, + sizeof(dictionary)); + if (rv == Z_OK) + rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + } + } + if (rv != Z_OK) { // How can we know that it decompressed everything? + free(new_frame); + return NULL; + } + + // Unset the compressed flag for data frames. + if (!new_frame->is_control_frame()) { + FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); + data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED); + } + + int decompressed_size = decompressed_max_size - decompressor_->avail_out; + new_frame->set_length(header_length + decompressed_size - sizeof(FlipFrame)); + + pre_decompress_bytes.Add(frame->length()); + post_decompress_bytes.Add(new_frame->length()); + + return new_frame; +} + +FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) { + int size = sizeof(FlipFrame) + frame->length(); + char* new_frame = new char[size]; + memcpy(new_frame, frame, size); + return reinterpret_cast<FlipFrame*>(new_frame); +} + +void FlipFramer::set_enable_compression(bool value) { + enable_compression_ = value; +} + +void FlipFramer::set_enable_compression_default(bool value) { + compression_default_ = value; +} + +} // namespace flip + diff --git a/net/flip/flip_framer.h b/net/flip/flip_framer.h new file mode 100644 index 0000000..e50cc83 --- /dev/null +++ b/net/flip/flip_framer.h @@ -0,0 +1,228 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_FRAMER_H_ +#define NET_FLIP_FLIP_FRAMER_H_ + +#ifdef _WIN32 +#include <winsock2.h> +#else +#include <arpa/inet.h> +#endif +#include <map> +#include <string> + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/scoped_ptr.h" +#include "testing/gtest/include/gtest/gtest_prod.h" +#include "flip_protocol.h" // cross-google3 directory naming. + +typedef struct z_stream_s z_stream; // Forward declaration for zlib. + +namespace net { +class FlipNetworkTransactionTest; +} + +namespace flip { + +class FlipFramer; +class FlipFramerTest; + +// A datastructure for holding a set of headers from either a +// SYN_STREAM or SYN_REPLY frame. +typedef std::map<std::string, std::string> FlipHeaderBlock; + +// FlipFramerVisitorInterface is a set of callbacks for the FlipFramer. +// Implement this interface to receive event callbacks as frames are +// decoded from the framer. +class FlipFramerVisitorInterface { + public: + virtual ~FlipFramerVisitorInterface() {} + + // Called if an error is detected in the FlipFrame protocol. + virtual void OnError(FlipFramer* framer) = 0; + + // Called when a Control Frame is received. + virtual void OnControl(const FlipControlFrame* frame) = 0; + + // Called when data is received. + virtual void OnStreamFrameData(uint32 sream_id, + const char* data, + uint32 len) = 0; + + // TODO(fenix): Implement me! + virtual void OnLameDuck() = 0; +}; + +class FlipFramer { + public: + // Flip states. + // TODO(mbelshe): Can we move these into the implementation + // and avoid exposing through the header. (Needed for test) + enum FlipState { + FLIP_ERROR, + FLIP_DONE, + FLIP_RESET, + FLIP_AUTO_RESET, + FLIP_READING_COMMON_HEADER, + FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER, + FLIP_CONTROL_FRAME_PAYLOAD, + FLIP_IGNORE_REMAINING_PAYLOAD, + FLIP_FORWARD_STREAM_FRAME + }; + + // Flip error codes. + enum FlipError { + FLIP_NO_ERROR, + FLIP_UNKNOWN_CONTROL_TYPE, // Control frame is an unknown type. + FLIP_INVALID_CONTROL_FRAME, // Control frame is mal-formatted. + FLIP_CONTROL_PAYLOAD_TOO_LARGE, // Control frame payload was too large. + FLIP_ZLIB_INIT_FAILURE, // The Zlib library could not initialize. + FLIP_UNSUPPORTED_VERSION, // Control frame has unsupported version. + FLIP_DECOMPRESS_FAILURE, // There was an error decompressing. + }; + + // Create a new Framer. + FlipFramer(); + virtual ~FlipFramer(); + + // Set callbacks to be called from the framer. A visitor must be set, or + // else the framer will likely crash. It is acceptable for the visitor + // to do nothing. If this is called multiple times, only the last visitor + // will be used. + void set_visitor(FlipFramerVisitorInterface* visitor) { + visitor_ = visitor; + } + + // Pass data into the framer for parsing. + // Returns the number of bytes consumed. It is safe to pass more bytes in + // than may be consumed. + uint32 ProcessInput(const char* data, uint32 len); + + // Resets the framer state after a frame has been successfully decoded. + // TODO(mbelshe): can we make this private? + void Reset(); + + // Check the state of the framer. + FlipError error_code() const { return error_code_; } + FlipState state() const { return state_; } + + bool MessageFullyRead() { + return state_ == FLIP_DONE || state_ == FLIP_AUTO_RESET; + } + bool HasError() { return state_ == FLIP_ERROR; } + + // Further parsing utilities. + // Given a control frame, parse out a FlipHeaderBlock. Only + // valid for SYN_STREAM and SYN_REPLY frames. + // Returns true if successfully parsed, false otherwise. + bool ParseHeaderBlock(const FlipFrame* frame, FlipHeaderBlock* block); + + // Frame creation utilities + // Create a FlipSynStreamControlFrame. The resulting frame will be + // compressed if |compressed| is true. + FlipSynStreamControlFrame* CreateSynStream(int stream_id, int priority, + bool compress, + FlipHeaderBlock* headers); + static FlipFinStreamControlFrame* CreateFinStream(int stream_id, int status); + + // Create a FlipSynReplyControlFrame.The resulting frame will be + // compressed if |compressed| is true. + FlipSynReplyControlFrame* CreateSynReply(int stream_id, + bool compress, + FlipHeaderBlock* headers); + + // Create a FlipDataFrame. The resulting frame will be + // compressed if |compressed| is true. + FlipDataFrame* CreateDataFrame(int stream_id, const char* data, + int len, bool compressed); + + // NOTES about frame compression. + // We want flip to compress headers across the entire session. As long as + // the session is over TCP, frames are sent serially. The client & server + // can each compress frames in the same order and then compress them in that + // order, and the remote can do the reverse. However, we ultimately want + // the creation of frames to be less sensitive to order so that they can be + // placed over a UDP based protocol and yet still benefit from some + // compression. We don't know of any good compression protocol which does + // not build its state in a serial (stream based) manner.... For now, we're + // using zlib anyway. + + // Compresses a FlipFrame. + // On success, returns a new FlipFrame with the payload compressed. + // Compression state is maintained as part of the FlipFramer. + // Returned frame must be freed with free(). + // On failure, returns NULL. + FlipFrame* CompressFrame(const FlipFrame* frame); + + // Decompresses a FlipFrame. + // On success, returns a new FlipFrame with the payload decompressed. + // Compression state is maintained as part of the FlipFramer. + // Returned frame must be freed with free(). + // On failure, returns NULL. + FlipFrame* DecompressFrame(const FlipFrame* frame); + + // Create a copy of a frame. + FlipFrame* DuplicateFrame(const FlipFrame* frame); + + // For debugging. + static const char* StateToString(int state); + static const char* ErrorCodeToString(int error_code); + + protected: + FRIEND_TEST(FlipFramerTest, Basic); + FRIEND_TEST(FlipFramerTest, HeaderBlockBarfsOnOutOfOrderHeaders); + friend class FlipNetworkTransactionTest; + + // For ease of testing we can tweak compression on/off. + void set_enable_compression(bool value); + static void set_enable_compression_default(bool value); + + private: + // Internal breakout from ProcessInput. Returns the number of bytes + // consumed from the data. + uint32 ProcessCommonHeader(const char* data, uint32 len); + uint32 ProcessControlFramePayload(const char* data, uint32 len); + + // Initialize the ZLib state. + bool InitializeCompressor(); + bool InitializeDecompressor(); + + // Not used (yet) + uint32 BytesSafeToRead() const; + + // Set the error code. + void set_error(FlipError error); + + // Expands the control frame buffer to accomodate a particular payload size. + void ExpandControlFrameBuffer(int size); + + // Given a frame, breakdown the variable payload length, the static header + // header length, and variable payload pointer. + bool GetFrameBoundaries(const FlipFrame* frame, int* payload_length, + int* header_length, + const unsigned char** payload) const; + + FlipState state_; + FlipError error_code_; + uint32 remaining_payload_; + uint32 remaining_control_payload_; + + char* current_frame_buffer_; + int current_frame_len_; // Number of bytes read into the current_frame_. + int current_frame_capacity_; + + bool enable_compression_; + scoped_ptr<z_stream> compressor_; + scoped_ptr<z_stream> decompressor_; + FlipFramerVisitorInterface* visitor_; + + static bool compression_default_; +}; + +} // namespace flip + +#endif // NET_FLIP_FLIP_FRAMER_H_ + diff --git a/net/flip/flip_framer_test.cc b/net/flip/flip_framer_test.cc new file mode 100644 index 0000000..a786bc2 --- /dev/null +++ b/net/flip/flip_framer_test.cc @@ -0,0 +1,251 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <iostream> +#include "base/scoped_ptr.h" + +#include "flip_framer.h" // cross-google3 directory naming. +#include "flip_protocol.h" +#include "flip_frame_builder.h" +#ifdef _WIN32 +#include "testing/platform_test.h" +#else +#include "testing/base/public/gunit.h" + +#define PlatformTest ::testing::Test +#endif + +namespace flip { + +class FlipFramerTest : public PlatformTest { + public: + virtual void TearDown() {} +}; + +class TestFlipVisitor : public FlipFramerVisitorInterface { + public: + explicit TestFlipVisitor(FlipFramer* framer) + : framer_(framer), + error_count_(0), + syn_frame_count_(0), + syn_reply_frame_count_(0), + data_frame_count_(0), + fin_frame_count_(0) { + } + + void OnError(FlipFramer* f) { + error_count_++; + } + void OnStreamFrameData(FlipStreamId stream_id, + const char* data, + uint32 len) { + data_frame_count_++; +#ifdef TEST_LOGGING + std::cerr << "OnStreamFrameData(" << stream_id << ", \""; + if (len > 0) { + for (uint32 i = 0 ; i < len; ++i) { + std::cerr << std::hex << (0xFF & (unsigned int)data[i]) << std::dec; + } + } + std::cerr << "\", " << len << ")\n"; +#endif // TEST_LOGGING + } + + void OnControl(const FlipControlFrame* frame) { + FlipHeaderBlock headers; + bool parsed_headers = false; + switch (frame->type()) { + case SYN_STREAM: + parsed_headers = framer_->ParseHeaderBlock( + reinterpret_cast<const FlipFrame*>(frame), &headers); + DCHECK(parsed_headers); + syn_frame_count_++; + VLOG(2) << "OnSyn(" << frame->stream_id() << ")\n"; + break; + case SYN_REPLY: + parsed_headers = framer_->ParseHeaderBlock( + reinterpret_cast<const FlipFrame*>(frame), &headers); + DCHECK(parsed_headers); + syn_reply_frame_count_++; + VLOG(2) << "OnSynReply(" << frame->stream_id() << ")\n"; + break; + case FIN_STREAM: + fin_frame_count_++; + VLOG(2) << "OnFin(" << frame->stream_id() << ")\n"; + break; + default: + DCHECK(false); // Error! + } + } + + void OnLameDuck() { + } + + FlipFramer* framer_; + // Counters from the visitor callbacks. + int error_count_; + int syn_frame_count_; + int syn_reply_frame_count_; + int data_frame_count_; + int fin_frame_count_; +}; + +// Test our protocol constants +TEST_F(FlipFramerTest, ProtocolConstants) { + EXPECT_EQ(8, sizeof(FlipFrame)); + EXPECT_EQ(8, sizeof(FlipDataFrame)); + EXPECT_EQ(12, sizeof(FlipControlFrame)); + EXPECT_EQ(16, sizeof(FlipSynStreamControlFrame)); + EXPECT_EQ(16, sizeof(FlipSynReplyControlFrame)); + EXPECT_EQ(16, sizeof(FlipFinStreamControlFrame)); + EXPECT_EQ(1, SYN_STREAM); + EXPECT_EQ(2, SYN_REPLY); + EXPECT_EQ(3, FIN_STREAM); +} + +// Test that we can encode and decode a FlipHeaderBlock. +TEST_F(FlipFramerTest, HeaderBlock) { + FlipHeaderBlock headers; + headers["alpha"] = "beta"; + headers["gamma"] = "charlie"; + FlipFramer framer; + + // Encode the header block into a SynStream frame. + scoped_ptr<FlipSynStreamControlFrame> frame( + framer.CreateSynStream(1, 1, true, &headers)); + EXPECT_TRUE(frame.get() != NULL); + + FlipHeaderBlock new_headers; + FlipFrame* control_frame = reinterpret_cast<FlipFrame*>(frame.get()); + framer.ParseHeaderBlock(control_frame, &new_headers); + + EXPECT_EQ(headers.size(), new_headers.size()); + EXPECT_EQ(headers["alpha"], new_headers["alpha"]); + EXPECT_EQ(headers["gamma"], new_headers["gamma"]); +} + +TEST_F(FlipFramerTest, HeaderBlockBarfsOnOutOfOrderHeaders) { + FlipFrameBuilder frame; + + frame.WriteUInt16(kControlFlagMask | 1); + frame.WriteUInt16(SYN_STREAM); + frame.WriteUInt32(0); // Placeholder for the length. + frame.WriteUInt32(3); // stream_id + frame.WriteUInt16(0); // Priority. + + frame.WriteUInt16(2); // Number of headers. + FlipHeaderBlock::iterator it; + frame.WriteString("gamma"); + frame.WriteString("gamma"); + frame.WriteString("alpha"); + frame.WriteString("alpha"); + // write the length + frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); + + FlipHeaderBlock new_headers; + const FlipFrame* control_frame = frame.data(); + FlipFramer framer; + framer.set_enable_compression(false); + EXPECT_FALSE(framer.ParseHeaderBlock(control_frame, &new_headers)); +} + +TEST_F(FlipFramerTest, BasicCompression) { + FlipHeaderBlock headers; + headers["server"] = "FlipServer 1.0"; + headers["date"] = "Mon 12 Jan 2009 12:12:12 PST"; + headers["status"] = "200"; + headers["version"] = "HTTP/1.1"; + headers["content-type"] = "text/html"; + headers["content-length"] = "12"; + + FlipFramer framer; + scoped_ptr<FlipSynStreamControlFrame> + frame1(framer.CreateSynStream(1, 1, true, &headers)); + scoped_ptr<FlipSynStreamControlFrame> + frame2(framer.CreateSynStream(1, 1, true, &headers)); + + // Expect the second frame to be more compact than the first. + EXPECT_LE(frame2->length(), frame1->length()); + + // Decompress the first frame + scoped_ptr<FlipFrame> frame3( + framer.DecompressFrame(reinterpret_cast<FlipFrame*>(frame1.get()))); + + // Decompress the second frame + scoped_ptr<FlipFrame> frame4( + framer.DecompressFrame(reinterpret_cast<FlipFrame*>(frame2.get()))); + + // Expect frames 3 & 4 to be the same. + EXPECT_EQ(0, + memcmp(frame3.get(), frame4.get(), + sizeof(FlipFrame) + frame3->length())); +} + +TEST_F(FlipFramerTest, Basic) { + const unsigned char input[] = { + 0x80, 0x01, 0x00, 0x01, // SYN Stream #1 + 0x00, 0x00, 0x00, 0x10, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x02, 'h', 'h', + 0x00, 0x02, 'v', 'v', + + 0x00, 0x00, 0x00, 0x01, // DATA on Stream #1 + 0x00, 0x00, 0x00, 0x0c, + 0xde, 0xad, 0xbe, 0xef, + 0xde, 0xad, 0xbe, 0xef, + 0xde, 0xad, 0xbe, 0xef, + + 0x80, 0x01, 0x00, 0x01, // SYN Stream #3 + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x00, 0x00, + + 0x00, 0x00, 0x00, 0x03, // DATA on Stream #3 + 0x00, 0x00, 0x00, 0x08, + 0xde, 0xad, 0xbe, 0xef, + 0xde, 0xad, 0xbe, 0xef, + + 0x00, 0x00, 0x00, 0x01, // DATA on Stream #1 + 0x00, 0x00, 0x00, 0x04, + 0xde, 0xad, 0xbe, 0xef, + + 0x80, 0x01, 0x00, 0x03, // FIN on Stream #1 + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + + 0x00, 0x00, 0x00, 0x03, // DATA on Stream #3 + 0x00, 0x00, 0x00, 0x00, + + 0x80, 0x01, 0x00, 0x03, // FIN on Stream #3 + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x00, 0x00, + }; + + FlipFramer framer; + framer.set_enable_compression(false); + TestFlipVisitor visitor(&framer); + framer.set_visitor(&visitor); + size_t input_remaining = sizeof(input); + const char* input_ptr = reinterpret_cast<const char*>(input); + while (input_remaining > 0 && + framer.error_code() == FlipFramer::FLIP_NO_ERROR) { + size_t bytes_processed = framer.ProcessInput(input_ptr, sizeof(input)); + input_remaining -= bytes_processed; + input_ptr += bytes_processed; + if (framer.state() == FlipFramer::FLIP_DONE) + framer.Reset(); + } + EXPECT_EQ(0, visitor.error_count_); + EXPECT_EQ(2, visitor.syn_frame_count_); + EXPECT_EQ(0, visitor.syn_reply_frame_count_); + EXPECT_EQ(4, visitor.data_frame_count_); + EXPECT_EQ(2, visitor.fin_frame_count_); +} + +} // namespace flip + + diff --git a/net/flip/flip_network_transaction.cc b/net/flip/flip_network_transaction.cc new file mode 100644 index 0000000..88b931d --- /dev/null +++ b/net/flip/flip_network_transaction.cc @@ -0,0 +1,414 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/flip/flip_network_transaction.h" + +#include "base/scoped_ptr.h" +#include "base/compiler_specific.h" +#include "base/field_trial.h" +#include "base/string_util.h" +#include "base/trace_event.h" +#include "build/build_config.h" +#include "net/base/connection_type_histograms.h" +#include "net/base/host_resolver.h" +#include "net/base/io_buffer.h" +#include "net/base/load_flags.h" +#include "net/base/net_errors.h" +#include "net/base/net_util.h" +#include "net/base/upload_data_stream.h" +#include "net/http/http_auth.h" +#include "net/http/http_auth_handler.h" +#include "net/http/http_basic_stream.h" +#include "net/http/http_chunked_decoder.h" +#include "net/http/http_network_session.h" +#include "net/http/http_request_info.h" +#include "net/http/http_response_headers.h" +#include "net/http/http_util.h" +#include "net/socket/client_socket_factory.h" +#include "net/socket/ssl_client_socket.h" + +using base::Time; + +namespace net { + +//----------------------------------------------------------------------------- + +FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session) + : flip_request_id_(0), + user_callback_(0), + user_buffer_bytes_remaining_(0), + session_(session), + request_(NULL), + response_complete_(false), + response_status_(net::OK), + next_state_(STATE_NONE) { +} + +FlipNetworkTransaction::~FlipNetworkTransaction() { + LOG(INFO) << "FlipNetworkTransaction dead. " << this; + if (flip_ && flip_request_id_) + flip_->CancelStream(flip_request_id_); +} + +const HttpRequestInfo* FlipNetworkTransaction::request() { + return request_; +} + +const UploadDataStream* FlipNetworkTransaction::data() { + return request_body_stream_.get(); +} + +void FlipNetworkTransaction::OnRequestSent(int status) { + if (status == OK) + next_state_ = STATE_SEND_REQUEST_COMPLETE; + else + next_state_ = STATE_NONE; + + int rv = DoLoop(status); +} + +void FlipNetworkTransaction::OnResponseReceived(HttpResponseInfo* response) { + next_state_ = STATE_READ_HEADERS_COMPLETE; + + response_ = *response; // TODO(mbelshe): avoid copy. + + int rv = DoLoop(net::OK); +} + +void FlipNetworkTransaction::OnDataReceived(const char* buffer, int bytes) { + // TODO(mbelshe): if data is received before a syn reply, this will crash. + + DCHECK(bytes >= 0); + next_state_ = STATE_READ_BODY; + + if (bytes > 0) { + DCHECK(buffer); + + // TODO(mbelshe): If read is pending, we should copy the data straight into + // the read buffer here. For now, we'll queue it always. + + IOBufferWithSize* io_buffer = new IOBufferWithSize(bytes); + memcpy(io_buffer->data(), buffer, bytes); + + response_body_.push_back(io_buffer); + } + int rv = DoLoop(net::OK); +} + +void FlipNetworkTransaction::OnClose(int status) { + next_state_ = STATE_READ_BODY_COMPLETE; + response_complete_ = true; + response_status_ = status; + flip_request_id_ = 0; // TODO(mbelshe) - do we need this? + int rv = DoLoop(status); +} + +void FlipNetworkTransaction::OnCancel() { + next_state_ = STATE_NONE; + response_complete_ = true; + response_status_ = net::ERR_ABORTED; + flip_request_id_ = 0; // TODO(mbelshe) - do we need this? + // Clear any data in our buffer. + while (response_body_.size()) + response_body_.pop_front(); +} + +int FlipNetworkTransaction::Start(const HttpRequestInfo* request_info, + CompletionCallback* callback, + LoadLog* load_log) { + request_ = request_info; + start_time_ = base::Time::Now(); + + next_state_ = STATE_INIT_CONNECTION; + int rv = DoLoop(OK); + if (rv == ERR_IO_PENDING) + user_callback_ = callback; + return rv; +} + +int FlipNetworkTransaction::RestartIgnoringLastError( + CompletionCallback* callback) { + // TODO(mbelshe): implement me. + NOTIMPLEMENTED(); + return ERR_NOT_IMPLEMENTED; +} + +int FlipNetworkTransaction::RestartWithCertificate( + X509Certificate* client_cert, CompletionCallback* callback) { + // TODO(mbelshe): implement me. + NOTIMPLEMENTED(); + return ERR_NOT_IMPLEMENTED; +} + +int FlipNetworkTransaction::RestartWithAuth( + const std::wstring& username, + const std::wstring& password, + CompletionCallback* callback) { + // TODO(mbelshe): implement me. + NOTIMPLEMENTED(); + return 0; +} + +int FlipNetworkTransaction::Read(IOBuffer* buf, int buf_len, + CompletionCallback* callback) { + DCHECK(buf); + DCHECK(buf_len > 0); + DCHECK(callback); + DCHECK(flip_.get()); + + // If we have data buffered, complete the IO immediately. + if (response_body_.size()) { + int bytes_read = 0; + while (response_body_.size() && buf_len > 0) { + scoped_refptr<IOBufferWithSize> data = response_body_.front(); + int bytes_to_copy = std::min(buf_len, data->size()); + memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); + buf_len -= bytes_to_copy; + if (bytes_to_copy == data->size()) { + response_body_.pop_front(); + } else { + int bytes_remaining = data->size() - bytes_to_copy; + IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); + memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), + bytes_remaining); + response_body_.pop_front(); + response_body_.push_front(new_buffer); + } + bytes_read += bytes_to_copy; + } + return bytes_read; + } + + if (response_complete_) + return 0; // We already finished reading this stream. + + user_callback_ = callback; + user_buffer_ = buf; + user_buffer_bytes_remaining_ = buf_len; + return net::ERR_IO_PENDING; +} + +const HttpResponseInfo* FlipNetworkTransaction::GetResponseInfo() const { + return (response_.headers || response_.ssl_info.cert) ? &response_ : NULL; +} + +LoadState FlipNetworkTransaction::GetLoadState() const { + switch (next_state_) { + case STATE_INIT_CONNECTION_COMPLETE: + if (flip_.get()) + return flip_->GetLoadState(); + return LOAD_STATE_CONNECTING; + case STATE_SEND_REQUEST_COMPLETE: + return LOAD_STATE_SENDING_REQUEST; + case STATE_READ_HEADERS_COMPLETE: + return LOAD_STATE_WAITING_FOR_RESPONSE; + case STATE_READ_BODY_COMPLETE: + return LOAD_STATE_READING_RESPONSE; + default: + return LOAD_STATE_IDLE; + } +} + +uint64 FlipNetworkTransaction::GetUploadProgress() const { + if (!request_body_stream_.get()) + return 0; + + return request_body_stream_->position(); +} + + +void FlipNetworkTransaction::DoHttpTransactionCallback(int rv) { + DCHECK(rv != ERR_IO_PENDING); + + // Because IO is asynchronous from the caller, we could be completing an + // IO even though the user hasn't issued a Read() yet. + if (!user_callback_) + return; + + // Since Run may result in Read being called, clear user_callback_ up front. + CompletionCallback* c = user_callback_; + user_callback_ = NULL; + c->Run(rv); +} + +int FlipNetworkTransaction::DoLoop(int result) { + DCHECK(next_state_ != STATE_NONE); + DCHECK(request_); + + if (!request_) + return 0; + + int rv = result; + do { + State state = next_state_; + next_state_ = STATE_NONE; + switch (state) { + case STATE_INIT_CONNECTION: + DCHECK_EQ(OK, rv); + rv = DoInitConnection(); + break; + case STATE_INIT_CONNECTION_COMPLETE: + rv = DoInitConnectionComplete(rv); + break; + case STATE_SEND_REQUEST: + DCHECK_EQ(OK, rv); + rv = DoSendRequest(); + break; + case STATE_SEND_REQUEST_COMPLETE: + rv = DoSendRequestComplete(rv); + break; + case STATE_READ_HEADERS: + DCHECK_EQ(OK, rv); + rv = DoReadHeaders(); + break; + case STATE_READ_HEADERS_COMPLETE: + // DoReadHeadersComplete only returns OK becaue the transaction + // could be destroyed as part of the call. + rv = DoReadHeadersComplete(rv); + DCHECK(rv == net::OK); + return rv; + break; + case STATE_READ_BODY: + DCHECK_EQ(OK, rv); + rv = DoReadBody(); + // DoReadBody only returns OK becaue the transaction could be + // destroyed as part of the call. + DCHECK(rv == net::OK); + return rv; + break; + case STATE_READ_BODY_COMPLETE: + // We return here because the Transaction could be destroyed after this + // call. + rv = DoReadBodyComplete(rv); + DCHECK(rv == net::OK); + return rv; + break; + case STATE_NONE: + rv = ERR_FAILED; + break; + default: + NOTREACHED() << "bad state"; + rv = ERR_FAILED; + break; + } + } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE); + + return rv; +} + +int FlipNetworkTransaction::DoInitConnection() { + next_state_ = STATE_INIT_CONNECTION_COMPLETE; + + std::string host = request_->url.HostNoBrackets(); + int port = request_->url.EffectiveIntPort(); + + std::string connection_group = "flip."; + connection_group.append(host); + + HostResolver::RequestInfo resolve_info(host, port); + +// TODO(mbelshe): Cleanup these testing tricks. +// If we want to use multiple connections, grab the flip session +// up front using the original domain name. +#undef USE_MULTIPLE_CONNECTIONS +#define DIVERT_URLS_TO_TEST_SERVER +#if defined(USE_MULTIPLE_CONNECTIONS) || !defined(DIVERT_URLS_TO_TEST_SERVER) + flip_ = FlipSession::GetFlipSession(resolve_info, session_); +#endif + +// Use this to divert URLs to a test server. +#ifdef DIVERT_URLS_TO_TEST_SERVER + // Fake out this session to go to our test server. + host = "servername"; + port = 443; + resolve_info = HostResolver::RequestInfo(host, port); +#ifndef USE_MULTIPLE_CONNECTIONS + flip_ = FlipSession::GetFlipSession(resolve_info, session_); +#endif // USE_MULTIPLE_CONNECTIONS + +#endif // DIVERT_URLS_TO_TEST_SERVER + + int rv = flip_->Connect(connection_group, resolve_info, request_->priority); + DCHECK(rv == net::OK); // The API says it will always return OK. + return net::OK; +} + +int FlipNetworkTransaction::DoInitConnectionComplete(int result) { + if (result < 0) + return result; + + next_state_ = STATE_SEND_REQUEST; + return net::OK; +} + +int FlipNetworkTransaction::DoSendRequest() { + // TODO(mbelshe): rethink this UploadDataStream wrapper. + if (request_->upload_data) + request_body_stream_.reset(new UploadDataStream(request_->upload_data)); + + flip_request_id_ = flip_->CreateStream(this); + + if (response_complete_) + return net::OK; + + // The FlipSession will always call us back when the send is complete. + return net::ERR_IO_PENDING; +} + +int FlipNetworkTransaction::DoSendRequestComplete(int result) { + if (result < 0) + return result; + + next_state_ = STATE_READ_HEADERS; + return net::OK; +} + +int FlipNetworkTransaction::DoReadHeaders() { + // The FlipSession will always call us back when the headers have been + // received. + return net::ERR_IO_PENDING; +} + +int FlipNetworkTransaction::DoReadHeadersComplete(int result) { + // Notify the user that the headers are ready. + DoHttpTransactionCallback(result); + return net::OK; +} + +int FlipNetworkTransaction::DoReadBody() { + // The caller has not issued a read request yet. + if (!user_callback_) + return net::OK; + + int bytes_read = 0; + while (response_body_.size() && user_buffer_bytes_remaining_ > 0) { + scoped_refptr<IOBufferWithSize> data = response_body_.front(); + int bytes_to_copy = std::min(user_buffer_bytes_remaining_, data->size()); + memcpy(&(user_buffer_->data()[bytes_read]), data->data(), bytes_to_copy); + user_buffer_bytes_remaining_ -= bytes_to_copy; + if (bytes_to_copy == data->size()) { + response_body_.pop_front(); + } else { + int bytes_remaining = data->size() - bytes_to_copy; + IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); + memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), + bytes_remaining); + response_body_.pop_front(); + response_body_.push_front(new_buffer); + } + bytes_read += bytes_to_copy; + } + + DoHttpTransactionCallback(bytes_read); + return net::OK; +} + +int FlipNetworkTransaction::DoReadBodyComplete(int result) { + // TODO(mbelshe): record success or failure of the transaction? + if (user_callback_) + DoHttpTransactionCallback(result); + return OK; +} + +} // namespace net diff --git a/net/flip/flip_network_transaction.h b/net/flip/flip_network_transaction.h new file mode 100644 index 0000000..f4b0292 --- /dev/null +++ b/net/flip/flip_network_transaction.h @@ -0,0 +1,142 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_NETWORK_TRANSACTION_H_ +#define NET_FLIP_NETWORK_TRANSACTION_H_ + +#include <string> +#include <deque> + +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "base/time.h" +#include "net/base/address_list.h" +#include "net/base/host_resolver.h" +#include "net/base/io_buffer.h" +#include "net/base/load_flags.h" +#include "net/base/load_states.h" +#include "net/base/ssl_config_service.h" +#include "net/flip/flip_session.h" +#include "net/http/http_auth.h" +#include "net/http/http_auth_handler.h" +#include "net/http/http_response_info.h" +#include "net/http/http_transaction.h" +#include "net/proxy/proxy_service.h" +#include "net/socket/client_socket_handle.h" +#include "net/socket/client_socket_pool.h" +#include "testing/gtest/include/gtest/gtest_prod.h" + +namespace net { + +class ClientSocketFactory; +class HttpNetworkSession; +class UploadDataStream; + +// A FlipNetworkTransaction can be used to fetch HTTP conent. +// The FlipDelegate is the consumer of events from the FlipSession. +class FlipNetworkTransaction : public HttpTransaction, public FlipDelegate { + public: + explicit FlipNetworkTransaction(HttpNetworkSession* session); + virtual ~FlipNetworkTransaction(); + + // FlipDelegate methods: + virtual const HttpRequestInfo* request(); + virtual const UploadDataStream* data(); + virtual void OnRequestSent(int status); + virtual void OnResponseReceived(HttpResponseInfo* response); + virtual void OnDataReceived(const char* buffer, int bytes); + virtual void OnClose(int status); + virtual void OnCancel(); + + // HttpTransaction methods: + virtual int Start(const HttpRequestInfo* request_info, + CompletionCallback* callback, + LoadLog* load_log); + virtual int RestartIgnoringLastError(CompletionCallback* callback); + virtual int RestartWithCertificate(X509Certificate* client_cert, + CompletionCallback* callback); + virtual int RestartWithAuth(const std::wstring& username, + const std::wstring& password, + CompletionCallback* callback); + virtual bool IsReadyToRestartForAuth() { return false; } + virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback); + virtual const HttpResponseInfo* GetResponseInfo() const; + virtual LoadState GetLoadState() const; + virtual uint64 GetUploadProgress() const; + + protected: + friend class FlipNetworkTransactionTest; + + // Provide access to the session for testing. + FlipSession* GetFlipSession() { return flip_.get(); } + + private: + enum State { + STATE_INIT_CONNECTION, + STATE_INIT_CONNECTION_COMPLETE, + STATE_SEND_REQUEST, + STATE_SEND_REQUEST_COMPLETE, + STATE_READ_HEADERS, + STATE_READ_HEADERS_COMPLETE, + STATE_READ_BODY, + STATE_READ_BODY_COMPLETE, + STATE_NONE + }; + + // Used to callback an HttpTransaction call. + void DoHttpTransactionCallback(int result); + + // Runs the state transition loop. + int DoLoop(int result); + + // Each of these methods corresponds to a State value. Those with an input + // argument receive the result from the previous state. If a method returns + // ERR_IO_PENDING, then the result from OnIOComplete will be passed to the + // next state method as the result arg. + int DoInitConnection(); + int DoInitConnectionComplete(int result); + int DoSendRequest(); + int DoSendRequestComplete(int result); + int DoReadHeaders(); + int DoReadHeadersComplete(int result); + int DoReadBody(); + int DoReadBodyComplete(int result); + + // The Flip request id for this request. + int flip_request_id_; + + // The Flip session servicing this request. If NULL, the request has not + // started. + scoped_refptr<FlipSession> flip_; + + CompletionCallback* user_callback_; + scoped_refptr<IOBuffer> user_buffer_; + int user_buffer_bytes_remaining_; + + scoped_refptr<HttpNetworkSession> session_; + + const HttpRequestInfo* request_; + HttpResponseInfo response_; + + scoped_ptr<UploadDataStream> request_body_stream_; + + // We buffer the response body as it arrives asynchronously from the stream. + // TODO(mbelshe): is this infinite buffering? + std::deque<scoped_refptr<IOBufferWithSize> > response_body_; + bool response_complete_; + // Since we buffer the response, we also buffer the response status. + // Not valid until response_complete_ is true. + int response_status_; + + // The time the Start method was called. + base::Time start_time_; + + // The next state in the state machine. + State next_state_; +}; + +} // namespace net + +#endif // NET_HTTP_NETWORK_TRANSACTION_H_ + diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc new file mode 100644 index 0000000..29db36c --- /dev/null +++ b/net/flip/flip_network_transaction_unittest.cc @@ -0,0 +1,190 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <math.h> // ceil + +#include "base/compiler_specific.h" +#include "net/base/completion_callback.h" +#include "net/base/mock_host_resolver.h" +#include "net/base/ssl_config_service_defaults.h" +#include "net/base/ssl_info.h" +#include "net/base/test_completion_callback.h" +#include "net/base/upload_data.h" +#include "net/flip/flip_network_transaction.h" +#include "net/flip/flip_protocol.h" +#include "net/http/http_auth_handler_ntlm.h" +#include "net/http/http_network_session.h" +#include "net/http/http_network_transaction.h" +#include "net/http/http_transaction_unittest.h" +#include "net/proxy/proxy_config_service_fixed.h" +#include "net/socket/client_socket_factory.h" +#include "net/socket/socket_test_util.h" +#include "net/socket/ssl_client_socket.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +//----------------------------------------------------------------------------- + +namespace flip { + +using namespace net; + +// Create a proxy service which fails on all requests (falls back to direct). +ProxyService* CreateNullProxyService() { + return ProxyService::CreateNull(); +} + +// Helper to manage the lifetimes of the dependencies for a +// HttpNetworkTransaction. +class SessionDependencies { + public: + // Default set of dependencies -- "null" proxy service. + SessionDependencies() + : host_resolver(new MockHostResolver), + proxy_service(CreateNullProxyService()), + ssl_config_service(new SSLConfigServiceDefaults) {} + + // Custom proxy service dependency. + explicit SessionDependencies(ProxyService* proxy_service) + : host_resolver(new MockHostResolver), + proxy_service(proxy_service), + ssl_config_service(new SSLConfigServiceDefaults) {} + + scoped_refptr<MockHostResolverBase> host_resolver; + scoped_refptr<ProxyService> proxy_service; + scoped_refptr<SSLConfigService> ssl_config_service; + MockClientSocketFactory socket_factory; +}; + +ProxyService* CreateFixedProxyService(const std::string& proxy) { + net::ProxyConfig proxy_config; + proxy_config.proxy_rules.ParseFromString(proxy); + return ProxyService::CreateFixed(proxy_config); +} + + +HttpNetworkSession* CreateSession(SessionDependencies* session_deps) { + return new HttpNetworkSession(session_deps->host_resolver, + session_deps->proxy_service, + &session_deps->socket_factory, + session_deps->ssl_config_service); +} + +class FlipNetworkTransactionTest : public PlatformTest { + public: + virtual void SetUp() { + // Disable compression on this test. + FlipFramer::set_enable_compression_default(false); + } + + virtual void TearDown() { + // Empty the current queue. + MessageLoop::current()->RunAllPending(); + PlatformTest::TearDown(); + } + + protected: + void KeepAliveConnectionResendRequestTest(const MockRead& read_failure); + + struct SimpleGetHelperResult { + int rv; + std::string status_line; + std::string response_data; + }; + + SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[]) { + SimpleGetHelperResult out; + + SessionDependencies session_deps; + scoped_ptr<FlipNetworkTransaction> trans( + new FlipNetworkTransaction( + CreateSession(&session_deps))); + + HttpRequestInfo request; + request.method = "GET"; + request.url = GURL("http://www.google.com/"); + request.load_flags = 0; + + StaticMockSocket data(data_reads, NULL); + session_deps.socket_factory.AddMockSocket(&data); + + TestCompletionCallback callback; + + int rv = trans->Start(&request, &callback, NULL); + EXPECT_EQ(ERR_IO_PENDING, rv); + + out.rv = callback.WaitForResult(); + if (out.rv != OK) + return out; + + const HttpResponseInfo* response = trans->GetResponseInfo(); + EXPECT_TRUE(response != NULL); + + EXPECT_TRUE(response->headers != NULL); + out.status_line = response->headers->GetStatusLine(); + + rv = ReadTransaction(trans.get(), &out.response_data); + EXPECT_EQ(OK, rv); + + return out; + } + + void ConnectStatusHelperWithExpectedStatus(const MockRead& status, + int expected_status); + + void ConnectStatusHelper(const MockRead& status); +}; + +//----------------------------------------------------------------------------- + +// Verify FlipNetworkTransaction constructor. +TEST_F(FlipNetworkTransactionTest, Constructor) { + SessionDependencies session_deps; + scoped_refptr<net::HttpNetworkSession> session = + CreateSession(&session_deps); + scoped_ptr<HttpTransaction> trans(new FlipNetworkTransaction(session)); +} + +TEST_F(FlipNetworkTransactionTest, Connect) { + unsigned char syn_reply[] = { + 0x80, 0x01, 0x00, 0x02, // header + 0x00, 0x00, 0x00, 0x45, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x04, // 4 headers + 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', // "hello" + 0x00, 0x03, 'b', 'y', 'e', // "bye" + 0x00, 0x06, 's', 't', 'a', 't', 'u', 's', // "status" + 0x00, 0x03, '2', '0', '0', // "200" + 0x00, 0x03, 'u', 'r', 'l', // "url" + 0x00, 0x0a, '/', 'i', 'n', 'd', 'e', 'x', '.', 'p', 'h', 'p', // "HTTP/1.1" + 0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version" + 0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1" + }; + unsigned char body_frame[] = { + 0x00, 0x00, 0x00, 0x01, // header + 0x00, 0x00, 0x00, 0x06, + 'h', 'e', 'l', 'l', 'o', '!', // "hello" + }; + unsigned char fin_frame[] = { + 0x80, 0x01, 0x00, 0x03, // header + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + }; + + MockRead data_reads[] = { + MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)), + MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)), + MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)), + MockRead(true, 0, 0), // EOF + }; + + SimpleGetHelperResult out = SimpleGetHelper(data_reads); + EXPECT_EQ(OK, out.rv); + EXPECT_EQ("HTTP/1.1 200 OK", out.status_line); + EXPECT_EQ("hello!", out.response_data); +} + +} // namespace net + diff --git a/net/flip/flip_protocol.h b/net/flip/flip_protocol.h new file mode 100644 index 0000000..93c8a5a --- /dev/null +++ b/net/flip/flip_protocol.h @@ -0,0 +1,203 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// This file contains some protocol structures for use with Flip. + +#ifndef NET_FLIP_FLIP_PROTOCOL_H_ +#define NET_FLIP_FLIP_PROTOCOL_H_ + +#ifdef WIN32 +#include <winsock2.h> +#else +#include <arpa/inet.h> +#endif +#include "base/basictypes.h" +#include "base/logging.h" +#include "flip_bitmasks.h" // cross-google3 directory naming. + + +// Stream Frame Format +// +-------------------------------+ +// |0| Stream-ID (31bits) | +// +-------------------------------+ +// |flags (16)| Length (16bits) | +// +-------------------------------+ +// | Data | +// +-------------------------------+ +// + +// Control Frame Format +// +-------------------------------+ +// |1| Version(15bits)|Type(16bits)| +// +-------------------------------+ +// | Length (32bits) | +// +-------------------------------+ +// | Data | +// +-------------------------------+ +// + + +// Control Frame: SYN_STREAM +// +----------------------------------+ +// |1|000000000000001|0000000000000001| +// +----------------------------------+ +// | Length (32bits) | >= 8 +// +----------------------------------+ +// |X| Stream-ID(31bits) | +// +----------------------------------+ +// |Pri| unused | Length (16bits)| +// +----------------------------------+ +// +// Control Frame: SYN_REPLY +// +----------------------------------+ +// |1|000000000000001|0000000000000010| +// +----------------------------------+ +// | Length (32bits) | >= 8 +// +----------------------------------+ +// |X| Stream-ID(31bits) | +// +----------------------------------+ +// | unused (16 bits)| Length (16bits)| +// +----------------------------------+ +// +// Control Frame: FIN_STREAM +// +----------------------------------+ +// |1|000000000000001|0000000000000011| +// +----------------------------------+ +// | Length (32bits) | >= 4 +// +----------------------------------+ +// |X| Stream-ID(31bits) | +// +----------------------------------+ +// | Status (32 bits) | +// +----------------------------------+ +// +// Control Frame: SetMaxStreams +// +----------------------------------+ +// |1|000000000000001|0000000000000100| +// +----------------------------------+ +// | Length (32bits) | >= 4 +// +----------------------------------+ +// |X| Stream-ID(31bits) | +// +----------------------------------+ + +// TODO(fenix): add ChangePriority support. + +namespace flip { + +// Note: all protocol data structures are on-the-wire format. That means that +// data is stored in network-normalized order. Readers must use the +// accessors provided or call ntohX() functions. + +// Types of Flip Control Frames. +typedef enum { + SYN_STREAM = 1, + SYN_REPLY, + FIN_STREAM, + NOOP +} FlipControlType; + +// Flags on data packets +typedef enum { + DATA_FLAG_COMPRESSED = 1 +} FlipDataFlags; + +// A FLIP stream id is a 31 bit entity. +typedef uint32 FlipStreamId; + +// FLIP Priorities. (there are only 2 bits) +#define FLIP_PRIORITY_LOWEST 3 +#define FLIP_PRIORITY_HIGHEST 0 + +// All Flip Frame types derive from the FlipFrame struct. +typedef struct { + uint32 length() const { return ntohs(data_.length_); } + void set_length(uint16 length) { data_.length_ = htons(length); } + bool is_control_frame() const { + return (ntohs(control_.version_) & kControlFlagMask) == kControlFlagMask; + } + + protected: + union { + struct { + uint16 version_; + uint16 type_; + uint32 length_; + } control_; + struct { + FlipStreamId stream_id_; + uint16 flags_; + uint16 length_; + } data_; + }; +} FlipFrame; + +// A Data Frame. +typedef struct : public FlipFrame { + FlipStreamId stream_id() const { + return ntohl(data_.stream_id_) & kStreamIdMask; + } + void set_stream_id(FlipStreamId id) { data_.stream_id_ = htonl(id); } + uint16 flags() const { return ntohs(data_.flags_); } + void set_flags(uint16 flags) { data_.flags_ = htons(flags); } +} FlipDataFrame; + +// A Control Frame. +typedef struct : public FlipFrame { + uint16 version() const { + const int kVersionMask = 0x7fff; + return ntohs(control_.version_) & kVersionMask; + } + FlipControlType type() const { + uint16 type = ntohs(control_.type_); + DCHECK(type >= SYN_STREAM && type <= NOOP); + return static_cast<FlipControlType>(type); + } + FlipStreamId stream_id() const { return ntohl(stream_id_) & kStreamIdMask; } + + private: + FlipStreamId stream_id_; +} FlipControlFrame; + +// A SYN_STREAM frame. +typedef struct FlipSynStreamControlFrame : public FlipControlFrame { + uint8 priority() const { return (priority_ & kPriorityMask) >> 6; } + int header_block_len() const { return length() - kHeaderBlockOffset; } + const char* header_block() const { + return reinterpret_cast<const char*>(this) + + sizeof(FlipFrame) + kHeaderBlockOffset; + } + private: + // Offset from the end of the FlipControlFrame to the FlipHeaderBlock. + static const int kHeaderBlockOffset = 6; + uint8 priority_; + uint8 unused_; + // Variable FlipHeaderBlock here. +} FlipSynStreamControlFrame; + +// A SYN_REPLY frame. +typedef struct FlipSynReplyControlFrame : public FlipControlFrame { + int header_block_len() const { return length() - kHeaderBlockOffset; } + const char* header_block() const { + return reinterpret_cast<const char*>(this) + + sizeof(FlipFrame) + kHeaderBlockOffset; + } + + private: + // Offset from the end of the FlipControlFrame to the FlipHeaderBlock. + static const int kHeaderBlockOffset = 6; + uint16 unused_; + // Variable FlipHeaderBlock here. +} FlipSynReplyControlFrame; + +// A FIN_STREAM frame. +typedef struct FlipFinStreamControlFrame : public FlipControlFrame { + uint32 status() const { return ntohl(status_); } + void set_status(int id) { status_ = htonl(id); } + private: + uint32 status_; +} FlipFinStreamControlFrame; + +} // namespace flip + +#endif // NET_FLIP_FLIP_PROTOCOL_H_ + diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc new file mode 100644 index 0000000..3d63ef9 --- /dev/null +++ b/net/flip/flip_session.cc @@ -0,0 +1,762 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/flip/flip_session.h" + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/rand_util.h" +#include "base/stats_counters.h" +#include "base/string_util.h" +#include "net/base/load_flags.h" +#include "net/flip/flip_frame_builder.h" +#include "net/flip/flip_protocol.h" +#include "net/http/http_network_session.h" +#include "net/http/http_request_info.h" +#include "net/http/http_response_headers.h" +#include "net/http/http_response_info.h" +#include "net/tools/dump_cache/url_to_filename_encoder.h" + +namespace net { + +// static +scoped_ptr<FlipSessionPool> FlipSession::session_pool_; +// static +bool FlipSession::disable_compression_ = false; +// static +int PrioritizedIOBuffer::order_ = 0; + +// The FlipStreamImpl is an internal class representing an active FlipStream. +class FlipStreamImpl { + public: + // A FlipStreamImpl represents an active FlipStream. + // If |delegate| is NULL, then we're receiving pushed data from the server. + FlipStreamImpl(flip::FlipStreamId stream_id, FlipDelegate* delegate) + : stream_id_(stream_id), + delegate_(delegate), + response_(NULL), + data_complete_(false) {} + virtual ~FlipStreamImpl() { + LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_; + }; + + flip::FlipStreamId stream_id() const { return stream_id_; } + FlipDelegate* delegate() const { return delegate_; } + + // For pushed streams, we track a path to identify them. + std::string path() const { return path_; } + void set_path(const std::string& path) { path_ = path; } + + // Attach a delegate to a previously pushed data stream. + // Returns true if the caller should Deactivate and delete the FlipStreamImpl + // after this call. + bool AttachDelegate(FlipDelegate* delegate); + + // Called when a SYN_REPLY is received on the stream. + void OnReply(const flip::FlipHeaderBlock* headers); + + // Called when data is received on the stream. + // Returns true if the caller should Deactivate and delete the FlipStreamImpl + // after this call. + bool OnData(const char* data, int length); + + // Called if the stream is prematurely aborted. + // The caller should Deactivate and delete the FlipStreamImpl after this + // call. + void OnAbort(); + + private: + flip::FlipStreamId stream_id_; + std::string path_; + FlipDelegate* delegate_; + scoped_ptr<HttpResponseInfo> response_; + std::list<scoped_refptr<IOBufferWithSize>> response_body_; + bool data_complete_; +}; + +FlipSession* FlipSession::GetFlipSession( + const net::HostResolver::RequestInfo& info, + HttpNetworkSession* session) { + if (!session_pool_.get()) + session_pool_.reset(new FlipSessionPool()); + return session_pool_->Get(info, session); +} + +FlipSession::FlipSession(std::string host, HttpNetworkSession* session) + : ALLOW_THIS_IN_INITIALIZER_LIST( + connect_callback_(this, &FlipSession::OnSocketConnect)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &FlipSession::OnReadComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_callback_(this, &FlipSession::OnWriteComplete)), + domain_(host), + session_(session), + connection_started_(false), + delayed_write_pending_(false), + write_pending_(false), + read_buffer_(new net::MovableIOBuffer(kReadBufferSize)), + read_buffer_bytes_read_(0), + read_pending_(false), + stream_hi_water_mark_(0) { + // Always start at 1 for the first stream id. + // TODO(mbelshe): consider randomization. + stream_hi_water_mark_ = 1; + + flip_framer_.set_visitor(this); +} + +FlipSession::~FlipSession() { + if (connection_.is_initialized()) { + // With Flip we can't recycle sockets. + connection_.socket()->Disconnect(); + } + if (session_pool_.get()) + session_pool_->Remove(this); + + // TODO(mbelshe) - clear out streams (active and pushed) here? +} + +net::Error FlipSession::Connect(const std::string& group_name, + const HostResolver::RequestInfo& host, + int priority) { + DCHECK(priority >= 0 && priority < 4); + + // If the connect process is started, let the caller continue. + if (connection_started_) + return net::OK; + + connection_started_ = true; + + static StatsCounter flip_sessions("flip.sessions"); + flip_sessions.Increment(); + + int rv = connection_.Init(group_name, host, priority, &connect_callback_, + session_->tcp_socket_pool(), NULL); + + // If the connect is pending, we still return ok. The APIs enqueue + // work until after the connect completes asynchronously later. + if (rv == net::ERR_IO_PENDING) + return net::OK; + return static_cast<net::Error>(rv); +} + + +// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from +// a HttpRequestInfo block. +void CreateFlipHeadersFromHttpRequest( + const HttpRequestInfo* info, flip::FlipHeaderBlock* headers) { + static const std::string kHttpProtocolVersion("HTTP/1.1"); + + HttpUtil::HeadersIterator it(info->extra_headers.begin(), + info->extra_headers.end(), + "\r\n"); + while (it.GetNext()) { + std::string name = StringToLowerASCII(it.name()); + (*headers)[name] = it.values(); + } + +#define REWRITE_URLS +#ifdef REWRITE_URLS + // TODO(mbelshe): Figure out how to remove this. This is just for hooking + // up to a test server. + // For testing content on our test server, we modify the URL. + GURL url = info->url; + FilePath path = UrlToFilenameEncoder::Encode(url.spec(), FilePath()); + std::string hack_url = "/" + WideToASCII(path.value()); + + // switch backslashes. HACK + std::string::size_type pos(0); + while ((pos = hack_url.find('\\', pos)) != std::string::npos) { + hack_url.replace(pos, 1, "/"); + pos += 1; + } +#endif + + (*headers)["method"] = info->method; + // (*headers)["url"] = info->url.PathForRequest(); + (*headers)["url"] = hack_url; + (*headers)["version"] = kHttpProtocolVersion; + if (info->user_agent.length()) + (*headers)["user-agent"] = info->user_agent; + if (!info->referrer.is_empty()) + (*headers)["referer"] = info->referrer.spec(); + + // Honor load flags that impact proxy caches. + if (info->load_flags & LOAD_BYPASS_CACHE) { + (*headers)["pragma"] = "no-cache"; + (*headers)["cache-control"] = "no-cache"; + } else if (info->load_flags & LOAD_VALIDATE_CACHE) { + (*headers)["cache-control"] = "max-age=0"; + } +} + +int FlipSession::CreateStream(FlipDelegate* delegate) { + flip::FlipStreamId stream_id = GetNewStreamId(); + + GURL url = delegate->request()->url; + std::string path = url.PathForRequest(); + + FlipStreamImpl* stream = NULL; + + // Check if we have a push stream for this path. + if (delegate->request()->method == "GET") { + stream = GetPushStream(path); + if (stream) { + if (stream->AttachDelegate(delegate)) { + DeactivateStream(stream->stream_id()); + delete stream; + return 0; + } + return stream->stream_id(); + } + } + + // If we still don't have a stream, activate one now. + stream = ActivateStream(stream_id, delegate); + if (!stream) + return net::ERR_FAILED; + + LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " + << delegate->request()->url; + + // TODO(mbelshe): Optimize memory allocations + int priority = delegate->request()->priority; + + // Hack for the priorities + // TODO(mbelshe): These need to be plumbed through the Http Network Stack. + if (path.find(".css") != path.npos) { + priority = 1; + } else if (path.find(".html") != path.npos) { + priority = 0; + } else if (path.find(".js") != path.npos) { + priority = 1; + } else { + priority = 3; + } + + DCHECK(priority >= FLIP_PRIORITY_HIGHEST && + priority <= FLIP_PRIORITY_LOWEST); + + // Convert from HttpRequestHeaders to Flip Headers. + flip::FlipHeaderBlock headers; + CreateFlipHeadersFromHttpRequest(delegate->request(), &headers); + + // Create a SYN_STREAM packet and add to the output queue. + flip::FlipSynStreamControlFrame* syn_frame = + flip_framer_.CreateSynStream(stream_id, priority, false, &headers); + int length = sizeof(flip::FlipFrame) + syn_frame->length(); + IOBufferWithSize* buffer = + new IOBufferWithSize(length); + memcpy(buffer->data(), syn_frame, length); + delete syn_frame; + queue_.push(PrioritizedIOBuffer(buffer, priority)); + + static StatsCounter flip_requests("flip.requests"); + flip_requests.Increment(); + + LOG(INFO) << "FETCHING: " << delegate->request()->url.spec(); + + + // TODO(mbelshe): Implement POST Data here + + // Schedule to write to the socket after we've made it back + // to the message loop so that we can aggregate multiple + // requests. + // TODO(mbelshe): Should we do the "first" request immediately? + // maybe we should only 'do later' for subsequent + // requests. + WriteSocketLater(); + + return stream_id; +} + +bool FlipSession::CancelStream(int id) { + LOG(INFO) << "Cancelling stream " << id; + if (!IsStreamActive(id)) + return false; + DeactivateStream(id); + return true; +} + +bool FlipSession::IsStreamActive(int id) { + return active_streams_.find(id) != active_streams_.end(); +} + +LoadState FlipSession::GetLoadState() const { + // TODO(mbelshe): needs more work + return LOAD_STATE_CONNECTING; +} + +void FlipSession::OnSocketConnect(int result) { + LOG(INFO) << "Flip socket connected (result=" << result << ")"; + + if (result != net::OK) { + net::Error err = static_cast<net::Error>(result); + CloseAllStreams(err); + this->Release(); + return; + } + + // Adjust socket buffer sizes. + // FLIP uses one socket, and we want a really big buffer. + // This greatly helps on links with packet loss - we can even + // outperform Vista's dynamic window sizing algorithm. + // TODO(mbelshe): more study. + const int kSocketBufferSize = 512 * 1024; + connection_.socket()->SetReceiveBufferSize(kSocketBufferSize); + connection_.socket()->SetSendBufferSize(kSocketBufferSize); + + // After we've connected, send any data to the server, and then issue + // our read. + WriteSocketLater(); + ReadSocket(); +} + +void FlipSession::OnReadComplete(int bytes_read) { + // Parse a frame. For now this code requires that the frame fit into our + // buffer (32KB). + // TODO(mbelshe): support arbitrarily large frames! + + LOG(INFO) << "Flip socket read: " << bytes_read << " bytes"; + + read_pending_ = false; + + if (bytes_read <= 0) { + // Session is tearing down. + net::Error err = static_cast<net::Error>(bytes_read); + CloseAllStreams(err); + this->Release(); + return; + } + + char *data = read_buffer_->data(); + while (bytes_read && + flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) { + uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read); + bytes_read -= bytes_processed; + data += bytes_processed; + if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE) + flip_framer_.Reset(); + } + // NOTE(mbelshe): Could cause circular callbacks. (when ReadSocket + // completes synchronously, calling OnReadComplete, etc). Should + // probably return to the message loop. + ReadSocket(); +} + +void FlipSession::OnWriteComplete(int result) { + DCHECK(write_pending_); + write_pending_ = false; + + LOG(INFO) << "Flip write complete (result=" << result << ")"; + + // Cleanup the write which just completed. + in_flight_write_.release(); + + // Write more data. We're already in a continuation, so we can + // go ahead and write it immediately (without going back to the + // message loop). + WriteSocketLater(); +} + +void FlipSession::ReadSocket() { + if (read_pending_) + return; + + size_t max_bytes = kReadBufferSize; + read_buffer_->set_data(read_buffer_bytes_read_); + max_bytes -= read_buffer_bytes_read_; + int bytes_read = connection_.socket()->Read(read_buffer_.get(), max_bytes, + &read_callback_); + switch (bytes_read) { + case 0: + // Socket is closed! + // TODO(mbelshe): Need to abort any active streams here. + DCHECK(!active_streams_.size()); + return; + case net::ERR_IO_PENDING: + // Waiting for data. Nothing to do now. + read_pending_ = true; + return; + default: + // Data was read, process it. + // TODO(mbelshe): check that we can't get a recursive stack? + OnReadComplete(bytes_read); + break; + } +} + +void FlipSession::WriteSocketLater() { + if (delayed_write_pending_) + return; + + delayed_write_pending_ = true; + MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( + this, &net::FlipSession::WriteSocket)); +} + +void FlipSession::WriteSocket() { + // This function should only be called via WriteSocketLater. + DCHECK(delayed_write_pending_); + delayed_write_pending_ = false; + + // If the socket isn't connected yet, just wait; we'll get called + // again when the socket connection completes. + if (!connection_.is_initialized()) + return; + + if (write_pending_) // Another write is in progress still. + return; + + while (queue_.size()) { + const int kMaxSegmentSize = 1430; + const int kMaxPayload = 4 * kMaxSegmentSize; + int max_size = std::max(kMaxPayload, queue_.top().size()); + + int bytes = 0; + // If we have multiple IOs to do, accumulate up to 4 MSS's worth of data + // and send them in batch. + IOBufferWithSize* buffer = new IOBufferWithSize(max_size); + while (queue_.size() && bytes < max_size) { + PrioritizedIOBuffer next_buffer = queue_.top(); + + // Now that we're outputting the frame, we can finally compress it. + flip::FlipFrame* uncompressed_frame = + reinterpret_cast<flip::FlipFrame*>(next_buffer.buffer()->data()); + scoped_array<flip::FlipFrame> compressed_frame( + flip_framer_.CompressFrame(uncompressed_frame)); + int size = compressed_frame.get()->length() + sizeof(flip::FlipFrame); + if (bytes + size > kMaxPayload) + break; + memcpy(buffer->data() + bytes, compressed_frame.get(), size); + bytes += size; + queue_.pop(); + next_buffer.release(); + } + DCHECK(bytes > 0); + in_flight_write_ = PrioritizedIOBuffer(buffer, 0); + + int rv = connection_.socket()->Write(in_flight_write_.buffer(), + bytes, &write_callback_); + if (rv == net::ERR_IO_PENDING) { + write_pending_ = true; + break; + } + LOG(INFO) << "FLIPSession wrote " << rv << " bytes."; + in_flight_write_.release(); + } +} + +void FlipSession::CloseAllStreams(net::Error code) { + LOG(INFO) << "Closing all FLIP Streams"; + + static StatsCounter abandoned_streams("flip.abandoned_streams"); + static StatsCounter abandoned_push_streams("flip.abandoned_push_streams"); + + if (active_streams_.size()) { + abandoned_streams.Add(active_streams_.size()); + + // Create a copy of the list, since aborting streams can invalidate + // our list. + FlipStreamImpl** list = new FlipStreamImpl*[active_streams_.size()]; + ActiveStreamMap::const_iterator it; + int index = 0; + for (it = active_streams_.begin(); it != active_streams_.end(); ++it) + list[index++] = it->second; + + // Issue the aborts. + for (--index; index >= 0; index--) { + list[index]->OnAbort(); + delete list[index]; + } + + // Clear out anything pending. + active_streams_.clear(); + + delete list; + } + + if (pushed_streams_.size()) { + abandoned_push_streams.Add(pushed_streams_.size()); + pushed_streams_.clear(); + } +} + +int FlipSession::GetNewStreamId() { + int id = stream_hi_water_mark_; + stream_hi_water_mark_ += 2; + if (stream_hi_water_mark_ > 0x7fff) + stream_hi_water_mark_ = 1; + return id; +} + +FlipStreamImpl* FlipSession::ActivateStream(int id, FlipDelegate* delegate) { + DCHECK(!IsStreamActive(id)); + + FlipStreamImpl* stream = new FlipStreamImpl(id, delegate); + active_streams_[id] = stream; + return stream; +} + +void FlipSession::DeactivateStream(int id) { + DCHECK(IsStreamActive(id)); + + // Verify it is not on the pushed_streams_ list. + ActiveStreamList::iterator it; + for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { + FlipStreamImpl* impl = *it; + if (id == impl->stream_id()) { + pushed_streams_.erase(it); + break; + } + } + + active_streams_.erase(id); +} + +FlipStreamImpl* FlipSession::GetPushStream(std::string path) { + static StatsCounter used_push_streams("flip.claimed_push_streams"); + + LOG(INFO) << "Looking for push stream: " << path; + + // We just walk a linear list here. + ActiveStreamList::iterator it; + for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { + FlipStreamImpl* impl = *it; + if (path == impl->path()) { + pushed_streams_.erase(it); + used_push_streams.Increment(); + LOG(INFO) << "Push Stream Claim for: " << path; + return impl; + } + } + return NULL; +} + +void FlipSession::OnError(flip::FlipFramer* framer) { + LOG(ERROR) << "FlipSession error!"; + CloseAllStreams(net::ERR_UNEXPECTED); + this->Release(); +} + +void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, + const char* data, + uint32 len) { + LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes"; + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + LOG(WARNING) << "Received data frame for invalid stream" << stream_id; + return; + } + if (!len) + return; // This was just an empty data packet. + FlipStreamImpl* stream = active_streams_[stream_id]; + if (stream->OnData(data, len)) { + DeactivateStream(stream->stream_id()); + delete stream; + } +} + +void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, + const flip::FlipHeaderBlock* headers) { + flip::FlipStreamId stream_id = frame->stream_id(); + + // Server-initiated streams should have odd sequence numbers. + if ((stream_id & 0x1) != 0) { + LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; + return; + } + + if (IsStreamActive(stream_id)) { + LOG(WARNING) << "Received OnSyn for active stream " << stream_id; + return; + } + + FlipStreamImpl* stream = ActivateStream(stream_id, NULL); + stream->OnReply(headers); + + // Verify that the response had a URL for us. + DCHECK(stream->path().length() != 0); + if (stream->path().length() == 0) { + LOG(WARNING) << "Pushed stream did not contain a path."; + DeactivateStream(stream_id); + delete stream; + return; + } + pushed_streams_.push_back(stream); + + LOG(INFO) << "Got pushed stream for " << stream->path(); + + static StatsCounter push_requests("flip.pushed_streams"); + push_requests.Increment(); +} + +void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame, + const flip::FlipHeaderBlock* headers) { + DCHECK(headers); + flip::FlipStreamId stream_id = frame->stream_id(); + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + LOG(WARNING) << "Received SYN_REPLY for invalid stream" << stream_id; + return; + } + + FlipStreamImpl* stream = active_streams_[stream_id]; + stream->OnReply(headers); +} + +void FlipSession::OnControl(const flip::FlipControlFrame* frame) { + flip::FlipHeaderBlock headers; + bool parsed_headers = false; + uint32 type = frame->type(); + if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) { + if (!flip_framer_.ParseHeaderBlock( + reinterpret_cast<const flip::FlipFrame*>(frame), &headers)) { + LOG(WARNING) << "Could not parse Flip Control Frame Header"; + return; + } + } + + switch (type) { + case flip::SYN_STREAM: + LOG(INFO) << "Flip SynStream for stream " << frame->stream_id(); + OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame), + &headers); + break; + case flip::SYN_REPLY: + LOG(INFO) << "Flip SynReply for stream " << frame->stream_id(); + OnSynReply( + reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame), + &headers); + break; + case flip::FIN_STREAM: + LOG(INFO) << "Flip Fin for stream " << frame->stream_id(); + OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame)); + break; + default: + DCHECK(false); // Error! + } +} + +void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { + flip::FlipStreamId stream_id = frame->stream_id(); + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + LOG(WARNING) << "Received FIN for invalid stream" << stream_id; + return; + } + FlipStreamImpl* stream = active_streams_[stream_id]; + // TODO(mbelshe) - status codes are HTTP codes? + // Shouldn't it be zero/nonzero? + // For now Roberto is sending 200, so use that as "ok". + bool cleanup_stream = false; + if (frame->status() == 200 || frame->status() == 0) { + cleanup_stream = stream->OnData(NULL, 0); + } else { + stream->OnAbort(); + cleanup_stream = true; + } + + if (cleanup_stream) { + DeactivateStream(stream_id); + delete stream; + } +} + +void FlipSession::OnLameDuck() { + NOTIMPLEMENTED(); +} + +bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) { + DCHECK(delegate_ == NULL); // Don't attach if already attached. + DCHECK(path_.length() > 0); // Path needs to be set for push streams. + delegate_ = delegate; + + // If there is pending data, send it up here. + + // Check for the OnReply, and pass it up. + if (response_.get()) + delegate_->OnResponseReceived(response_.get()); + + // Pass data up + while (response_body_.size()) { + scoped_refptr<IOBufferWithSize> buffer = response_body_.front(); + response_body_.pop_front(); + delegate_->OnDataReceived(buffer->data(), buffer->size()); + } + + // Finally send up the end-of-stream. + if (data_complete_) { + delegate_->OnClose(net::OK); + return true; // tell the caller to shut us down + } + return false; +} + +void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { + DCHECK(headers); + DCHECK(headers->find("version") != headers->end()); + DCHECK(headers->find("status") != headers->end()); + + // TODO(mbelshe): if no version or status is found, we need to error + // out the stream. + + // Server initiated streams must send a URL to us in the headers. + if (headers->find("path") != headers->end()) + path_ = headers->find("path")->second; + + // TODO(mbelshe): For now we convert from our nice hash map back + // to a string of headers; this is because the HttpResponseInfo + // is a bit rigid for its http (non-flip) design. + std::string raw_headers(headers->find("version")->second); + raw_headers.append(" ", 1); + raw_headers.append(headers->find("status")->second); + raw_headers.append("\0", 1); + flip::FlipHeaderBlock::const_iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + raw_headers.append(it->first); + raw_headers.append(":", 1); + raw_headers.append(it->second); + raw_headers.append("\0", 1); + } + + LOG(INFO) << "FlipStream: SynReply received for " << stream_id_; + + DCHECK(response_ == NULL); + response_.reset(new HttpResponseInfo()); + response_->headers = new HttpResponseHeaders(raw_headers); + if (delegate_) + delegate_->OnResponseReceived(response_.get()); +} + +bool FlipStreamImpl::OnData(const char* data, int length) { + LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " + << stream_id_; + if (length) { + if (delegate_) { + delegate_->OnDataReceived(data, length); + } else { + // Save the data for use later + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); + } + } else { + data_complete_ = true; + if (delegate_) { + delegate_->OnClose(net::OK); + return true; // Tell the caller to clean us up. + } + } + return false; +} + +void FlipStreamImpl::OnAbort() { + if (delegate_) + delegate_->OnClose(net::ERR_ABORTED); +} + +} // namespace net + diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h new file mode 100644 index 0000000..b608ddb --- /dev/null +++ b/net/flip/flip_session.h @@ -0,0 +1,216 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_SESSION_H_ +#define NET_FLIP_FLIP_SESSION_H_ + +#include <deque> +#include <list> +#include <map> +#include <queue> +#include <string> + +#include "base/ref_counted.h" +#include "net/base/io_buffer.h" +#include "net/base/load_states.h" +#include "net/base/net_errors.h" +#include "net/base/upload_data_stream.h" +#include "net/flip/flip_framer.h" +#include "net/flip/flip_protocol.h" +#include "net/flip/flip_session_pool.h" +#include "net/socket/client_socket.h" +#include "net/socket/client_socket_handle.h" +#include "testing/platform_test.h" + +namespace net { + +class FlipStreamImpl; +class HttpNetworkSession; +class HttpRequestInfo; +class HttpResponseInfo; + +// A callback interface for HTTP content retrieved from the Flip stream. +class FlipDelegate { + public: + virtual ~FlipDelegate() {} + virtual const HttpRequestInfo* request() = 0; + virtual const UploadDataStream* data() = 0; + + virtual void OnRequestSent(int status) = 0; + virtual void OnResponseReceived(HttpResponseInfo* response) = 0; + virtual void OnDataReceived(const char* buffer, int bytes) = 0; + virtual void OnClose(int status) = 0; + virtual void OnCancel() = 0; +}; + +class PrioritizedIOBuffer { + public: + PrioritizedIOBuffer() : buffer_(0), priority_(0) {} + PrioritizedIOBuffer(IOBufferWithSize* buffer, int priority) + : buffer_(buffer), + priority_(priority), + position_(++order_) { + } + + IOBuffer* buffer() const { return buffer_; } + + int size() const { return buffer_->size(); } + + void release() { buffer_ = NULL; } + + int priority() { return priority_; } + + // Supports sorting. + bool operator<(const PrioritizedIOBuffer& other) const { + if (priority_ != other.priority_) + return priority_ > other.priority_; + return position_ >= other.position_; + } + + private: + scoped_refptr<IOBufferWithSize> buffer_; + int priority_; + int position_; + static int order_; // Maintains a FIFO order for equal priorities. +}; + +class FlipSession : public base::RefCounted<FlipSession>, + public flip::FlipFramerVisitorInterface { + public: + // Factory for finding open sessions. + // TODO(mbelshe): Break this out into a connection pool class? + static FlipSession* GetFlipSession(const HostResolver::RequestInfo&, + HttpNetworkSession* session); + virtual ~FlipSession(); + + // Get the domain for this FlipSession. + std::string domain() { return domain_; } + + // Connect the FLIP Socket. + // Returns net::Error::OK on success. + // Note that this call does not wait for the connect to complete. Callers can + // immediately start using the FlipSession while it connects. + net::Error Connect(const std::string& group_name, + const HostResolver::RequestInfo& host, int priority); + + // Create a new stream. + // FlipDelegate must remain valid until the stream is either cancelled by the + // creator via CancelStream or the FlipDelegate OnClose or OnCancel callbacks + // have been made. + // Once the stream is created, the delegate should wait for a callback. + int CreateStream(FlipDelegate* delegate); + + // Cancel a stream. + bool CancelStream(int id); + + // Check if a stream is active. + bool IsStreamActive(int id); + + // The LoadState is used for informing the user of the current network + // status, such as "resolving host", "connecting", etc. + LoadState GetLoadState() const; + protected: + friend class FlipNetworkTransactionTest; + friend class FlipSessionPool; + + // Provide access to the framer for testing. + flip::FlipFramer* GetFramer() { return &flip_framer_; } + + // Create a new FlipSession. + // |host| is the hostname that this session connects to. + FlipSession(std::string host, HttpNetworkSession* session); + + // Closes all open streams. Used as part of shutdown. + void CloseAllStreams(net::Error code); + + private: + // FlipFramerVisitorInterface + virtual void OnError(flip::FlipFramer*); + virtual void OnStreamFrameData(flip::FlipStreamId stream_id, + const char* data, + uint32 len); + virtual void OnControl(const flip::FlipControlFrame* frame); + virtual void OnLameDuck(); + + // Control frame handlers. + void OnSyn(const flip::FlipSynStreamControlFrame* frame, + const flip::FlipHeaderBlock* headers); + void OnSynReply(const flip::FlipSynReplyControlFrame* frame, + const flip::FlipHeaderBlock* headers); + void OnFin(const flip::FlipFinStreamControlFrame* frame); + + // IO Callbacks + void OnSocketConnect(int result); + void OnReadComplete(int result); + void OnWriteComplete(int result); + + // Start reading from the socket. + void ReadSocket(); + + // Write current data to the socket. + void WriteSocketLater(); + void WriteSocket(); + + // Get a new stream id. + int GetNewStreamId(); + + // Track active streams in the active stream list. + FlipStreamImpl* ActivateStream(int id, FlipDelegate* delegate); + void DeactivateStream(int id); + + // Check if we have a pending pushed-stream for this url + // Returns the stream if found (and returns it from the pending + // list), returns NULL otherwise. + FlipStreamImpl* GetPushStream(std::string url); + + // Callbacks for the Flip session. + CompletionCallbackImpl<FlipSession> connect_callback_; + CompletionCallbackImpl<FlipSession> read_callback_; + CompletionCallbackImpl<FlipSession> write_callback_; + + // The domain this session is connected to. + std::string domain_; + + scoped_refptr<HttpNetworkSession> session_; + + // The socket handle for this session. + ClientSocketHandle connection_; + bool connection_started_; + + // The read buffer used to read data from the socket. + enum { kReadBufferSize = (4 * 1024) }; + scoped_refptr<net::MovableIOBuffer> read_buffer_; + int read_buffer_bytes_read_; // bytes left in the buffer from prior read. + bool read_pending_; + + int stream_hi_water_mark_; // The next stream id to use. + + typedef std::map<int, FlipStreamImpl*> ActiveStreamMap; + typedef std::list<FlipStreamImpl*> ActiveStreamList; + ActiveStreamMap active_streams_; + + ActiveStreamList pushed_streams_; + + // As we gather data to be sent, we put it into the output queue. + typedef std::priority_queue<PrioritizedIOBuffer> OutputQueue; + OutputQueue queue_; + + // TODO(mbelshe): this is ugly!! + // The packet we are currently sending. + PrioritizedIOBuffer in_flight_write_; + bool delayed_write_pending_; + bool write_pending_; + + // Flip Frame state. + flip::FlipFramer flip_framer_; + + // This is our weak session pool - one session per domain. + static scoped_ptr<FlipSessionPool> session_pool_; + static bool disable_compression_; +}; + +} // namespace net + +#endif // NET_FLIP_FLIP_SESSION_H_ + diff --git a/net/flip/flip_session_pool.cc b/net/flip/flip_session_pool.cc new file mode 100644 index 0000000..2aef13a --- /dev/null +++ b/net/flip/flip_session_pool.cc @@ -0,0 +1,99 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/flip/flip_session_pool.h" + +#include "base/logging.h" +#include "net/flip/flip_session.h" + +namespace net { + +// The maximum number of sessions to open to a single domain. +const int kMaxSessionsPerDomain = 1; + +scoped_ptr<FlipSessionPool::FlipSessionsMap> FlipSessionPool::sessions_; + +FlipSession* FlipSessionPool::Get(const HostResolver::RequestInfo& info, + HttpNetworkSession* session) { + if (!sessions_.get()) + sessions_.reset(new FlipSessionsMap()); + + const std::string domain = info.hostname(); + FlipSession* flip_session = NULL; + FlipSessionList* list = GetSessionList(domain); + if (list) { + if (list->size() >= kMaxSessionsPerDomain) { + flip_session = list->front(); + list->pop_front(); + } + } else { + list = AddSessionList(domain); + } + + DCHECK(list); + if (!flip_session) { + flip_session = new FlipSession(domain, session); + flip_session->AddRef(); // Keep it in the cache. + } + + DCHECK(flip_session); + list->push_back(flip_session); + DCHECK(list->size() <= kMaxSessionsPerDomain); + return flip_session; +} + +void FlipSessionPool::Remove(FlipSession* session) { + std::string domain = session->domain(); + FlipSessionList* list = GetSessionList(domain); + if (list == NULL) + return; + list->remove(session); + if (!list->size()) + RemoveSessionList(domain); +} + +FlipSessionPool::FlipSessionList* + FlipSessionPool::AddSessionList(std::string domain) { + DCHECK(sessions_->find(domain) == sessions_->end()); + return (*sessions_)[domain] = new FlipSessionList(); +} + +// static +FlipSessionPool::FlipSessionList* + FlipSessionPool::GetSessionList(std::string domain) { + FlipSessionsMap::iterator it = sessions_->find(domain); + if (it == sessions_->end()) + return NULL; + return it->second; +} + +// static +void FlipSessionPool::RemoveSessionList(std::string domain) { + FlipSessionList* list = GetSessionList(domain); + if (list) { + delete list; + sessions_->erase(domain); + } else { + DCHECK(false) << "removing orphaned session list"; + } +} + +// static +void FlipSessionPool::CloseAllSessions() { + while (sessions_->size()) { + FlipSessionList* list = sessions_->begin()->second; + DCHECK(list); + sessions_->erase(sessions_->begin()->first); + while (list->size()) { + FlipSession* session = list->front(); + list->pop_front(); + session->CloseAllStreams(net::OK); + session->Release(); + } + delete list; + } +} + +} // namespace net + diff --git a/net/flip/flip_session_pool.h b/net/flip/flip_session_pool.h new file mode 100644 index 0000000..9562413 --- /dev/null +++ b/net/flip/flip_session_pool.h @@ -0,0 +1,57 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_SESSION_POOL_H_ +#define NET_FLIP_FLIP_SESSION_POOL_H_ + +#include <map> +#include <list> +#include <string> + +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "net/base/host_resolver.h" + +namespace net { + +class FlipSession; +class HttpNetworkSession; + +// This is a very simple pool for open FlipSessions. +// TODO(mbelshe): Make this production ready. +class FlipSessionPool { + public: + FlipSessionPool() {} + virtual ~FlipSessionPool() {} + + // Factory for finding open sessions. + FlipSession* Get(const HostResolver::RequestInfo& info, + HttpNetworkSession* session); + + // Close all Flip Sessions; used for debugging. + static void CloseAllSessions(); + + protected: + friend class FlipSession; + + // Return a FlipSession to the pool. + void Remove(FlipSession* session); + + private: + typedef std::list<FlipSession*> FlipSessionList; + typedef std::map<std::string, FlipSessionList*> FlipSessionsMap; + + // Helper functions for manipulating the lists. + FlipSessionList* AddSessionList(std::string domain); + static FlipSessionList* GetSessionList(std::string domain); + static void RemoveSessionList(std::string domain); + + // This is our weak session pool - one session per domain. + static scoped_ptr<FlipSessionsMap> sessions_; +}; + +} // namespace net + +#endif // NET_FLIP_FLIP_SESSION_POOL_H_ + diff --git a/net/flip/flip_session_unittest.cc b/net/flip/flip_session_unittest.cc new file mode 100644 index 0000000..016a197 --- /dev/null +++ b/net/flip/flip_session_unittest.cc @@ -0,0 +1,55 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/base/test_completion_callback.h" +#include "net/socket/socket_test_util.h" +#include "testing/platform_test.h" + +#include "net/flip/flip_session.h" + +namespace net { + +class FlipSessionTest : public PlatformTest { + public: +}; + +// Test the PrioritizedIOBuffer class. +TEST_F(FlipSessionTest, PrioritizedIOBuffer) { + std::priority_queue<PrioritizedIOBuffer> queue_; + const int kQueueSize = 100; + + // Insert 100 items; pri 100 to 1. + for (int index = 0; index < kQueueSize; ++index) { + PrioritizedIOBuffer buffer(NULL, kQueueSize - index); + queue_.push(buffer); + } + + // Insert several priority 0 items last. + const int kNumDuplicates = 12; + IOBufferWithSize* buffers[kNumDuplicates]; + for (int index = 0; index < kNumDuplicates; ++index) { + buffers[index] = new IOBufferWithSize(index+1); + queue_.push(PrioritizedIOBuffer(buffers[index], 0)); + } + + EXPECT_EQ(kQueueSize + kNumDuplicates, queue_.size()); + + // Verify the P0 items come out in FIFO order. + for (int index = 0; index < kNumDuplicates; ++index) { + PrioritizedIOBuffer buffer = queue_.top(); + EXPECT_EQ(0, buffer.priority()); + EXPECT_EQ(index + 1, buffer.size()); + queue_.pop(); + } + + int priority = 1; + while (queue_.size()) { + PrioritizedIOBuffer buffer = queue_.top(); + EXPECT_EQ(priority++, buffer.priority()); + queue_.pop(); + } +} + +} // namespace net + diff --git a/net/flip/flip_transaction_factory.h b/net/flip/flip_transaction_factory.h new file mode 100644 index 0000000..ce50686 --- /dev/null +++ b/net/flip/flip_transaction_factory.h @@ -0,0 +1,36 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_TRANSACTION_FACTORY_H__ +#define NET_FLIP_FLIP_TRANSACTION_FACTORY_H__ + +#include "net/flip/flip_network_transaction.h" +#include "net/http/http_transaction_factory.h" + +namespace net { + +class FlipTransactionFactory : public HttpTransactionFactory { + public: + explicit FlipTransactionFactory(HttpNetworkSession* session) + : session_(session) { + } + virtual ~FlipTransactionFactory() {} + + // HttpTransactionFactory Interface. + virtual HttpTransaction* CreateTransaction() { + return new FlipNetworkTransaction(session_); + } + virtual HttpCache* GetCache() { + return NULL; + } + virtual void Suspend(bool suspend) { + } + + private: + scoped_refptr<HttpNetworkSession> session_; +}; + +} // namespace net + +#endif // NET_FLIP_FLIP_TRANSACTION_FACTORY_H__ |