From 246f429937fda025fe76c9332015d7647fb33307 Mon Sep 17 00:00:00 2001 From: "mbelshe@google.com" Date: Wed, 25 Nov 2009 00:43:06 +0000 Subject: Fix the FlipSession to support partial writes. Modified the FlipIOBuffer to use a DrainableIOBuffer instead of a IOBufferWithSize. When a write completes, we drain the bytes, and only fetch the next FlipFrame from the queue after we have fully drained the buffer. I will update the tests to be much more thorough in my next CL. BUG=none TEST=FlipNetworkTransactionTest.PartialWrites Review URL: http://codereview.chromium.org/436019 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@33018 0039d316-1c4b-4281-b951-d872f2087c98 --- net/flip/flip_session.cc | 118 +++++++++++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 51 deletions(-) (limited to 'net/flip/flip_session.cc') diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index de87732..8b62c50 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -267,9 +267,9 @@ scoped_refptr FlipSession::GetOrCreateStream( flip_framer_.CreateSynStream(stream_id, priority, flags, false, &headers)); int length = flip::FlipFrame::size() + syn_frame->length(); - IOBufferWithSize* buffer = new IOBufferWithSize(length); + IOBuffer* buffer = new IOBuffer(length); memcpy(buffer->data(), syn_frame->data(), length); - queue_.push(FlipIOBuffer(buffer, priority, stream)); + queue_.push(FlipIOBuffer(buffer, length, priority, stream)); static StatsCounter flip_requests("flip.requests"); flip_requests.Increment(); @@ -324,7 +324,7 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, int length = flip::FlipFrame::size() + frame->length(); IOBufferWithSize* buffer = new IOBufferWithSize(length); memcpy(buffer->data(), frame->data(), length); - queue_.push(FlipIOBuffer(buffer, stream->priority(), stream)); + queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream)); // Whenever we queue onto the socket we need to ensure that we will write to // it later. @@ -449,40 +449,52 @@ void FlipSession::OnReadComplete(int bytes_read) { void FlipSession::OnWriteComplete(int result) { DCHECK(write_pending_); + DCHECK(in_flight_write_.size()); + DCHECK(result != 0); // This shouldn't happen for write. + write_pending_ = false; LOG(INFO) << "Flip write complete (result=" << result << ")"; if (result >= 0) { - // TODO(mbelshe) Verify that we wrote ALL the bytes of the frame. - // The code current is broken in the case of a partial write. - DCHECK_EQ(static_cast(result), in_flight_write_.size()); - - // We only notify the stream when we've fully written the pending flip - // frame. - scoped_refptr stream = in_flight_write_.stream(); - DCHECK(stream.get()); - - // Report the number of bytes written to the caller, but exclude the - // frame size overhead. - if (result > 0) { - // TODO(willchan): This is an unsafe DCHECK. I'm hitting this. We should - // handle small writes appropriately. - DCHECK(result > static_cast(flip::FlipFrame::size())); - result -= static_cast(flip::FlipFrame::size()); + // It should not be possible to have written more bytes than our + // in_flight_write_. + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); + + in_flight_write_.buffer()->DidConsume(result); + + // We only notify the stream when we've fully written the pending frame. + if (!in_flight_write_.buffer()->BytesRemaining()) { + scoped_refptr stream = in_flight_write_.stream(); + DCHECK(stream.get()); + + // Report the number of bytes written to the caller, but exclude the + // frame size overhead. NOTE: if this frame was compressed the reported + // bytes written is the compressed size, not the original size. + if (result > 0) { + result = in_flight_write_.buffer()->size(); + DCHECK_GT(result, static_cast(flip::FlipFrame::size())); + result -= static_cast(flip::FlipFrame::size()); + } + stream->OnWriteComplete(result); + + // Cleanup the write which just completed. + in_flight_write_.release(); } - stream->OnWriteComplete(result); - - // Cleanup the write which just completed. - in_flight_write_.release(); // Write more data. We're already in a continuation, so we can // go ahead and write it immediately (without going back to the // message loop). WriteSocketLater(); } else { - // TODO(mbelshe): Deal with result < 0 error case. - NOTIMPLEMENTED(); + // The stream is now errored. Close it down. + CloseAllStreams(static_cast(result)); + // TODO(mbelshe): we need to cleanup the session here as well. + // Right now, but a read and a write can fail, and each will + // remove the session from the pool, which trips asserts. We + // need to cleanup the close logic so that we only call it + // once. + //session_->flip_session_pool()->Remove(this); } } @@ -535,36 +547,40 @@ void FlipSession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). - while (queue_.size()) { - // Grab the next FlipFrame to send. - FlipIOBuffer next_buffer = queue_.top(); - queue_.pop(); - - // We've deferred compression until just before we write it to the socket, - // which is now. At this time, we don't compress our data frames. - flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); - size_t size; - if (uncompressed_frame.is_control_frame()) { - scoped_ptr compressed_frame( - flip_framer_.CompressFrame(&uncompressed_frame)); - size = compressed_frame->length() + flip::FlipFrame::size(); - - DCHECK(size > 0); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), compressed_frame->data(), size); - - // Attempt to send the frame. - in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream()); + while (in_flight_write_.buffer() || queue_.size()) { + if (!in_flight_write_.buffer()) { + // Grab the next FlipFrame to send. + FlipIOBuffer next_buffer = queue_.top(); + queue_.pop(); + + // We've deferred compression until just before we write it to the socket, + // which is now. At this time, we don't compress our data frames. + flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); + size_t size; + if (uncompressed_frame.is_control_frame()) { + scoped_ptr compressed_frame( + flip_framer_.CompressFrame(&uncompressed_frame)); + size = compressed_frame->length() + flip::FlipFrame::size(); + + DCHECK(size > 0); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), compressed_frame->data(), size); + + // Attempt to send the frame. + in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream()); + } else { + size = uncompressed_frame.length() + flip::FlipFrame::size(); + in_flight_write_ = next_buffer; + } } else { - size = uncompressed_frame.length() + flip::FlipFrame::size(); - in_flight_write_ = next_buffer; + DCHECK(in_flight_write_.buffer()->BytesRemaining()); } write_pending_ = true; int rv = connection_.socket()->Write(in_flight_write_.buffer(), - size, &write_callback_); + in_flight_write_.buffer()->BytesRemaining(), &write_callback_); if (rv == net::ERR_IO_PENDING) break; @@ -599,7 +615,7 @@ void FlipSession::CloseAllStreams(net::Error code) { for (--index; index >= 0; index--) { LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id() << "): " << list[index]->path(); - list[index]->OnClose(ERR_ABORTED); + list[index]->OnClose(code); } // Clear out anything pending. -- cgit v1.1