summaryrefslogtreecommitdiffstats
path: root/net/flip/flip_framer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'net/flip/flip_framer.cc')
-rw-r--r--net/flip/flip_framer.cc732
1 files changed, 732 insertions, 0 deletions
diff --git a/net/flip/flip_framer.cc b/net/flip/flip_framer.cc
new file mode 100644
index 0000000..ec183cc
--- /dev/null
+++ b/net/flip/flip_framer.cc
@@ -0,0 +1,732 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/scoped_ptr.h"
+#include "base/stats_counters.h"
+
+#include "flip_framer.h" // cross-google3 directory naming.
+#include "flip_frame_builder.h"
+#include "flip_bitmasks.h"
+
+#ifdef WIN32
+#include "third_party/zlib/zlib.h"
+#else
+#include "third_party/zlib/v1_2_3/zlib.h"
+#endif
+
+namespace flip {
+
+// The initial size of the control frame buffer; this is used internally
+// as we we parse though control frames.
+static const int kControlFrameBufferInitialSize = 32 * 1024;
+// The maximum size of the control frame buffer that we support.
+// TODO(mbelshe): We should make this stream-based so there are no limits.
+static const int kControlFrameBufferMaxSize = 64 * 1024;
+
+// This implementation of Flip is version 1.
+static const int kFlipProtocolVersion = 1;
+
+// By default is compression on or off.
+bool FlipFramer::compression_default_ = true;
+
+#ifdef DEBUG_FLIP_STATE_CHANGES
+#define CHANGE_STATE(newstate) \
+{ \
+ do { \
+ LOG(INFO) << "Changing state from: " \
+ << StateToString(state_) \
+ << " to " << StateToString(newstate) << "\n"; \
+ state_ = newstate; \
+ } while (false); \
+}
+#else
+#define CHANGE_STATE(newstate) (state_ = newstate)
+#endif
+
+FlipFramer::FlipFramer()
+ : state_(FLIP_RESET),
+ error_code_(FLIP_NO_ERROR),
+ remaining_payload_(0),
+ remaining_control_payload_(0),
+ current_frame_buffer_(NULL),
+ current_frame_len_(0),
+ current_frame_capacity_(0),
+ enable_compression_(compression_default_),
+ visitor_(NULL) {
+}
+
+FlipFramer::~FlipFramer() {
+ if (compressor_.get()) {
+ deflateEnd(compressor_.get());
+ }
+ delete [] current_frame_buffer_;
+}
+
+void FlipFramer::Reset() {
+ state_ = FLIP_RESET;
+ error_code_ = FLIP_NO_ERROR;
+ remaining_payload_ = 0;
+ remaining_control_payload_ = 0;
+ current_frame_len_ = 0;
+ if (current_frame_capacity_ != kControlFrameBufferInitialSize) {
+ delete [] current_frame_buffer_;
+ current_frame_buffer_ = 0;
+ current_frame_capacity_ = 0;
+ ExpandControlFrameBuffer(kControlFrameBufferInitialSize);
+ }
+}
+
+const char* FlipFramer::StateToString(int state) {
+ switch (state) {
+ case FLIP_ERROR:
+ return "ERROR";
+ case FLIP_DONE:
+ return "DONE";
+ case FLIP_AUTO_RESET:
+ return "AUTO_RESET";
+ case FLIP_RESET:
+ return "RESET";
+ case FLIP_READING_COMMON_HEADER:
+ return "READING_COMMON_HEADER";
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ return "INTERPRET_CONTROL_FRAME_COMMON_HEADER";
+ case FLIP_CONTROL_FRAME_PAYLOAD:
+ return "CONTROL_FRAME_PAYLOAD";
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ return "IGNORE_REMAINING_PAYLOAD";
+ case FLIP_FORWARD_STREAM_FRAME:
+ return "FORWARD_STREAM_FRAME";
+ }
+ return "UNKNOWN_STATE";
+}
+
+uint32 FlipFramer::BytesSafeToRead() const {
+ switch (state_) {
+ case FLIP_ERROR:
+ case FLIP_DONE:
+ case FLIP_AUTO_RESET:
+ case FLIP_RESET:
+ return 0;
+ case FLIP_READING_COMMON_HEADER:
+ DCHECK(current_frame_len_ < sizeof(FlipFrame));
+ return sizeof(FlipFrame) - current_frame_len_;
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ return 0;
+ case FLIP_CONTROL_FRAME_PAYLOAD:
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ case FLIP_FORWARD_STREAM_FRAME:
+ return remaining_payload_;
+ }
+ // We should never get to here.
+ return 0;
+}
+
+void FlipFramer::set_error(FlipError error) {
+ DCHECK(false);
+ DCHECK(visitor_);
+ error_code_ = error;
+ visitor_->OnError(this);
+}
+
+const char* FlipFramer::ErrorCodeToString(int error_code) {
+ switch (error_code) {
+ case FLIP_NO_ERROR:
+ return "NO_ERROR";
+ case FLIP_UNKNOWN_CONTROL_TYPE:
+ return "UNKNOWN_CONTROL_TYPE";
+ case FLIP_INVALID_CONTROL_FRAME:
+ return "INVALID_CONTROL_FRAME";
+ case FLIP_CONTROL_PAYLOAD_TOO_LARGE:
+ return "CONTROL_PAYLOAD_TOO_LARGE";
+ case FLIP_ZLIB_INIT_FAILURE:
+ return "ZLIB_INIT_FAILURE";
+ case FLIP_UNSUPPORTED_VERSION:
+ return "UNSUPPORTED_VERSION";
+ case FLIP_DECOMPRESS_FAILURE:
+ return "DECOMPRESS_FAILURE";
+ }
+ return "UNKNOWN_STATE";
+}
+
+uint32 FlipFramer::ProcessInput(const char* data, uint32 len) {
+ DCHECK(visitor_);
+ DCHECK(data);
+
+ uint32 original_len = len;
+ while (len != 0) {
+ FlipControlFrame* current_control_frame =
+ reinterpret_cast<FlipControlFrame*>(current_frame_buffer_);
+ FlipDataFrame* current_data_frame =
+ reinterpret_cast<FlipDataFrame*>(current_frame_buffer_);
+
+ switch (state_) {
+ case FLIP_ERROR:
+ case FLIP_DONE:
+ goto bottom;
+
+ case FLIP_AUTO_RESET:
+ case FLIP_RESET:
+ Reset();
+ CHANGE_STATE(FLIP_READING_COMMON_HEADER);
+ continue;
+
+ case FLIP_READING_COMMON_HEADER: {
+ int bytes_read = ProcessCommonHeader(data, len);
+ len -= bytes_read;
+ data += bytes_read;
+ continue;
+ }
+
+ // Arguably, this case is not necessary, as no bytes are consumed here.
+ // I felt it was a nice partitioning, however (which probably indicates
+ // that it should be refactored into its own function!)
+ case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
+ DCHECK(error_code_ == 0);
+ DCHECK(current_frame_len_ >= sizeof(FlipFrame));
+ // Do some sanity checking on the control frame sizes.
+ switch (current_control_frame->type()) {
+ case SYN_STREAM:
+ // NOTE: sizeof(FlipSynStreamControlFrame) is not accurate.
+ if (current_control_frame->length() <
+ sizeof(FlipSynStreamControlFrame) - sizeof(FlipControlFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case SYN_REPLY:
+ if (current_control_frame->length() <
+ sizeof(FlipSynReplyControlFrame) - sizeof(FlipControlFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case FIN_STREAM:
+ if (current_control_frame->length() !=
+ sizeof(FlipFinStreamControlFrame) - sizeof(FlipFrame))
+ set_error(FLIP_INVALID_CONTROL_FRAME);
+ break;
+ case NOOP:
+ // NOP. Swallow it.
+ CHANGE_STATE(FLIP_AUTO_RESET);
+ continue;
+ default:
+ set_error(FLIP_UNKNOWN_CONTROL_TYPE);
+ break;
+ }
+
+ // We only support version 1 of this protocol.
+ if (current_control_frame->version() != kFlipProtocolVersion)
+ set_error(FLIP_UNSUPPORTED_VERSION);
+
+ if (error_code_) {
+ CHANGE_STATE(FLIP_ERROR);
+ goto bottom;
+ }
+
+ remaining_control_payload_ = current_control_frame->length();
+ if (remaining_control_payload_ > kControlFrameBufferMaxSize) {
+ set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE);
+ CHANGE_STATE(FLIP_ERROR);
+ goto bottom;
+ }
+ ExpandControlFrameBuffer(remaining_control_payload_);
+ CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD);
+ continue;
+
+ case FLIP_CONTROL_FRAME_PAYLOAD: {
+ int bytes_read = ProcessControlFramePayload(data, len);
+ len -= bytes_read;
+ data += bytes_read;
+ }
+ // intentional fallthrough
+ case FLIP_IGNORE_REMAINING_PAYLOAD:
+ // control frame has too-large payload
+ // intentional fallthrough
+ case FLIP_FORWARD_STREAM_FRAME:
+ if (remaining_payload_) {
+ uint32 amount_to_forward = std::min(remaining_payload_, len);
+ if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) {
+ const FlipDataFrame* data_frame =
+ reinterpret_cast<const FlipDataFrame*>(current_data_frame);
+ if (data_frame->flags() & DATA_FLAG_COMPRESSED) {
+ // TODO(mbelshe): Assert that the decompressor is init'ed.
+ if (!InitializeDecompressor())
+ return NULL;
+
+ int decompressed_max_size = amount_to_forward * 100;
+ scoped_array<char> decompressed(new char[decompressed_max_size]);
+ decompressor_->next_in = reinterpret_cast<Bytef*>(
+ const_cast<char*>(data));
+ decompressor_->avail_in = amount_to_forward;
+ decompressor_->next_out =
+ reinterpret_cast<Bytef*>(decompressed.get());
+ decompressor_->avail_out = decompressed_max_size;
+
+ int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ if (rv != Z_OK) {
+ set_error(FLIP_DECOMPRESS_FAILURE);
+ goto bottom;
+ }
+ int decompressed_size = decompressed_max_size -
+ decompressor_->avail_out;
+ visitor_->OnStreamFrameData(current_data_frame->stream_id(),
+ decompressed.get(),
+ decompressed_size);
+ amount_to_forward -= decompressor_->avail_in;
+ } else {
+ // The data frame was not compressed
+ visitor_->OnStreamFrameData(current_data_frame->stream_id(),
+ data, amount_to_forward);
+ }
+ }
+ data += amount_to_forward;
+ len -= amount_to_forward;
+ remaining_payload_ -= amount_to_forward;
+ } else {
+ CHANGE_STATE(FLIP_AUTO_RESET);
+ }
+ continue;
+ default:
+ break;
+ }
+ }
+ bottom:
+ return original_len - len;
+}
+
+uint32 FlipFramer::ProcessCommonHeader(const char* data, uint32 len) {
+ // This should only be called when we're in the FLIP_READING_COMMON_HEADER
+ // state.
+ DCHECK(state_ == FLIP_READING_COMMON_HEADER);
+
+ int original_len = len;
+ FlipDataFrame* current_frame =
+ reinterpret_cast<FlipDataFrame*>(current_frame_buffer_);
+
+ do {
+ if (current_frame_len_ < sizeof(FlipFrame)) {
+ uint32 bytes_desired = sizeof(FlipFrame) - current_frame_len_;
+ uint32 bytes_to_append = std::min(bytes_desired, len);
+ char* header_buffer = current_frame_buffer_;
+ memcpy(&header_buffer[current_frame_len_], data, bytes_to_append);
+ current_frame_len_ += bytes_to_append;
+ data += bytes_to_append;
+ len -= bytes_to_append;
+ // Check for an empty data packet.
+ if (current_frame_len_ == sizeof(FlipFrame) &&
+ !current_frame->is_control_frame() &&
+ current_frame->length() == 0) {
+ visitor_->OnStreamFrameData(current_frame->stream_id(), NULL, 0);
+ CHANGE_STATE(FLIP_RESET);
+ }
+ break;
+ }
+ remaining_payload_ = current_frame->length();
+ // if we're here, then we have the common header all received.
+ if (!current_frame->is_control_frame())
+ CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME);
+ else
+ CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER);
+ } while (false);
+
+ return original_len - len;
+}
+
+uint32 FlipFramer::ProcessControlFramePayload(const char* data, uint32 len) {
+ int original_len = len;
+ do {
+ if (remaining_control_payload_) {
+ uint32 amount_to_consume = std::min(remaining_control_payload_, len);
+ memcpy(&current_frame_buffer_[current_frame_len_], data,
+ amount_to_consume);
+ current_frame_len_ += amount_to_consume;
+ data += amount_to_consume;
+ len -= amount_to_consume;
+ remaining_control_payload_ -= amount_to_consume;
+ remaining_payload_ -= amount_to_consume;
+ if (remaining_control_payload_)
+ break;
+ }
+ FlipControlFrame* control_frame =
+ reinterpret_cast<FlipControlFrame*>(current_frame_buffer_);
+ visitor_->OnControl(control_frame);
+ CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD);
+ } while (false);
+ return original_len - len;
+}
+
+void FlipFramer::ExpandControlFrameBuffer(int size) {
+ DCHECK(size < kControlFrameBufferMaxSize);
+ if (size < current_frame_capacity_)
+ return;
+
+ int alloc_size = size + sizeof(FlipFrame);
+ char* new_buffer = new char[alloc_size];
+ memcpy(new_buffer, current_frame_buffer_, current_frame_len_);
+ current_frame_capacity_ = alloc_size;
+ current_frame_buffer_ = new_buffer;
+}
+
+bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame,
+ FlipHeaderBlock* block) {
+ uint32 type = reinterpret_cast<const FlipControlFrame*>(frame)->type();
+ if (type != SYN_STREAM && type != SYN_REPLY)
+ return false;
+
+ // Find the header data within the control frame.
+ scoped_array<FlipSynStreamControlFrame> control_frame(
+ reinterpret_cast<FlipSynStreamControlFrame*>(DecompressFrame(frame)));
+ if (!control_frame.get())
+ return false;
+ const char *header_data = control_frame.get()->header_block();
+ int header_length = control_frame.get()->header_block_len();
+
+ FlipFrameBuilder builder(header_data, header_length);
+ void* iter = NULL;
+ uint16 num_headers;
+ if (builder.ReadUInt16(&iter, &num_headers)) {
+ VLOG(2) << "found num_headers: " << num_headers;
+ for (int index = 0; index < num_headers; ++index) {
+ std::string name;
+ std::string value;
+ if (!builder.ReadString(&iter, &name)) {
+ VLOG(1) << "couldn't read string (key)!";
+ break;
+ }
+ if (!builder.ReadString(&iter, &value)) {
+ VLOG(1) << "couldn't read string (value)!";
+ break;
+ }
+ if (block->empty()) {
+ (*block)[name] = value;
+ } else {
+ FlipHeaderBlock::iterator last = --block->end();
+ if (block->key_comp()(last->first, name)) {
+ block->insert(block->end(),
+ std::pair<std::string, std::string>(name, value));
+ } else {
+ return false;
+ }
+ }
+ }
+ return true;
+ } else {
+ VLOG(2) << "didn't find headers";
+ }
+ return false;
+}
+
+FlipSynStreamControlFrame* FlipFramer::CreateSynStream(
+ int stream_id,
+ int priority,
+ bool compress,
+ FlipHeaderBlock* headers) {
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(SYN_STREAM);
+ frame.WriteUInt32(0); // Placeholder for the length.
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt16(ntohs(priority) << 6); // Priority.
+
+ frame.WriteUInt16(headers->size()); // Number of headers.
+ FlipHeaderBlock::iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ frame.WriteString(it->first);
+ frame.WriteString(it->second);
+ }
+ // write the length
+ frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame));
+ if (compress) {
+ FlipSynStreamControlFrame* new_frame =
+ reinterpret_cast<FlipSynStreamControlFrame*>(
+ CompressFrame(frame.data()));
+ return new_frame;
+ }
+
+ return reinterpret_cast<FlipSynStreamControlFrame*>(frame.take());
+}
+
+/* static */
+FlipFinStreamControlFrame* FlipFramer::CreateFinStream(int stream_id,
+ int status) {
+ FlipFrameBuilder frame;
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(FIN_STREAM);
+ frame.WriteUInt32(8);
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt32(status);
+ return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take());
+}
+
+FlipSynReplyControlFrame* FlipFramer::CreateSynReply(int stream_id,
+ bool compressed, FlipHeaderBlock* headers) {
+
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
+ frame.WriteUInt16(SYN_REPLY);
+ frame.WriteUInt32(0); // Placeholder for the length.
+ frame.WriteUInt32(stream_id);
+ frame.WriteUInt16(0); // Priority.
+
+ frame.WriteUInt16(headers->size()); // Number of headers.
+ FlipHeaderBlock::iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ // TODO(mbelshe): Headers need to be sorted.
+ frame.WriteString(it->first);
+ frame.WriteString(it->second);
+ }
+ // write the length
+ frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame));
+ if (compressed)
+ return reinterpret_cast<FlipSynReplyControlFrame*>(
+ CompressFrame(frame.data()));
+ return reinterpret_cast<FlipSynReplyControlFrame*>(frame.take());
+}
+
+FlipDataFrame* FlipFramer::CreateDataFrame(int stream_id,
+ const char* data,
+ int len, bool compressed) {
+ FlipFrameBuilder frame;
+
+ frame.WriteUInt32(stream_id);
+
+ frame.WriteUInt32(len);
+ frame.WriteBytes(data, len);
+ if (compressed)
+ return reinterpret_cast<FlipDataFrame*>(CompressFrame(frame.data()));
+ return reinterpret_cast<FlipDataFrame*>(frame.take());
+}
+
+static const int kCompressorLevel = Z_DEFAULT_COMPRESSION;
+// This is just a hacked dictionary to use for shrinking HTTP-like headers.
+// TODO(mbelshe): Use a scientific methodology for computing the dictionary.
+static const char dictionary[] =
+ "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
+ "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
+ "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser"
+ "-agent10010120020120220320420520630030130230330430530630740040140240340440"
+ "5406407408409410411412413414415416417500501502503504505accept-rangesageeta"
+ "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic"
+ "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran"
+ "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati"
+ "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo"
+ "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe"
+ "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic"
+ "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1"
+ ".1statusversionurl";
+static uLong dictionary_id = 0;
+
+bool FlipFramer::InitializeCompressor() {
+ if (compressor_.get())
+ return true; // Already initialized.
+
+ compressor_.reset(new z_stream);
+ memset(compressor_.get(), 0, sizeof(z_stream));
+
+ int success = deflateInit(compressor_.get(), kCompressorLevel);
+ if (success == Z_OK)
+ success = deflateSetDictionary(compressor_.get(),
+ reinterpret_cast<const Bytef*>(dictionary),
+ sizeof(dictionary));
+ if (success != Z_OK)
+ compressor_.reset(NULL);
+ return success == Z_OK;
+}
+
+bool FlipFramer::InitializeDecompressor() {
+ if (decompressor_.get())
+ return true; // Already initialized.
+
+ decompressor_.reset(new z_stream);
+ memset(decompressor_.get(), 0, sizeof(z_stream));
+
+ // Compute the id of our dictionary so that we know we're using the
+ // right one when asked for it.
+ if (dictionary_id == 0) {
+ dictionary_id = adler32(0L, Z_NULL, 0);
+ dictionary_id = adler32(dictionary_id,
+ reinterpret_cast<const Bytef*>(dictionary),
+ sizeof(dictionary));
+ }
+
+ int success = inflateInit(decompressor_.get());
+ if (success != Z_OK)
+ decompressor_.reset(NULL);
+ return success == Z_OK;
+}
+
+bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame,
+ int* payload_length,
+ int* header_length,
+ const unsigned char** payload) const {
+ if (frame->is_control_frame()) {
+ const FlipControlFrame* control_frame =
+ reinterpret_cast<const FlipControlFrame*>(frame);
+ switch (control_frame->type()) {
+ case SYN_STREAM:
+ case SYN_REPLY:
+ {
+ const FlipSynStreamControlFrame *syn_frame =
+ reinterpret_cast<const FlipSynStreamControlFrame*>(frame);
+ *payload_length = syn_frame->header_block_len();
+ *header_length = sizeof(FlipFrame) + syn_frame->length() -
+ syn_frame->header_block_len();
+ *payload = reinterpret_cast<const unsigned char*>(frame) +
+ *header_length;
+ }
+ break;
+ default:
+ // TODO(mbelshe): set an error?
+ return false; // We can't compress this frame!
+ }
+ } else {
+ *header_length = sizeof(FlipFrame);
+ *payload_length = frame->length();
+ *payload = reinterpret_cast<const unsigned char*>(frame) +
+ sizeof(FlipFrame);
+ }
+ DCHECK(static_cast<size_t>(*header_length) <=
+ sizeof(FlipFrame) + *payload_length);
+ return true;
+}
+
+
+FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) {
+ int payload_length;
+ int header_length;
+ const unsigned char* payload;
+
+ static StatsCounter pre_compress_bytes("flip.PreCompressSize");
+ static StatsCounter post_compress_bytes("flip.PostCompressSize");
+
+ if (!enable_compression_)
+ return DuplicateFrame(frame);
+
+ if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
+ return NULL;
+
+ if (!InitializeCompressor())
+ return NULL;
+
+ // TODO(mbelshe): Should we have a zlib header like what http servers do?
+
+ // Create an output frame.
+ int compressed_max_size = deflateBound(compressor_.get(), payload_length);
+ int new_frame_size = header_length + compressed_max_size;
+ FlipFrame* new_frame =
+ reinterpret_cast<FlipFrame*>(new char[new_frame_size]);
+ memcpy(new_frame, frame, header_length);
+
+ compressor_->next_in = const_cast<Bytef*>(payload);
+ compressor_->avail_in = payload_length;
+ compressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + header_length;
+ compressor_->avail_out = compressed_max_size;
+
+ // Data packets have a 'compressed flag
+ if (!new_frame->is_control_frame()) {
+ FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
+ data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED);
+ }
+
+ int rv = deflate(compressor_.get(), Z_SYNC_FLUSH);
+ if (rv != Z_OK) { // How can we know that it compressed everything?
+ // This shouldn't happen, right?
+ free(new_frame);
+ return NULL;
+ }
+
+ int compressed_size = compressed_max_size - compressor_->avail_out;
+ new_frame->set_length(header_length + compressed_size - sizeof(FlipFrame));
+
+ pre_compress_bytes.Add(payload_length);
+ post_compress_bytes.Add(new_frame->length());
+
+ return new_frame;
+}
+
+FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) {
+ int payload_length;
+ int header_length;
+ const unsigned char* payload;
+
+ static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize");
+ static StatsCounter post_decompress_bytes("flip.PostDeCompressSize");
+
+ if (!enable_compression_)
+ return DuplicateFrame(frame);
+
+ if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
+ return NULL;
+
+ if (!frame->is_control_frame()) {
+ const FlipDataFrame* data_frame =
+ reinterpret_cast<const FlipDataFrame*>(frame);
+ if (!data_frame->flags() & DATA_FLAG_COMPRESSED)
+ return DuplicateFrame(frame);
+ }
+
+ if (!InitializeDecompressor())
+ return NULL;
+
+ // TODO(mbelshe): Should we have a zlib header like what http servers do?
+
+ // Create an output frame. Assume it does not need to be longer than
+ // the input data.
+ int decompressed_max_size = kControlFrameBufferInitialSize;
+ int new_frame_size = header_length + decompressed_max_size;
+ FlipFrame* new_frame =
+ reinterpret_cast<FlipFrame*>(new char[new_frame_size]);
+ memcpy(new_frame, frame, header_length);
+
+ decompressor_->next_in = const_cast<Bytef*>(payload);
+ decompressor_->avail_in = payload_length;
+ decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame) +
+ header_length;
+ decompressor_->avail_out = decompressed_max_size;
+
+ int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ if (rv == Z_NEED_DICT) {
+ // Need to try again with the right dictionary.
+ if (decompressor_->adler == dictionary_id) {
+ rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary,
+ sizeof(dictionary));
+ if (rv == Z_OK)
+ rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
+ }
+ }
+ if (rv != Z_OK) { // How can we know that it decompressed everything?
+ free(new_frame);
+ return NULL;
+ }
+
+ // Unset the compressed flag for data frames.
+ if (!new_frame->is_control_frame()) {
+ FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
+ data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED);
+ }
+
+ int decompressed_size = decompressed_max_size - decompressor_->avail_out;
+ new_frame->set_length(header_length + decompressed_size - sizeof(FlipFrame));
+
+ pre_decompress_bytes.Add(frame->length());
+ post_decompress_bytes.Add(new_frame->length());
+
+ return new_frame;
+}
+
+FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) {
+ int size = sizeof(FlipFrame) + frame->length();
+ char* new_frame = new char[size];
+ memcpy(new_frame, frame, size);
+ return reinterpret_cast<FlipFrame*>(new_frame);
+}
+
+void FlipFramer::set_enable_compression(bool value) {
+ enable_compression_ = value;
+}
+
+void FlipFramer::set_enable_compression_default(bool value) {
+ compression_default_ = value;
+}
+
+} // namespace flip
+