diff options
author | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-05-21 04:05:40 +0000 |
---|---|---|
committer | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-05-21 04:05:40 +0000 |
commit | f1525409f0a8b7e1fe50abad138d18062c27fbcf (patch) | |
tree | 3ced864709a672cdb96460dfa3633c671cfa7552 /net | |
parent | 9e380509a8bece2177f27ed5b214732b50be39db (diff) | |
download | chromium_src-f1525409f0a8b7e1fe50abad138d18062c27fbcf.zip chromium_src-f1525409f0a8b7e1fe50abad138d18062c27fbcf.tar.gz chromium_src-f1525409f0a8b7e1fe50abad138d18062c27fbcf.tar.bz2 |
Separate SPDY data-frame compression from the header compressors.
BUG=none
TEST=SpdyFramerTest.UnclosedStreamDataCompressors, SpdyFramerTest.DataCompression
Review URL: http://codereview.chromium.org/2071009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@47886 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_framer.cc | 381 | ||||
-rw-r--r-- | net/spdy/spdy_framer.h | 55 | ||||
-rw-r--r-- | net/spdy/spdy_framer_test.cc | 160 | ||||
-rw-r--r-- | net/spdy/spdy_protocol.h | 12 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 4 |
5 files changed, 473 insertions, 139 deletions
diff --git a/net/spdy/spdy_framer.cc b/net/spdy/spdy_framer.cc index 8ab5bb7d..83652d3 100644 --- a/net/spdy/spdy_framer.cc +++ b/net/spdy/spdy_framer.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -55,12 +55,13 @@ SpdyFramer::SpdyFramer() } SpdyFramer::~SpdyFramer() { - if (compressor_.get()) { - deflateEnd(compressor_.get()); + if (header_compressor_.get()) { + deflateEnd(header_compressor_.get()); } - if (decompressor_.get()) { - inflateEnd(decompressor_.get()); + if (header_decompressor_.get()) { + inflateEnd(header_decompressor_.get()); } + CleanupStreamCompressorsAndDecompressors(); delete [] current_frame_buffer_; } @@ -366,32 +367,35 @@ size_t SpdyFramer::ProcessDataFramePayload(const char* data, size_t len) { size_t amount_to_forward = std::min(remaining_payload_, len); if (amount_to_forward && state_ != SPDY_IGNORE_REMAINING_PAYLOAD) { if (current_data_frame.flags() & DATA_FLAG_COMPRESSED) { - // TODO(mbelshe): Assert that the decompressor is init'ed. - if (!InitializeDecompressor()) + z_stream* decompressor = + GetStreamDecompressor(current_data_frame.stream_id()); + if (!decompressor) return 0; size_t decompressed_max_size = amount_to_forward * 100; scoped_array<char> decompressed(new char[decompressed_max_size]); - decompressor_->next_in = reinterpret_cast<Bytef*>( + decompressor->next_in = reinterpret_cast<Bytef*>( const_cast<char*>(data)); - decompressor_->avail_in = amount_to_forward; - decompressor_->next_out = + decompressor->avail_in = amount_to_forward; + decompressor->next_out = reinterpret_cast<Bytef*>(decompressed.get()); - decompressor_->avail_out = decompressed_max_size; + decompressor->avail_out = decompressed_max_size; - int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + int rv = inflate(decompressor, Z_SYNC_FLUSH); if (rv != Z_OK) { + LOG(WARNING) << "inflate failure: " << rv; set_error(SPDY_DECOMPRESS_FAILURE); return 0; } size_t decompressed_size = decompressed_max_size - - decompressor_->avail_out; + decompressor->avail_out; + // Only inform the visitor if there is data. if (decompressed_size) visitor_->OnStreamFrameData(current_data_frame.stream_id(), decompressed.get(), decompressed_size); - amount_to_forward -= decompressor_->avail_in; + amount_to_forward -= decompressor->avail_in; } else { // The data frame was not compressed. // Only inform the visitor if there is data. @@ -407,9 +411,10 @@ size_t SpdyFramer::ProcessDataFramePayload(const char* data, size_t len) { // If the FIN flag is set, and there is no more data in this data // frame, inform the visitor of EOF via a 0-length data frame. if (!remaining_payload_ && - current_data_frame.flags() & DATA_FLAG_FIN) - visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL, - 0); + current_data_frame.flags() & DATA_FLAG_FIN) { + visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL, 0); + CleanupDecompressorForStream(current_data_frame.stream_id()); + } } else { CHANGE_STATE(SPDY_AUTO_RESET); } @@ -437,7 +442,7 @@ bool SpdyFramer::ParseHeaderBlock(const SpdyFrame* frame, return false; // Find the header data within the control frame. - scoped_ptr<SpdyFrame> decompressed_frame(DecompressFrame(frame)); + scoped_ptr<SpdyFrame> decompressed_frame(DecompressFrame(*frame)); if (!decompressed_frame.get()) return false; @@ -533,7 +538,7 @@ SpdySynStreamControlFrame* SpdyFramer::CreateSynStream( scoped_ptr<SpdyFrame> syn_frame(frame.take()); if (compressed) { return reinterpret_cast<SpdySynStreamControlFrame*>( - CompressFrame(syn_frame.get())); + CompressFrame(*syn_frame.get())); } return reinterpret_cast<SpdySynStreamControlFrame*>(syn_frame.release()); } @@ -595,7 +600,6 @@ SpdySynReplyControlFrame* SpdyFramer::CreateSynReply(SpdyStreamId stream_id, frame.WriteUInt16(headers->size()); // Number of headers. SpdyHeaderBlock::iterator it; for (it = headers->begin(); it != headers->end(); ++it) { - // TODO(mbelshe): Headers need to be sorted. frame.WriteString(it->first); frame.WriteString(it->second); } @@ -611,14 +615,14 @@ SpdySynReplyControlFrame* SpdyFramer::CreateSynReply(SpdyStreamId stream_id, scoped_ptr<SpdyFrame> reply_frame(frame.take()); if (compressed) { return reinterpret_cast<SpdySynReplyControlFrame*>( - CompressFrame(reply_frame.get())); + CompressFrame(*reply_frame.get())); } return reinterpret_cast<SpdySynReplyControlFrame*>(reply_frame.release()); } SpdyDataFrame* SpdyFramer::CreateDataFrame(SpdyStreamId stream_id, const char* data, - uint32 len, SpdyDataFlags flags) { + uint32 len, uint32 flags) { SpdyFrameBuilder frame; frame.WriteUInt32(stream_id); @@ -631,9 +635,18 @@ SpdyDataFrame* SpdyFramer::CreateDataFrame(SpdyStreamId stream_id, frame.WriteBytes(data, len); scoped_ptr<SpdyFrame> data_frame(frame.take()); - if (flags & DATA_FLAG_COMPRESSED) - return reinterpret_cast<SpdyDataFrame*>(CompressFrame(data_frame.get())); - return reinterpret_cast<SpdyDataFrame*>(data_frame.release()); + SpdyDataFrame* rv; + if (flags & DATA_FLAG_COMPRESSED) { + rv = reinterpret_cast<SpdyDataFrame*>(CompressFrame(*data_frame.get())); + } else { + rv = reinterpret_cast<SpdyDataFrame*>(data_frame.release()); + } + + if (flags & DATA_FLAG_FIN) { + CleanupCompressorForStream(stream_id); + } + + return rv; } /* static */ @@ -672,34 +685,37 @@ const int SpdyFramer::kDictionarySize = arraysize(kDictionary); static uLong dictionary_id = 0; -bool SpdyFramer::InitializeCompressor() { - if (compressor_.get()) - return true; // Already initialized. +z_stream* SpdyFramer::GetHeaderCompressor() { + if (header_compressor_.get()) + return header_compressor_.get(); // Already initialized. - compressor_.reset(new z_stream); - memset(compressor_.get(), 0, sizeof(z_stream)); + header_compressor_.reset(new z_stream); + memset(header_compressor_.get(), 0, sizeof(z_stream)); - int success = deflateInit2(compressor_.get(), + int success = deflateInit2(header_compressor_.get(), kCompressorLevel, Z_DEFLATED, kCompressorWindowSizeInBits, kCompressorMemLevel, Z_DEFAULT_STRATEGY); if (success == Z_OK) - success = deflateSetDictionary(compressor_.get(), + success = deflateSetDictionary(header_compressor_.get(), reinterpret_cast<const Bytef*>(kDictionary), kDictionarySize); - if (success != Z_OK) - compressor_.reset(NULL); - return success == Z_OK; + if (success != Z_OK) { + LOG(WARNING) << "deflateSetDictionary failure: " << success; + header_compressor_.reset(NULL); + return NULL; + } + return header_compressor_.get(); } -bool SpdyFramer::InitializeDecompressor() { - if (decompressor_.get()) - return true; // Already initialized. +z_stream* SpdyFramer::GetHeaderDecompressor() { + if (header_decompressor_.get()) + return header_decompressor_.get(); // Already initialized. - decompressor_.reset(new z_stream); - memset(decompressor_.get(), 0, sizeof(z_stream)); + header_decompressor_.reset(new z_stream); + memset(header_decompressor_.get(), 0, sizeof(z_stream)); // Compute the id of our dictionary so that we know we're using the // right one when asked for it. @@ -710,39 +726,79 @@ bool SpdyFramer::InitializeDecompressor() { kDictionarySize); } - int success = inflateInit(decompressor_.get()); - if (success != Z_OK) - decompressor_.reset(NULL); - return success == Z_OK; + int success = inflateInit(header_decompressor_.get()); + if (success != Z_OK) { + LOG(WARNING) << "inflateInit failure: " << success; + header_decompressor_.reset(NULL); + return NULL; + } + return header_decompressor_.get(); } -bool SpdyFramer::GetFrameBoundaries(const SpdyFrame* frame, +z_stream* SpdyFramer::GetStreamCompressor(SpdyStreamId stream_id) { + CompressorMap::iterator it = stream_compressors_.find(stream_id); + if (it != stream_compressors_.end()) + return it->second; // Already initialized. + + scoped_ptr<z_stream> compressor(new z_stream); + memset(compressor.get(), 0, sizeof(z_stream)); + + int success = deflateInit2(compressor.get(), + kCompressorLevel, + Z_DEFLATED, + kCompressorWindowSizeInBits, + kCompressorMemLevel, + Z_DEFAULT_STRATEGY); + if (success != Z_OK) { + LOG(WARNING) << "deflateInit failure: " << success; + return NULL; + } + return stream_compressors_[stream_id] = compressor.release(); +} + +z_stream* SpdyFramer::GetStreamDecompressor(SpdyStreamId stream_id) { + CompressorMap::iterator it = stream_decompressors_.find(stream_id); + if (it != stream_decompressors_.end()) + return it->second; // Already initialized. + + scoped_ptr<z_stream> decompressor(new z_stream); + memset(decompressor.get(), 0, sizeof(z_stream)); + + int success = inflateInit(decompressor.get()); + if (success != Z_OK) { + LOG(WARNING) << "inflateInit failure: " << success; + return NULL; + } + return stream_decompressors_[stream_id] = decompressor.release(); +} + +bool SpdyFramer::GetFrameBoundaries(const SpdyFrame& frame, int* payload_length, int* header_length, const char** payload) const { size_t frame_size; - if (frame->is_control_frame()) { - const SpdyControlFrame* control_frame = - reinterpret_cast<const SpdyControlFrame*>(frame); - switch (control_frame->type()) { + if (frame.is_control_frame()) { + const SpdyControlFrame& control_frame = + reinterpret_cast<const SpdyControlFrame&>(frame); + switch (control_frame.type()) { case SYN_STREAM: { - const SpdySynStreamControlFrame *syn_frame = - reinterpret_cast<const SpdySynStreamControlFrame*>(frame); + const SpdySynStreamControlFrame& syn_frame = + reinterpret_cast<const SpdySynStreamControlFrame&>(frame); frame_size = SpdySynStreamControlFrame::size(); - *payload_length = syn_frame->header_block_len(); + *payload_length = syn_frame.header_block_len(); *header_length = frame_size; - *payload = frame->data() + *header_length; + *payload = frame.data() + *header_length; } break; case SYN_REPLY: { - const SpdySynReplyControlFrame *syn_frame = - reinterpret_cast<const SpdySynReplyControlFrame*>(frame); + const SpdySynReplyControlFrame& syn_frame = + reinterpret_cast<const SpdySynReplyControlFrame&>(frame); frame_size = SpdySynReplyControlFrame::size(); - *payload_length = syn_frame->header_block_len(); + *payload_length = syn_frame.header_block_len(); *header_length = frame_size; - *payload = frame->data() + *header_length; + *payload = frame.data() + *header_length; } break; default: @@ -752,18 +808,69 @@ bool SpdyFramer::GetFrameBoundaries(const SpdyFrame* frame, } else { frame_size = SpdyFrame::size(); *header_length = frame_size; - *payload_length = frame->length(); - *payload = frame->data() + SpdyFrame::size(); + *payload_length = frame.length(); + *payload = frame.data() + SpdyFrame::size(); } return true; } +SpdyFrame* SpdyFramer::CompressFrame(const SpdyFrame& frame) { + if (frame.is_control_frame()) { + return CompressControlFrame( + reinterpret_cast<const SpdyControlFrame&>(frame)); + } + return CompressDataFrame(reinterpret_cast<const SpdyDataFrame&>(frame)); +} + +SpdyFrame* SpdyFramer::DecompressFrame(const SpdyFrame& frame) { + if (frame.is_control_frame()) { + return DecompressControlFrame( + reinterpret_cast<const SpdyControlFrame&>(frame)); + } + return DecompressDataFrame(reinterpret_cast<const SpdyDataFrame&>(frame)); +} + +SpdyControlFrame* SpdyFramer::CompressControlFrame( + const SpdyControlFrame& frame) { + z_stream* compressor = GetHeaderCompressor(); + if (!compressor) + return NULL; + return reinterpret_cast<SpdyControlFrame*>( + CompressFrameWithZStream(frame, compressor)); +} + +SpdyControlFrame* SpdyFramer::DecompressControlFrame( + const SpdyControlFrame& frame) { + z_stream* decompressor = GetHeaderDecompressor(); + if (!decompressor) + return NULL; + return reinterpret_cast<SpdyControlFrame*>( + DecompressFrameWithZStream(frame, decompressor)); +} + +SpdyDataFrame* SpdyFramer::CompressDataFrame(const SpdyDataFrame& frame) { + z_stream* compressor = GetStreamCompressor(frame.stream_id()); + if (!compressor) + return NULL; + return reinterpret_cast<SpdyDataFrame*>( + CompressFrameWithZStream(frame, compressor)); +} + +SpdyDataFrame* SpdyFramer::DecompressDataFrame(const SpdyDataFrame& frame) { + z_stream* decompressor = GetStreamDecompressor(frame.stream_id()); + if (!decompressor) + return NULL; + return reinterpret_cast<SpdyDataFrame*>( + DecompressFrameWithZStream(frame, decompressor)); +} -SpdyFrame* SpdyFramer::CompressFrame(const SpdyFrame* frame) { +SpdyFrame* SpdyFramer::CompressFrameWithZStream(const SpdyFrame& frame, + z_stream* compressor) { int payload_length; int header_length; const char* payload; + static StatsCounter compressed_frames("spdy.CompressedFrames"); static StatsCounter pre_compress_bytes("spdy.PreCompressSize"); static StatsCounter post_compress_bytes("spdy.PostCompressSize"); @@ -773,50 +880,50 @@ SpdyFrame* SpdyFramer::CompressFrame(const SpdyFrame* frame) { if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) return NULL; - if (!InitializeCompressor()) - return NULL; - - // TODO(mbelshe): Should we have a zlib header like what http servers do? - // Create an output frame. - int compressed_max_size = deflateBound(compressor_.get(), payload_length); + int compressed_max_size = deflateBound(compressor, payload_length); int new_frame_size = header_length + compressed_max_size; - SpdyFrame* new_frame = new SpdyFrame(new_frame_size); - memcpy(new_frame->data(), frame->data(), frame->length() + SpdyFrame::size()); + scoped_ptr<SpdyFrame> new_frame(new SpdyFrame(new_frame_size)); + memcpy(new_frame->data(), frame.data(), frame.length() + SpdyFrame::size()); - compressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); - compressor_->avail_in = payload_length; - compressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + + compressor->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); + compressor->avail_in = payload_length; + compressor->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + header_length; - compressor_->avail_out = compressed_max_size; + compressor->avail_out = compressed_max_size; - // Data packets have a 'compressed flag + // Data packets have a 'compressed' flag. if (!new_frame->is_control_frame()) { - SpdyDataFrame* data_frame = reinterpret_cast<SpdyDataFrame*>(new_frame); + SpdyDataFrame* data_frame = + reinterpret_cast<SpdyDataFrame*>(new_frame.get()); data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED); } - int rv = deflate(compressor_.get(), Z_SYNC_FLUSH); + int rv = deflate(compressor, Z_SYNC_FLUSH); if (rv != Z_OK) { // How can we know that it compressed everything? // This shouldn't happen, right? - delete new_frame; + LOG(WARNING) << "deflate failure: " << rv; return NULL; } - int compressed_size = compressed_max_size - compressor_->avail_out; + int compressed_size = compressed_max_size - compressor->avail_out; new_frame->set_length(header_length + compressed_size - SpdyFrame::size()); pre_compress_bytes.Add(payload_length); post_compress_bytes.Add(new_frame->length()); - return new_frame; + compressed_frames.Increment(); + + return new_frame.release(); } -SpdyFrame* SpdyFramer::DecompressFrame(const SpdyFrame* frame) { +SpdyFrame* SpdyFramer::DecompressFrameWithZStream(const SpdyFrame& frame, + z_stream* decompressor) { int payload_length; int header_length; const char* payload; + static StatsCounter decompressed_frames("spdy.DecompressedFrames"); static StatsCounter pre_decompress_bytes("spdy.PreDeCompressSize"); static StatsCounter post_decompress_bytes("spdy.PostDeCompressSize"); @@ -826,84 +933,128 @@ SpdyFrame* SpdyFramer::DecompressFrame(const SpdyFrame* frame) { if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) return NULL; - if (!frame->is_control_frame()) { - const SpdyDataFrame* data_frame = - reinterpret_cast<const SpdyDataFrame*>(frame); - if ((data_frame->flags() & DATA_FLAG_COMPRESSED) == 0) + if (!frame.is_control_frame()) { + const SpdyDataFrame& data_frame = + reinterpret_cast<const SpdyDataFrame&>(frame); + if ((data_frame.flags() & DATA_FLAG_COMPRESSED) == 0) return DuplicateFrame(frame); } - if (!InitializeDecompressor()) - return NULL; - - // TODO(mbelshe): Should we have a zlib header like what http servers do? - // Create an output frame. Assume it does not need to be longer than // the input data. int decompressed_max_size = kControlFrameBufferInitialSize; int new_frame_size = header_length + decompressed_max_size; - SpdyFrame* new_frame = new SpdyFrame(new_frame_size); - memcpy(new_frame->data(), frame->data(), frame->length() + SpdyFrame::size()); + scoped_ptr<SpdyFrame> new_frame(new SpdyFrame(new_frame_size)); + memcpy(new_frame->data(), frame.data(), frame.length() + SpdyFrame::size()); - decompressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); - decompressor_->avail_in = payload_length; - decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + + decompressor->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); + decompressor->avail_in = payload_length; + decompressor->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + header_length; - decompressor_->avail_out = decompressed_max_size; + decompressor->avail_out = decompressed_max_size; - int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + int rv = inflate(decompressor, Z_SYNC_FLUSH); if (rv == Z_NEED_DICT) { // Need to try again with the right dictionary. - if (decompressor_->adler == dictionary_id) { - rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)kDictionary, + if (decompressor->adler == dictionary_id) { + rv = inflateSetDictionary(decompressor, (const Bytef*)kDictionary, kDictionarySize); if (rv == Z_OK) - rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); + rv = inflate(decompressor, Z_SYNC_FLUSH); } } if (rv != Z_OK) { // How can we know that it decompressed everything? - delete new_frame; + LOG(WARNING) << "inflate failure: " << rv; return NULL; } // Unset the compressed flag for data frames. if (!new_frame->is_control_frame()) { - SpdyDataFrame* data_frame = reinterpret_cast<SpdyDataFrame*>(new_frame); + SpdyDataFrame* data_frame = + reinterpret_cast<SpdyDataFrame*>(new_frame.get()); data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED); } - int decompressed_size = decompressed_max_size - decompressor_->avail_out; + int decompressed_size = decompressed_max_size - decompressor->avail_out; new_frame->set_length(header_length + decompressed_size - SpdyFrame::size()); - pre_decompress_bytes.Add(frame->length()); + // If there is data left, then we're in trouble. This API assumes everything + // was consumed. + CHECK_EQ(decompressor->avail_in, 0u); + + pre_decompress_bytes.Add(frame.length()); post_decompress_bytes.Add(new_frame->length()); - return new_frame; + decompressed_frames.Increment(); + + return new_frame.release(); +} + +void SpdyFramer::CleanupCompressorForStream(SpdyStreamId id) { + CompressorMap::iterator it = stream_compressors_.find(id); + if (it != stream_compressors_.end()) { + z_stream* compressor = it->second; + deflateEnd(compressor); + delete compressor; + stream_compressors_.erase(it); + } +} + +void SpdyFramer::CleanupDecompressorForStream(SpdyStreamId id) { + CompressorMap::iterator it = stream_decompressors_.find(id); + if (it != stream_decompressors_.end()) { + z_stream* decompressor = it->second; + inflateEnd(decompressor); + delete decompressor; + stream_decompressors_.erase(it); + } +} + +void SpdyFramer::CleanupStreamCompressorsAndDecompressors() { + CompressorMap::iterator it; + + it = stream_compressors_.begin(); + while (it != stream_compressors_.end()) { + z_stream* compressor = it->second; + deflateEnd(compressor); + delete compressor; + ++it; + } + stream_compressors_.clear(); + + it = stream_decompressors_.begin(); + while (it != stream_decompressors_.end()) { + z_stream* decompressor = it->second; + inflateEnd(decompressor); + delete decompressor; + ++it; + } + stream_decompressors_.clear(); } -SpdyFrame* SpdyFramer::DuplicateFrame(const SpdyFrame* frame) { - int size = SpdyFrame::size() + frame->length(); +SpdyFrame* SpdyFramer::DuplicateFrame(const SpdyFrame& frame) { + int size = SpdyFrame::size() + frame.length(); SpdyFrame* new_frame = new SpdyFrame(size); - memcpy(new_frame->data(), frame->data(), size); + memcpy(new_frame->data(), frame.data(), size); return new_frame; } -bool SpdyFramer::IsCompressible(const SpdyFrame* frame) const { +bool SpdyFramer::IsCompressible(const SpdyFrame& frame) const { // The important frames to compress are those which contain large // amounts of compressible data - namely the headers in the SYN_STREAM // and SYN_REPLY. // TODO(mbelshe): Reconcile this with the spec when the spec is // explicit about which frames compress and which do not. - if (frame->is_control_frame()) { - const SpdyControlFrame* control_frame = - reinterpret_cast<const SpdyControlFrame*>(frame); - return control_frame->type() == SYN_STREAM || - control_frame->type() == SYN_REPLY; + if (frame.is_control_frame()) { + const SpdyControlFrame& control_frame = + reinterpret_cast<const SpdyControlFrame&>(frame); + return control_frame.type() == SYN_STREAM || + control_frame.type() == SYN_REPLY; } - const SpdyDataFrame* data_frame = - reinterpret_cast<const SpdyDataFrame*>(frame); - return (data_frame->flags() & DATA_FLAG_COMPRESSED) != 0; + const SpdyDataFrame& data_frame = + reinterpret_cast<const SpdyDataFrame&>(frame); + return (data_frame.flags() & DATA_FLAG_COMPRESSED) != 0; } void SpdyFramer::set_enable_compression(bool value) { diff --git a/net/spdy/spdy_framer.h b/net/spdy/spdy_framer.h index 2c95ae5..540da6d 100644 --- a/net/spdy/spdy_framer.h +++ b/net/spdy/spdy_framer.h @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -192,7 +192,7 @@ class SpdyFramer { // To create a compressed frame, enable DATA_FLAG_COMPRESSED. // To mark this frame as the last data frame, enable DATA_FLAG_FIN. SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id, const char* data, - uint32 len, SpdyDataFlags flags); + uint32 len, uint32 flags); static SpdyControlFrame* CreateNopFrame(); @@ -212,21 +212,21 @@ class SpdyFramer { // Compression state is maintained as part of the SpdyFramer. // Returned frame must be freed with "delete". // On failure, returns NULL. - SpdyFrame* CompressFrame(const SpdyFrame* frame); + SpdyFrame* CompressFrame(const SpdyFrame& frame); // Decompresses a SpdyFrame. // On success, returns a new SpdyFrame with the payload decompressed. // Compression state is maintained as part of the SpdyFramer. // Returned frame must be freed with "delete". // On failure, returns NULL. - SpdyFrame* DecompressFrame(const SpdyFrame* frame); + SpdyFrame* DecompressFrame(const SpdyFrame& frame); // Create a copy of a frame. // Returned frame must be freed with "delete". - SpdyFrame* DuplicateFrame(const SpdyFrame* frame); + SpdyFrame* DuplicateFrame(const SpdyFrame& frame); // Returns true if a frame could be compressed. - bool IsCompressible(const SpdyFrame* frame) const; + bool IsCompressible(const SpdyFrame& frame) const; // For debugging. static const char* StateToString(int state); @@ -237,7 +237,8 @@ class SpdyFramer { static const int kDictionarySize; protected: - FRIEND_TEST(SpdyFramerTest, HeaderBlockBarfsOnOutOfOrderHeaders); + FRIEND_TEST(SpdyFramerTest, DataCompression); + FRIEND_TEST(SpdyFramerTest, UnclosedStreamDataCompressors); friend class net::SpdyNetworkTransactionTest; friend class net::HttpNetworkTransactionTest; friend class net::HttpNetworkLayer; // This is temporary for the server. @@ -251,6 +252,8 @@ class SpdyFramer { static void set_enable_compression_default(bool value); private: + typedef std::map<SpdyStreamId, z_stream*> CompressorMap; + // Internal breakout from ProcessInput. Returns the number of bytes // consumed from the data. size_t ProcessCommonHeader(const char* data, size_t len); @@ -258,9 +261,24 @@ class SpdyFramer { size_t ProcessControlFramePayload(const char* data, size_t len); size_t ProcessDataFramePayload(const char* data, size_t len); - // Initialize the ZLib state. - bool InitializeCompressor(); - bool InitializeDecompressor(); + // Get (and lazily initialize) the ZLib state. + z_stream* GetHeaderCompressor(); + z_stream* GetHeaderDecompressor(); + z_stream* GetStreamCompressor(SpdyStreamId id); + z_stream* GetStreamDecompressor(SpdyStreamId id); + + // Compression helpers + SpdyControlFrame* CompressControlFrame(const SpdyControlFrame& frame); + SpdyDataFrame* CompressDataFrame(const SpdyDataFrame& frame); + SpdyControlFrame* DecompressControlFrame(const SpdyControlFrame& frame); + SpdyDataFrame* DecompressDataFrame(const SpdyDataFrame& frame); + SpdyFrame* CompressFrameWithZStream(const SpdyFrame& frame, + z_stream* compressor); + SpdyFrame* DecompressFrameWithZStream(const SpdyFrame& frame, + z_stream* decompressor); + void CleanupCompressorForStream(SpdyStreamId id); + void CleanupDecompressorForStream(SpdyStreamId id); + void CleanupStreamCompressorsAndDecompressors(); // Not used (yet) size_t BytesSafeToRead() const; @@ -273,9 +291,12 @@ class SpdyFramer { // Given a frame, breakdown the variable payload length, the static header // header length, and variable payload pointer. - bool GetFrameBoundaries(const SpdyFrame* frame, int* payload_length, + bool GetFrameBoundaries(const SpdyFrame& frame, int* payload_length, int* header_length, const char** payload) const; + int num_stream_compressors() const { return stream_compressors_.size(); } + int num_stream_decompressors() const { return stream_decompressors_.size(); } + SpdyState state_; SpdyError error_code_; size_t remaining_payload_; @@ -285,9 +306,15 @@ class SpdyFramer { size_t current_frame_len_; // Number of bytes read into the current_frame_. size_t current_frame_capacity_; - bool enable_compression_; - scoped_ptr<z_stream> compressor_; - scoped_ptr<z_stream> decompressor_; + bool enable_compression_; // Controls all compression + // SPDY header compressors. + scoped_ptr<z_stream> header_compressor_; + scoped_ptr<z_stream> header_decompressor_; + + // Per-stream data compressors. + CompressorMap stream_compressors_; + CompressorMap stream_decompressors_; + SpdyFramerVisitorInterface* visitor_; static bool compression_default_; diff --git a/net/spdy/spdy_framer_test.cc b/net/spdy/spdy_framer_test.cc index bf9b6cd..783c4ac 100644 --- a/net/spdy/spdy_framer_test.cc +++ b/net/spdy/spdy_framer_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -112,6 +112,9 @@ class TestSpdyVisitor : public SpdyFramerVisitorInterface { } // namespace spdy +using spdy::SpdyControlFlags; +using spdy::SpdyControlFrame; +using spdy::SpdyDataFrame; using spdy::SpdyFrame; using spdy::SpdyFrameBuilder; using spdy::SpdyFramer; @@ -119,11 +122,13 @@ using spdy::SpdyHeaderBlock; using spdy::SpdySynStreamControlFrame; using spdy::kControlFlagMask; using spdy::CONTROL_FLAG_NONE; +using spdy::DATA_FLAG_COMPRESSED; +using spdy::DATA_FLAG_FIN; using spdy::SYN_STREAM; using spdy::test::FramerSetEnableCompressionHelper; using spdy::test::TestSpdyVisitor; -namespace { +namespace spdy { class SpdyFramerTest : public PlatformTest { public: @@ -251,10 +256,10 @@ TEST_F(SpdyFramerTest, BasicCompression) { EXPECT_LE(frame2->length(), frame1->length()); // Decompress the first frame - scoped_ptr<SpdyFrame> frame3(framer.DecompressFrame(frame1.get())); + scoped_ptr<SpdyFrame> frame3(framer.DecompressFrame(*frame1.get())); // Decompress the second frame - scoped_ptr<SpdyFrame> frame4(framer.DecompressFrame(frame2.get())); + scoped_ptr<SpdyFrame> frame4(framer.DecompressFrame(*frame2.get())); // Expect frames 3 & 4 to be the same. EXPECT_EQ(0, @@ -278,7 +283,7 @@ TEST_F(SpdyFramerTest, DecompressUncompressedFrame) { &headers)); // Decompress the frame - scoped_ptr<SpdyFrame> frame2(framer.DecompressFrame(frame1.get())); + scoped_ptr<SpdyFrame> frame2(framer.DecompressFrame(*frame1.get())); EXPECT_EQ(NULL, frame2.get()); } @@ -413,5 +418,150 @@ TEST_F(SpdyFramerTest, FinOnSynReplyFrame) { EXPECT_EQ(1, visitor.zero_length_data_frame_count_); } +// Basic compression & decompression +TEST_F(SpdyFramerTest, DataCompression) { + SpdyFramer send_framer; + SpdyFramer recv_framer; + + FramerSetEnableCompressionHelper(&send_framer, true); + FramerSetEnableCompressionHelper(&recv_framer, true); + + // Mix up some SYNs and DATA frames since they use different compressors. + const char kHeader1[] = "header1"; + const char kHeader2[] = "header2"; + const char kHeader3[] = "header3"; + const char kValue1[] = "value1"; + const char kValue2[] = "value2"; + const char kValue3[] = "value3"; + + // SYN_STREAM #1 + SpdyHeaderBlock block; + block[kHeader1] = kValue1; + block[kHeader2] = kValue2; + SpdyControlFlags flags(CONTROL_FLAG_NONE); + scoped_ptr<spdy::SpdyFrame> syn_frame_1( + send_framer.CreateSynStream(1, 0, 0, flags, true, &block)); + EXPECT_TRUE(syn_frame_1.get() != NULL); + + // DATA #1 + const char bytes[] = "this is a test test test test test!"; + scoped_ptr<SpdyFrame> data_frame_1( + send_framer.CreateDataFrame(1, bytes, arraysize(bytes), + DATA_FLAG_COMPRESSED)); + EXPECT_TRUE(data_frame_1.get() != NULL); + + // SYN_STREAM #2 + block[kHeader3] = kValue3; + scoped_ptr<SpdyFrame> syn_frame_2( + send_framer.CreateSynStream(3, 0, 0, flags, true, &block)); + EXPECT_TRUE(syn_frame_2.get() != NULL); + + // DATA #2 + scoped_ptr<SpdyFrame> data_frame_2( + send_framer.CreateDataFrame(3, bytes, arraysize(bytes), + DATA_FLAG_COMPRESSED)); + EXPECT_TRUE(data_frame_2.get() != NULL); + + // Now start decompressing + scoped_ptr<SpdyFrame> decompressed; + SpdyControlFrame* control_frame; + SpdyDataFrame* data_frame; + SpdyHeaderBlock decompressed_headers; + + decompressed.reset(recv_framer.DuplicateFrame(*syn_frame_1.get())); + EXPECT_TRUE(decompressed.get() != NULL); + EXPECT_TRUE(decompressed->is_control_frame()); + control_frame = reinterpret_cast<SpdyControlFrame*>(decompressed.get()); + EXPECT_EQ(SYN_STREAM, control_frame->type()); + EXPECT_TRUE(recv_framer.ParseHeaderBlock( + control_frame, &decompressed_headers)); + EXPECT_EQ(2u, decompressed_headers.size()); + EXPECT_EQ(SYN_STREAM, control_frame->type()); + EXPECT_EQ(kValue1, decompressed_headers[kHeader1]); + EXPECT_EQ(kValue2, decompressed_headers[kHeader2]); + + decompressed.reset(recv_framer.DecompressFrame(*data_frame_1.get())); + EXPECT_TRUE(decompressed.get() != NULL); + EXPECT_FALSE(decompressed->is_control_frame()); + data_frame = reinterpret_cast<SpdyDataFrame*>(decompressed.get()); + EXPECT_EQ(arraysize(bytes), data_frame->length()); + EXPECT_EQ(0, memcmp(data_frame->payload(), bytes, data_frame->length())); + + decompressed.reset(recv_framer.DuplicateFrame(*syn_frame_2.get())); + EXPECT_TRUE(decompressed.get() != NULL); + EXPECT_TRUE(decompressed->is_control_frame()); + control_frame = reinterpret_cast<SpdyControlFrame*>(decompressed.get()); + EXPECT_EQ(control_frame->type(), SYN_STREAM); + decompressed_headers.clear(); + EXPECT_TRUE(recv_framer.ParseHeaderBlock( + control_frame, &decompressed_headers)); + EXPECT_EQ(3u, decompressed_headers.size()); + EXPECT_EQ(SYN_STREAM, control_frame->type()); + EXPECT_EQ(kValue1, decompressed_headers[kHeader1]); + EXPECT_EQ(kValue2, decompressed_headers[kHeader2]); + EXPECT_EQ(kValue3, decompressed_headers[kHeader3]); + + decompressed.reset(recv_framer.DecompressFrame(*data_frame_2.get())); + EXPECT_TRUE(decompressed.get() != NULL); + EXPECT_FALSE(decompressed->is_control_frame()); + data_frame = reinterpret_cast<SpdyDataFrame*>(decompressed.get()); + EXPECT_EQ(arraysize(bytes), data_frame->length()); + EXPECT_EQ(0, memcmp(data_frame->payload(), bytes, data_frame->length())); + + // We didn't close these streams, so the compressors should be active. + EXPECT_EQ(2, send_framer.num_stream_compressors()); + EXPECT_EQ(0, send_framer.num_stream_decompressors()); + EXPECT_EQ(0, recv_framer.num_stream_compressors()); + EXPECT_EQ(2, recv_framer.num_stream_decompressors()); +} + +// Verify we don't leak when we leave streams unclosed +TEST_F(SpdyFramerTest, UnclosedStreamDataCompressors) { + SpdyFramer send_framer; + + FramerSetEnableCompressionHelper(&send_framer, true); + + const char kHeader1[] = "header1"; + const char kHeader2[] = "header2"; + const char kValue1[] = "value1"; + const char kValue2[] = "value2"; + + SpdyHeaderBlock block; + block[kHeader1] = kValue1; + block[kHeader2] = kValue2; + SpdyControlFlags flags(CONTROL_FLAG_NONE); + scoped_ptr<spdy::SpdyFrame> syn_frame( + send_framer.CreateSynStream(1, 0, 0, flags, true, &block)); + EXPECT_TRUE(syn_frame.get() != NULL); + + const char bytes[] = "this is a test test test test test!"; + scoped_ptr<SpdyFrame> send_frame( + send_framer.CreateDataFrame(1, bytes, arraysize(bytes), + DATA_FLAG_FIN | DATA_FLAG_COMPRESSED)); + EXPECT_TRUE(send_frame.get() != NULL); + + // Run the inputs through the framer. + TestSpdyVisitor visitor; + const unsigned char* data; + data = reinterpret_cast<const unsigned char*>(syn_frame->data()); + visitor.SimulateInFramer(data, syn_frame->length() + SpdyFrame::size()); + data = reinterpret_cast<const unsigned char*>(send_frame->data()); + visitor.SimulateInFramer(data, send_frame->length() + SpdyFrame::size()); + + EXPECT_EQ(0, visitor.error_count_); + EXPECT_EQ(1, visitor.syn_frame_count_); + EXPECT_EQ(0, visitor.syn_reply_frame_count_); + EXPECT_EQ(arraysize(bytes), static_cast<unsigned>(visitor.data_bytes_)); + EXPECT_EQ(0, visitor.fin_frame_count_); + EXPECT_EQ(0, visitor.fin_flag_count_); + EXPECT_EQ(1, visitor.zero_length_data_frame_count_); + + // We closed the streams, so all compressors should be down. + EXPECT_EQ(0, visitor.framer_.num_stream_compressors()); + EXPECT_EQ(0, visitor.framer_.num_stream_decompressors()); + EXPECT_EQ(0, send_framer.num_stream_compressors()); + EXPECT_EQ(0, send_framer.num_stream_decompressors()); +} + } // namespace diff --git a/net/spdy/spdy_protocol.h b/net/spdy/spdy_protocol.h index 5167b04..a7a5ab7 100644 --- a/net/spdy/spdy_protocol.h +++ b/net/spdy/spdy_protocol.h @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -130,7 +130,7 @@ enum SpdyControlType { enum SpdyDataFlags { DATA_FLAG_NONE = 0, DATA_FLAG_FIN = 1, - DATA_FLAG_COMPRESSED = 2 // TODO(mbelshe): remove me. + DATA_FLAG_COMPRESSED = 2 }; // Flags on control packets @@ -157,7 +157,9 @@ enum SpdySettingsIds { SETTINGS_DOWNLOAD_BANDWIDTH = 0x2, SETTINGS_ROUND_TRIP_TIME = 0x3, SETTINGS_MAX_CONCURRENT_STREAMS = 0x4, - SETTINGS_CURRENT_CWND = 0x5 + SETTINGS_CURRENT_CWND = 0x5, + // Downstream byte retransmission rate in percentage. + SETTINGS_DOWNLOAD_RETRANS_RATE = 0x6 }; // Status codes, as used in control frames (primarily RST_STREAM). @@ -352,6 +354,10 @@ class SpdyDataFrame : public SpdyFrame { // Note: this is not the size of the SpdyDataFrame class. static size_t size() { return SpdyFrame::size(); } + const char* payload() const { + return reinterpret_cast<const char*>(frame_) + size(); + } + private: DISALLOW_COPY_AND_ASSIGN(SpdyDataFrame); }; diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 85c825b..686e960 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -755,9 +755,9 @@ void SpdySession::WriteSocket() { // which is now. At this time, we don't compress our data frames. spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); size_t size; - if (spdy_framer_.IsCompressible(&uncompressed_frame)) { + if (spdy_framer_.IsCompressible(uncompressed_frame)) { scoped_ptr<spdy::SpdyFrame> compressed_frame( - spdy_framer_.CompressFrame(&uncompressed_frame)); + spdy_framer_.CompressFrame(uncompressed_frame)); if (!compressed_frame.get()) { LOG(ERROR) << "SPDY Compression failure"; CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR); |