diff options
Diffstat (limited to 'net/flip/flip_framer.cc')
-rw-r--r-- | net/flip/flip_framer.cc | 732 |
1 files changed, 732 insertions, 0 deletions
diff --git a/net/flip/flip_framer.cc b/net/flip/flip_framer.cc new file mode 100644 index 0000000..ec183cc --- /dev/null +++ b/net/flip/flip_framer.cc @@ -0,0 +1,732 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/scoped_ptr.h" +#include "base/stats_counters.h" + +#include "flip_framer.h" // cross-google3 directory naming. +#include "flip_frame_builder.h" +#include "flip_bitmasks.h" + +#ifdef WIN32 +#include "third_party/zlib/zlib.h" +#else +#include "third_party/zlib/v1_2_3/zlib.h" +#endif + +namespace flip { + +// The initial size of the control frame buffer; this is used internally +// as we we parse though control frames. +static const int kControlFrameBufferInitialSize = 32 * 1024; +// The maximum size of the control frame buffer that we support. +// TODO(mbelshe): We should make this stream-based so there are no limits. +static const int kControlFrameBufferMaxSize = 64 * 1024; + +// This implementation of Flip is version 1. +static const int kFlipProtocolVersion = 1; + +// By default is compression on or off. +bool FlipFramer::compression_default_ = true; + +#ifdef DEBUG_FLIP_STATE_CHANGES +#define CHANGE_STATE(newstate) \ +{ \ + do { \ + LOG(INFO) << "Changing state from: " \ + << StateToString(state_) \ + << " to " << StateToString(newstate) << "\n"; \ + state_ = newstate; \ + } while (false); \ +} +#else +#define CHANGE_STATE(newstate) (state_ = newstate) +#endif + +FlipFramer::FlipFramer() + : state_(FLIP_RESET), + error_code_(FLIP_NO_ERROR), + remaining_payload_(0), + remaining_control_payload_(0), + current_frame_buffer_(NULL), + current_frame_len_(0), + current_frame_capacity_(0), + enable_compression_(compression_default_), + visitor_(NULL) { +} + +FlipFramer::~FlipFramer() { + if (compressor_.get()) { + deflateEnd(compressor_.get()); + } + delete [] current_frame_buffer_; +} + +void FlipFramer::Reset() { + state_ = FLIP_RESET; + error_code_ = FLIP_NO_ERROR; + remaining_payload_ = 0; + remaining_control_payload_ = 0; + current_frame_len_ = 0; + if (current_frame_capacity_ != kControlFrameBufferInitialSize) { + delete [] current_frame_buffer_; + current_frame_buffer_ = 0; + current_frame_capacity_ = 0; + ExpandControlFrameBuffer(kControlFrameBufferInitialSize); + } +} + +const char* FlipFramer::StateToString(int state) { + switch (state) { + case FLIP_ERROR: + return "ERROR"; + case FLIP_DONE: + return "DONE"; + case FLIP_AUTO_RESET: + return "AUTO_RESET"; + case FLIP_RESET: + return "RESET"; + case FLIP_READING_COMMON_HEADER: + return "READING_COMMON_HEADER"; + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + return "INTERPRET_CONTROL_FRAME_COMMON_HEADER"; + case FLIP_CONTROL_FRAME_PAYLOAD: + return "CONTROL_FRAME_PAYLOAD"; + case FLIP_IGNORE_REMAINING_PAYLOAD: + return "IGNORE_REMAINING_PAYLOAD"; + case FLIP_FORWARD_STREAM_FRAME: + return "FORWARD_STREAM_FRAME"; + } + return "UNKNOWN_STATE"; +} + +uint32 FlipFramer::BytesSafeToRead() const { + switch (state_) { + case FLIP_ERROR: + case FLIP_DONE: + case FLIP_AUTO_RESET: + case FLIP_RESET: + return 0; + case FLIP_READING_COMMON_HEADER: + DCHECK(current_frame_len_ < sizeof(FlipFrame)); + return sizeof(FlipFrame) - current_frame_len_; + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + return 0; + case FLIP_CONTROL_FRAME_PAYLOAD: + case FLIP_IGNORE_REMAINING_PAYLOAD: + case FLIP_FORWARD_STREAM_FRAME: + return remaining_payload_; + } + // We should never get to here. + return 0; +} + +void FlipFramer::set_error(FlipError error) { + DCHECK(false); + DCHECK(visitor_); + error_code_ = error; + visitor_->OnError(this); +} + +const char* FlipFramer::ErrorCodeToString(int error_code) { + switch (error_code) { + case FLIP_NO_ERROR: + return "NO_ERROR"; + case FLIP_UNKNOWN_CONTROL_TYPE: + return "UNKNOWN_CONTROL_TYPE"; + case FLIP_INVALID_CONTROL_FRAME: + return "INVALID_CONTROL_FRAME"; + case FLIP_CONTROL_PAYLOAD_TOO_LARGE: + return "CONTROL_PAYLOAD_TOO_LARGE"; + case FLIP_ZLIB_INIT_FAILURE: + return "ZLIB_INIT_FAILURE"; + case FLIP_UNSUPPORTED_VERSION: + return "UNSUPPORTED_VERSION"; + case FLIP_DECOMPRESS_FAILURE: + return "DECOMPRESS_FAILURE"; + } + return "UNKNOWN_STATE"; +} + +uint32 FlipFramer::ProcessInput(const char* data, uint32 len) { + DCHECK(visitor_); + DCHECK(data); + + uint32 original_len = len; + while (len != 0) { + FlipControlFrame* current_control_frame = + reinterpret_cast<FlipControlFrame*>(current_frame_buffer_); + FlipDataFrame* current_data_frame = + reinterpret_cast<FlipDataFrame*>(current_frame_buffer_); + + switch (state_) { + case FLIP_ERROR: + case FLIP_DONE: + goto bottom; + + case FLIP_AUTO_RESET: + case FLIP_RESET: + Reset(); + CHANGE_STATE(FLIP_READING_COMMON_HEADER); + continue; + + case FLIP_READING_COMMON_HEADER: { + int bytes_read = ProcessCommonHeader(data, len); + len -= bytes_read; + data += bytes_read; + continue; + } + + // Arguably, this case is not necessary, as no bytes are consumed here. + // I felt it was a nice partitioning, however (which probably indicates + // that it should be refactored into its own function!) + case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: + DCHECK(error_code_ == 0); + DCHECK(current_frame_len_ >= sizeof(FlipFrame)); + // Do some sanity checking on the control frame sizes. + switch (current_control_frame->type()) { + case SYN_STREAM: + // NOTE: sizeof(FlipSynStreamControlFrame) is not accurate. + if (current_control_frame->length() < + sizeof(FlipSynStreamControlFrame) - sizeof(FlipControlFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case SYN_REPLY: + if (current_control_frame->length() < + sizeof(FlipSynReplyControlFrame) - sizeof(FlipControlFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case FIN_STREAM: + if (current_control_frame->length() != + sizeof(FlipFinStreamControlFrame) - sizeof(FlipFrame)) + set_error(FLIP_INVALID_CONTROL_FRAME); + break; + case NOOP: + // NOP. Swallow it. + CHANGE_STATE(FLIP_AUTO_RESET); + continue; + default: + set_error(FLIP_UNKNOWN_CONTROL_TYPE); + break; + } + + // We only support version 1 of this protocol. + if (current_control_frame->version() != kFlipProtocolVersion) + set_error(FLIP_UNSUPPORTED_VERSION); + + if (error_code_) { + CHANGE_STATE(FLIP_ERROR); + goto bottom; + } + + remaining_control_payload_ = current_control_frame->length(); + if (remaining_control_payload_ > kControlFrameBufferMaxSize) { + set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE); + CHANGE_STATE(FLIP_ERROR); + goto bottom; + } + ExpandControlFrameBuffer(remaining_control_payload_); + CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD); + continue; + + case FLIP_CONTROL_FRAME_PAYLOAD: { + int bytes_read = ProcessControlFramePayload(data, len); + len -= bytes_read; + data += bytes_read; + } + // intentional fallthrough + case FLIP_IGNORE_REMAINING_PAYLOAD: + // control frame has too-large payload + // intentional fallthrough + case FLIP_FORWARD_STREAM_FRAME: + if (remaining_payload_) { + uint32 amount_to_forward = std::min(remaining_payload_, len); + if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) { + const FlipDataFrame* data_frame = + reinterpret_cast<const FlipDataFrame*>(current_data_frame); + if (data_frame->flags() & DATA_FLAG_COMPRESSED) { + // TODO(mbelshe): Assert that the decompressor is init'ed. + if (!InitializeDecompressor()) + return NULL; + + int decompressed_max_size = amount_to_forward * 100; + scoped_array<char> decompressed(new char[decompressed_max_size]); + decompressor_->next_in = reinterpret_cast<Bytef*>( + const_cast<char*>(data)); + decompressor_->avail_in = amount_to_forward; + decompressor_->next_out = + reinterpret_cast<Bytef*>(decompressed.get()); + decompressor_->avail_out = decompressed_max_size; + + int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + if (rv != Z_OK) { + set_error(FLIP_DECOMPRESS_FAILURE); + goto bottom; + } + int decompressed_size = decompressed_max_size - + decompressor_->avail_out; + visitor_->OnStreamFrameData(current_data_frame->stream_id(), + decompressed.get(), + decompressed_size); + amount_to_forward -= decompressor_->avail_in; + } else { + // The data frame was not compressed + visitor_->OnStreamFrameData(current_data_frame->stream_id(), + data, amount_to_forward); + } + } + data += amount_to_forward; + len -= amount_to_forward; + remaining_payload_ -= amount_to_forward; + } else { + CHANGE_STATE(FLIP_AUTO_RESET); + } + continue; + default: + break; + } + } + bottom: + return original_len - len; +} + +uint32 FlipFramer::ProcessCommonHeader(const char* data, uint32 len) { + // This should only be called when we're in the FLIP_READING_COMMON_HEADER + // state. + DCHECK(state_ == FLIP_READING_COMMON_HEADER); + + int original_len = len; + FlipDataFrame* current_frame = + reinterpret_cast<FlipDataFrame*>(current_frame_buffer_); + + do { + if (current_frame_len_ < sizeof(FlipFrame)) { + uint32 bytes_desired = sizeof(FlipFrame) - current_frame_len_; + uint32 bytes_to_append = std::min(bytes_desired, len); + char* header_buffer = current_frame_buffer_; + memcpy(&header_buffer[current_frame_len_], data, bytes_to_append); + current_frame_len_ += bytes_to_append; + data += bytes_to_append; + len -= bytes_to_append; + // Check for an empty data packet. + if (current_frame_len_ == sizeof(FlipFrame) && + !current_frame->is_control_frame() && + current_frame->length() == 0) { + visitor_->OnStreamFrameData(current_frame->stream_id(), NULL, 0); + CHANGE_STATE(FLIP_RESET); + } + break; + } + remaining_payload_ = current_frame->length(); + // if we're here, then we have the common header all received. + if (!current_frame->is_control_frame()) + CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME); + else + CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER); + } while (false); + + return original_len - len; +} + +uint32 FlipFramer::ProcessControlFramePayload(const char* data, uint32 len) { + int original_len = len; + do { + if (remaining_control_payload_) { + uint32 amount_to_consume = std::min(remaining_control_payload_, len); + memcpy(¤t_frame_buffer_[current_frame_len_], data, + amount_to_consume); + current_frame_len_ += amount_to_consume; + data += amount_to_consume; + len -= amount_to_consume; + remaining_control_payload_ -= amount_to_consume; + remaining_payload_ -= amount_to_consume; + if (remaining_control_payload_) + break; + } + FlipControlFrame* control_frame = + reinterpret_cast<FlipControlFrame*>(current_frame_buffer_); + visitor_->OnControl(control_frame); + CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD); + } while (false); + return original_len - len; +} + +void FlipFramer::ExpandControlFrameBuffer(int size) { + DCHECK(size < kControlFrameBufferMaxSize); + if (size < current_frame_capacity_) + return; + + int alloc_size = size + sizeof(FlipFrame); + char* new_buffer = new char[alloc_size]; + memcpy(new_buffer, current_frame_buffer_, current_frame_len_); + current_frame_capacity_ = alloc_size; + current_frame_buffer_ = new_buffer; +} + +bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame, + FlipHeaderBlock* block) { + uint32 type = reinterpret_cast<const FlipControlFrame*>(frame)->type(); + if (type != SYN_STREAM && type != SYN_REPLY) + return false; + + // Find the header data within the control frame. + scoped_array<FlipSynStreamControlFrame> control_frame( + reinterpret_cast<FlipSynStreamControlFrame*>(DecompressFrame(frame))); + if (!control_frame.get()) + return false; + const char *header_data = control_frame.get()->header_block(); + int header_length = control_frame.get()->header_block_len(); + + FlipFrameBuilder builder(header_data, header_length); + void* iter = NULL; + uint16 num_headers; + if (builder.ReadUInt16(&iter, &num_headers)) { + VLOG(2) << "found num_headers: " << num_headers; + for (int index = 0; index < num_headers; ++index) { + std::string name; + std::string value; + if (!builder.ReadString(&iter, &name)) { + VLOG(1) << "couldn't read string (key)!"; + break; + } + if (!builder.ReadString(&iter, &value)) { + VLOG(1) << "couldn't read string (value)!"; + break; + } + if (block->empty()) { + (*block)[name] = value; + } else { + FlipHeaderBlock::iterator last = --block->end(); + if (block->key_comp()(last->first, name)) { + block->insert(block->end(), + std::pair<std::string, std::string>(name, value)); + } else { + return false; + } + } + } + return true; + } else { + VLOG(2) << "didn't find headers"; + } + return false; +} + +FlipSynStreamControlFrame* FlipFramer::CreateSynStream( + int stream_id, + int priority, + bool compress, + FlipHeaderBlock* headers) { + FlipFrameBuilder frame; + + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(SYN_STREAM); + frame.WriteUInt32(0); // Placeholder for the length. + frame.WriteUInt32(stream_id); + frame.WriteUInt16(ntohs(priority) << 6); // Priority. + + frame.WriteUInt16(headers->size()); // Number of headers. + FlipHeaderBlock::iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + frame.WriteString(it->first); + frame.WriteString(it->second); + } + // write the length + frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); + if (compress) { + FlipSynStreamControlFrame* new_frame = + reinterpret_cast<FlipSynStreamControlFrame*>( + CompressFrame(frame.data())); + return new_frame; + } + + return reinterpret_cast<FlipSynStreamControlFrame*>(frame.take()); +} + +/* static */ +FlipFinStreamControlFrame* FlipFramer::CreateFinStream(int stream_id, + int status) { + FlipFrameBuilder frame; + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(FIN_STREAM); + frame.WriteUInt32(8); + frame.WriteUInt32(stream_id); + frame.WriteUInt32(status); + return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take()); +} + +FlipSynReplyControlFrame* FlipFramer::CreateSynReply(int stream_id, + bool compressed, FlipHeaderBlock* headers) { + + FlipFrameBuilder frame; + + frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); + frame.WriteUInt16(SYN_REPLY); + frame.WriteUInt32(0); // Placeholder for the length. + frame.WriteUInt32(stream_id); + frame.WriteUInt16(0); // Priority. + + frame.WriteUInt16(headers->size()); // Number of headers. + FlipHeaderBlock::iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + // TODO(mbelshe): Headers need to be sorted. + frame.WriteString(it->first); + frame.WriteString(it->second); + } + // write the length + frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); + if (compressed) + return reinterpret_cast<FlipSynReplyControlFrame*>( + CompressFrame(frame.data())); + return reinterpret_cast<FlipSynReplyControlFrame*>(frame.take()); +} + +FlipDataFrame* FlipFramer::CreateDataFrame(int stream_id, + const char* data, + int len, bool compressed) { + FlipFrameBuilder frame; + + frame.WriteUInt32(stream_id); + + frame.WriteUInt32(len); + frame.WriteBytes(data, len); + if (compressed) + return reinterpret_cast<FlipDataFrame*>(CompressFrame(frame.data())); + return reinterpret_cast<FlipDataFrame*>(frame.take()); +} + +static const int kCompressorLevel = Z_DEFAULT_COMPRESSION; +// This is just a hacked dictionary to use for shrinking HTTP-like headers. +// TODO(mbelshe): Use a scientific methodology for computing the dictionary. +static const char dictionary[] = + "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-" + "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi" + "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser" + "-agent10010120020120220320420520630030130230330430530630740040140240340440" + "5406407408409410411412413414415416417500501502503504505accept-rangesageeta" + "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic" + "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran" + "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati" + "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo" + "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe" + "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic" + "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1" + ".1statusversionurl"; +static uLong dictionary_id = 0; + +bool FlipFramer::InitializeCompressor() { + if (compressor_.get()) + return true; // Already initialized. + + compressor_.reset(new z_stream); + memset(compressor_.get(), 0, sizeof(z_stream)); + + int success = deflateInit(compressor_.get(), kCompressorLevel); + if (success == Z_OK) + success = deflateSetDictionary(compressor_.get(), + reinterpret_cast<const Bytef*>(dictionary), + sizeof(dictionary)); + if (success != Z_OK) + compressor_.reset(NULL); + return success == Z_OK; +} + +bool FlipFramer::InitializeDecompressor() { + if (decompressor_.get()) + return true; // Already initialized. + + decompressor_.reset(new z_stream); + memset(decompressor_.get(), 0, sizeof(z_stream)); + + // Compute the id of our dictionary so that we know we're using the + // right one when asked for it. + if (dictionary_id == 0) { + dictionary_id = adler32(0L, Z_NULL, 0); + dictionary_id = adler32(dictionary_id, + reinterpret_cast<const Bytef*>(dictionary), + sizeof(dictionary)); + } + + int success = inflateInit(decompressor_.get()); + if (success != Z_OK) + decompressor_.reset(NULL); + return success == Z_OK; +} + +bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame, + int* payload_length, + int* header_length, + const unsigned char** payload) const { + if (frame->is_control_frame()) { + const FlipControlFrame* control_frame = + reinterpret_cast<const FlipControlFrame*>(frame); + switch (control_frame->type()) { + case SYN_STREAM: + case SYN_REPLY: + { + const FlipSynStreamControlFrame *syn_frame = + reinterpret_cast<const FlipSynStreamControlFrame*>(frame); + *payload_length = syn_frame->header_block_len(); + *header_length = sizeof(FlipFrame) + syn_frame->length() - + syn_frame->header_block_len(); + *payload = reinterpret_cast<const unsigned char*>(frame) + + *header_length; + } + break; + default: + // TODO(mbelshe): set an error? + return false; // We can't compress this frame! + } + } else { + *header_length = sizeof(FlipFrame); + *payload_length = frame->length(); + *payload = reinterpret_cast<const unsigned char*>(frame) + + sizeof(FlipFrame); + } + DCHECK(static_cast<size_t>(*header_length) <= + sizeof(FlipFrame) + *payload_length); + return true; +} + + +FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) { + int payload_length; + int header_length; + const unsigned char* payload; + + static StatsCounter pre_compress_bytes("flip.PreCompressSize"); + static StatsCounter post_compress_bytes("flip.PostCompressSize"); + + if (!enable_compression_) + return DuplicateFrame(frame); + + if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) + return NULL; + + if (!InitializeCompressor()) + return NULL; + + // TODO(mbelshe): Should we have a zlib header like what http servers do? + + // Create an output frame. + int compressed_max_size = deflateBound(compressor_.get(), payload_length); + int new_frame_size = header_length + compressed_max_size; + FlipFrame* new_frame = + reinterpret_cast<FlipFrame*>(new char[new_frame_size]); + memcpy(new_frame, frame, header_length); + + compressor_->next_in = const_cast<Bytef*>(payload); + compressor_->avail_in = payload_length; + compressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + header_length; + compressor_->avail_out = compressed_max_size; + + // Data packets have a 'compressed flag + if (!new_frame->is_control_frame()) { + FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); + data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED); + } + + int rv = deflate(compressor_.get(), Z_SYNC_FLUSH); + if (rv != Z_OK) { // How can we know that it compressed everything? + // This shouldn't happen, right? + free(new_frame); + return NULL; + } + + int compressed_size = compressed_max_size - compressor_->avail_out; + new_frame->set_length(header_length + compressed_size - sizeof(FlipFrame)); + + pre_compress_bytes.Add(payload_length); + post_compress_bytes.Add(new_frame->length()); + + return new_frame; +} + +FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) { + int payload_length; + int header_length; + const unsigned char* payload; + + static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize"); + static StatsCounter post_decompress_bytes("flip.PostDeCompressSize"); + + if (!enable_compression_) + return DuplicateFrame(frame); + + if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) + return NULL; + + if (!frame->is_control_frame()) { + const FlipDataFrame* data_frame = + reinterpret_cast<const FlipDataFrame*>(frame); + if (!data_frame->flags() & DATA_FLAG_COMPRESSED) + return DuplicateFrame(frame); + } + + if (!InitializeDecompressor()) + return NULL; + + // TODO(mbelshe): Should we have a zlib header like what http servers do? + + // Create an output frame. Assume it does not need to be longer than + // the input data. + int decompressed_max_size = kControlFrameBufferInitialSize; + int new_frame_size = header_length + decompressed_max_size; + FlipFrame* new_frame = + reinterpret_cast<FlipFrame*>(new char[new_frame_size]); + memcpy(new_frame, frame, header_length); + + decompressor_->next_in = const_cast<Bytef*>(payload); + decompressor_->avail_in = payload_length; + decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame) + + header_length; + decompressor_->avail_out = decompressed_max_size; + + int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + if (rv == Z_NEED_DICT) { + // Need to try again with the right dictionary. + if (decompressor_->adler == dictionary_id) { + rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary, + sizeof(dictionary)); + if (rv == Z_OK) + rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + } + } + if (rv != Z_OK) { // How can we know that it decompressed everything? + free(new_frame); + return NULL; + } + + // Unset the compressed flag for data frames. + if (!new_frame->is_control_frame()) { + FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); + data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED); + } + + int decompressed_size = decompressed_max_size - decompressor_->avail_out; + new_frame->set_length(header_length + decompressed_size - sizeof(FlipFrame)); + + pre_decompress_bytes.Add(frame->length()); + post_decompress_bytes.Add(new_frame->length()); + + return new_frame; +} + +FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) { + int size = sizeof(FlipFrame) + frame->length(); + char* new_frame = new char[size]; + memcpy(new_frame, frame, size); + return reinterpret_cast<FlipFrame*>(new_frame); +} + +void FlipFramer::set_enable_compression(bool value) { + enable_compression_ = value; +} + +void FlipFramer::set_enable_compression_default(bool value) { + compression_default_ = value; +} + +} // namespace flip + |