// Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/scoped_ptr.h" #include "base/stats_counters.h" #include "flip_framer.h" // cross-google3 directory naming. #include "flip_frame_builder.h" #include "flip_bitmasks.h" #ifdef WIN32 #include "third_party/zlib/zlib.h" #else #include "third_party/zlib/v1_2_3/zlib.h" #endif namespace flip { // The initial size of the control frame buffer; this is used internally // as we we parse though control frames. static const int kControlFrameBufferInitialSize = 32 * 1024; // The maximum size of the control frame buffer that we support. // TODO(mbelshe): We should make this stream-based so there are no limits. static const int kControlFrameBufferMaxSize = 64 * 1024; // This implementation of Flip is version 1. static const int kFlipProtocolVersion = 1; // By default is compression on or off. bool FlipFramer::compression_default_ = true; #ifdef DEBUG_FLIP_STATE_CHANGES #define CHANGE_STATE(newstate) \ { \ do { \ LOG(INFO) << "Changing state from: " \ << StateToString(state_) \ << " to " << StateToString(newstate) << "\n"; \ state_ = newstate; \ } while (false); \ } #else #define CHANGE_STATE(newstate) (state_ = newstate) #endif FlipFramer::FlipFramer() : state_(FLIP_RESET), error_code_(FLIP_NO_ERROR), remaining_payload_(0), remaining_control_payload_(0), current_frame_buffer_(NULL), current_frame_len_(0), current_frame_capacity_(0), enable_compression_(compression_default_), visitor_(NULL) { } FlipFramer::~FlipFramer() { if (compressor_.get()) { deflateEnd(compressor_.get()); } if (decompressor_.get()) { inflateEnd(decompressor_.get()); } delete [] current_frame_buffer_; } void FlipFramer::Reset() { state_ = FLIP_RESET; error_code_ = FLIP_NO_ERROR; remaining_payload_ = 0; remaining_control_payload_ = 0; current_frame_len_ = 0; if (current_frame_capacity_ != kControlFrameBufferInitialSize) { delete [] current_frame_buffer_; current_frame_buffer_ = 0; current_frame_capacity_ = 0; ExpandControlFrameBuffer(kControlFrameBufferInitialSize); } } const char* FlipFramer::StateToString(int state) { switch (state) { case FLIP_ERROR: return "ERROR"; case FLIP_DONE: return "DONE"; case FLIP_AUTO_RESET: return "AUTO_RESET"; case FLIP_RESET: return "RESET"; case FLIP_READING_COMMON_HEADER: return "READING_COMMON_HEADER"; case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: return "INTERPRET_CONTROL_FRAME_COMMON_HEADER"; case FLIP_CONTROL_FRAME_PAYLOAD: return "CONTROL_FRAME_PAYLOAD"; case FLIP_IGNORE_REMAINING_PAYLOAD: return "IGNORE_REMAINING_PAYLOAD"; case FLIP_FORWARD_STREAM_FRAME: return "FORWARD_STREAM_FRAME"; } return "UNKNOWN_STATE"; } uint32 FlipFramer::BytesSafeToRead() const { switch (state_) { case FLIP_ERROR: case FLIP_DONE: case FLIP_AUTO_RESET: case FLIP_RESET: return 0; case FLIP_READING_COMMON_HEADER: DCHECK(current_frame_len_ < sizeof(FlipFrame)); return sizeof(FlipFrame) - current_frame_len_; case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: return 0; case FLIP_CONTROL_FRAME_PAYLOAD: case FLIP_IGNORE_REMAINING_PAYLOAD: case FLIP_FORWARD_STREAM_FRAME: return remaining_payload_; } // We should never get to here. return 0; } void FlipFramer::set_error(FlipError error) { DCHECK(false); DCHECK(visitor_); error_code_ = error; visitor_->OnError(this); } const char* FlipFramer::ErrorCodeToString(int error_code) { switch (error_code) { case FLIP_NO_ERROR: return "NO_ERROR"; case FLIP_UNKNOWN_CONTROL_TYPE: return "UNKNOWN_CONTROL_TYPE"; case FLIP_INVALID_CONTROL_FRAME: return "INVALID_CONTROL_FRAME"; case FLIP_CONTROL_PAYLOAD_TOO_LARGE: return "CONTROL_PAYLOAD_TOO_LARGE"; case FLIP_ZLIB_INIT_FAILURE: return "ZLIB_INIT_FAILURE"; case FLIP_UNSUPPORTED_VERSION: return "UNSUPPORTED_VERSION"; case FLIP_DECOMPRESS_FAILURE: return "DECOMPRESS_FAILURE"; } return "UNKNOWN_STATE"; } uint32 FlipFramer::ProcessInput(const char* data, uint32 len) { DCHECK(visitor_); DCHECK(data); uint32 original_len = len; while (len != 0) { FlipControlFrame* current_control_frame = reinterpret_cast(current_frame_buffer_); FlipDataFrame* current_data_frame = reinterpret_cast(current_frame_buffer_); switch (state_) { case FLIP_ERROR: case FLIP_DONE: goto bottom; case FLIP_AUTO_RESET: case FLIP_RESET: Reset(); CHANGE_STATE(FLIP_READING_COMMON_HEADER); continue; case FLIP_READING_COMMON_HEADER: { int bytes_read = ProcessCommonHeader(data, len); len -= bytes_read; data += bytes_read; continue; } // Arguably, this case is not necessary, as no bytes are consumed here. // I felt it was a nice partitioning, however (which probably indicates // that it should be refactored into its own function!) case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: DCHECK(error_code_ == 0); DCHECK(current_frame_len_ >= sizeof(FlipFrame)); // Do some sanity checking on the control frame sizes. switch (current_control_frame->type()) { case SYN_STREAM: // NOTE: sizeof(FlipSynStreamControlFrame) is not accurate. if (current_control_frame->length() < sizeof(FlipSynStreamControlFrame) - sizeof(FlipControlFrame)) set_error(FLIP_INVALID_CONTROL_FRAME); break; case SYN_REPLY: if (current_control_frame->length() < sizeof(FlipSynReplyControlFrame) - sizeof(FlipControlFrame)) set_error(FLIP_INVALID_CONTROL_FRAME); break; case FIN_STREAM: if (current_control_frame->length() != sizeof(FlipFinStreamControlFrame) - sizeof(FlipFrame)) set_error(FLIP_INVALID_CONTROL_FRAME); break; case NOOP: // NOP. Swallow it. CHANGE_STATE(FLIP_AUTO_RESET); continue; default: set_error(FLIP_UNKNOWN_CONTROL_TYPE); break; } // We only support version 1 of this protocol. if (current_control_frame->version() != kFlipProtocolVersion) set_error(FLIP_UNSUPPORTED_VERSION); if (error_code_) { CHANGE_STATE(FLIP_ERROR); goto bottom; } remaining_control_payload_ = current_control_frame->length(); if (remaining_control_payload_ > kControlFrameBufferMaxSize) { set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE); CHANGE_STATE(FLIP_ERROR); goto bottom; } ExpandControlFrameBuffer(remaining_control_payload_); CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD); continue; case FLIP_CONTROL_FRAME_PAYLOAD: { int bytes_read = ProcessControlFramePayload(data, len); len -= bytes_read; data += bytes_read; } // intentional fallthrough case FLIP_IGNORE_REMAINING_PAYLOAD: // control frame has too-large payload // intentional fallthrough case FLIP_FORWARD_STREAM_FRAME: if (remaining_payload_) { uint32 amount_to_forward = std::min(remaining_payload_, len); if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) { const FlipDataFrame* data_frame = reinterpret_cast(current_data_frame); if (data_frame->flags() & DATA_FLAG_COMPRESSED) { // TODO(mbelshe): Assert that the decompressor is init'ed. if (!InitializeDecompressor()) return NULL; int decompressed_max_size = amount_to_forward * 100; scoped_array decompressed(new char[decompressed_max_size]); decompressor_->next_in = reinterpret_cast( const_cast(data)); decompressor_->avail_in = amount_to_forward; decompressor_->next_out = reinterpret_cast(decompressed.get()); decompressor_->avail_out = decompressed_max_size; int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); if (rv != Z_OK) { set_error(FLIP_DECOMPRESS_FAILURE); goto bottom; } int decompressed_size = decompressed_max_size - decompressor_->avail_out; visitor_->OnStreamFrameData(current_data_frame->stream_id(), decompressed.get(), decompressed_size); amount_to_forward -= decompressor_->avail_in; } else { // The data frame was not compressed visitor_->OnStreamFrameData(current_data_frame->stream_id(), data, amount_to_forward); } } data += amount_to_forward; len -= amount_to_forward; remaining_payload_ -= amount_to_forward; } else { CHANGE_STATE(FLIP_AUTO_RESET); } continue; default: break; } } bottom: return original_len - len; } uint32 FlipFramer::ProcessCommonHeader(const char* data, uint32 len) { // This should only be called when we're in the FLIP_READING_COMMON_HEADER // state. DCHECK(state_ == FLIP_READING_COMMON_HEADER); int original_len = len; FlipDataFrame* current_frame = reinterpret_cast(current_frame_buffer_); do { if (current_frame_len_ < sizeof(FlipFrame)) { uint32 bytes_desired = sizeof(FlipFrame) - current_frame_len_; uint32 bytes_to_append = std::min(bytes_desired, len); char* header_buffer = current_frame_buffer_; memcpy(&header_buffer[current_frame_len_], data, bytes_to_append); current_frame_len_ += bytes_to_append; data += bytes_to_append; len -= bytes_to_append; // Check for an empty data packet. if (current_frame_len_ == sizeof(FlipFrame) && !current_frame->is_control_frame() && current_frame->length() == 0) { visitor_->OnStreamFrameData(current_frame->stream_id(), NULL, 0); CHANGE_STATE(FLIP_RESET); } break; } remaining_payload_ = current_frame->length(); // if we're here, then we have the common header all received. if (!current_frame->is_control_frame()) CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME); else CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER); } while (false); return original_len - len; } uint32 FlipFramer::ProcessControlFramePayload(const char* data, uint32 len) { int original_len = len; do { if (remaining_control_payload_) { uint32 amount_to_consume = std::min(remaining_control_payload_, len); memcpy(¤t_frame_buffer_[current_frame_len_], data, amount_to_consume); current_frame_len_ += amount_to_consume; data += amount_to_consume; len -= amount_to_consume; remaining_control_payload_ -= amount_to_consume; remaining_payload_ -= amount_to_consume; if (remaining_control_payload_) break; } FlipControlFrame* control_frame = reinterpret_cast(current_frame_buffer_); visitor_->OnControl(control_frame); CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD); } while (false); return original_len - len; } void FlipFramer::ExpandControlFrameBuffer(int size) { DCHECK(size < kControlFrameBufferMaxSize); if (size < current_frame_capacity_) return; int alloc_size = size + sizeof(FlipFrame); char* new_buffer = new char[alloc_size]; memcpy(new_buffer, current_frame_buffer_, current_frame_len_); current_frame_capacity_ = alloc_size; current_frame_buffer_ = new_buffer; } bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame, FlipHeaderBlock* block) { uint32 type = reinterpret_cast(frame)->type(); if (type != SYN_STREAM && type != SYN_REPLY) return false; // Find the header data within the control frame. scoped_array control_frame( reinterpret_cast(DecompressFrame(frame))); if (!control_frame.get()) return false; const char *header_data = control_frame.get()->header_block(); int header_length = control_frame.get()->header_block_len(); FlipFrameBuilder builder(header_data, header_length); void* iter = NULL; uint16 num_headers; if (builder.ReadUInt16(&iter, &num_headers)) { VLOG(2) << "found num_headers: " << num_headers; for (int index = 0; index < num_headers; ++index) { std::string name; std::string value; if (!builder.ReadString(&iter, &name)) { VLOG(1) << "couldn't read string (key)!"; break; } if (!builder.ReadString(&iter, &value)) { VLOG(1) << "couldn't read string (value)!"; break; } if (block->empty()) { (*block)[name] = value; } else { FlipHeaderBlock::iterator last = --block->end(); if (block->key_comp()(last->first, name)) { block->insert(block->end(), std::pair(name, value)); } else { return false; } } } return true; } else { VLOG(2) << "didn't find headers"; } return false; } FlipSynStreamControlFrame* FlipFramer::CreateSynStream( int stream_id, int priority, bool compress, FlipHeaderBlock* headers) { FlipFrameBuilder frame; frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); frame.WriteUInt16(SYN_STREAM); frame.WriteUInt32(0); // Placeholder for the length. frame.WriteUInt32(stream_id); frame.WriteUInt16(ntohs(priority) << 6); // Priority. frame.WriteUInt16(headers->size()); // Number of headers. FlipHeaderBlock::iterator it; for (it = headers->begin(); it != headers->end(); ++it) { frame.WriteString(it->first); frame.WriteString(it->second); } // write the length frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); if (compress) { FlipSynStreamControlFrame* new_frame = reinterpret_cast( CompressFrame(frame.data())); return new_frame; } return reinterpret_cast(frame.take()); } /* static */ FlipFinStreamControlFrame* FlipFramer::CreateFinStream(int stream_id, int status) { FlipFrameBuilder frame; frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); frame.WriteUInt16(FIN_STREAM); frame.WriteUInt32(8); frame.WriteUInt32(stream_id); frame.WriteUInt32(status); return reinterpret_cast(frame.take()); } FlipSynReplyControlFrame* FlipFramer::CreateSynReply(int stream_id, bool compressed, FlipHeaderBlock* headers) { FlipFrameBuilder frame; frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); frame.WriteUInt16(SYN_REPLY); frame.WriteUInt32(0); // Placeholder for the length. frame.WriteUInt32(stream_id); frame.WriteUInt16(0); // Priority. frame.WriteUInt16(headers->size()); // Number of headers. FlipHeaderBlock::iterator it; for (it = headers->begin(); it != headers->end(); ++it) { // TODO(mbelshe): Headers need to be sorted. frame.WriteString(it->first); frame.WriteString(it->second); } // write the length frame.WriteUInt32ToOffset(4, frame.length() - sizeof(FlipFrame)); if (compressed) return reinterpret_cast( CompressFrame(frame.data())); return reinterpret_cast(frame.take()); } FlipDataFrame* FlipFramer::CreateDataFrame(int stream_id, const char* data, int len, bool compressed) { FlipFrameBuilder frame; frame.WriteUInt32(stream_id); frame.WriteUInt32(len); frame.WriteBytes(data, len); if (compressed) return reinterpret_cast(CompressFrame(frame.data())); return reinterpret_cast(frame.take()); } static const int kCompressorLevel = Z_DEFAULT_COMPRESSION; // This is just a hacked dictionary to use for shrinking HTTP-like headers. // TODO(mbelshe): Use a scientific methodology for computing the dictionary. static const char dictionary[] = "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-" "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi" "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser" "-agent10010120020120220320420520630030130230330430530630740040140240340440" "5406407408409410411412413414415416417500501502503504505accept-rangesageeta" "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic" "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran" "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati" "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo" "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe" "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic" "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1" ".1statusversionurl"; static uLong dictionary_id = 0; bool FlipFramer::InitializeCompressor() { if (compressor_.get()) return true; // Already initialized. compressor_.reset(new z_stream); memset(compressor_.get(), 0, sizeof(z_stream)); int success = deflateInit(compressor_.get(), kCompressorLevel); if (success == Z_OK) success = deflateSetDictionary(compressor_.get(), reinterpret_cast(dictionary), sizeof(dictionary)); if (success != Z_OK) compressor_.reset(NULL); return success == Z_OK; } bool FlipFramer::InitializeDecompressor() { if (decompressor_.get()) return true; // Already initialized. decompressor_.reset(new z_stream); memset(decompressor_.get(), 0, sizeof(z_stream)); // Compute the id of our dictionary so that we know we're using the // right one when asked for it. if (dictionary_id == 0) { dictionary_id = adler32(0L, Z_NULL, 0); dictionary_id = adler32(dictionary_id, reinterpret_cast(dictionary), sizeof(dictionary)); } int success = inflateInit(decompressor_.get()); if (success != Z_OK) decompressor_.reset(NULL); return success == Z_OK; } bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame, int* payload_length, int* header_length, const unsigned char** payload) const { if (frame->is_control_frame()) { const FlipControlFrame* control_frame = reinterpret_cast(frame); switch (control_frame->type()) { case SYN_STREAM: case SYN_REPLY: { const FlipSynStreamControlFrame *syn_frame = reinterpret_cast(frame); *payload_length = syn_frame->header_block_len(); *header_length = sizeof(FlipFrame) + syn_frame->length() - syn_frame->header_block_len(); *payload = reinterpret_cast(frame) + *header_length; } break; default: // TODO(mbelshe): set an error? return false; // We can't compress this frame! } } else { *header_length = sizeof(FlipFrame); *payload_length = frame->length(); *payload = reinterpret_cast(frame) + sizeof(FlipFrame); } DCHECK(static_cast(*header_length) <= sizeof(FlipFrame) + *payload_length); return true; } FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) { int payload_length; int header_length; const unsigned char* payload; static StatsCounter pre_compress_bytes("flip.PreCompressSize"); static StatsCounter post_compress_bytes("flip.PostCompressSize"); if (!enable_compression_) return DuplicateFrame(frame); if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) return NULL; if (!InitializeCompressor()) return NULL; // TODO(mbelshe): Should we have a zlib header like what http servers do? // Create an output frame. int compressed_max_size = deflateBound(compressor_.get(), payload_length); int new_frame_size = header_length + compressed_max_size; FlipFrame* new_frame = reinterpret_cast(new char[new_frame_size]); memcpy(new_frame, frame, header_length); compressor_->next_in = const_cast(payload); compressor_->avail_in = payload_length; compressor_->next_out = reinterpret_cast(new_frame) + header_length; compressor_->avail_out = compressed_max_size; // Data packets have a 'compressed flag if (!new_frame->is_control_frame()) { FlipDataFrame* data_frame = reinterpret_cast(new_frame); data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED); } int rv = deflate(compressor_.get(), Z_SYNC_FLUSH); if (rv != Z_OK) { // How can we know that it compressed everything? // This shouldn't happen, right? free(new_frame); return NULL; } int compressed_size = compressed_max_size - compressor_->avail_out; new_frame->set_length(header_length + compressed_size - sizeof(FlipFrame)); pre_compress_bytes.Add(payload_length); post_compress_bytes.Add(new_frame->length()); return new_frame; } FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) { int payload_length; int header_length; const unsigned char* payload; static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize"); static StatsCounter post_decompress_bytes("flip.PostDeCompressSize"); if (!enable_compression_) return DuplicateFrame(frame); if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) return NULL; if (!frame->is_control_frame()) { const FlipDataFrame* data_frame = reinterpret_cast(frame); if (!data_frame->flags() & DATA_FLAG_COMPRESSED) return DuplicateFrame(frame); } if (!InitializeDecompressor()) return NULL; // TODO(mbelshe): Should we have a zlib header like what http servers do? // Create an output frame. Assume it does not need to be longer than // the input data. int decompressed_max_size = kControlFrameBufferInitialSize; int new_frame_size = header_length + decompressed_max_size; FlipFrame* new_frame = reinterpret_cast(new char[new_frame_size]); memcpy(new_frame, frame, header_length); decompressor_->next_in = const_cast(payload); decompressor_->avail_in = payload_length; decompressor_->next_out = reinterpret_cast(new_frame) + header_length; decompressor_->avail_out = decompressed_max_size; int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); if (rv == Z_NEED_DICT) { // Need to try again with the right dictionary. if (decompressor_->adler == dictionary_id) { rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary, sizeof(dictionary)); if (rv == Z_OK) rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); } } if (rv != Z_OK) { // How can we know that it decompressed everything? free(new_frame); return NULL; } // Unset the compressed flag for data frames. if (!new_frame->is_control_frame()) { FlipDataFrame* data_frame = reinterpret_cast(new_frame); data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED); } int decompressed_size = decompressed_max_size - decompressor_->avail_out; new_frame->set_length(header_length + decompressed_size - sizeof(FlipFrame)); pre_decompress_bytes.Add(frame->length()); post_decompress_bytes.Add(new_frame->length()); return new_frame; } FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) { int size = sizeof(FlipFrame) + frame->length(); char* new_frame = new char[size]; memcpy(new_frame, frame, size); return reinterpret_cast(new_frame); } void FlipFramer::set_enable_compression(bool value) { enable_compression_ = value; } void FlipFramer::set_enable_compression_default(bool value) { compression_default_ = value; } } // namespace flip