summaryrefslogtreecommitdiffstats
path: root/net/flip
diff options
context:
space:
mode:
Diffstat (limited to 'net/flip')
-rw-r--r--net/flip/flip_bitmasks.h16
-rw-r--r--net/flip/flip_frame_builder.cc184
-rw-r--r--net/flip/flip_frame_builder.h184
-rw-r--r--net/flip/flip_framer.cc732
-rw-r--r--net/flip/flip_framer.h228
-rw-r--r--net/flip/flip_framer_test.cc251
-rw-r--r--net/flip/flip_network_transaction.cc414
-rw-r--r--net/flip/flip_network_transaction.h142
-rw-r--r--net/flip/flip_network_transaction_unittest.cc190
-rw-r--r--net/flip/flip_protocol.h203
-rw-r--r--net/flip/flip_session.cc762
-rw-r--r--net/flip/flip_session.h216
-rw-r--r--net/flip/flip_session_pool.cc99
-rw-r--r--net/flip/flip_session_pool.h57
-rw-r--r--net/flip/flip_session_unittest.cc55
-rw-r--r--net/flip/flip_transaction_factory.h36
16 files changed, 3769 insertions, 0 deletions
diff --git a/net/flip/flip_bitmasks.h b/net/flip/flip_bitmasks.h
new file mode 100644
index 0000000..8f139e0
--- /dev/null
+++ b/net/flip/flip_bitmasks.h
@@ -0,0 +1,16 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FLIP_BITMASKS_H_
+#define NET_FLIP_FLIP_BITMASKS_H_
+
+namespace flip {
+
+const int kStreamIdMask = 0x7fffffff; // StreamId mask from the FlipHeader
+const int kControlFlagMask = 0x8000; // Control flag mask from the FlipHeader
+const int kPriorityMask = 0xc0; // Priority mask from the SYN_FRAME
+} // flip
+
+#endif // NET_FLIP_FLIP_BITMASKS_H_
+
diff --git a/net/flip/flip_frame_builder.cc b/net/flip/flip_frame_builder.cc
new file mode 100644
index 0000000..28d156c
--- /dev/null
+++ b/net/flip/flip_frame_builder.cc
@@ -0,0 +1,184 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <limits>
+
+#include "flip_frame_builder.h" // cross-google3 directory naming.
+#include "flip_protocol.h"
+
+namespace flip {
+
+ // We mark a read only FlipFrameBuilder with a special capacity_.
+static const size_t kCapacityReadOnly = std::numeric_limits<size_t>::max();
+
+FlipFrameBuilder::FlipFrameBuilder()
+ : buffer_(NULL),
+ capacity_(0),
+ length_(0),
+ variable_buffer_offset_(0) {
+ Resize(kInitialPayload);
+}
+
+FlipFrameBuilder::FlipFrameBuilder(const char* data, int data_len)
+ : buffer_(const_cast<char*>(data)),
+ capacity_(kCapacityReadOnly),
+ length_(data_len),
+ variable_buffer_offset_(0) {
+}
+
+FlipFrameBuilder::~FlipFrameBuilder() {
+ if (capacity_ != kCapacityReadOnly)
+ delete[] buffer_;
+}
+
+bool FlipFrameBuilder::ReadUInt16(void** iter, uint16* result) const {
+ DCHECK(iter);
+ if (!*iter)
+ *iter = const_cast<char*>(buffer_);
+
+ if (!IteratorHasRoomFor(*iter, sizeof(*result)))
+ return false;
+
+ *result = ntohs(*(reinterpret_cast<uint16*>(*iter)));
+
+ UpdateIter(iter, sizeof(*result));
+ return true;
+}
+
+bool FlipFrameBuilder::ReadUInt32(void** iter, uint32* result) const {
+ DCHECK(iter);
+ if (!*iter)
+ *iter = const_cast<char*>(buffer_);
+
+ if (!IteratorHasRoomFor(*iter, sizeof(*result)))
+ return false;
+
+ *result = ntohl(*(reinterpret_cast<uint32*>(*iter)));
+
+ UpdateIter(iter, sizeof(*result));
+ return true;
+}
+
+bool FlipFrameBuilder::ReadString(void** iter, std::string* result) const {
+ DCHECK(iter);
+
+ uint16 len;
+ if (!ReadUInt16(iter, &len)) {
+ VLOG(1) << "Unable to read length";
+ return false;
+ }
+ if (!IteratorHasRoomFor(*iter, len)) {
+ VLOG(1) << "!IteratorHasRoomFor";
+ return false;
+ }
+
+ char* chars = reinterpret_cast<char*>(*iter);
+ result->assign(chars, len);
+
+ UpdateIter(iter, len);
+ return true;
+}
+
+bool FlipFrameBuilder::ReadBytes(void** iter, const char** data,
+ uint16 length) const {
+ DCHECK(iter);
+ DCHECK(data);
+
+ if (!IteratorHasRoomFor(*iter, length))
+ return false;
+
+ *data = reinterpret_cast<const char*>(*iter);
+
+ UpdateIter(iter, length);
+ return true;
+}
+
+bool FlipFrameBuilder::ReadData(void** iter, const char** data,
+ uint16* length) const {
+ DCHECK(iter);
+ DCHECK(data);
+ DCHECK(length);
+
+ if (!ReadUInt16(iter, length))
+ return false;
+
+ return ReadBytes(iter, data, *length);
+}
+
+char* FlipFrameBuilder::BeginWrite(size_t length) {
+ size_t offset = length_;
+ size_t needed_size = length_ + length;
+ if (needed_size > capacity_ && !Resize(std::max(capacity_ * 2, needed_size)))
+ return NULL;
+
+#ifdef ARCH_CPU_64_BITS
+ DCHECK_LE(length, std::numeric_limits<uint32>::max());
+#endif
+
+ return buffer_ + offset;
+}
+
+void FlipFrameBuilder::EndWrite(char* dest, int length) {
+}
+
+bool FlipFrameBuilder::WriteBytes(const void* data, uint16 data_len) {
+ DCHECK(capacity_ != kCapacityReadOnly);
+
+ char* dest = BeginWrite(data_len);
+ if (!dest)
+ return false;
+
+ memcpy(dest, data, data_len);
+
+ EndWrite(dest, data_len);
+ length_ += data_len;
+ return true;
+}
+
+bool FlipFrameBuilder::WriteString(const std::string& value) {
+ if (value.size() > 0xffff)
+ return false;
+
+ if (!WriteUInt16(static_cast<int>(value.size())))
+ return false;
+
+ return WriteBytes(value.data(), static_cast<uint16>(value.size()));
+}
+
+char* FlipFrameBuilder::BeginWriteData(uint16 length) {
+ DCHECK_EQ(variable_buffer_offset_, 0U) <<
+ "There can only be one variable buffer in a FlipFrameBuilder";
+
+ if (!WriteUInt16(length))
+ return false;
+
+ char *data_ptr = BeginWrite(length);
+ if (!data_ptr)
+ return NULL;
+
+ variable_buffer_offset_ = data_ptr - buffer_ - sizeof(int);
+
+ // EndWrite doesn't necessarily have to be called after the write operation,
+ // so we call it here to pad out what the caller will eventually write.
+ EndWrite(data_ptr, length);
+ return data_ptr;
+}
+
+bool FlipFrameBuilder::Resize(size_t new_capacity) {
+ if (new_capacity < capacity_)
+ return true;
+
+ char* p = new char[new_capacity];
+ if (buffer_) {
+ memcpy(p, buffer_, capacity_);
+ delete[] buffer_;
+ }
+ if (!p && new_capacity > 0)
+ return false;
+ buffer_ = p;
+ capacity_ = new_capacity;
+ return true;
+}
+
+} // namespace flip
diff --git a/net/flip/flip_frame_builder.h b/net/flip/flip_frame_builder.h
new file mode 100644
index 0000000..809f451
--- /dev/null
+++ b/net/flip/flip_frame_builder.h
@@ -0,0 +1,184 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FRAME_BUILDER_H_
+#define NET_FLIP_FRAME_BUILDER_H_
+
+#ifdef WIN32
+#include <winsock2.h> // for htonl() functions
+#else
+#include <arpa/inet.h>
+#endif
+
+#include <string>
+
+#include "base/logging.h"
+#include "flip_protocol.h" // cross-google3 directory naming.
+
+#ifdef WIN32
+#undef VLOG
+#define VLOG(x) LOG_IF(INFO, false)
+#endif // WIN32
+
+namespace flip {
+
+// This class provides facilities for basic binary value packing and unpacking
+// into Flip frames. Note: this is similar to Chrome's pickle class, but is
+// simplified to work in both the client and server, and without excess
+// padding.
+//
+// The FlipFrameBuilder supports appending primitive values (int, string, etc)
+// to a frame instance. The FlipFrameBuilder grows its internal memory buffer
+// dynamically to hold the sequence of primitive values. The internal memory
+// buffer is exposed as the "data" of the FlipFrameBuilder.
+//
+// When reading from a FlipFrameBuilder the consumer must know what value types
+// to read and in what order to read them as the FlipFrameBuilder does not keep
+// track of the type of data written to it.
+class FlipFrameBuilder {
+ public:
+ FlipFrameBuilder();
+ ~FlipFrameBuilder();
+
+ // Initializes a FlipFrameBuilder from a const block of data. The data is
+ // not copied; instead the data is merely referenced by this
+ // FlipFrameBuilder. Only const methods should be used when initialized
+ // this way.
+ FlipFrameBuilder(const char* data, int data_len);
+
+ // Returns the size of the FlipFrameBuilder's data.
+ int length() const { return length_; }
+
+ // Returns the data for this FlipFrameBuilder.
+ const FlipFrame* data() const {
+ return reinterpret_cast<FlipFrame*>(buffer_);
+ }
+
+ // Takes the buffer from the FlipFrameBuilder.
+ FlipFrame* take() {
+ FlipFrame* rv = reinterpret_cast<FlipFrame*>(buffer_);
+ buffer_ = NULL;
+ capacity_ = 0;
+ length_ = 0;
+ return rv;
+ }
+
+ // Methods for reading the payload of the FlipFrameBuilder. To read from the
+ // start of the FlipFrameBuilder, initialize *iter to NULL. If successful,
+ // these methods return true. Otherwise, false is returned to indicate that
+ // the result could not be extracted.
+ bool ReadUInt16(void** iter, uint16* result) const;
+ bool ReadUInt32(void** iter, uint32* result) const;
+ bool ReadString(void** iter, std::string* result) const;
+ bool ReadBytes(void** iter, const char** data, uint16 length) const;
+ bool ReadData(void** iter, const char** data, uint16* length) const;
+
+ // Methods for adding to the payload. These values are appended to the end
+ // of the FlipFrameBuilder payload. When reading values, you must read them
+ // in the order they were added. Note - binary integers are converted from
+ // host to network form.
+ bool WriteUInt16(uint16 value) {
+ value = htons(value);
+ return WriteBytes(&value, sizeof(value));
+ }
+ bool WriteUInt32(uint32 value) {
+ value = htonl(value);
+ return WriteBytes(&value, sizeof(value));
+ }
+ bool WriteString(const std::string& value);
+ bool WriteBytes(const void* data, uint16 data_len);
+
+ // Write an integer to a particular offset in the data buffer.
+ bool WriteUInt32ToOffset(int offset, uint32 value) {
+ if (offset + sizeof(value) > length_)
+ return false;
+ value = htonl(value);
+ char *ptr = buffer_ + offset;
+ memcpy(ptr, &value, sizeof(value));
+ return true;
+ }
+
+ // Allows the caller to write data directly into the FlipFrameBuilder.
+ // This saves a copy when the data is not already available in a buffer.
+ // The caller must not write more than the length it declares it will.
+ // Use ReadData to get the data.
+ // Returns NULL on failure.
+ //
+ // The returned pointer will only be valid until the next write operation
+ // on this FlipFrameBuilder.
+ char* BeginWriteData(uint16 length);
+
+ // Returns true if the given iterator could point to data with the given
+ // length. If there is no room for the given data before the end of the
+ // payload, returns false.
+ bool IteratorHasRoomFor(const void* iter, int len) const {
+ const char* end_of_region = reinterpret_cast<const char*>(iter) + len;
+ VLOG(1) << "len: " << len;
+ if (len < 0) {
+ VLOG(1) << "Len < 0";
+ return false;
+ } else if (iter < buffer_) {
+ VLOG(1) << "iter < buffer_";
+ return false;
+ } else if (iter > end_of_payload()) {
+ VLOG(1) << "iter > end_of_payload())";
+ return false;
+ } else if (iter > end_of_region) {
+ VLOG(1) << "iter > end_of_region)";
+ return false;
+ } else if (end_of_region > end_of_payload()) {
+ VLOG(1) << "end_of_region > end_of_payload()";
+ VLOG(1) << "end_of_region - end_of_payload(): "
+ << (end_of_region - end_of_payload());
+
+ return false;
+ }
+
+ // Watch out for overflow in pointer calculation, which wraps.
+ return (iter <= end_of_region) && (end_of_region <= end_of_payload());
+ }
+
+ protected:
+ size_t capacity() const {
+ return capacity_;
+ }
+
+ const char* end_of_payload() const { return buffer_ + length_; }
+
+ // Resizes the buffer for use when writing the specified amount of data. The
+ // location that the data should be written at is returned, or NULL if there
+ // was an error. Call EndWrite with the returned offset and the given length
+ // to pad out for the next write.
+ char* BeginWrite(size_t length);
+
+ // Completes the write operation by padding the data with NULL bytes until it
+ // is padded. Should be paired with BeginWrite, but it does not necessarily
+ // have to be called after the data is written.
+ void EndWrite(char* dest, int length);
+
+ // Resize the capacity, note that the input value should include the size of
+ // the header: new_capacity = sizeof(Header) + desired_payload_capacity.
+ // A new failure will cause a Resize failure... and caller should check
+ // the return result for true (i.e., successful resizing).
+ bool Resize(size_t new_capacity);
+
+ // Moves the iterator by the given number of bytes.
+ static void UpdateIter(void** iter, int bytes) {
+ *iter = static_cast<char*>(*iter) + bytes;
+ }
+
+ // Initial size of the payload.
+ static const int kInitialPayload = 1024;
+
+ private:
+ char* buffer_;
+ size_t capacity_; // Allocation size of payload (or -1 if buffer is const).
+ size_t length_; // current length of the buffer
+ size_t variable_buffer_offset_; // IF non-zero, then offset to a buffer.
+};
+
+} // namespace flip
+
+#endif // NET_FLIP_FRAME_BUILDER_H_
+
diff --git a/net/flip/flip_framer.cc b/net/flip/flip_framer.cc
new file mode 100644
index 0000000..ec183cc
--- /dev/null
+++ b/net/flip/flip_framer.cc
@@ -0,0 +1,732 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/scoped_ptr.h"
+#include "base/stats_counters.h"
+
+#include "flip_framer.h" // cross-google3 directory naming.
+#include "flip_frame_builder.h"
+#include "flip_bitmasks.h"
+
+#ifdef WIN32
+#include "third_party/zlib/zlib.h"
+#else
+#include "third_party/zlib/v1_2_3/zlib.h"
+#endif
+
+namespace flip {
+
+// The initial size of the control frame buffer; this is used internally
+// as we we parse though control frames.
+static const int kControlFrameBufferInitialSize = 32 * 1024;
+// The maximum size of the control frame buffer that we support.
+// TODO(mbelshe): We should make this stream-based so there are no limits.
+static const int kControlFrameBufferMaxSize = 64 * 1024;
+
+// This implementation of Flip is version 1.
+static const int kFlipProtocolVersion = 1;
+
+// By default is compression on or off.
+bool FlipFramer::compression_default_ = true;
+
+#ifdef DEBUG_FLIP_STATE_CHANGES
+#define CHANGE_STATE(newstate) \
+{ \
+ do { \
+ LOG(INFO) << "Changing state from: " \
+ << StateToString(state_) \
+ << " to " << StateToString(newstate) << "\n"; \
+ state_ = newstate; \
+ } while (false); \
+}
+#else
+#define CHANGE_STATE(newstate) (state_ = newstate)
+#endif
+
+FlipFramer::FlipFramer()
+ : state_(FLIP_RESET),
+ error_code_(FLIP_NO_ERROR),
+ remaining_payload_(0),
+ remaining_control_payload_(0),
+ current_frame_buffer_(NULL),
+ current_frame_len_(0),
+ current_frame_capacity_(0),
+ enable_compression_(compression_default_),
+ visitor_(NULL) {
+}
+
+FlipFramer::~FlipFramer() {
+ if (compressor_.get()) {
+ deflateEnd(compressor_.get());
+ }
+ delete [] current_frame_buffer_;
+}
+
+void FlipFramer::Reset() {
+ state_ = FLIP_RESET;
+ error_code_ = FLIP_NO_ERROR;
+ remaining_payload_ = 0;
+ remaining_control_payload_ = 0;
+ current_frame_len_ = 0;
+ if (current_frame_capacity_ != kControlFrameBufferInitialSize) {
+ delete [] current_frame_buffer_;
+ current_frame_buffer_ = 0;
+ current_frame_capacity_ = 0;
+ ExpandControlFrameBuffer(kControlFrameBufferInitialSize);
+ }
+}
+
+const char* FlipFramer::StateToString(int state) {
+ switch (state) {
+ case FLIP_ERROR:
+ return "ERROR";
+ case FLIP_DONE:
+ return "DONE";
+ case FLIP_AUTO_RESET:
+ return "AUTO_RESET";
+ case FLIP_RESET:
+ return "RESET";
+ case FLIP_READING_COMMON_HEADER:
+ return "READING_COMMON_HEADER";
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ return "INTERPRET_CONTROL_FRAME_COMMON_HEADER";
+ case FLIP_CONTROL_FRAME_PAYLOAD:
+ return "CONTROL_FRAME_PAYLOAD";
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ return "IGNORE_REMAINING_PAYLOAD";
+ case FLIP_FORWARD_STREAM_FRAME:
+ return "FORWARD_STREAM_FRAME";
+ }
+ return "UNKNOWN_STATE";
+}
+
+uint32 FlipFramer::BytesSafeToRead() const {
+ switch (state_) {
+ case FLIP_ERROR:
+ case FLIP_DONE:
+ case FLIP_AUTO_RESET:
+ case FLIP_RESET:
+ return 0;
+ case FLIP_READING_COMMON_HEADER:
+ DCHECK(current_frame_len_ < sizeof(FlipFrame));
+ return sizeof(FlipFrame) - current_frame_len_;
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ return 0;
+ case FLIP_CONTROL_FRAME_PAYLOAD:
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ case FLIP_FORWARD_STREAM_FRAME:
+ return remaining_payload_;
+ }
+ // We should never get to here.
+ return 0;
+}
+
+void FlipFramer::set_error(FlipError error) {
+ DCHECK(false);
+ DCHECK(visitor_);
+ error_code_ = error;
+ visitor_->OnError(this);
+}
+
+const char* FlipFramer::ErrorCodeToString(int error_code) {
+ switch (error_code) {
+ case FLIP_NO_ERROR:
+ return "NO_ERROR";
+ case FLIP_UNKNOWN_CONTROL_TYPE:
+ return "UNKNOWN_CONTROL_TYPE";
+ case FLIP_INVALID_CONTROL_FRAME:
+ return "INVALID_CONTROL_FRAME";
+ case FLIP_CONTROL_PAYLOAD_TOO_LARGE:
+ return "CONTROL_PAYLOAD_TOO_LARGE";
+ case FLIP_ZLIB_INIT_FAILURE:
+ return "ZLIB_INIT_FAILURE";
+ case FLIP_UNSUPPORTED_VERSION:
+ return "UNSUPPORTED_VERSION";
+ case FLIP_DECOMPRESS_FAILURE:
+ return "DECOMPRESS_FAILURE";
+ }
+ return "UNKNOWN_STATE";
+}
+
+uint32 FlipFramer::ProcessInput(const char* data, uint32 len) {
+ DCHECK(visitor_);
+ DCHECK(data);
+
+ uint32 original_len = len;
+ while (len != 0) {
+ FlipControlFrame* current_control_frame =
+ reinterpret_cast<FlipControlFrame*>(current_frame_buffer_);
+ FlipDataFrame* current_data_frame =
+ reinterpret_cast<FlipDataFrame*>(current_frame_buffer_);
+
+ switch (state_) {
+ case FLIP_ERROR:
+ case FLIP_DONE:
+ goto bottom;
+
+ case FLIP_AUTO_RESET:
+ case FLIP_RESET:
+ Reset();
+ CHANGE_STATE(FLIP_READING_COMMON_HEADER);
+ continue;
+
+ case FLIP_READING_COMMON_HEADER: {
+ int bytes_read = ProcessCommonHeader(data, len);
+ len -= bytes_read;
+ data += bytes_read;
+ continue;
+ }
+
+ // Arguably, this case is not necessary, as no bytes are consumed here.
+ // I felt it was a nice partitioning, however (which probably indicates
+ // that it should be refactored into its own function!)
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ DCHECK(error_code_ == 0);
+ DCHECK(current_frame_len_ >= sizeof(FlipFrame));
+ // Do some sanity checking on the control frame sizes.
+ switch (current_control_frame->type()) {
+ case SYN_STREAM:
+ // NOTE: sizeof(FlipSynStreamControlFrame) is not accurate.
+ if (current_control_frame->length() <
+ sizeof(FlipSynStreamControlFrame) - sizeof(FlipControlFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case SYN_REPLY:
+ if (current_control_frame->length() <
+ sizeof(FlipSynReplyControlFrame) - sizeof(FlipControlFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case FIN_STREAM:
+ if (current_control_frame->length() !=
+ sizeof(FlipFinStreamControlFrame) - sizeof(FlipFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case NOOP:
+ // NOP. Swallow it.
+ CHANGE_STATE(FLIP_AUTO_RESET);
+ continue;
+ default:
+ set_error(FLIP_UNKNOWN_CONTROL_TYPE);
+ break;
+ }
+
+ // We only support version 1 of this protocol.
+ if (current_control_frame->version() != kFlipProtocolVersion)
+ set_error(FLIP_UNSUPPORTED_VERSION);
+
+ if (error_code_) {
+ CHANGE_STATE(FLIP_ERROR);
+ goto bottom;
+ }
+
+ remaining_control_payload_ = current_control_frame->length();
+ if (remaining_control_payload_ > kControlFrameBufferMaxSize) {
+ set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE);
+ CHANGE_STATE(FLIP_ERROR);
+ goto bottom;
+ }
+ ExpandControlFrameBuffer(remaining_control_payload_);
+ CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD);
+ continue;
+
+ case FLIP_CONTROL_FRAME_PAYLOAD: {
+ int bytes_read = ProcessControlFramePayload(data, len);
+ len -= bytes_read;
+ data += bytes_read;
+ }
+ // intentional fallthrough
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ // control frame has too-large payload
+ // intentional fallthrough
+ case FLIP_FORWARD_STREAM_FRAME:
+ if (remaining_payload_) {
+ uint32 amount_to_forward = std::min(remaining_payload_, len);
+ if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) {
+ const FlipDataFrame* data_frame =
+ reinterpret_cast<const FlipDataFrame*>(current_data_frame);
+ if (data_frame->flags() & DATA_FLAG_COMPRESSED) {
+ // TODO(mbelshe): Assert that the decompressor is init'ed.
+ if (!InitializeDecompressor())
+ return NULL;
+
+ int decompressed_max_size = amount_to_forward * 100;
+ scoped_array<char> decompressed(new char[decompressed_max_size]);
+ decompressor_->next_in = reinterpret_cast<Bytef*>(
+ const_cast<char*>(data));
+ decompressor_->avail_in = amount_to_forward;
+ decompressor_->next_out =
+ reinterpret_cast<Bytef*>(decompressed.get());
+ decompressor_->avail_out = decompressed_max_size;
+
+ int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ if (rv != Z_OK) {
+ set_error(FLIP_DECOMPRESS_FAILURE);
+ goto bottom;
+ }
+ int decompressed_size = decompressed_max_size -
+ decompressor_->avail_out;
+ visitor_->OnStreamFrameData(current_data_frame->stream_id(),
+ decompressed.get(),
+ decompressed_size);
+ amount_to_forward -= decompressor_->avail_in;
+ } else {
+ // The data frame was not compressed
+ visitor_->OnStreamFrameData(current_data_frame->stream_id(),
+ data, amount_to_forward);
+ }
+ }
+ data += amount_to_forward;
+ len -= amount_to_forward;
+ remaining_payload_ -= amount_to_forward;
+ } else {
+ CHANGE_STATE(FLIP_AUTO_RESET);
+ }
+ continue;
+ default:
+ break;
+ }
+ }
+ bottom:
+ return original_len - len;
+}
+
+uint32 FlipFramer::ProcessCommonHeader(const char* data, uint32 len) {
+ // This should only be called when we're in the FLIP_READING_COMMON_HEADER
+ // state.
+ DCHECK(state_ == FLIP_READING_COMMON_HEADER);
+
+ int original_len = len;
+ FlipDataFrame* current_frame =
+ reinterpret_cast<FlipDataFrame*>(current_frame_buffer_);
+
+ do {
+ if (current_frame_len_ < sizeof(FlipFrame)) {
+ uint32 bytes_desired = sizeof(FlipFrame) - current_frame_len_;
+ uint32 bytes_to_append = std::min(bytes_desired, len);
+ char* header_buffer = current_frame_buffer_;
+ memcpy(&header_buffer[current_frame_len_], data, bytes_to_append);
+ current_frame_len_ += bytes_to_append;
+ data += bytes_to_append;
+ len -= bytes_to_append;
+ // Check for an empty data packet.
+ if (current_frame_len_ == sizeof(FlipFrame) &&
+ !current_frame->is_control_frame() &&
+ current_frame->length() == 0) {
+ visitor_->OnStreamFrameData(current_frame->stream_id(), NULL, 0);
+ CHANGE_STATE(FLIP_RESET);
+ }
+ break;
+ }
+ remaining_payload_ = current_frame->length();
+ // if we're here, then we have the common header all received.
+ if (!current_frame->is_control_frame())
+ CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME);
+ else
+ CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER);
+ } while (false);
+
+ return original_len - len;
+}
+
+uint32 FlipFramer::ProcessControlFramePayload(const char* data, uint32 len) {
+ int original_len = len;
+ do {
+ if (remaining_control_payload_) {
+ uint32 amount_to_consume = std::min(remaining_control_payload_, len);
+ memcpy(&current_frame_buffer_[current_frame_len_], data,
+ amount_to_consume);
+ current_frame_len_ += amount_to_consume;
+ data += amount_to_consume;
+ len -= amount_to_consume;
+ remaining_control_payload_ -= amount_to_consume;
+ remaining_payload_ -= amount_to_consume;
+ if (remaining_control_payload_)
+ break;
+ }
+ FlipControlFrame* control_frame =
+ reinterpret_cast<FlipControlFrame*>(current_frame_buffer_);
+ visitor_->OnControl(control_frame);
+ CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD);
+ } while (false);
+ return original_len - len;
+}
+
+void FlipFramer::ExpandControlFrameBuffer(int size) {
+ DCHECK(size < kControlFrameBufferMaxSize);
+ if (size < current_frame_capacity_)
+ return;
+
+ int alloc_size = size + sizeof(FlipFrame);
+ char* new_buffer = new char[alloc_size];
+ memcpy(new_buffer, current_frame_buffer_, current_frame_len_);
+ current_frame_capacity_ = alloc_size;
+ current_frame_buffer_ = new_buffer;
+}
+
+bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame,
+ FlipHeaderBlock* block) {
+ uint32 type = reinterpret_cast<const FlipControlFrame*>(frame)->type();
+ if (type != SYN_STREAM && type != SYN_REPLY)
+ return false;
+
+ // Find the header data within the control frame.
+ scoped_array<FlipSynStreamControlFrame> control_frame(
+ reinterpret_cast<FlipSynStreamControlFrame*>(DecompressFrame(frame)));
+ if (!control_frame.get())
+ return false;
+ const char *header_data = control_frame.get()->header_block();
+ int header_length = control_frame.get()->header_block_len();
+
+ FlipFrameBuilder builder(header_data, header_length);
+ void* iter = NULL;
+ uint16 num_headers;
+ if (builder.ReadUInt16(&iter, &num_headers)) {
+ VLOG(2) << "found num_headers: " << num_headers;
+ for (int index = 0; index < num_headers; ++index) {
+ std::string name;
+ std::string value;
+ if (!builder.ReadString(&iter, &name)) {
+ VLOG(1) << "couldn't read string (key)!";
+ break;
+ }
+ if (!builder.ReadString(&iter, &value)) {
+ VLOG(1) << "couldn't read string (value)!";
+ break;
+ }
+ if (block->empty()) {
+ (*block)[name] = value;
+ } else {
+ FlipHeaderBlock::iterator last = --block->end();
+ if (block->key_comp()(last->first, name)) {
+ block->insert(block->end(),
+ std::pair<std::string, std::string>(name, value));
+ } else {
+ return false;
+ }
+ }
+ }
+ return true;
+ } else {
+ VLOG(2) << "didn't find headers";
+ }
+ return false;
+}
+
+FlipSynStreamControlFrame* FlipFramer::CreateSynStream(
+ int stream_id,
+ int priority,
+ bool compress,
+ FlipHeaderBlock* headers) {
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(SYN_STREAM);
+ frame.WriteUInt32(0); // Placeholder for the length.
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt16(ntohs(priority) << 6); // Priority.
+
+ frame.WriteUInt16(headers->size()); // Number of headers.
+ FlipHeaderBlock::iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ frame.WriteString(it->first);
+ frame.WriteString(it->second);
+ }
+ // write the length
+ frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame));
+ if (compress) {
+ FlipSynStreamControlFrame* new_frame =
+ reinterpret_cast<FlipSynStreamControlFrame*>(
+ CompressFrame(frame.data()));
+ return new_frame;
+ }
+
+ return reinterpret_cast<FlipSynStreamControlFrame*>(frame.take());
+}
+
+/* static */
+FlipFinStreamControlFrame* FlipFramer::CreateFinStream(int stream_id,
+ int status) {
+ FlipFrameBuilder frame;
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(FIN_STREAM);
+ frame.WriteUInt32(8);
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt32(status);
+ return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take());
+}
+
+FlipSynReplyControlFrame* FlipFramer::CreateSynReply(int stream_id,
+ bool compressed, FlipHeaderBlock* headers) {
+
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(SYN_REPLY);
+ frame.WriteUInt32(0); // Placeholder for the length.
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt16(0); // Priority.
+
+ frame.WriteUInt16(headers->size()); // Number of headers.
+ FlipHeaderBlock::iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ // TODO(mbelshe): Headers need to be sorted.
+ frame.WriteString(it->first);
+ frame.WriteString(it->second);
+ }
+ // write the length
+ frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame));
+ if (compressed)
+ return reinterpret_cast<FlipSynReplyControlFrame*>(
+ CompressFrame(frame.data()));
+ return reinterpret_cast<FlipSynReplyControlFrame*>(frame.take());
+}
+
+FlipDataFrame* FlipFramer::CreateDataFrame(int stream_id,
+ const char* data,
+ int len, bool compressed) {
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt32(stream_id);
+
+ frame.WriteUInt32(len);
+ frame.WriteBytes(data, len);
+ if (compressed)
+ return reinterpret_cast<FlipDataFrame*>(CompressFrame(frame.data()));
+ return reinterpret_cast<FlipDataFrame*>(frame.take());
+}
+
+static const int kCompressorLevel = Z_DEFAULT_COMPRESSION;
+// This is just a hacked dictionary to use for shrinking HTTP-like headers.
+// TODO(mbelshe): Use a scientific methodology for computing the dictionary.
+static const char dictionary[] =
+ "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
+ "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
+ "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser"
+ "-agent10010120020120220320420520630030130230330430530630740040140240340440"
+ "5406407408409410411412413414415416417500501502503504505accept-rangesageeta"
+ "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic"
+ "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran"
+ "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati"
+ "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo"
+ "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe"
+ "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic"
+ "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1"
+ ".1statusversionurl";
+static uLong dictionary_id = 0;
+
+bool FlipFramer::InitializeCompressor() {
+ if (compressor_.get())
+ return true; // Already initialized.
+
+ compressor_.reset(new z_stream);
+ memset(compressor_.get(), 0, sizeof(z_stream));
+
+ int success = deflateInit(compressor_.get(), kCompressorLevel);
+ if (success == Z_OK)
+ success = deflateSetDictionary(compressor_.get(),
+ reinterpret_cast<const Bytef*>(dictionary),
+ sizeof(dictionary));
+ if (success != Z_OK)
+ compressor_.reset(NULL);
+ return success == Z_OK;
+}
+
+bool FlipFramer::InitializeDecompressor() {
+ if (decompressor_.get())
+ return true; // Already initialized.
+
+ decompressor_.reset(new z_stream);
+ memset(decompressor_.get(), 0, sizeof(z_stream));
+
+ // Compute the id of our dictionary so that we know we're using the
+ // right one when asked for it.
+ if (dictionary_id == 0) {
+ dictionary_id = adler32(0L, Z_NULL, 0);
+ dictionary_id = adler32(dictionary_id,
+ reinterpret_cast<const Bytef*>(dictionary),
+ sizeof(dictionary));
+ }
+
+ int success = inflateInit(decompressor_.get());
+ if (success != Z_OK)
+ decompressor_.reset(NULL);
+ return success == Z_OK;
+}
+
+bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame,
+ int* payload_length,
+ int* header_length,
+ const unsigned char** payload) const {
+ if (frame->is_control_frame()) {
+ const FlipControlFrame* control_frame =
+ reinterpret_cast<const FlipControlFrame*>(frame);
+ switch (control_frame->type()) {
+ case SYN_STREAM:
+ case SYN_REPLY:
+ {
+ const FlipSynStreamControlFrame *syn_frame =
+ reinterpret_cast<const FlipSynStreamControlFrame*>(frame);
+ *payload_length = syn_frame->header_block_len();
+ *header_length = sizeof(FlipFrame) + syn_frame->length() -
+ syn_frame->header_block_len();
+ *payload = reinterpret_cast<const unsigned char*>(frame) +
+ *header_length;
+ }
+ break;
+ default:
+ // TODO(mbelshe): set an error?
+ return false; // We can't compress this frame!
+ }
+ } else {
+ *header_length = sizeof(FlipFrame);
+ *payload_length = frame->length();
+ *payload = reinterpret_cast<const unsigned char*>(frame) +
+ sizeof(FlipFrame);
+ }
+ DCHECK(static_cast<size_t>(*header_length) <=
+ sizeof(FlipFrame) + *payload_length);
+ return true;
+}
+
+
+FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) {
+ int payload_length;
+ int header_length;
+ const unsigned char* payload;
+
+ static StatsCounter pre_compress_bytes("flip.PreCompressSize");
+ static StatsCounter post_compress_bytes("flip.PostCompressSize");
+
+ if (!enable_compression_)
+ return DuplicateFrame(frame);
+
+ if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
+ return NULL;
+
+ if (!InitializeCompressor())
+ return NULL;
+
+ // TODO(mbelshe): Should we have a zlib header like what http servers do?
+
+ // Create an output frame.
+ int compressed_max_size = deflateBound(compressor_.get(), payload_length);
+ int new_frame_size = header_length + compressed_max_size;
+ FlipFrame* new_frame =
+ reinterpret_cast<FlipFrame*>(new char[new_frame_size]);
+ memcpy(new_frame, frame, header_length);
+
+ compressor_->next_in = const_cast<Bytef*>(payload);
+ compressor_->avail_in = payload_length;
+ compressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + header_length;
+ compressor_->avail_out = compressed_max_size;
+
+ // Data packets have a 'compressed flag
+ if (!new_frame->is_control_frame()) {
+ FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
+ data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED);
+ }
+
+ int rv = deflate(compressor_.get(), Z_SYNC_FLUSH);
+ if (rv != Z_OK) { // How can we know that it compressed everything?
+ // This shouldn't happen, right?
+ free(new_frame);
+ return NULL;
+ }
+
+ int compressed_size = compressed_max_size - compressor_->avail_out;
+ new_frame->set_length(header_length + compressed_size - sizeof(FlipFrame));
+
+ pre_compress_bytes.Add(payload_length);
+ post_compress_bytes.Add(new_frame->length());
+
+ return new_frame;
+}
+
+FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) {
+ int payload_length;
+ int header_length;
+ const unsigned char* payload;
+
+ static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize");
+ static StatsCounter post_decompress_bytes("flip.PostDeCompressSize");
+
+ if (!enable_compression_)
+ return DuplicateFrame(frame);
+
+ if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
+ return NULL;
+
+ if (!frame->is_control_frame()) {
+ const FlipDataFrame* data_frame =
+ reinterpret_cast<const FlipDataFrame*>(frame);
+ if (!data_frame->flags() & DATA_FLAG_COMPRESSED)
+ return DuplicateFrame(frame);
+ }
+
+ if (!InitializeDecompressor())
+ return NULL;
+
+ // TODO(mbelshe): Should we have a zlib header like what http servers do?
+
+ // Create an output frame. Assume it does not need to be longer than
+ // the input data.
+ int decompressed_max_size = kControlFrameBufferInitialSize;
+ int new_frame_size = header_length + decompressed_max_size;
+ FlipFrame* new_frame =
+ reinterpret_cast<FlipFrame*>(new char[new_frame_size]);
+ memcpy(new_frame, frame, header_length);
+
+ decompressor_->next_in = const_cast<Bytef*>(payload);
+ decompressor_->avail_in = payload_length;
+ decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame) +
+ header_length;
+ decompressor_->avail_out = decompressed_max_size;
+
+ int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ if (rv == Z_NEED_DICT) {
+ // Need to try again with the right dictionary.
+ if (decompressor_->adler == dictionary_id) {
+ rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary,
+ sizeof(dictionary));
+ if (rv == Z_OK)
+ rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ }
+ }
+ if (rv != Z_OK) { // How can we know that it decompressed everything?
+ free(new_frame);
+ return NULL;
+ }
+
+ // Unset the compressed flag for data frames.
+ if (!new_frame->is_control_frame()) {
+ FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
+ data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED);
+ }
+
+ int decompressed_size = decompressed_max_size - decompressor_->avail_out;
+ new_frame->set_length(header_length + decompressed_size - sizeof(FlipFrame));
+
+ pre_decompress_bytes.Add(frame->length());
+ post_decompress_bytes.Add(new_frame->length());
+
+ return new_frame;
+}
+
+FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) {
+ int size = sizeof(FlipFrame) + frame->length();
+ char* new_frame = new char[size];
+ memcpy(new_frame, frame, size);
+ return reinterpret_cast<FlipFrame*>(new_frame);
+}
+
+void FlipFramer::set_enable_compression(bool value) {
+ enable_compression_ = value;
+}
+
+void FlipFramer::set_enable_compression_default(bool value) {
+ compression_default_ = value;
+}
+
+} // namespace flip
+
diff --git a/net/flip/flip_framer.h b/net/flip/flip_framer.h
new file mode 100644
index 0000000..e50cc83
--- /dev/null
+++ b/net/flip/flip_framer.h
@@ -0,0 +1,228 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FLIP_FRAMER_H_
+#define NET_FLIP_FLIP_FRAMER_H_
+
+#ifdef _WIN32
+#include <winsock2.h>
+#else
+#include <arpa/inet.h>
+#endif
+#include <map>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/scoped_ptr.h"
+#include "testing/gtest/include/gtest/gtest_prod.h"
+#include "flip_protocol.h" // cross-google3 directory naming.
+
+typedef struct z_stream_s z_stream; // Forward declaration for zlib.
+
+namespace net {
+class FlipNetworkTransactionTest;
+}
+
+namespace flip {
+
+class FlipFramer;
+class FlipFramerTest;
+
+// A datastructure for holding a set of headers from either a
+// SYN_STREAM or SYN_REPLY frame.
+typedef std::map<std::string, std::string> FlipHeaderBlock;
+
+// FlipFramerVisitorInterface is a set of callbacks for the FlipFramer.
+// Implement this interface to receive event callbacks as frames are
+// decoded from the framer.
+class FlipFramerVisitorInterface {
+ public:
+ virtual ~FlipFramerVisitorInterface() {}
+
+ // Called if an error is detected in the FlipFrame protocol.
+ virtual void OnError(FlipFramer* framer) = 0;
+
+ // Called when a Control Frame is received.
+ virtual void OnControl(const FlipControlFrame* frame) = 0;
+
+ // Called when data is received.
+ virtual void OnStreamFrameData(uint32 sream_id,
+ const char* data,
+ uint32 len) = 0;
+
+ // TODO(fenix): Implement me!
+ virtual void OnLameDuck() = 0;
+};
+
+class FlipFramer {
+ public:
+ // Flip states.
+ // TODO(mbelshe): Can we move these into the implementation
+ // and avoid exposing through the header. (Needed for test)
+ enum FlipState {
+ FLIP_ERROR,
+ FLIP_DONE,
+ FLIP_RESET,
+ FLIP_AUTO_RESET,
+ FLIP_READING_COMMON_HEADER,
+ FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER,
+ FLIP_CONTROL_FRAME_PAYLOAD,
+ FLIP_IGNORE_REMAINING_PAYLOAD,
+ FLIP_FORWARD_STREAM_FRAME
+ };
+
+ // Flip error codes.
+ enum FlipError {
+ FLIP_NO_ERROR,
+ FLIP_UNKNOWN_CONTROL_TYPE, // Control frame is an unknown type.
+ FLIP_INVALID_CONTROL_FRAME, // Control frame is mal-formatted.
+ FLIP_CONTROL_PAYLOAD_TOO_LARGE, // Control frame payload was too large.
+ FLIP_ZLIB_INIT_FAILURE, // The Zlib library could not initialize.
+ FLIP_UNSUPPORTED_VERSION, // Control frame has unsupported version.
+ FLIP_DECOMPRESS_FAILURE, // There was an error decompressing.
+ };
+
+ // Create a new Framer.
+ FlipFramer();
+ virtual ~FlipFramer();
+
+ // Set callbacks to be called from the framer. A visitor must be set, or
+ // else the framer will likely crash. It is acceptable for the visitor
+ // to do nothing. If this is called multiple times, only the last visitor
+ // will be used.
+ void set_visitor(FlipFramerVisitorInterface* visitor) {
+ visitor_ = visitor;
+ }
+
+ // Pass data into the framer for parsing.
+ // Returns the number of bytes consumed. It is safe to pass more bytes in
+ // than may be consumed.
+ uint32 ProcessInput(const char* data, uint32 len);
+
+ // Resets the framer state after a frame has been successfully decoded.
+ // TODO(mbelshe): can we make this private?
+ void Reset();
+
+ // Check the state of the framer.
+ FlipError error_code() const { return error_code_; }
+ FlipState state() const { return state_; }
+
+ bool MessageFullyRead() {
+ return state_ == FLIP_DONE || state_ == FLIP_AUTO_RESET;
+ }
+ bool HasError() { return state_ == FLIP_ERROR; }
+
+ // Further parsing utilities.
+ // Given a control frame, parse out a FlipHeaderBlock. Only
+ // valid for SYN_STREAM and SYN_REPLY frames.
+ // Returns true if successfully parsed, false otherwise.
+ bool ParseHeaderBlock(const FlipFrame* frame, FlipHeaderBlock* block);
+
+ // Frame creation utilities
+ // Create a FlipSynStreamControlFrame. The resulting frame will be
+ // compressed if |compressed| is true.
+ FlipSynStreamControlFrame* CreateSynStream(int stream_id, int priority,
+ bool compress,
+ FlipHeaderBlock* headers);
+ static FlipFinStreamControlFrame* CreateFinStream(int stream_id, int status);
+
+ // Create a FlipSynReplyControlFrame.The resulting frame will be
+ // compressed if |compressed| is true.
+ FlipSynReplyControlFrame* CreateSynReply(int stream_id,
+ bool compress,
+ FlipHeaderBlock* headers);
+
+ // Create a FlipDataFrame. The resulting frame will be
+ // compressed if |compressed| is true.
+ FlipDataFrame* CreateDataFrame(int stream_id, const char* data,
+ int len, bool compressed);
+
+ // NOTES about frame compression.
+ // We want flip to compress headers across the entire session. As long as
+ // the session is over TCP, frames are sent serially. The client & server
+ // can each compress frames in the same order and then compress them in that
+ // order, and the remote can do the reverse. However, we ultimately want
+ // the creation of frames to be less sensitive to order so that they can be
+ // placed over a UDP based protocol and yet still benefit from some
+ // compression. We don't know of any good compression protocol which does
+ // not build its state in a serial (stream based) manner.... For now, we're
+ // using zlib anyway.
+
+ // Compresses a FlipFrame.
+ // On success, returns a new FlipFrame with the payload compressed.
+ // Compression state is maintained as part of the FlipFramer.
+ // Returned frame must be freed with free().
+ // On failure, returns NULL.
+ FlipFrame* CompressFrame(const FlipFrame* frame);
+
+ // Decompresses a FlipFrame.
+ // On success, returns a new FlipFrame with the payload decompressed.
+ // Compression state is maintained as part of the FlipFramer.
+ // Returned frame must be freed with free().
+ // On failure, returns NULL.
+ FlipFrame* DecompressFrame(const FlipFrame* frame);
+
+ // Create a copy of a frame.
+ FlipFrame* DuplicateFrame(const FlipFrame* frame);
+
+ // For debugging.
+ static const char* StateToString(int state);
+ static const char* ErrorCodeToString(int error_code);
+
+ protected:
+ FRIEND_TEST(FlipFramerTest, Basic);
+ FRIEND_TEST(FlipFramerTest, HeaderBlockBarfsOnOutOfOrderHeaders);
+ friend class FlipNetworkTransactionTest;
+
+ // For ease of testing we can tweak compression on/off.
+ void set_enable_compression(bool value);
+ static void set_enable_compression_default(bool value);
+
+ private:
+ // Internal breakout from ProcessInput. Returns the number of bytes
+ // consumed from the data.
+ uint32 ProcessCommonHeader(const char* data, uint32 len);
+ uint32 ProcessControlFramePayload(const char* data, uint32 len);
+
+ // Initialize the ZLib state.
+ bool InitializeCompressor();
+ bool InitializeDecompressor();
+
+ // Not used (yet)
+ uint32 BytesSafeToRead() const;
+
+ // Set the error code.
+ void set_error(FlipError error);
+
+ // Expands the control frame buffer to accomodate a particular payload size.
+ void ExpandControlFrameBuffer(int size);
+
+ // Given a frame, breakdown the variable payload length, the static header
+ // header length, and variable payload pointer.
+ bool GetFrameBoundaries(const FlipFrame* frame, int* payload_length,
+ int* header_length,
+ const unsigned char** payload) const;
+
+ FlipState state_;
+ FlipError error_code_;
+ uint32 remaining_payload_;
+ uint32 remaining_control_payload_;
+
+ char* current_frame_buffer_;
+ int current_frame_len_; // Number of bytes read into the current_frame_.
+ int current_frame_capacity_;
+
+ bool enable_compression_;
+ scoped_ptr<z_stream> compressor_;
+ scoped_ptr<z_stream> decompressor_;
+ FlipFramerVisitorInterface* visitor_;
+
+ static bool compression_default_;
+};
+
+} // namespace flip
+
+#endif // NET_FLIP_FLIP_FRAMER_H_
+
diff --git a/net/flip/flip_framer_test.cc b/net/flip/flip_framer_test.cc
new file mode 100644
index 0000000..a786bc2
--- /dev/null
+++ b/net/flip/flip_framer_test.cc
@@ -0,0 +1,251 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <iostream>
+#include "base/scoped_ptr.h"
+
+#include "flip_framer.h" // cross-google3 directory naming.
+#include "flip_protocol.h"
+#include "flip_frame_builder.h"
+#ifdef _WIN32
+#include "testing/platform_test.h"
+#else
+#include "testing/base/public/gunit.h"
+
+#define PlatformTest ::testing::Test
+#endif
+
+namespace flip {
+
+class FlipFramerTest : public PlatformTest {
+ public:
+ virtual void TearDown() {}
+};
+
+class TestFlipVisitor : public FlipFramerVisitorInterface {
+ public:
+ explicit TestFlipVisitor(FlipFramer* framer)
+ : framer_(framer),
+ error_count_(0),
+ syn_frame_count_(0),
+ syn_reply_frame_count_(0),
+ data_frame_count_(0),
+ fin_frame_count_(0) {
+ }
+
+ void OnError(FlipFramer* f) {
+ error_count_++;
+ }
+ void OnStreamFrameData(FlipStreamId stream_id,
+ const char* data,
+ uint32 len) {
+ data_frame_count_++;
+#ifdef TEST_LOGGING
+ std::cerr << "OnStreamFrameData(" << stream_id << ", \"";
+ if (len > 0) {
+ for (uint32 i = 0 ; i < len; ++i) {
+ std::cerr << std::hex << (0xFF & (unsigned int)data[i]) << std::dec;
+ }
+ }
+ std::cerr << "\", " << len << ")\n";
+#endif // TEST_LOGGING
+ }
+
+ void OnControl(const FlipControlFrame* frame) {
+ FlipHeaderBlock headers;
+ bool parsed_headers = false;
+ switch (frame->type()) {
+ case SYN_STREAM:
+ parsed_headers = framer_->ParseHeaderBlock(
+ reinterpret_cast<const FlipFrame*>(frame), &headers);
+ DCHECK(parsed_headers);
+ syn_frame_count_++;
+ VLOG(2) << "OnSyn(" << frame->stream_id() << ")\n";
+ break;
+ case SYN_REPLY:
+ parsed_headers = framer_->ParseHeaderBlock(
+ reinterpret_cast<const FlipFrame*>(frame), &headers);
+ DCHECK(parsed_headers);
+ syn_reply_frame_count_++;
+ VLOG(2) << "OnSynReply(" << frame->stream_id() << ")\n";
+ break;
+ case FIN_STREAM:
+ fin_frame_count_++;
+ VLOG(2) << "OnFin(" << frame->stream_id() << ")\n";
+ break;
+ default:
+ DCHECK(false); // Error!
+ }
+ }
+
+ void OnLameDuck() {
+ }
+
+ FlipFramer* framer_;
+ // Counters from the visitor callbacks.
+ int error_count_;
+ int syn_frame_count_;
+ int syn_reply_frame_count_;
+ int data_frame_count_;
+ int fin_frame_count_;
+};
+
+// Test our protocol constants
+TEST_F(FlipFramerTest, ProtocolConstants) {
+ EXPECT_EQ(8, sizeof(FlipFrame));
+ EXPECT_EQ(8, sizeof(FlipDataFrame));
+ EXPECT_EQ(12, sizeof(FlipControlFrame));
+ EXPECT_EQ(16, sizeof(FlipSynStreamControlFrame));
+ EXPECT_EQ(16, sizeof(FlipSynReplyControlFrame));
+ EXPECT_EQ(16, sizeof(FlipFinStreamControlFrame));
+ EXPECT_EQ(1, SYN_STREAM);
+ EXPECT_EQ(2, SYN_REPLY);
+ EXPECT_EQ(3, FIN_STREAM);
+}
+
+// Test that we can encode and decode a FlipHeaderBlock.
+TEST_F(FlipFramerTest, HeaderBlock) {
+ FlipHeaderBlock headers;
+ headers["alpha"] = "beta";
+ headers["gamma"] = "charlie";
+ FlipFramer framer;
+
+ // Encode the header block into a SynStream frame.
+ scoped_ptr<FlipSynStreamControlFrame> frame(
+ framer.CreateSynStream(1, 1, true, &headers));
+ EXPECT_TRUE(frame.get() != NULL);
+
+ FlipHeaderBlock new_headers;
+ FlipFrame* control_frame = reinterpret_cast<FlipFrame*>(frame.get());
+ framer.ParseHeaderBlock(control_frame, &new_headers);
+
+ EXPECT_EQ(headers.size(), new_headers.size());
+ EXPECT_EQ(headers["alpha"], new_headers["alpha"]);
+ EXPECT_EQ(headers["gamma"], new_headers["gamma"]);
+}
+
+TEST_F(FlipFramerTest, HeaderBlockBarfsOnOutOfOrderHeaders) {
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt16(kControlFlagMask | 1);
+ frame.WriteUInt16(SYN_STREAM);
+ frame.WriteUInt32(0); // Placeholder for the length.
+ frame.WriteUInt32(3); // stream_id
+ frame.WriteUInt16(0); // Priority.
+
+ frame.WriteUInt16(2); // Number of headers.
+ FlipHeaderBlock::iterator it;
+ frame.WriteString("gamma");
+ frame.WriteString("gamma");
+ frame.WriteString("alpha");
+ frame.WriteString("alpha");
+ // write the length
+ frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame));
+
+ FlipHeaderBlock new_headers;
+ const FlipFrame* control_frame = frame.data();
+ FlipFramer framer;
+ framer.set_enable_compression(false);
+ EXPECT_FALSE(framer.ParseHeaderBlock(control_frame, &new_headers));
+}
+
+TEST_F(FlipFramerTest, BasicCompression) {
+ FlipHeaderBlock headers;
+ headers["server"] = "FlipServer 1.0";
+ headers["date"] = "Mon 12 Jan 2009 12:12:12 PST";
+ headers["status"] = "200";
+ headers["version"] = "HTTP/1.1";
+ headers["content-type"] = "text/html";
+ headers["content-length"] = "12";
+
+ FlipFramer framer;
+ scoped_ptr<FlipSynStreamControlFrame>
+ frame1(framer.CreateSynStream(1, 1, true, &headers));
+ scoped_ptr<FlipSynStreamControlFrame>
+ frame2(framer.CreateSynStream(1, 1, true, &headers));
+
+ // Expect the second frame to be more compact than the first.
+ EXPECT_LE(frame2->length(), frame1->length());
+
+ // Decompress the first frame
+ scoped_ptr<FlipFrame> frame3(
+ framer.DecompressFrame(reinterpret_cast<FlipFrame*>(frame1.get())));
+
+ // Decompress the second frame
+ scoped_ptr<FlipFrame> frame4(
+ framer.DecompressFrame(reinterpret_cast<FlipFrame*>(frame2.get())));
+
+ // Expect frames 3 & 4 to be the same.
+ EXPECT_EQ(0,
+ memcmp(frame3.get(), frame4.get(),
+ sizeof(FlipFrame) + frame3->length()));
+}
+
+TEST_F(FlipFramerTest, Basic) {
+ const unsigned char input[] = {
+ 0x80, 0x01, 0x00, 0x01, // SYN Stream #1
+ 0x00, 0x00, 0x00, 0x10,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x02, 'h', 'h',
+ 0x00, 0x02, 'v', 'v',
+
+ 0x00, 0x00, 0x00, 0x01, // DATA on Stream #1
+ 0x00, 0x00, 0x00, 0x0c,
+ 0xde, 0xad, 0xbe, 0xef,
+ 0xde, 0xad, 0xbe, 0xef,
+ 0xde, 0xad, 0xbe, 0xef,
+
+ 0x80, 0x01, 0x00, 0x01, // SYN Stream #3
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x00, 0x00,
+
+ 0x00, 0x00, 0x00, 0x03, // DATA on Stream #3
+ 0x00, 0x00, 0x00, 0x08,
+ 0xde, 0xad, 0xbe, 0xef,
+ 0xde, 0xad, 0xbe, 0xef,
+
+ 0x00, 0x00, 0x00, 0x01, // DATA on Stream #1
+ 0x00, 0x00, 0x00, 0x04,
+ 0xde, 0xad, 0xbe, 0xef,
+
+ 0x80, 0x01, 0x00, 0x03, // FIN on Stream #1
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+
+ 0x00, 0x00, 0x00, 0x03, // DATA on Stream #3
+ 0x00, 0x00, 0x00, 0x00,
+
+ 0x80, 0x01, 0x00, 0x03, // FIN on Stream #3
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x00, 0x00,
+ };
+
+ FlipFramer framer;
+ framer.set_enable_compression(false);
+ TestFlipVisitor visitor(&framer);
+ framer.set_visitor(&visitor);
+ size_t input_remaining = sizeof(input);
+ const char* input_ptr = reinterpret_cast<const char*>(input);
+ while (input_remaining > 0 &&
+ framer.error_code() == FlipFramer::FLIP_NO_ERROR) {
+ size_t bytes_processed = framer.ProcessInput(input_ptr, sizeof(input));
+ input_remaining -= bytes_processed;
+ input_ptr += bytes_processed;
+ if (framer.state() == FlipFramer::FLIP_DONE)
+ framer.Reset();
+ }
+ EXPECT_EQ(0, visitor.error_count_);
+ EXPECT_EQ(2, visitor.syn_frame_count_);
+ EXPECT_EQ(0, visitor.syn_reply_frame_count_);
+ EXPECT_EQ(4, visitor.data_frame_count_);
+ EXPECT_EQ(2, visitor.fin_frame_count_);
+}
+
+} // namespace flip
+
+
diff --git a/net/flip/flip_network_transaction.cc b/net/flip/flip_network_transaction.cc
new file mode 100644
index 0000000..88b931d
--- /dev/null
+++ b/net/flip/flip_network_transaction.cc
@@ -0,0 +1,414 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/flip/flip_network_transaction.h"
+
+#include "base/scoped_ptr.h"
+#include "base/compiler_specific.h"
+#include "base/field_trial.h"
+#include "base/string_util.h"
+#include "base/trace_event.h"
+#include "build/build_config.h"
+#include "net/base/connection_type_histograms.h"
+#include "net/base/host_resolver.h"
+#include "net/base/io_buffer.h"
+#include "net/base/load_flags.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
+#include "net/base/upload_data_stream.h"
+#include "net/http/http_auth.h"
+#include "net/http/http_auth_handler.h"
+#include "net/http/http_basic_stream.h"
+#include "net/http/http_chunked_decoder.h"
+#include "net/http/http_network_session.h"
+#include "net/http/http_request_info.h"
+#include "net/http/http_response_headers.h"
+#include "net/http/http_util.h"
+#include "net/socket/client_socket_factory.h"
+#include "net/socket/ssl_client_socket.h"
+
+using base::Time;
+
+namespace net {
+
+//-----------------------------------------------------------------------------
+
+FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session)
+ : flip_request_id_(0),
+ user_callback_(0),
+ user_buffer_bytes_remaining_(0),
+ session_(session),
+ request_(NULL),
+ response_complete_(false),
+ response_status_(net::OK),
+ next_state_(STATE_NONE) {
+}
+
+FlipNetworkTransaction::~FlipNetworkTransaction() {
+ LOG(INFO) << "FlipNetworkTransaction dead. " << this;
+ if (flip_ && flip_request_id_)
+ flip_->CancelStream(flip_request_id_);
+}
+
+const HttpRequestInfo* FlipNetworkTransaction::request() {
+ return request_;
+}
+
+const UploadDataStream* FlipNetworkTransaction::data() {
+ return request_body_stream_.get();
+}
+
+void FlipNetworkTransaction::OnRequestSent(int status) {
+ if (status == OK)
+ next_state_ = STATE_SEND_REQUEST_COMPLETE;
+ else
+ next_state_ = STATE_NONE;
+
+ int rv = DoLoop(status);
+}
+
+void FlipNetworkTransaction::OnResponseReceived(HttpResponseInfo* response) {
+ next_state_ = STATE_READ_HEADERS_COMPLETE;
+
+ response_ = *response; // TODO(mbelshe): avoid copy.
+
+ int rv = DoLoop(net::OK);
+}
+
+void FlipNetworkTransaction::OnDataReceived(const char* buffer, int bytes) {
+ // TODO(mbelshe): if data is received before a syn reply, this will crash.
+
+ DCHECK(bytes >= 0);
+ next_state_ = STATE_READ_BODY;
+
+ if (bytes > 0) {
+ DCHECK(buffer);
+
+ // TODO(mbelshe): If read is pending, we should copy the data straight into
+ // the read buffer here. For now, we'll queue it always.
+
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(bytes);
+ memcpy(io_buffer->data(), buffer, bytes);
+
+ response_body_.push_back(io_buffer);
+ }
+ int rv = DoLoop(net::OK);
+}
+
+void FlipNetworkTransaction::OnClose(int status) {
+ next_state_ = STATE_READ_BODY_COMPLETE;
+ response_complete_ = true;
+ response_status_ = status;
+ flip_request_id_ = 0; // TODO(mbelshe) - do we need this?
+ int rv = DoLoop(status);
+}
+
+void FlipNetworkTransaction::OnCancel() {
+ next_state_ = STATE_NONE;
+ response_complete_ = true;
+ response_status_ = net::ERR_ABORTED;
+ flip_request_id_ = 0; // TODO(mbelshe) - do we need this?
+ // Clear any data in our buffer.
+ while (response_body_.size())
+ response_body_.pop_front();
+}
+
+int FlipNetworkTransaction::Start(const HttpRequestInfo* request_info,
+ CompletionCallback* callback,
+ LoadLog* load_log) {
+ request_ = request_info;
+ start_time_ = base::Time::Now();
+
+ next_state_ = STATE_INIT_CONNECTION;
+ int rv = DoLoop(OK);
+ if (rv == ERR_IO_PENDING)
+ user_callback_ = callback;
+ return rv;
+}
+
+int FlipNetworkTransaction::RestartIgnoringLastError(
+ CompletionCallback* callback) {
+ // TODO(mbelshe): implement me.
+ NOTIMPLEMENTED();
+ return ERR_NOT_IMPLEMENTED;
+}
+
+int FlipNetworkTransaction::RestartWithCertificate(
+ X509Certificate* client_cert, CompletionCallback* callback) {
+ // TODO(mbelshe): implement me.
+ NOTIMPLEMENTED();
+ return ERR_NOT_IMPLEMENTED;
+}
+
+int FlipNetworkTransaction::RestartWithAuth(
+ const std::wstring& username,
+ const std::wstring& password,
+ CompletionCallback* callback) {
+ // TODO(mbelshe): implement me.
+ NOTIMPLEMENTED();
+ return 0;
+}
+
+int FlipNetworkTransaction::Read(IOBuffer* buf, int buf_len,
+ CompletionCallback* callback) {
+ DCHECK(buf);
+ DCHECK(buf_len > 0);
+ DCHECK(callback);
+ DCHECK(flip_.get());
+
+ // If we have data buffered, complete the IO immediately.
+ if (response_body_.size()) {
+ int bytes_read = 0;
+ while (response_body_.size() && buf_len > 0) {
+ scoped_refptr<IOBufferWithSize> data = response_body_.front();
+ int bytes_to_copy = std::min(buf_len, data->size());
+ memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
+ buf_len -= bytes_to_copy;
+ if (bytes_to_copy == data->size()) {
+ response_body_.pop_front();
+ } else {
+ int bytes_remaining = data->size() - bytes_to_copy;
+ IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
+ memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
+ bytes_remaining);
+ response_body_.pop_front();
+ response_body_.push_front(new_buffer);
+ }
+ bytes_read += bytes_to_copy;
+ }
+ return bytes_read;
+ }
+
+ if (response_complete_)
+ return 0; // We already finished reading this stream.
+
+ user_callback_ = callback;
+ user_buffer_ = buf;
+ user_buffer_bytes_remaining_ = buf_len;
+ return net::ERR_IO_PENDING;
+}
+
+const HttpResponseInfo* FlipNetworkTransaction::GetResponseInfo() const {
+ return (response_.headers || response_.ssl_info.cert) ? &response_ : NULL;
+}
+
+LoadState FlipNetworkTransaction::GetLoadState() const {
+ switch (next_state_) {
+ case STATE_INIT_CONNECTION_COMPLETE:
+ if (flip_.get())
+ return flip_->GetLoadState();
+ return LOAD_STATE_CONNECTING;
+ case STATE_SEND_REQUEST_COMPLETE:
+ return LOAD_STATE_SENDING_REQUEST;
+ case STATE_READ_HEADERS_COMPLETE:
+ return LOAD_STATE_WAITING_FOR_RESPONSE;
+ case STATE_READ_BODY_COMPLETE:
+ return LOAD_STATE_READING_RESPONSE;
+ default:
+ return LOAD_STATE_IDLE;
+ }
+}
+
+uint64 FlipNetworkTransaction::GetUploadProgress() const {
+ if (!request_body_stream_.get())
+ return 0;
+
+ return request_body_stream_->position();
+}
+
+
+void FlipNetworkTransaction::DoHttpTransactionCallback(int rv) {
+ DCHECK(rv != ERR_IO_PENDING);
+
+ // Because IO is asynchronous from the caller, we could be completing an
+ // IO even though the user hasn't issued a Read() yet.
+ if (!user_callback_)
+ return;
+
+ // Since Run may result in Read being called, clear user_callback_ up front.
+ CompletionCallback* c = user_callback_;
+ user_callback_ = NULL;
+ c->Run(rv);
+}
+
+int FlipNetworkTransaction::DoLoop(int result) {
+ DCHECK(next_state_ != STATE_NONE);
+ DCHECK(request_);
+
+ if (!request_)
+ return 0;
+
+ int rv = result;
+ do {
+ State state = next_state_;
+ next_state_ = STATE_NONE;
+ switch (state) {
+ case STATE_INIT_CONNECTION:
+ DCHECK_EQ(OK, rv);
+ rv = DoInitConnection();
+ break;
+ case STATE_INIT_CONNECTION_COMPLETE:
+ rv = DoInitConnectionComplete(rv);
+ break;
+ case STATE_SEND_REQUEST:
+ DCHECK_EQ(OK, rv);
+ rv = DoSendRequest();
+ break;
+ case STATE_SEND_REQUEST_COMPLETE:
+ rv = DoSendRequestComplete(rv);
+ break;
+ case STATE_READ_HEADERS:
+ DCHECK_EQ(OK, rv);
+ rv = DoReadHeaders();
+ break;
+ case STATE_READ_HEADERS_COMPLETE:
+ // DoReadHeadersComplete only returns OK becaue the transaction
+ // could be destroyed as part of the call.
+ rv = DoReadHeadersComplete(rv);
+ DCHECK(rv == net::OK);
+ return rv;
+ break;
+ case STATE_READ_BODY:
+ DCHECK_EQ(OK, rv);
+ rv = DoReadBody();
+ // DoReadBody only returns OK becaue the transaction could be
+ // destroyed as part of the call.
+ DCHECK(rv == net::OK);
+ return rv;
+ break;
+ case STATE_READ_BODY_COMPLETE:
+ // We return here because the Transaction could be destroyed after this
+ // call.
+ rv = DoReadBodyComplete(rv);
+ DCHECK(rv == net::OK);
+ return rv;
+ break;
+ case STATE_NONE:
+ rv = ERR_FAILED;
+ break;
+ default:
+ NOTREACHED() << "bad state";
+ rv = ERR_FAILED;
+ break;
+ }
+ } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
+
+ return rv;
+}
+
+int FlipNetworkTransaction::DoInitConnection() {
+ next_state_ = STATE_INIT_CONNECTION_COMPLETE;
+
+ std::string host = request_->url.HostNoBrackets();
+ int port = request_->url.EffectiveIntPort();
+
+ std::string connection_group = "flip.";
+ connection_group.append(host);
+
+ HostResolver::RequestInfo resolve_info(host, port);
+
+// TODO(mbelshe): Cleanup these testing tricks.
+// If we want to use multiple connections, grab the flip session
+// up front using the original domain name.
+#undef USE_MULTIPLE_CONNECTIONS
+#define DIVERT_URLS_TO_TEST_SERVER
+#if defined(USE_MULTIPLE_CONNECTIONS) || !defined(DIVERT_URLS_TO_TEST_SERVER)
+ flip_ = FlipSession::GetFlipSession(resolve_info, session_);
+#endif
+
+// Use this to divert URLs to a test server.
+#ifdef DIVERT_URLS_TO_TEST_SERVER
+ // Fake out this session to go to our test server.
+ host = "servername";
+ port = 443;
+ resolve_info = HostResolver::RequestInfo(host, port);
+#ifndef USE_MULTIPLE_CONNECTIONS
+ flip_ = FlipSession::GetFlipSession(resolve_info, session_);
+#endif // USE_MULTIPLE_CONNECTIONS
+
+#endif // DIVERT_URLS_TO_TEST_SERVER
+
+ int rv = flip_->Connect(connection_group, resolve_info, request_->priority);
+ DCHECK(rv == net::OK); // The API says it will always return OK.
+ return net::OK;
+}
+
+int FlipNetworkTransaction::DoInitConnectionComplete(int result) {
+ if (result < 0)
+ return result;
+
+ next_state_ = STATE_SEND_REQUEST;
+ return net::OK;
+}
+
+int FlipNetworkTransaction::DoSendRequest() {
+ // TODO(mbelshe): rethink this UploadDataStream wrapper.
+ if (request_->upload_data)
+ request_body_stream_.reset(new UploadDataStream(request_->upload_data));
+
+ flip_request_id_ = flip_->CreateStream(this);
+
+ if (response_complete_)
+ return net::OK;
+
+ // The FlipSession will always call us back when the send is complete.
+ return net::ERR_IO_PENDING;
+}
+
+int FlipNetworkTransaction::DoSendRequestComplete(int result) {
+ if (result < 0)
+ return result;
+
+ next_state_ = STATE_READ_HEADERS;
+ return net::OK;
+}
+
+int FlipNetworkTransaction::DoReadHeaders() {
+ // The FlipSession will always call us back when the headers have been
+ // received.
+ return net::ERR_IO_PENDING;
+}
+
+int FlipNetworkTransaction::DoReadHeadersComplete(int result) {
+ // Notify the user that the headers are ready.
+ DoHttpTransactionCallback(result);
+ return net::OK;
+}
+
+int FlipNetworkTransaction::DoReadBody() {
+ // The caller has not issued a read request yet.
+ if (!user_callback_)
+ return net::OK;
+
+ int bytes_read = 0;
+ while (response_body_.size() && user_buffer_bytes_remaining_ > 0) {
+ scoped_refptr<IOBufferWithSize> data = response_body_.front();
+ int bytes_to_copy = std::min(user_buffer_bytes_remaining_, data->size());
+ memcpy(&(user_buffer_->data()[bytes_read]), data->data(), bytes_to_copy);
+ user_buffer_bytes_remaining_ -= bytes_to_copy;
+ if (bytes_to_copy == data->size()) {
+ response_body_.pop_front();
+ } else {
+ int bytes_remaining = data->size() - bytes_to_copy;
+ IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
+ memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
+ bytes_remaining);
+ response_body_.pop_front();
+ response_body_.push_front(new_buffer);
+ }
+ bytes_read += bytes_to_copy;
+ }
+
+ DoHttpTransactionCallback(bytes_read);
+ return net::OK;
+}
+
+int FlipNetworkTransaction::DoReadBodyComplete(int result) {
+ // TODO(mbelshe): record success or failure of the transaction?
+ if (user_callback_)
+ DoHttpTransactionCallback(result);
+ return OK;
+}
+
+} // namespace net
diff --git a/net/flip/flip_network_transaction.h b/net/flip/flip_network_transaction.h
new file mode 100644
index 0000000..f4b0292
--- /dev/null
+++ b/net/flip/flip_network_transaction.h
@@ -0,0 +1,142 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_NETWORK_TRANSACTION_H_
+#define NET_FLIP_NETWORK_TRANSACTION_H_
+
+#include <string>
+#include <deque>
+
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "base/time.h"
+#include "net/base/address_list.h"
+#include "net/base/host_resolver.h"
+#include "net/base/io_buffer.h"
+#include "net/base/load_flags.h"
+#include "net/base/load_states.h"
+#include "net/base/ssl_config_service.h"
+#include "net/flip/flip_session.h"
+#include "net/http/http_auth.h"
+#include "net/http/http_auth_handler.h"
+#include "net/http/http_response_info.h"
+#include "net/http/http_transaction.h"
+#include "net/proxy/proxy_service.h"
+#include "net/socket/client_socket_handle.h"
+#include "net/socket/client_socket_pool.h"
+#include "testing/gtest/include/gtest/gtest_prod.h"
+
+namespace net {
+
+class ClientSocketFactory;
+class HttpNetworkSession;
+class UploadDataStream;
+
+// A FlipNetworkTransaction can be used to fetch HTTP conent.
+// The FlipDelegate is the consumer of events from the FlipSession.
+class FlipNetworkTransaction : public HttpTransaction, public FlipDelegate {
+ public:
+ explicit FlipNetworkTransaction(HttpNetworkSession* session);
+ virtual ~FlipNetworkTransaction();
+
+ // FlipDelegate methods:
+ virtual const HttpRequestInfo* request();
+ virtual const UploadDataStream* data();
+ virtual void OnRequestSent(int status);
+ virtual void OnResponseReceived(HttpResponseInfo* response);
+ virtual void OnDataReceived(const char* buffer, int bytes);
+ virtual void OnClose(int status);
+ virtual void OnCancel();
+
+ // HttpTransaction methods:
+ virtual int Start(const HttpRequestInfo* request_info,
+ CompletionCallback* callback,
+ LoadLog* load_log);
+ virtual int RestartIgnoringLastError(CompletionCallback* callback);
+ virtual int RestartWithCertificate(X509Certificate* client_cert,
+ CompletionCallback* callback);
+ virtual int RestartWithAuth(const std::wstring& username,
+ const std::wstring& password,
+ CompletionCallback* callback);
+ virtual bool IsReadyToRestartForAuth() { return false; }
+ virtual int Read(IOBuffer* buf, int buf_len, CompletionCallback* callback);
+ virtual const HttpResponseInfo* GetResponseInfo() const;
+ virtual LoadState GetLoadState() const;
+ virtual uint64 GetUploadProgress() const;
+
+ protected:
+ friend class FlipNetworkTransactionTest;
+
+ // Provide access to the session for testing.
+ FlipSession* GetFlipSession() { return flip_.get(); }
+
+ private:
+ enum State {
+ STATE_INIT_CONNECTION,
+ STATE_INIT_CONNECTION_COMPLETE,
+ STATE_SEND_REQUEST,
+ STATE_SEND_REQUEST_COMPLETE,
+ STATE_READ_HEADERS,
+ STATE_READ_HEADERS_COMPLETE,
+ STATE_READ_BODY,
+ STATE_READ_BODY_COMPLETE,
+ STATE_NONE
+ };
+
+ // Used to callback an HttpTransaction call.
+ void DoHttpTransactionCallback(int result);
+
+ // Runs the state transition loop.
+ int DoLoop(int result);
+
+ // Each of these methods corresponds to a State value. Those with an input
+ // argument receive the result from the previous state. If a method returns
+ // ERR_IO_PENDING, then the result from OnIOComplete will be passed to the
+ // next state method as the result arg.
+ int DoInitConnection();
+ int DoInitConnectionComplete(int result);
+ int DoSendRequest();
+ int DoSendRequestComplete(int result);
+ int DoReadHeaders();
+ int DoReadHeadersComplete(int result);
+ int DoReadBody();
+ int DoReadBodyComplete(int result);
+
+ // The Flip request id for this request.
+ int flip_request_id_;
+
+ // The Flip session servicing this request. If NULL, the request has not
+ // started.
+ scoped_refptr<FlipSession> flip_;
+
+ CompletionCallback* user_callback_;
+ scoped_refptr<IOBuffer> user_buffer_;
+ int user_buffer_bytes_remaining_;
+
+ scoped_refptr<HttpNetworkSession> session_;
+
+ const HttpRequestInfo* request_;
+ HttpResponseInfo response_;
+
+ scoped_ptr<UploadDataStream> request_body_stream_;
+
+ // We buffer the response body as it arrives asynchronously from the stream.
+ // TODO(mbelshe): is this infinite buffering?
+ std::deque<scoped_refptr<IOBufferWithSize> > response_body_;
+ bool response_complete_;
+ // Since we buffer the response, we also buffer the response status.
+ // Not valid until response_complete_ is true.
+ int response_status_;
+
+ // The time the Start method was called.
+ base::Time start_time_;
+
+ // The next state in the state machine.
+ State next_state_;
+};
+
+} // namespace net
+
+#endif // NET_HTTP_NETWORK_TRANSACTION_H_
+
diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc
new file mode 100644
index 0000000..29db36c
--- /dev/null
+++ b/net/flip/flip_network_transaction_unittest.cc
@@ -0,0 +1,190 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <math.h> // ceil
+
+#include "base/compiler_specific.h"
+#include "net/base/completion_callback.h"
+#include "net/base/mock_host_resolver.h"
+#include "net/base/ssl_config_service_defaults.h"
+#include "net/base/ssl_info.h"
+#include "net/base/test_completion_callback.h"
+#include "net/base/upload_data.h"
+#include "net/flip/flip_network_transaction.h"
+#include "net/flip/flip_protocol.h"
+#include "net/http/http_auth_handler_ntlm.h"
+#include "net/http/http_network_session.h"
+#include "net/http/http_network_transaction.h"
+#include "net/http/http_transaction_unittest.h"
+#include "net/proxy/proxy_config_service_fixed.h"
+#include "net/socket/client_socket_factory.h"
+#include "net/socket/socket_test_util.h"
+#include "net/socket/ssl_client_socket.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "testing/platform_test.h"
+
+//-----------------------------------------------------------------------------
+
+namespace flip {
+
+using namespace net;
+
+// Create a proxy service which fails on all requests (falls back to direct).
+ProxyService* CreateNullProxyService() {
+ return ProxyService::CreateNull();
+}
+
+// Helper to manage the lifetimes of the dependencies for a
+// HttpNetworkTransaction.
+class SessionDependencies {
+ public:
+ // Default set of dependencies -- "null" proxy service.
+ SessionDependencies()
+ : host_resolver(new MockHostResolver),
+ proxy_service(CreateNullProxyService()),
+ ssl_config_service(new SSLConfigServiceDefaults) {}
+
+ // Custom proxy service dependency.
+ explicit SessionDependencies(ProxyService* proxy_service)
+ : host_resolver(new MockHostResolver),
+ proxy_service(proxy_service),
+ ssl_config_service(new SSLConfigServiceDefaults) {}
+
+ scoped_refptr<MockHostResolverBase> host_resolver;
+ scoped_refptr<ProxyService> proxy_service;
+ scoped_refptr<SSLConfigService> ssl_config_service;
+ MockClientSocketFactory socket_factory;
+};
+
+ProxyService* CreateFixedProxyService(const std::string& proxy) {
+ net::ProxyConfig proxy_config;
+ proxy_config.proxy_rules.ParseFromString(proxy);
+ return ProxyService::CreateFixed(proxy_config);
+}
+
+
+HttpNetworkSession* CreateSession(SessionDependencies* session_deps) {
+ return new HttpNetworkSession(session_deps->host_resolver,
+ session_deps->proxy_service,
+ &session_deps->socket_factory,
+ session_deps->ssl_config_service);
+}
+
+class FlipNetworkTransactionTest : public PlatformTest {
+ public:
+ virtual void SetUp() {
+ // Disable compression on this test.
+ FlipFramer::set_enable_compression_default(false);
+ }
+
+ virtual void TearDown() {
+ // Empty the current queue.
+ MessageLoop::current()->RunAllPending();
+ PlatformTest::TearDown();
+ }
+
+ protected:
+ void KeepAliveConnectionResendRequestTest(const MockRead& read_failure);
+
+ struct SimpleGetHelperResult {
+ int rv;
+ std::string status_line;
+ std::string response_data;
+ };
+
+ SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[]) {
+ SimpleGetHelperResult out;
+
+ SessionDependencies session_deps;
+ scoped_ptr<FlipNetworkTransaction> trans(
+ new FlipNetworkTransaction(
+ CreateSession(&session_deps)));
+
+ HttpRequestInfo request;
+ request.method = "GET";
+ request.url = GURL("http://www.google.com/");
+ request.load_flags = 0;
+
+ StaticMockSocket data(data_reads, NULL);
+ session_deps.socket_factory.AddMockSocket(&data);
+
+ TestCompletionCallback callback;
+
+ int rv = trans->Start(&request, &callback, NULL);
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ out.rv = callback.WaitForResult();
+ if (out.rv != OK)
+ return out;
+
+ const HttpResponseInfo* response = trans->GetResponseInfo();
+ EXPECT_TRUE(response != NULL);
+
+ EXPECT_TRUE(response->headers != NULL);
+ out.status_line = response->headers->GetStatusLine();
+
+ rv = ReadTransaction(trans.get(), &out.response_data);
+ EXPECT_EQ(OK, rv);
+
+ return out;
+ }
+
+ void ConnectStatusHelperWithExpectedStatus(const MockRead& status,
+ int expected_status);
+
+ void ConnectStatusHelper(const MockRead& status);
+};
+
+//-----------------------------------------------------------------------------
+
+// Verify FlipNetworkTransaction constructor.
+TEST_F(FlipNetworkTransactionTest, Constructor) {
+ SessionDependencies session_deps;
+ scoped_refptr<net::HttpNetworkSession> session =
+ CreateSession(&session_deps);
+ scoped_ptr<HttpTransaction> trans(new FlipNetworkTransaction(session));
+}
+
+TEST_F(FlipNetworkTransactionTest, Connect) {
+ unsigned char syn_reply[] = {
+ 0x80, 0x01, 0x00, 0x02, // header
+ 0x00, 0x00, 0x00, 0x45,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x04, // 4 headers
+ 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', // "hello"
+ 0x00, 0x03, 'b', 'y', 'e', // "bye"
+ 0x00, 0x06, 's', 't', 'a', 't', 'u', 's', // "status"
+ 0x00, 0x03, '2', '0', '0', // "200"
+ 0x00, 0x03, 'u', 'r', 'l', // "url"
+ 0x00, 0x0a, '/', 'i', 'n', 'd', 'e', 'x', '.', 'p', 'h', 'p', // "HTTP/1.1"
+ 0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version"
+ 0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1"
+ };
+ unsigned char body_frame[] = {
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x00, 0x00, 0x00, 0x06,
+ 'h', 'e', 'l', 'l', 'o', '!', // "hello"
+ };
+ unsigned char fin_frame[] = {
+ 0x80, 0x01, 0x00, 0x03, // header
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+ };
+
+ MockRead data_reads[] = {
+ MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)),
+ MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)),
+ MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)),
+ MockRead(true, 0, 0), // EOF
+ };
+
+ SimpleGetHelperResult out = SimpleGetHelper(data_reads);
+ EXPECT_EQ(OK, out.rv);
+ EXPECT_EQ("HTTP/1.1 200 OK", out.status_line);
+ EXPECT_EQ("hello!", out.response_data);
+}
+
+} // namespace net
+
diff --git a/net/flip/flip_protocol.h b/net/flip/flip_protocol.h
new file mode 100644
index 0000000..93c8a5a
--- /dev/null
+++ b/net/flip/flip_protocol.h
@@ -0,0 +1,203 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// This file contains some protocol structures for use with Flip.
+
+#ifndef NET_FLIP_FLIP_PROTOCOL_H_
+#define NET_FLIP_FLIP_PROTOCOL_H_
+
+#ifdef WIN32
+#include <winsock2.h>
+#else
+#include <arpa/inet.h>
+#endif
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "flip_bitmasks.h" // cross-google3 directory naming.
+
+
+// Stream Frame Format
+// +-------------------------------+
+// |0| Stream-ID (31bits) |
+// +-------------------------------+
+// |flags (16)| Length (16bits) |
+// +-------------------------------+
+// | Data |
+// +-------------------------------+
+//
+
+// Control Frame Format
+// +-------------------------------+
+// |1| Version(15bits)|Type(16bits)|
+// +-------------------------------+
+// | Length (32bits) |
+// +-------------------------------+
+// | Data |
+// +-------------------------------+
+//
+
+
+// Control Frame: SYN_STREAM
+// +----------------------------------+
+// |1|000000000000001|0000000000000001|
+// +----------------------------------+
+// | Length (32bits) | >= 8
+// +----------------------------------+
+// |X| Stream-ID(31bits) |
+// +----------------------------------+
+// |Pri| unused | Length (16bits)|
+// +----------------------------------+
+//
+// Control Frame: SYN_REPLY
+// +----------------------------------+
+// |1|000000000000001|0000000000000010|
+// +----------------------------------+
+// | Length (32bits) | >= 8
+// +----------------------------------+
+// |X| Stream-ID(31bits) |
+// +----------------------------------+
+// | unused (16 bits)| Length (16bits)|
+// +----------------------------------+
+//
+// Control Frame: FIN_STREAM
+// +----------------------------------+
+// |1|000000000000001|0000000000000011|
+// +----------------------------------+
+// | Length (32bits) | >= 4
+// +----------------------------------+
+// |X| Stream-ID(31bits) |
+// +----------------------------------+
+// | Status (32 bits) |
+// +----------------------------------+
+//
+// Control Frame: SetMaxStreams
+// +----------------------------------+
+// |1|000000000000001|0000000000000100|
+// +----------------------------------+
+// | Length (32bits) | >= 4
+// +----------------------------------+
+// |X| Stream-ID(31bits) |
+// +----------------------------------+
+
+// TODO(fenix): add ChangePriority support.
+
+namespace flip {
+
+// Note: all protocol data structures are on-the-wire format. That means that
+// data is stored in network-normalized order. Readers must use the
+// accessors provided or call ntohX() functions.
+
+// Types of Flip Control Frames.
+typedef enum {
+ SYN_STREAM = 1,
+ SYN_REPLY,
+ FIN_STREAM,
+ NOOP
+} FlipControlType;
+
+// Flags on data packets
+typedef enum {
+ DATA_FLAG_COMPRESSED = 1
+} FlipDataFlags;
+
+// A FLIP stream id is a 31 bit entity.
+typedef uint32 FlipStreamId;
+
+// FLIP Priorities. (there are only 2 bits)
+#define FLIP_PRIORITY_LOWEST 3
+#define FLIP_PRIORITY_HIGHEST 0
+
+// All Flip Frame types derive from the FlipFrame struct.
+typedef struct {
+ uint32 length() const { return ntohs(data_.length_); }
+ void set_length(uint16 length) { data_.length_ = htons(length); }
+ bool is_control_frame() const {
+ return (ntohs(control_.version_) & kControlFlagMask) == kControlFlagMask;
+ }
+
+ protected:
+ union {
+ struct {
+ uint16 version_;
+ uint16 type_;
+ uint32 length_;
+ } control_;
+ struct {
+ FlipStreamId stream_id_;
+ uint16 flags_;
+ uint16 length_;
+ } data_;
+ };
+} FlipFrame;
+
+// A Data Frame.
+typedef struct : public FlipFrame {
+ FlipStreamId stream_id() const {
+ return ntohl(data_.stream_id_) & kStreamIdMask;
+ }
+ void set_stream_id(FlipStreamId id) { data_.stream_id_ = htonl(id); }
+ uint16 flags() const { return ntohs(data_.flags_); }
+ void set_flags(uint16 flags) { data_.flags_ = htons(flags); }
+} FlipDataFrame;
+
+// A Control Frame.
+typedef struct : public FlipFrame {
+ uint16 version() const {
+ const int kVersionMask = 0x7fff;
+ return ntohs(control_.version_) & kVersionMask;
+ }
+ FlipControlType type() const {
+ uint16 type = ntohs(control_.type_);
+ DCHECK(type >= SYN_STREAM && type <= NOOP);
+ return static_cast<FlipControlType>(type);
+ }
+ FlipStreamId stream_id() const { return ntohl(stream_id_) & kStreamIdMask; }
+
+ private:
+ FlipStreamId stream_id_;
+} FlipControlFrame;
+
+// A SYN_STREAM frame.
+typedef struct FlipSynStreamControlFrame : public FlipControlFrame {
+ uint8 priority() const { return (priority_ & kPriorityMask) >> 6; }
+ int header_block_len() const { return length() - kHeaderBlockOffset; }
+ const char* header_block() const {
+ return reinterpret_cast<const char*>(this) +
+ sizeof(FlipFrame) + kHeaderBlockOffset;
+ }
+ private:
+ // Offset from the end of the FlipControlFrame to the FlipHeaderBlock.
+ static const int kHeaderBlockOffset = 6;
+ uint8 priority_;
+ uint8 unused_;
+ // Variable FlipHeaderBlock here.
+} FlipSynStreamControlFrame;
+
+// A SYN_REPLY frame.
+typedef struct FlipSynReplyControlFrame : public FlipControlFrame {
+ int header_block_len() const { return length() - kHeaderBlockOffset; }
+ const char* header_block() const {
+ return reinterpret_cast<const char*>(this) +
+ sizeof(FlipFrame) + kHeaderBlockOffset;
+ }
+
+ private:
+ // Offset from the end of the FlipControlFrame to the FlipHeaderBlock.
+ static const int kHeaderBlockOffset = 6;
+ uint16 unused_;
+ // Variable FlipHeaderBlock here.
+} FlipSynReplyControlFrame;
+
+// A FIN_STREAM frame.
+typedef struct FlipFinStreamControlFrame : public FlipControlFrame {
+ uint32 status() const { return ntohl(status_); }
+ void set_status(int id) { status_ = htonl(id); }
+ private:
+ uint32 status_;
+} FlipFinStreamControlFrame;
+
+} // namespace flip
+
+#endif // NET_FLIP_FLIP_PROTOCOL_H_
+
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
new file mode 100644
index 0000000..3d63ef9
--- /dev/null
+++ b/net/flip/flip_session.cc
@@ -0,0 +1,762 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/flip/flip_session.h"
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/rand_util.h"
+#include "base/stats_counters.h"
+#include "base/string_util.h"
+#include "net/base/load_flags.h"
+#include "net/flip/flip_frame_builder.h"
+#include "net/flip/flip_protocol.h"
+#include "net/http/http_network_session.h"
+#include "net/http/http_request_info.h"
+#include "net/http/http_response_headers.h"
+#include "net/http/http_response_info.h"
+#include "net/tools/dump_cache/url_to_filename_encoder.h"
+
+namespace net {
+
+// static
+scoped_ptr<FlipSessionPool> FlipSession::session_pool_;
+// static
+bool FlipSession::disable_compression_ = false;
+// static
+int PrioritizedIOBuffer::order_ = 0;
+
+// The FlipStreamImpl is an internal class representing an active FlipStream.
+class FlipStreamImpl {
+ public:
+ // A FlipStreamImpl represents an active FlipStream.
+ // If |delegate| is NULL, then we're receiving pushed data from the server.
+ FlipStreamImpl(flip::FlipStreamId stream_id, FlipDelegate* delegate)
+ : stream_id_(stream_id),
+ delegate_(delegate),
+ response_(NULL),
+ data_complete_(false) {}
+ virtual ~FlipStreamImpl() {
+ LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_;
+ };
+
+ flip::FlipStreamId stream_id() const { return stream_id_; }
+ FlipDelegate* delegate() const { return delegate_; }
+
+ // For pushed streams, we track a path to identify them.
+ std::string path() const { return path_; }
+ void set_path(const std::string& path) { path_ = path; }
+
+ // Attach a delegate to a previously pushed data stream.
+ // Returns true if the caller should Deactivate and delete the FlipStreamImpl
+ // after this call.
+ bool AttachDelegate(FlipDelegate* delegate);
+
+ // Called when a SYN_REPLY is received on the stream.
+ void OnReply(const flip::FlipHeaderBlock* headers);
+
+ // Called when data is received on the stream.
+ // Returns true if the caller should Deactivate and delete the FlipStreamImpl
+ // after this call.
+ bool OnData(const char* data, int length);
+
+ // Called if the stream is prematurely aborted.
+ // The caller should Deactivate and delete the FlipStreamImpl after this
+ // call.
+ void OnAbort();
+
+ private:
+ flip::FlipStreamId stream_id_;
+ std::string path_;
+ FlipDelegate* delegate_;
+ scoped_ptr<HttpResponseInfo> response_;
+ std::list<scoped_refptr<IOBufferWithSize>> response_body_;
+ bool data_complete_;
+};
+
+FlipSession* FlipSession::GetFlipSession(
+ const net::HostResolver::RequestInfo& info,
+ HttpNetworkSession* session) {
+ if (!session_pool_.get())
+ session_pool_.reset(new FlipSessionPool());
+ return session_pool_->Get(info, session);
+}
+
+FlipSession::FlipSession(std::string host, HttpNetworkSession* session)
+ : ALLOW_THIS_IN_INITIALIZER_LIST(
+ connect_callback_(this, &FlipSession::OnSocketConnect)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &FlipSession::OnReadComplete)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_callback_(this, &FlipSession::OnWriteComplete)),
+ domain_(host),
+ session_(session),
+ connection_started_(false),
+ delayed_write_pending_(false),
+ write_pending_(false),
+ read_buffer_(new net::MovableIOBuffer(kReadBufferSize)),
+ read_buffer_bytes_read_(0),
+ read_pending_(false),
+ stream_hi_water_mark_(0) {
+ // Always start at 1 for the first stream id.
+ // TODO(mbelshe): consider randomization.
+ stream_hi_water_mark_ = 1;
+
+ flip_framer_.set_visitor(this);
+}
+
+FlipSession::~FlipSession() {
+ if (connection_.is_initialized()) {
+ // With Flip we can't recycle sockets.
+ connection_.socket()->Disconnect();
+ }
+ if (session_pool_.get())
+ session_pool_->Remove(this);
+
+ // TODO(mbelshe) - clear out streams (active and pushed) here?
+}
+
+net::Error FlipSession::Connect(const std::string& group_name,
+ const HostResolver::RequestInfo& host,
+ int priority) {
+ DCHECK(priority >= 0 && priority < 4);
+
+ // If the connect process is started, let the caller continue.
+ if (connection_started_)
+ return net::OK;
+
+ connection_started_ = true;
+
+ static StatsCounter flip_sessions("flip.sessions");
+ flip_sessions.Increment();
+
+ int rv = connection_.Init(group_name, host, priority, &connect_callback_,
+ session_->tcp_socket_pool(), NULL);
+
+ // If the connect is pending, we still return ok. The APIs enqueue
+ // work until after the connect completes asynchronously later.
+ if (rv == net::ERR_IO_PENDING)
+ return net::OK;
+ return static_cast<net::Error>(rv);
+}
+
+
+// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
+// a HttpRequestInfo block.
+void CreateFlipHeadersFromHttpRequest(
+ const HttpRequestInfo* info, flip::FlipHeaderBlock* headers) {
+ static const std::string kHttpProtocolVersion("HTTP/1.1");
+
+ HttpUtil::HeadersIterator it(info->extra_headers.begin(),
+ info->extra_headers.end(),
+ "\r\n");
+ while (it.GetNext()) {
+ std::string name = StringToLowerASCII(it.name());
+ (*headers)[name] = it.values();
+ }
+
+#define REWRITE_URLS
+#ifdef REWRITE_URLS
+ // TODO(mbelshe): Figure out how to remove this. This is just for hooking
+ // up to a test server.
+ // For testing content on our test server, we modify the URL.
+ GURL url = info->url;
+ FilePath path = UrlToFilenameEncoder::Encode(url.spec(), FilePath());
+ std::string hack_url = "/" + WideToASCII(path.value());
+
+ // switch backslashes. HACK
+ std::string::size_type pos(0);
+ while ((pos = hack_url.find('\\', pos)) != std::string::npos) {
+ hack_url.replace(pos, 1, "/");
+ pos += 1;
+ }
+#endif
+
+ (*headers)["method"] = info->method;
+ // (*headers)["url"] = info->url.PathForRequest();
+ (*headers)["url"] = hack_url;
+ (*headers)["version"] = kHttpProtocolVersion;
+ if (info->user_agent.length())
+ (*headers)["user-agent"] = info->user_agent;
+ if (!info->referrer.is_empty())
+ (*headers)["referer"] = info->referrer.spec();
+
+ // Honor load flags that impact proxy caches.
+ if (info->load_flags & LOAD_BYPASS_CACHE) {
+ (*headers)["pragma"] = "no-cache";
+ (*headers)["cache-control"] = "no-cache";
+ } else if (info->load_flags & LOAD_VALIDATE_CACHE) {
+ (*headers)["cache-control"] = "max-age=0";
+ }
+}
+
+int FlipSession::CreateStream(FlipDelegate* delegate) {
+ flip::FlipStreamId stream_id = GetNewStreamId();
+
+ GURL url = delegate->request()->url;
+ std::string path = url.PathForRequest();
+
+ FlipStreamImpl* stream = NULL;
+
+ // Check if we have a push stream for this path.
+ if (delegate->request()->method == "GET") {
+ stream = GetPushStream(path);
+ if (stream) {
+ if (stream->AttachDelegate(delegate)) {
+ DeactivateStream(stream->stream_id());
+ delete stream;
+ return 0;
+ }
+ return stream->stream_id();
+ }
+ }
+
+ // If we still don't have a stream, activate one now.
+ stream = ActivateStream(stream_id, delegate);
+ if (!stream)
+ return net::ERR_FAILED;
+
+ LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for "
+ << delegate->request()->url;
+
+ // TODO(mbelshe): Optimize memory allocations
+ int priority = delegate->request()->priority;
+
+ // Hack for the priorities
+ // TODO(mbelshe): These need to be plumbed through the Http Network Stack.
+ if (path.find(".css") != path.npos) {
+ priority = 1;
+ } else if (path.find(".html") != path.npos) {
+ priority = 0;
+ } else if (path.find(".js") != path.npos) {
+ priority = 1;
+ } else {
+ priority = 3;
+ }
+
+ DCHECK(priority >= FLIP_PRIORITY_HIGHEST &&
+ priority <= FLIP_PRIORITY_LOWEST);
+
+ // Convert from HttpRequestHeaders to Flip Headers.
+ flip::FlipHeaderBlock headers;
+ CreateFlipHeadersFromHttpRequest(delegate->request(), &headers);
+
+ // Create a SYN_STREAM packet and add to the output queue.
+ flip::FlipSynStreamControlFrame* syn_frame =
+ flip_framer_.CreateSynStream(stream_id, priority, false, &headers);
+ int length = sizeof(flip::FlipFrame) + syn_frame->length();
+ IOBufferWithSize* buffer =
+ new IOBufferWithSize(length);
+ memcpy(buffer->data(), syn_frame, length);
+ delete syn_frame;
+ queue_.push(PrioritizedIOBuffer(buffer, priority));
+
+ static StatsCounter flip_requests("flip.requests");
+ flip_requests.Increment();
+
+ LOG(INFO) << "FETCHING: " << delegate->request()->url.spec();
+
+
+ // TODO(mbelshe): Implement POST Data here
+
+ // Schedule to write to the socket after we've made it back
+ // to the message loop so that we can aggregate multiple
+ // requests.
+ // TODO(mbelshe): Should we do the "first" request immediately?
+ // maybe we should only 'do later' for subsequent
+ // requests.
+ WriteSocketLater();
+
+ return stream_id;
+}
+
+bool FlipSession::CancelStream(int id) {
+ LOG(INFO) << "Cancelling stream " << id;
+ if (!IsStreamActive(id))
+ return false;
+ DeactivateStream(id);
+ return true;
+}
+
+bool FlipSession::IsStreamActive(int id) {
+ return active_streams_.find(id) != active_streams_.end();
+}
+
+LoadState FlipSession::GetLoadState() const {
+ // TODO(mbelshe): needs more work
+ return LOAD_STATE_CONNECTING;
+}
+
+void FlipSession::OnSocketConnect(int result) {
+ LOG(INFO) << "Flip socket connected (result=" << result << ")";
+
+ if (result != net::OK) {
+ net::Error err = static_cast<net::Error>(result);
+ CloseAllStreams(err);
+ this->Release();
+ return;
+ }
+
+ // Adjust socket buffer sizes.
+ // FLIP uses one socket, and we want a really big buffer.
+ // This greatly helps on links with packet loss - we can even
+ // outperform Vista's dynamic window sizing algorithm.
+ // TODO(mbelshe): more study.
+ const int kSocketBufferSize = 512 * 1024;
+ connection_.socket()->SetReceiveBufferSize(kSocketBufferSize);
+ connection_.socket()->SetSendBufferSize(kSocketBufferSize);
+
+ // After we've connected, send any data to the server, and then issue
+ // our read.
+ WriteSocketLater();
+ ReadSocket();
+}
+
+void FlipSession::OnReadComplete(int bytes_read) {
+ // Parse a frame. For now this code requires that the frame fit into our
+ // buffer (32KB).
+ // TODO(mbelshe): support arbitrarily large frames!
+
+ LOG(INFO) << "Flip socket read: " << bytes_read << " bytes";
+
+ read_pending_ = false;
+
+ if (bytes_read <= 0) {
+ // Session is tearing down.
+ net::Error err = static_cast<net::Error>(bytes_read);
+ CloseAllStreams(err);
+ this->Release();
+ return;
+ }
+
+ char *data = read_buffer_->data();
+ while (bytes_read &&
+ flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) {
+ uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read);
+ bytes_read -= bytes_processed;
+ data += bytes_processed;
+ if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE)
+ flip_framer_.Reset();
+ }
+ // NOTE(mbelshe): Could cause circular callbacks. (when ReadSocket
+ // completes synchronously, calling OnReadComplete, etc). Should
+ // probably return to the message loop.
+ ReadSocket();
+}
+
+void FlipSession::OnWriteComplete(int result) {
+ DCHECK(write_pending_);
+ write_pending_ = false;
+
+ LOG(INFO) << "Flip write complete (result=" << result << ")";
+
+ // Cleanup the write which just completed.
+ in_flight_write_.release();
+
+ // Write more data. We're already in a continuation, so we can
+ // go ahead and write it immediately (without going back to the
+ // message loop).
+ WriteSocketLater();
+}
+
+void FlipSession::ReadSocket() {
+ if (read_pending_)
+ return;
+
+ size_t max_bytes = kReadBufferSize;
+ read_buffer_->set_data(read_buffer_bytes_read_);
+ max_bytes -= read_buffer_bytes_read_;
+ int bytes_read = connection_.socket()->Read(read_buffer_.get(), max_bytes,
+ &read_callback_);
+ switch (bytes_read) {
+ case 0:
+ // Socket is closed!
+ // TODO(mbelshe): Need to abort any active streams here.
+ DCHECK(!active_streams_.size());
+ return;
+ case net::ERR_IO_PENDING:
+ // Waiting for data. Nothing to do now.
+ read_pending_ = true;
+ return;
+ default:
+ // Data was read, process it.
+ // TODO(mbelshe): check that we can't get a recursive stack?
+ OnReadComplete(bytes_read);
+ break;
+ }
+}
+
+void FlipSession::WriteSocketLater() {
+ if (delayed_write_pending_)
+ return;
+
+ delayed_write_pending_ = true;
+ MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &net::FlipSession::WriteSocket));
+}
+
+void FlipSession::WriteSocket() {
+ // This function should only be called via WriteSocketLater.
+ DCHECK(delayed_write_pending_);
+ delayed_write_pending_ = false;
+
+ // If the socket isn't connected yet, just wait; we'll get called
+ // again when the socket connection completes.
+ if (!connection_.is_initialized())
+ return;
+
+ if (write_pending_) // Another write is in progress still.
+ return;
+
+ while (queue_.size()) {
+ const int kMaxSegmentSize = 1430;
+ const int kMaxPayload = 4 * kMaxSegmentSize;
+ int max_size = std::max(kMaxPayload, queue_.top().size());
+
+ int bytes = 0;
+ // If we have multiple IOs to do, accumulate up to 4 MSS's worth of data
+ // and send them in batch.
+ IOBufferWithSize* buffer = new IOBufferWithSize(max_size);
+ while (queue_.size() && bytes < max_size) {
+ PrioritizedIOBuffer next_buffer = queue_.top();
+
+ // Now that we're outputting the frame, we can finally compress it.
+ flip::FlipFrame* uncompressed_frame =
+ reinterpret_cast<flip::FlipFrame*>(next_buffer.buffer()->data());
+ scoped_array<flip::FlipFrame> compressed_frame(
+ flip_framer_.CompressFrame(uncompressed_frame));
+ int size = compressed_frame.get()->length() + sizeof(flip::FlipFrame);
+ if (bytes + size > kMaxPayload)
+ break;
+ memcpy(buffer->data() + bytes, compressed_frame.get(), size);
+ bytes += size;
+ queue_.pop();
+ next_buffer.release();
+ }
+ DCHECK(bytes > 0);
+ in_flight_write_ = PrioritizedIOBuffer(buffer, 0);
+
+ int rv = connection_.socket()->Write(in_flight_write_.buffer(),
+ bytes, &write_callback_);
+ if (rv == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ break;
+ }
+ LOG(INFO) << "FLIPSession wrote " << rv << " bytes.";
+ in_flight_write_.release();
+ }
+}
+
+void FlipSession::CloseAllStreams(net::Error code) {
+ LOG(INFO) << "Closing all FLIP Streams";
+
+ static StatsCounter abandoned_streams("flip.abandoned_streams");
+ static StatsCounter abandoned_push_streams("flip.abandoned_push_streams");
+
+ if (active_streams_.size()) {
+ abandoned_streams.Add(active_streams_.size());
+
+ // Create a copy of the list, since aborting streams can invalidate
+ // our list.
+ FlipStreamImpl** list = new FlipStreamImpl*[active_streams_.size()];
+ ActiveStreamMap::const_iterator it;
+ int index = 0;
+ for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
+ list[index++] = it->second;
+
+ // Issue the aborts.
+ for (--index; index >= 0; index--) {
+ list[index]->OnAbort();
+ delete list[index];
+ }
+
+ // Clear out anything pending.
+ active_streams_.clear();
+
+ delete list;
+ }
+
+ if (pushed_streams_.size()) {
+ abandoned_push_streams.Add(pushed_streams_.size());
+ pushed_streams_.clear();
+ }
+}
+
+int FlipSession::GetNewStreamId() {
+ int id = stream_hi_water_mark_;
+ stream_hi_water_mark_ += 2;
+ if (stream_hi_water_mark_ > 0x7fff)
+ stream_hi_water_mark_ = 1;
+ return id;
+}
+
+FlipStreamImpl* FlipSession::ActivateStream(int id, FlipDelegate* delegate) {
+ DCHECK(!IsStreamActive(id));
+
+ FlipStreamImpl* stream = new FlipStreamImpl(id, delegate);
+ active_streams_[id] = stream;
+ return stream;
+}
+
+void FlipSession::DeactivateStream(int id) {
+ DCHECK(IsStreamActive(id));
+
+ // Verify it is not on the pushed_streams_ list.
+ ActiveStreamList::iterator it;
+ for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
+ FlipStreamImpl* impl = *it;
+ if (id == impl->stream_id()) {
+ pushed_streams_.erase(it);
+ break;
+ }
+ }
+
+ active_streams_.erase(id);
+}
+
+FlipStreamImpl* FlipSession::GetPushStream(std::string path) {
+ static StatsCounter used_push_streams("flip.claimed_push_streams");
+
+ LOG(INFO) << "Looking for push stream: " << path;
+
+ // We just walk a linear list here.
+ ActiveStreamList::iterator it;
+ for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
+ FlipStreamImpl* impl = *it;
+ if (path == impl->path()) {
+ pushed_streams_.erase(it);
+ used_push_streams.Increment();
+ LOG(INFO) << "Push Stream Claim for: " << path;
+ return impl;
+ }
+ }
+ return NULL;
+}
+
+void FlipSession::OnError(flip::FlipFramer* framer) {
+ LOG(ERROR) << "FlipSession error!";
+ CloseAllStreams(net::ERR_UNEXPECTED);
+ this->Release();
+}
+
+void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
+ const char* data,
+ uint32 len) {
+ LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes";
+ bool valid_stream = IsStreamActive(stream_id);
+ if (!valid_stream) {
+ LOG(WARNING) << "Received data frame for invalid stream" << stream_id;
+ return;
+ }
+ if (!len)
+ return; // This was just an empty data packet.
+ FlipStreamImpl* stream = active_streams_[stream_id];
+ if (stream->OnData(data, len)) {
+ DeactivateStream(stream->stream_id());
+ delete stream;
+ }
+}
+
+void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
+ const flip::FlipHeaderBlock* headers) {
+ flip::FlipStreamId stream_id = frame->stream_id();
+
+ // Server-initiated streams should have odd sequence numbers.
+ if ((stream_id & 0x1) != 0) {
+ LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
+ return;
+ }
+
+ if (IsStreamActive(stream_id)) {
+ LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
+ return;
+ }
+
+ FlipStreamImpl* stream = ActivateStream(stream_id, NULL);
+ stream->OnReply(headers);
+
+ // Verify that the response had a URL for us.
+ DCHECK(stream->path().length() != 0);
+ if (stream->path().length() == 0) {
+ LOG(WARNING) << "Pushed stream did not contain a path.";
+ DeactivateStream(stream_id);
+ delete stream;
+ return;
+ }
+ pushed_streams_.push_back(stream);
+
+ LOG(INFO) << "Got pushed stream for " << stream->path();
+
+ static StatsCounter push_requests("flip.pushed_streams");
+ push_requests.Increment();
+}
+
+void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
+ const flip::FlipHeaderBlock* headers) {
+ DCHECK(headers);
+ flip::FlipStreamId stream_id = frame->stream_id();
+ bool valid_stream = IsStreamActive(stream_id);
+ if (!valid_stream) {
+ LOG(WARNING) << "Received SYN_REPLY for invalid stream" << stream_id;
+ return;
+ }
+
+ FlipStreamImpl* stream = active_streams_[stream_id];
+ stream->OnReply(headers);
+}
+
+void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
+ flip::FlipHeaderBlock headers;
+ bool parsed_headers = false;
+ uint32 type = frame->type();
+ if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) {
+ if (!flip_framer_.ParseHeaderBlock(
+ reinterpret_cast<const flip::FlipFrame*>(frame), &headers)) {
+ LOG(WARNING) << "Could not parse Flip Control Frame Header";
+ return;
+ }
+ }
+
+ switch (type) {
+ case flip::SYN_STREAM:
+ LOG(INFO) << "Flip SynStream for stream " << frame->stream_id();
+ OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame),
+ &headers);
+ break;
+ case flip::SYN_REPLY:
+ LOG(INFO) << "Flip SynReply for stream " << frame->stream_id();
+ OnSynReply(
+ reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame),
+ &headers);
+ break;
+ case flip::FIN_STREAM:
+ LOG(INFO) << "Flip Fin for stream " << frame->stream_id();
+ OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame));
+ break;
+ default:
+ DCHECK(false); // Error!
+ }
+}
+
+void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
+ flip::FlipStreamId stream_id = frame->stream_id();
+ bool valid_stream = IsStreamActive(stream_id);
+ if (!valid_stream) {
+ LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
+ return;
+ }
+ FlipStreamImpl* stream = active_streams_[stream_id];
+ // TODO(mbelshe) - status codes are HTTP codes?
+ // Shouldn't it be zero/nonzero?
+ // For now Roberto is sending 200, so use that as "ok".
+ bool cleanup_stream = false;
+ if (frame->status() == 200 || frame->status() == 0) {
+ cleanup_stream = stream->OnData(NULL, 0);
+ } else {
+ stream->OnAbort();
+ cleanup_stream = true;
+ }
+
+ if (cleanup_stream) {
+ DeactivateStream(stream_id);
+ delete stream;
+ }
+}
+
+void FlipSession::OnLameDuck() {
+ NOTIMPLEMENTED();
+}
+
+bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) {
+ DCHECK(delegate_ == NULL); // Don't attach if already attached.
+ DCHECK(path_.length() > 0); // Path needs to be set for push streams.
+ delegate_ = delegate;
+
+ // If there is pending data, send it up here.
+
+ // Check for the OnReply, and pass it up.
+ if (response_.get())
+ delegate_->OnResponseReceived(response_.get());
+
+ // Pass data up
+ while (response_body_.size()) {
+ scoped_refptr<IOBufferWithSize> buffer = response_body_.front();
+ response_body_.pop_front();
+ delegate_->OnDataReceived(buffer->data(), buffer->size());
+ }
+
+ // Finally send up the end-of-stream.
+ if (data_complete_) {
+ delegate_->OnClose(net::OK);
+ return true; // tell the caller to shut us down
+ }
+ return false;
+}
+
+void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
+ DCHECK(headers);
+ DCHECK(headers->find("version") != headers->end());
+ DCHECK(headers->find("status") != headers->end());
+
+ // TODO(mbelshe): if no version or status is found, we need to error
+ // out the stream.
+
+ // Server initiated streams must send a URL to us in the headers.
+ if (headers->find("path") != headers->end())
+ path_ = headers->find("path")->second;
+
+ // TODO(mbelshe): For now we convert from our nice hash map back
+ // to a string of headers; this is because the HttpResponseInfo
+ // is a bit rigid for its http (non-flip) design.
+ std::string raw_headers(headers->find("version")->second);
+ raw_headers.append(" ", 1);
+ raw_headers.append(headers->find("status")->second);
+ raw_headers.append("\0", 1);
+ flip::FlipHeaderBlock::const_iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ raw_headers.append(it->first);
+ raw_headers.append(":", 1);
+ raw_headers.append(it->second);
+ raw_headers.append("\0", 1);
+ }
+
+ LOG(INFO) << "FlipStream: SynReply received for " << stream_id_;
+
+ DCHECK(response_ == NULL);
+ response_.reset(new HttpResponseInfo());
+ response_->headers = new HttpResponseHeaders(raw_headers);
+ if (delegate_)
+ delegate_->OnResponseReceived(response_.get());
+}
+
+bool FlipStreamImpl::OnData(const char* data, int length) {
+ LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
+ << stream_id_;
+ if (length) {
+ if (delegate_) {
+ delegate_->OnDataReceived(data, length);
+ } else {
+ // Save the data for use later
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
+ memcpy(io_buffer->data(), data, length);
+ response_body_.push_back(io_buffer);
+ }
+ } else {
+ data_complete_ = true;
+ if (delegate_) {
+ delegate_->OnClose(net::OK);
+ return true; // Tell the caller to clean us up.
+ }
+ }
+ return false;
+}
+
+void FlipStreamImpl::OnAbort() {
+ if (delegate_)
+ delegate_->OnClose(net::ERR_ABORTED);
+}
+
+} // namespace net
+
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
new file mode 100644
index 0000000..b608ddb
--- /dev/null
+++ b/net/flip/flip_session.h
@@ -0,0 +1,216 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FLIP_SESSION_H_
+#define NET_FLIP_FLIP_SESSION_H_
+
+#include <deque>
+#include <list>
+#include <map>
+#include <queue>
+#include <string>
+
+#include "base/ref_counted.h"
+#include "net/base/io_buffer.h"
+#include "net/base/load_states.h"
+#include "net/base/net_errors.h"
+#include "net/base/upload_data_stream.h"
+#include "net/flip/flip_framer.h"
+#include "net/flip/flip_protocol.h"
+#include "net/flip/flip_session_pool.h"
+#include "net/socket/client_socket.h"
+#include "net/socket/client_socket_handle.h"
+#include "testing/platform_test.h"
+
+namespace net {
+
+class FlipStreamImpl;
+class HttpNetworkSession;
+class HttpRequestInfo;
+class HttpResponseInfo;
+
+// A callback interface for HTTP content retrieved from the Flip stream.
+class FlipDelegate {
+ public:
+ virtual ~FlipDelegate() {}
+ virtual const HttpRequestInfo* request() = 0;
+ virtual const UploadDataStream* data() = 0;
+
+ virtual void OnRequestSent(int status) = 0;
+ virtual void OnResponseReceived(HttpResponseInfo* response) = 0;
+ virtual void OnDataReceived(const char* buffer, int bytes) = 0;
+ virtual void OnClose(int status) = 0;
+ virtual void OnCancel() = 0;
+};
+
+class PrioritizedIOBuffer {
+ public:
+ PrioritizedIOBuffer() : buffer_(0), priority_(0) {}
+ PrioritizedIOBuffer(IOBufferWithSize* buffer, int priority)
+ : buffer_(buffer),
+ priority_(priority),
+ position_(++order_) {
+ }
+
+ IOBuffer* buffer() const { return buffer_; }
+
+ int size() const { return buffer_->size(); }
+
+ void release() { buffer_ = NULL; }
+
+ int priority() { return priority_; }
+
+ // Supports sorting.
+ bool operator<(const PrioritizedIOBuffer& other) const {
+ if (priority_ != other.priority_)
+ return priority_ > other.priority_;
+ return position_ >= other.position_;
+ }
+
+ private:
+ scoped_refptr<IOBufferWithSize> buffer_;
+ int priority_;
+ int position_;
+ static int order_; // Maintains a FIFO order for equal priorities.
+};
+
+class FlipSession : public base::RefCounted<FlipSession>,
+ public flip::FlipFramerVisitorInterface {
+ public:
+ // Factory for finding open sessions.
+ // TODO(mbelshe): Break this out into a connection pool class?
+ static FlipSession* GetFlipSession(const HostResolver::RequestInfo&,
+ HttpNetworkSession* session);
+ virtual ~FlipSession();
+
+ // Get the domain for this FlipSession.
+ std::string domain() { return domain_; }
+
+ // Connect the FLIP Socket.
+ // Returns net::Error::OK on success.
+ // Note that this call does not wait for the connect to complete. Callers can
+ // immediately start using the FlipSession while it connects.
+ net::Error Connect(const std::string& group_name,
+ const HostResolver::RequestInfo& host, int priority);
+
+ // Create a new stream.
+ // FlipDelegate must remain valid until the stream is either cancelled by the
+ // creator via CancelStream or the FlipDelegate OnClose or OnCancel callbacks
+ // have been made.
+ // Once the stream is created, the delegate should wait for a callback.
+ int CreateStream(FlipDelegate* delegate);
+
+ // Cancel a stream.
+ bool CancelStream(int id);
+
+ // Check if a stream is active.
+ bool IsStreamActive(int id);
+
+ // The LoadState is used for informing the user of the current network
+ // status, such as "resolving host", "connecting", etc.
+ LoadState GetLoadState() const;
+ protected:
+ friend class FlipNetworkTransactionTest;
+ friend class FlipSessionPool;
+
+ // Provide access to the framer for testing.
+ flip::FlipFramer* GetFramer() { return &flip_framer_; }
+
+ // Create a new FlipSession.
+ // |host| is the hostname that this session connects to.
+ FlipSession(std::string host, HttpNetworkSession* session);
+
+ // Closes all open streams. Used as part of shutdown.
+ void CloseAllStreams(net::Error code);
+
+ private:
+ // FlipFramerVisitorInterface
+ virtual void OnError(flip::FlipFramer*);
+ virtual void OnStreamFrameData(flip::FlipStreamId stream_id,
+ const char* data,
+ uint32 len);
+ virtual void OnControl(const flip::FlipControlFrame* frame);
+ virtual void OnLameDuck();
+
+ // Control frame handlers.
+ void OnSyn(const flip::FlipSynStreamControlFrame* frame,
+ const flip::FlipHeaderBlock* headers);
+ void OnSynReply(const flip::FlipSynReplyControlFrame* frame,
+ const flip::FlipHeaderBlock* headers);
+ void OnFin(const flip::FlipFinStreamControlFrame* frame);
+
+ // IO Callbacks
+ void OnSocketConnect(int result);
+ void OnReadComplete(int result);
+ void OnWriteComplete(int result);
+
+ // Start reading from the socket.
+ void ReadSocket();
+
+ // Write current data to the socket.
+ void WriteSocketLater();
+ void WriteSocket();
+
+ // Get a new stream id.
+ int GetNewStreamId();
+
+ // Track active streams in the active stream list.
+ FlipStreamImpl* ActivateStream(int id, FlipDelegate* delegate);
+ void DeactivateStream(int id);
+
+ // Check if we have a pending pushed-stream for this url
+ // Returns the stream if found (and returns it from the pending
+ // list), returns NULL otherwise.
+ FlipStreamImpl* GetPushStream(std::string url);
+
+ // Callbacks for the Flip session.
+ CompletionCallbackImpl<FlipSession> connect_callback_;
+ CompletionCallbackImpl<FlipSession> read_callback_;
+ CompletionCallbackImpl<FlipSession> write_callback_;
+
+ // The domain this session is connected to.
+ std::string domain_;
+
+ scoped_refptr<HttpNetworkSession> session_;
+
+ // The socket handle for this session.
+ ClientSocketHandle connection_;
+ bool connection_started_;
+
+ // The read buffer used to read data from the socket.
+ enum { kReadBufferSize = (4 * 1024) };
+ scoped_refptr<net::MovableIOBuffer> read_buffer_;
+ int read_buffer_bytes_read_; // bytes left in the buffer from prior read.
+ bool read_pending_;
+
+ int stream_hi_water_mark_; // The next stream id to use.
+
+ typedef std::map<int, FlipStreamImpl*> ActiveStreamMap;
+ typedef std::list<FlipStreamImpl*> ActiveStreamList;
+ ActiveStreamMap active_streams_;
+
+ ActiveStreamList pushed_streams_;
+
+ // As we gather data to be sent, we put it into the output queue.
+ typedef std::priority_queue<PrioritizedIOBuffer> OutputQueue;
+ OutputQueue queue_;
+
+ // TODO(mbelshe): this is ugly!!
+ // The packet we are currently sending.
+ PrioritizedIOBuffer in_flight_write_;
+ bool delayed_write_pending_;
+ bool write_pending_;
+
+ // Flip Frame state.
+ flip::FlipFramer flip_framer_;
+
+ // This is our weak session pool - one session per domain.
+ static scoped_ptr<FlipSessionPool> session_pool_;
+ static bool disable_compression_;
+};
+
+} // namespace net
+
+#endif // NET_FLIP_FLIP_SESSION_H_
+
diff --git a/net/flip/flip_session_pool.cc b/net/flip/flip_session_pool.cc
new file mode 100644
index 0000000..2aef13a
--- /dev/null
+++ b/net/flip/flip_session_pool.cc
@@ -0,0 +1,99 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/flip/flip_session_pool.h"
+
+#include "base/logging.h"
+#include "net/flip/flip_session.h"
+
+namespace net {
+
+// The maximum number of sessions to open to a single domain.
+const int kMaxSessionsPerDomain = 1;
+
+scoped_ptr<FlipSessionPool::FlipSessionsMap> FlipSessionPool::sessions_;
+
+FlipSession* FlipSessionPool::Get(const HostResolver::RequestInfo& info,
+ HttpNetworkSession* session) {
+ if (!sessions_.get())
+ sessions_.reset(new FlipSessionsMap());
+
+ const std::string domain = info.hostname();
+ FlipSession* flip_session = NULL;
+ FlipSessionList* list = GetSessionList(domain);
+ if (list) {
+ if (list->size() >= kMaxSessionsPerDomain) {
+ flip_session = list->front();
+ list->pop_front();
+ }
+ } else {
+ list = AddSessionList(domain);
+ }
+
+ DCHECK(list);
+ if (!flip_session) {
+ flip_session = new FlipSession(domain, session);
+ flip_session->AddRef(); // Keep it in the cache.
+ }
+
+ DCHECK(flip_session);
+ list->push_back(flip_session);
+ DCHECK(list->size() <= kMaxSessionsPerDomain);
+ return flip_session;
+}
+
+void FlipSessionPool::Remove(FlipSession* session) {
+ std::string domain = session->domain();
+ FlipSessionList* list = GetSessionList(domain);
+ if (list == NULL)
+ return;
+ list->remove(session);
+ if (!list->size())
+ RemoveSessionList(domain);
+}
+
+FlipSessionPool::FlipSessionList*
+ FlipSessionPool::AddSessionList(std::string domain) {
+ DCHECK(sessions_->find(domain) == sessions_->end());
+ return (*sessions_)[domain] = new FlipSessionList();
+}
+
+// static
+FlipSessionPool::FlipSessionList*
+ FlipSessionPool::GetSessionList(std::string domain) {
+ FlipSessionsMap::iterator it = sessions_->find(domain);
+ if (it == sessions_->end())
+ return NULL;
+ return it->second;
+}
+
+// static
+void FlipSessionPool::RemoveSessionList(std::string domain) {
+ FlipSessionList* list = GetSessionList(domain);
+ if (list) {
+ delete list;
+ sessions_->erase(domain);
+ } else {
+ DCHECK(false) << "removing orphaned session list";
+ }
+}
+
+// static
+void FlipSessionPool::CloseAllSessions() {
+ while (sessions_->size()) {
+ FlipSessionList* list = sessions_->begin()->second;
+ DCHECK(list);
+ sessions_->erase(sessions_->begin()->first);
+ while (list->size()) {
+ FlipSession* session = list->front();
+ list->pop_front();
+ session->CloseAllStreams(net::OK);
+ session->Release();
+ }
+ delete list;
+ }
+}
+
+} // namespace net
+
diff --git a/net/flip/flip_session_pool.h b/net/flip/flip_session_pool.h
new file mode 100644
index 0000000..9562413
--- /dev/null
+++ b/net/flip/flip_session_pool.h
@@ -0,0 +1,57 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FLIP_SESSION_POOL_H_
+#define NET_FLIP_FLIP_SESSION_POOL_H_
+
+#include <map>
+#include <list>
+#include <string>
+
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "net/base/host_resolver.h"
+
+namespace net {
+
+class FlipSession;
+class HttpNetworkSession;
+
+// This is a very simple pool for open FlipSessions.
+// TODO(mbelshe): Make this production ready.
+class FlipSessionPool {
+ public:
+ FlipSessionPool() {}
+ virtual ~FlipSessionPool() {}
+
+ // Factory for finding open sessions.
+ FlipSession* Get(const HostResolver::RequestInfo& info,
+ HttpNetworkSession* session);
+
+ // Close all Flip Sessions; used for debugging.
+ static void CloseAllSessions();
+
+ protected:
+ friend class FlipSession;
+
+ // Return a FlipSession to the pool.
+ void Remove(FlipSession* session);
+
+ private:
+ typedef std::list<FlipSession*> FlipSessionList;
+ typedef std::map<std::string, FlipSessionList*> FlipSessionsMap;
+
+ // Helper functions for manipulating the lists.
+ FlipSessionList* AddSessionList(std::string domain);
+ static FlipSessionList* GetSessionList(std::string domain);
+ static void RemoveSessionList(std::string domain);
+
+ // This is our weak session pool - one session per domain.
+ static scoped_ptr<FlipSessionsMap> sessions_;
+};
+
+} // namespace net
+
+#endif // NET_FLIP_FLIP_SESSION_POOL_H_
+
diff --git a/net/flip/flip_session_unittest.cc b/net/flip/flip_session_unittest.cc
new file mode 100644
index 0000000..016a197
--- /dev/null
+++ b/net/flip/flip_session_unittest.cc
@@ -0,0 +1,55 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/base/test_completion_callback.h"
+#include "net/socket/socket_test_util.h"
+#include "testing/platform_test.h"
+
+#include "net/flip/flip_session.h"
+
+namespace net {
+
+class FlipSessionTest : public PlatformTest {
+ public:
+};
+
+// Test the PrioritizedIOBuffer class.
+TEST_F(FlipSessionTest, PrioritizedIOBuffer) {
+ std::priority_queue<PrioritizedIOBuffer> queue_;
+ const int kQueueSize = 100;
+
+ // Insert 100 items; pri 100 to 1.
+ for (int index = 0; index < kQueueSize; ++index) {
+ PrioritizedIOBuffer buffer(NULL, kQueueSize - index);
+ queue_.push(buffer);
+ }
+
+ // Insert several priority 0 items last.
+ const int kNumDuplicates = 12;
+ IOBufferWithSize* buffers[kNumDuplicates];
+ for (int index = 0; index < kNumDuplicates; ++index) {
+ buffers[index] = new IOBufferWithSize(index+1);
+ queue_.push(PrioritizedIOBuffer(buffers[index], 0));
+ }
+
+ EXPECT_EQ(kQueueSize + kNumDuplicates, queue_.size());
+
+ // Verify the P0 items come out in FIFO order.
+ for (int index = 0; index < kNumDuplicates; ++index) {
+ PrioritizedIOBuffer buffer = queue_.top();
+ EXPECT_EQ(0, buffer.priority());
+ EXPECT_EQ(index + 1, buffer.size());
+ queue_.pop();
+ }
+
+ int priority = 1;
+ while (queue_.size()) {
+ PrioritizedIOBuffer buffer = queue_.top();
+ EXPECT_EQ(priority++, buffer.priority());
+ queue_.pop();
+ }
+}
+
+} // namespace net
+
diff --git a/net/flip/flip_transaction_factory.h b/net/flip/flip_transaction_factory.h
new file mode 100644
index 0000000..ce50686
--- /dev/null
+++ b/net/flip/flip_transaction_factory.h
@@ -0,0 +1,36 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_FLIP_FLIP_TRANSACTION_FACTORY_H__
+#define NET_FLIP_FLIP_TRANSACTION_FACTORY_H__
+
+#include "net/flip/flip_network_transaction.h"
+#include "net/http/http_transaction_factory.h"
+
+namespace net {
+
+class FlipTransactionFactory : public HttpTransactionFactory {
+ public:
+ explicit FlipTransactionFactory(HttpNetworkSession* session)
+ : session_(session) {
+ }
+ virtual ~FlipTransactionFactory() {}
+
+ // HttpTransactionFactory Interface.
+ virtual HttpTransaction* CreateTransaction() {
+ return new FlipNetworkTransaction(session_);
+ }
+ virtual HttpCache* GetCache() {
+ return NULL;
+ }
+ virtual void Suspend(bool suspend) {
+ }
+
+ private:
+ scoped_refptr<HttpNetworkSession> session_;
+};
+
+} // namespace net
+
+#endif // NET_FLIP_FLIP_TRANSACTION_FACTORY_H__