diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-25 00:43:06 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-25 00:43:06 +0000 |
commit | 246f429937fda025fe76c9332015d7647fb33307 (patch) | |
tree | 5005419cdd6be63ce5eed0ee70144d0f727de8a7 /net/flip/flip_session.cc | |
parent | 025bc9679326c36e9c6cf9a7c466ab64a63eb0be (diff) | |
download | chromium_src-246f429937fda025fe76c9332015d7647fb33307.zip chromium_src-246f429937fda025fe76c9332015d7647fb33307.tar.gz chromium_src-246f429937fda025fe76c9332015d7647fb33307.tar.bz2 |
Fix the FlipSession to support partial writes.
Modified the FlipIOBuffer to use a DrainableIOBuffer instead
of a IOBufferWithSize. When a write completes, we drain the
bytes, and only fetch the next FlipFrame from the queue after
we have fully drained the buffer.
I will update the tests to be much more thorough in my next
CL.
BUG=none
TEST=FlipNetworkTransactionTest.PartialWrites
Review URL: http://codereview.chromium.org/436019
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@33018 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/flip/flip_session.cc')
-rw-r--r-- | net/flip/flip_session.cc | 118 |
1 files changed, 67 insertions, 51 deletions
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index de87732..8b62c50 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -267,9 +267,9 @@ scoped_refptr<FlipStream> FlipSession::GetOrCreateStream( flip_framer_.CreateSynStream(stream_id, priority, flags, false, &headers)); int length = flip::FlipFrame::size() + syn_frame->length(); - IOBufferWithSize* buffer = new IOBufferWithSize(length); + IOBuffer* buffer = new IOBuffer(length); memcpy(buffer->data(), syn_frame->data(), length); - queue_.push(FlipIOBuffer(buffer, priority, stream)); + queue_.push(FlipIOBuffer(buffer, length, priority, stream)); static StatsCounter flip_requests("flip.requests"); flip_requests.Increment(); @@ -324,7 +324,7 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, int length = flip::FlipFrame::size() + frame->length(); IOBufferWithSize* buffer = new IOBufferWithSize(length); memcpy(buffer->data(), frame->data(), length); - queue_.push(FlipIOBuffer(buffer, stream->priority(), stream)); + queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream)); // Whenever we queue onto the socket we need to ensure that we will write to // it later. @@ -449,40 +449,52 @@ void FlipSession::OnReadComplete(int bytes_read) { void FlipSession::OnWriteComplete(int result) { DCHECK(write_pending_); + DCHECK(in_flight_write_.size()); + DCHECK(result != 0); // This shouldn't happen for write. + write_pending_ = false; LOG(INFO) << "Flip write complete (result=" << result << ")"; if (result >= 0) { - // TODO(mbelshe) Verify that we wrote ALL the bytes of the frame. - // The code current is broken in the case of a partial write. - DCHECK_EQ(static_cast<size_t>(result), in_flight_write_.size()); - - // We only notify the stream when we've fully written the pending flip - // frame. - scoped_refptr<FlipStream> stream = in_flight_write_.stream(); - DCHECK(stream.get()); - - // Report the number of bytes written to the caller, but exclude the - // frame size overhead. - if (result > 0) { - // TODO(willchan): This is an unsafe DCHECK. I'm hitting this. We should - // handle small writes appropriately. - DCHECK(result > static_cast<int>(flip::FlipFrame::size())); - result -= static_cast<int>(flip::FlipFrame::size()); + // It should not be possible to have written more bytes than our + // in_flight_write_. + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); + + in_flight_write_.buffer()->DidConsume(result); + + // We only notify the stream when we've fully written the pending frame. + if (!in_flight_write_.buffer()->BytesRemaining()) { + scoped_refptr<FlipStream> stream = in_flight_write_.stream(); + DCHECK(stream.get()); + + // Report the number of bytes written to the caller, but exclude the + // frame size overhead. NOTE: if this frame was compressed the reported + // bytes written is the compressed size, not the original size. + if (result > 0) { + result = in_flight_write_.buffer()->size(); + DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size())); + result -= static_cast<int>(flip::FlipFrame::size()); + } + stream->OnWriteComplete(result); + + // Cleanup the write which just completed. + in_flight_write_.release(); } - stream->OnWriteComplete(result); - - // Cleanup the write which just completed. - in_flight_write_.release(); // Write more data. We're already in a continuation, so we can // go ahead and write it immediately (without going back to the // message loop). WriteSocketLater(); } else { - // TODO(mbelshe): Deal with result < 0 error case. - NOTIMPLEMENTED(); + // The stream is now errored. Close it down. + CloseAllStreams(static_cast<net::Error>(result)); + // TODO(mbelshe): we need to cleanup the session here as well. + // Right now, but a read and a write can fail, and each will + // remove the session from the pool, which trips asserts. We + // need to cleanup the close logic so that we only call it + // once. + //session_->flip_session_pool()->Remove(this); } } @@ -535,36 +547,40 @@ void FlipSession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). - while (queue_.size()) { - // Grab the next FlipFrame to send. - FlipIOBuffer next_buffer = queue_.top(); - queue_.pop(); - - // We've deferred compression until just before we write it to the socket, - // which is now. At this time, we don't compress our data frames. - flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); - size_t size; - if (uncompressed_frame.is_control_frame()) { - scoped_ptr<flip::FlipFrame> compressed_frame( - flip_framer_.CompressFrame(&uncompressed_frame)); - size = compressed_frame->length() + flip::FlipFrame::size(); - - DCHECK(size > 0); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), compressed_frame->data(), size); - - // Attempt to send the frame. - in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream()); + while (in_flight_write_.buffer() || queue_.size()) { + if (!in_flight_write_.buffer()) { + // Grab the next FlipFrame to send. + FlipIOBuffer next_buffer = queue_.top(); + queue_.pop(); + + // We've deferred compression until just before we write it to the socket, + // which is now. At this time, we don't compress our data frames. + flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); + size_t size; + if (uncompressed_frame.is_control_frame()) { + scoped_ptr<flip::FlipFrame> compressed_frame( + flip_framer_.CompressFrame(&uncompressed_frame)); + size = compressed_frame->length() + flip::FlipFrame::size(); + + DCHECK(size > 0); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), compressed_frame->data(), size); + + // Attempt to send the frame. + in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream()); + } else { + size = uncompressed_frame.length() + flip::FlipFrame::size(); + in_flight_write_ = next_buffer; + } } else { - size = uncompressed_frame.length() + flip::FlipFrame::size(); - in_flight_write_ = next_buffer; + DCHECK(in_flight_write_.buffer()->BytesRemaining()); } write_pending_ = true; int rv = connection_.socket()->Write(in_flight_write_.buffer(), - size, &write_callback_); + in_flight_write_.buffer()->BytesRemaining(), &write_callback_); if (rv == net::ERR_IO_PENDING) break; @@ -599,7 +615,7 @@ void FlipSession::CloseAllStreams(net::Error code) { for (--index; index >= 0; index--) { LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id() << "): " << list[index]->path(); - list[index]->OnClose(ERR_ABORTED); + list[index]->OnClose(code); } // Clear out anything pending. |