summaryrefslogtreecommitdiffstats
path: root/net/flip/flip_session.cc
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-25 00:43:06 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-25 00:43:06 +0000
commit246f429937fda025fe76c9332015d7647fb33307 (patch)
tree5005419cdd6be63ce5eed0ee70144d0f727de8a7 /net/flip/flip_session.cc
parent025bc9679326c36e9c6cf9a7c466ab64a63eb0be (diff)
downloadchromium_src-246f429937fda025fe76c9332015d7647fb33307.zip
chromium_src-246f429937fda025fe76c9332015d7647fb33307.tar.gz
chromium_src-246f429937fda025fe76c9332015d7647fb33307.tar.bz2
Fix the FlipSession to support partial writes.
Modified the FlipIOBuffer to use a DrainableIOBuffer instead of a IOBufferWithSize. When a write completes, we drain the bytes, and only fetch the next FlipFrame from the queue after we have fully drained the buffer. I will update the tests to be much more thorough in my next CL. BUG=none TEST=FlipNetworkTransactionTest.PartialWrites Review URL: http://codereview.chromium.org/436019 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@33018 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/flip/flip_session.cc')
-rw-r--r--net/flip/flip_session.cc118
1 files changed, 67 insertions, 51 deletions
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
index de87732..8b62c50 100644
--- a/net/flip/flip_session.cc
+++ b/net/flip/flip_session.cc
@@ -267,9 +267,9 @@ scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
flip_framer_.CreateSynStream(stream_id, priority, flags, false,
&headers));
int length = flip::FlipFrame::size() + syn_frame->length();
- IOBufferWithSize* buffer = new IOBufferWithSize(length);
+ IOBuffer* buffer = new IOBuffer(length);
memcpy(buffer->data(), syn_frame->data(), length);
- queue_.push(FlipIOBuffer(buffer, priority, stream));
+ queue_.push(FlipIOBuffer(buffer, length, priority, stream));
static StatsCounter flip_requests("flip.requests");
flip_requests.Increment();
@@ -324,7 +324,7 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
int length = flip::FlipFrame::size() + frame->length();
IOBufferWithSize* buffer = new IOBufferWithSize(length);
memcpy(buffer->data(), frame->data(), length);
- queue_.push(FlipIOBuffer(buffer, stream->priority(), stream));
+ queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream));
// Whenever we queue onto the socket we need to ensure that we will write to
// it later.
@@ -449,40 +449,52 @@ void FlipSession::OnReadComplete(int bytes_read) {
void FlipSession::OnWriteComplete(int result) {
DCHECK(write_pending_);
+ DCHECK(in_flight_write_.size());
+ DCHECK(result != 0); // This shouldn't happen for write.
+
write_pending_ = false;
LOG(INFO) << "Flip write complete (result=" << result << ")";
if (result >= 0) {
- // TODO(mbelshe) Verify that we wrote ALL the bytes of the frame.
- // The code current is broken in the case of a partial write.
- DCHECK_EQ(static_cast<size_t>(result), in_flight_write_.size());
-
- // We only notify the stream when we've fully written the pending flip
- // frame.
- scoped_refptr<FlipStream> stream = in_flight_write_.stream();
- DCHECK(stream.get());
-
- // Report the number of bytes written to the caller, but exclude the
- // frame size overhead.
- if (result > 0) {
- // TODO(willchan): This is an unsafe DCHECK. I'm hitting this. We should
- // handle small writes appropriately.
- DCHECK(result > static_cast<int>(flip::FlipFrame::size()));
- result -= static_cast<int>(flip::FlipFrame::size());
+ // It should not be possible to have written more bytes than our
+ // in_flight_write_.
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
+
+ in_flight_write_.buffer()->DidConsume(result);
+
+ // We only notify the stream when we've fully written the pending frame.
+ if (!in_flight_write_.buffer()->BytesRemaining()) {
+ scoped_refptr<FlipStream> stream = in_flight_write_.stream();
+ DCHECK(stream.get());
+
+ // Report the number of bytes written to the caller, but exclude the
+ // frame size overhead. NOTE: if this frame was compressed the reported
+ // bytes written is the compressed size, not the original size.
+ if (result > 0) {
+ result = in_flight_write_.buffer()->size();
+ DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size()));
+ result -= static_cast<int>(flip::FlipFrame::size());
+ }
+ stream->OnWriteComplete(result);
+
+ // Cleanup the write which just completed.
+ in_flight_write_.release();
}
- stream->OnWriteComplete(result);
-
- // Cleanup the write which just completed.
- in_flight_write_.release();
// Write more data. We're already in a continuation, so we can
// go ahead and write it immediately (without going back to the
// message loop).
WriteSocketLater();
} else {
- // TODO(mbelshe): Deal with result < 0 error case.
- NOTIMPLEMENTED();
+ // The stream is now errored. Close it down.
+ CloseAllStreams(static_cast<net::Error>(result));
+ // TODO(mbelshe): we need to cleanup the session here as well.
+ // Right now, but a read and a write can fail, and each will
+ // remove the session from the pool, which trips asserts. We
+ // need to cleanup the close logic so that we only call it
+ // once.
+ //session_->flip_session_pool()->Remove(this);
}
}
@@ -535,36 +547,40 @@ void FlipSession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
- while (queue_.size()) {
- // Grab the next FlipFrame to send.
- FlipIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (uncompressed_frame.is_control_frame()) {
- scoped_ptr<flip::FlipFrame> compressed_frame(
- flip_framer_.CompressFrame(&uncompressed_frame));
- size = compressed_frame->length() + flip::FlipFrame::size();
-
- DCHECK(size > 0);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream());
+ while (in_flight_write_.buffer() || queue_.size()) {
+ if (!in_flight_write_.buffer()) {
+ // Grab the next FlipFrame to send.
+ FlipIOBuffer next_buffer = queue_.top();
+ queue_.pop();
+
+ // We've deferred compression until just before we write it to the socket,
+ // which is now. At this time, we don't compress our data frames.
+ flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
+ size_t size;
+ if (uncompressed_frame.is_control_frame()) {
+ scoped_ptr<flip::FlipFrame> compressed_frame(
+ flip_framer_.CompressFrame(&uncompressed_frame));
+ size = compressed_frame->length() + flip::FlipFrame::size();
+
+ DCHECK(size > 0);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), compressed_frame->data(), size);
+
+ // Attempt to send the frame.
+ in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream());
+ } else {
+ size = uncompressed_frame.length() + flip::FlipFrame::size();
+ in_flight_write_ = next_buffer;
+ }
} else {
- size = uncompressed_frame.length() + flip::FlipFrame::size();
- in_flight_write_ = next_buffer;
+ DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
write_pending_ = true;
int rv = connection_.socket()->Write(in_flight_write_.buffer(),
- size, &write_callback_);
+ in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
if (rv == net::ERR_IO_PENDING)
break;
@@ -599,7 +615,7 @@ void FlipSession::CloseAllStreams(net::Error code) {
for (--index; index >= 0; index--) {
LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
<< "): " << list[index]->path();
- list[index]->OnClose(ERR_ABORTED);
+ list[index]->OnClose(code);
}
// Clear out anything pending.