diff options
Diffstat (limited to 'net/spdy/spdy_stream.cc')
-rw-r--r-- | net/spdy/spdy_stream.cc | 456 |
1 files changed, 456 insertions, 0 deletions
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc new file mode 100644 index 0000000..78d263d --- /dev/null +++ b/net/spdy/spdy_stream.cc @@ -0,0 +1,456 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_stream.h" + +#include "base/logging.h" +#include "net/http/http_request_info.h" +#include "net/http/http_response_info.h" +#include "net/spdy/spdy_session.h" + +namespace net { + +FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id, + bool pushed, LoadLog* log) + : stream_id_(stream_id), + priority_(0), + pushed_(pushed), + download_finished_(false), + metrics_(Singleton<BandwidthMetrics>::get()), + session_(session), + response_(NULL), + request_body_stream_(NULL), + response_complete_(false), + io_state_(STATE_NONE), + response_status_(OK), + user_callback_(NULL), + user_buffer_(NULL), + user_buffer_len_(0), + cancelled_(false), + load_log_(log), + send_bytes_(0), + recv_bytes_(0), + histograms_recorded_(false) {} + +FlipStream::~FlipStream() { + DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_; + + // TODO(willchan): We're still calling CancelStream() too many times, because + // inactive pending/pushed streams will still have stream_id_ set. + if (stream_id_) { + session_->CancelStream(stream_id_); + } else if (!response_complete_) { + NOTREACHED(); + } +} + +uint64 FlipStream::GetUploadProgress() const { + if (!request_body_stream_.get()) + return 0; + + return request_body_stream_->position(); +} + +const HttpResponseInfo* FlipStream::GetResponseInfo() const { + return response_; +} + +int FlipStream::ReadResponseHeaders(CompletionCallback* callback) { + // Note: The FlipStream may have already received the response headers, so + // this call may complete synchronously. + CHECK(callback); + CHECK(io_state_ == STATE_NONE); + CHECK(!cancelled_); + + // The SYN_REPLY has already been received. + if (response_->headers) + return OK; + + io_state_ = STATE_READ_HEADERS; + CHECK(!user_callback_); + user_callback_ = callback; + return ERR_IO_PENDING; +} + +int FlipStream::ReadResponseBody( + IOBuffer* buf, int buf_len, CompletionCallback* callback) { + DCHECK_EQ(io_state_, STATE_NONE); + CHECK(buf); + CHECK(buf_len); + CHECK(callback); + CHECK(!cancelled_); + + // If we have data buffered, complete the IO immediately. + if (response_body_.size()) { + int bytes_read = 0; + while (response_body_.size() && buf_len > 0) { + scoped_refptr<IOBufferWithSize> data = response_body_.front(); + const int bytes_to_copy = std::min(buf_len, data->size()); + memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); + buf_len -= bytes_to_copy; + if (bytes_to_copy == data->size()) { + response_body_.pop_front(); + } else { + const int bytes_remaining = data->size() - bytes_to_copy; + IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); + memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), + bytes_remaining); + response_body_.pop_front(); + response_body_.push_front(new_buffer); + } + bytes_read += bytes_to_copy; + } + if (bytes_read > 0) + recv_bytes_ += bytes_read; + return bytes_read; + } else if (response_complete_) { + return response_status_; + } + + CHECK(!user_callback_); + CHECK(!user_buffer_); + CHECK(user_buffer_len_ == 0); + + user_callback_ = callback; + user_buffer_ = buf; + user_buffer_len_ = buf_len; + return ERR_IO_PENDING; +} + +int FlipStream::SendRequest(UploadDataStream* upload_data, + HttpResponseInfo* response, + CompletionCallback* callback) { + CHECK(callback); + CHECK(!cancelled_); + CHECK(response); + + if (response_) { + *response = *response_; + delete response_; + } + response_ = response; + + if (upload_data) { + if (upload_data->size()) + request_body_stream_.reset(upload_data); + else + delete upload_data; + } + + send_time_ = base::TimeTicks::Now(); + + DCHECK_EQ(io_state_, STATE_NONE); + if (!pushed_) + io_state_ = STATE_SEND_HEADERS; + else { + if (response_->headers) { + io_state_ = STATE_READ_BODY; + } else { + io_state_ = STATE_READ_HEADERS; + } + } + int result = DoLoop(OK); + if (result == ERR_IO_PENDING) { + CHECK(!user_callback_); + user_callback_ = callback; + } + return result; +} + +void FlipStream::Cancel() { + cancelled_ = true; + user_callback_ = NULL; + + session_->CancelStream(stream_id_); +} + +void FlipStream::OnResponseReceived(const HttpResponseInfo& response) { + metrics_.StartStream(); + + CHECK(!response_->headers); + + *response_ = response; // TODO(mbelshe): avoid copy. + DCHECK(response_->headers); + + recv_first_byte_time_ = base::TimeTicks::Now(); + + if (io_state_ == STATE_NONE) { + CHECK(pushed_); + io_state_ = STATE_READ_HEADERS; + } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) { + // This FlipStream could be in this state in both true and false pushed_ + // conditions. + // The false pushed_ condition (client request) will always go through + // this state. + // The true pushed_condition (server push) can be in this state when the + // client requests an X-Associated-Content piece of content prior + // to when the server push happens. + } else { + NOTREACHED(); + } + + int rv = DoLoop(OK); + + if (user_callback_) + DoCallback(rv); +} + +bool FlipStream::OnDataReceived(const char* data, int length) { + DCHECK_GE(length, 0); + LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " + << stream_id_; + + // If we don't have a response, then the SYN_REPLY did not come through. + // We cannot pass data up to the caller unless the reply headers have been + // received. + if (!response_->headers) { + OnClose(ERR_SYN_REPLY_NOT_RECEIVED); + return false; + } + + if (length > 0) + recv_bytes_ += length; + recv_last_byte_time_ = base::TimeTicks::Now(); + + // A zero-length read means that the stream is being closed. + if (!length) { + metrics_.StopStream(); + download_finished_ = true; + OnClose(net::OK); + return true; + } + + // Track our bandwidth. + metrics_.RecordBytes(length); + + if (length > 0) { + // TODO(mbelshe): If read is pending, we should copy the data straight into + // the read buffer here. For now, we'll queue it always. + // TODO(mbelshe): We need to have some throttling on this. We shouldn't + // buffer an infinite amount of data. + + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + + response_body_.push_back(io_buffer); + } + + // Note that data may be received for a FlipStream prior to the user calling + // ReadResponseBody(), therefore user_callback_ may be NULL. This may often + // happen for server initiated streams. + if (user_callback_) { + int rv; + if (user_buffer_) { + rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); + CHECK(rv != ERR_IO_PENDING); + user_buffer_ = NULL; + user_buffer_len_ = 0; + } else { + rv = OK; + } + DoCallback(rv); + } + + return true; +} + +void FlipStream::OnWriteComplete(int status) { + // TODO(mbelshe): Check for cancellation here. If we're cancelled, we + // should discontinue the DoLoop. + + if (status > 0) + send_bytes_ += status; + + DoLoop(status); +} + +void FlipStream::OnClose(int status) { + response_complete_ = true; + response_status_ = status; + stream_id_ = 0; + + if (user_callback_) + DoCallback(status); + + UpdateHistograms(); +} + +int FlipStream::DoLoop(int result) { + do { + State state = io_state_; + io_state_ = STATE_NONE; + switch (state) { + // State machine 1: Send headers and wait for response headers. + case STATE_SEND_HEADERS: + CHECK(result == OK); + LoadLog::BeginEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS); + result = DoSendHeaders(); + break; + case STATE_SEND_HEADERS_COMPLETE: + LoadLog::EndEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS); + result = DoSendHeadersComplete(result); + break; + case STATE_SEND_BODY: + CHECK(result == OK); + LoadLog::BeginEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_SEND_BODY); + result = DoSendBody(); + break; + case STATE_SEND_BODY_COMPLETE: + LoadLog::EndEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_SEND_BODY); + result = DoSendBodyComplete(result); + break; + case STATE_READ_HEADERS: + CHECK(result == OK); + LoadLog::BeginEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_READ_HEADERS); + result = DoReadHeaders(); + break; + case STATE_READ_HEADERS_COMPLETE: + LoadLog::EndEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_READ_HEADERS); + result = DoReadHeadersComplete(result); + break; + + // State machine 2: Read body. + // NOTE(willchan): Currently unused. Currently we handle this stuff in + // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc. Only reason + // to do this is for consistency with the Http code. + case STATE_READ_BODY: + LoadLog::BeginEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_READ_BODY); + result = DoReadBody(); + break; + case STATE_READ_BODY_COMPLETE: + LoadLog::EndEvent(load_log_, + LoadLog::TYPE_FLIP_STREAM_READ_BODY); + result = DoReadBodyComplete(result); + break; + case STATE_DONE: + DCHECK(result != ERR_IO_PENDING); + break; + default: + NOTREACHED(); + break; + } + } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE); + + return result; +} + +void FlipStream::DoCallback(int rv) { + CHECK(rv != ERR_IO_PENDING); + CHECK(user_callback_); + + // Since Run may result in being called back, clear user_callback_ in advance. + CompletionCallback* c = user_callback_; + user_callback_ = NULL; + c->Run(rv); +} + +int FlipStream::DoSendHeaders() { + // The FlipSession will always call us back when the send is complete. + // TODO(willchan): This code makes the assumption that for the non-push stream + // case, the client code calls SendRequest() after creating the stream and + // before yielding back to the MessageLoop. This is true in the current code, + // but is not obvious from the headers. We should make the code handle + // SendRequest() being called after the SYN_REPLY has been received. + io_state_ = STATE_SEND_HEADERS_COMPLETE; + return ERR_IO_PENDING; +} + +int FlipStream::DoSendHeadersComplete(int result) { + if (result < 0) + return result; + + CHECK(result > 0); + + // There is no body, skip that state. + if (!request_body_stream_.get()) { + io_state_ = STATE_READ_HEADERS; + return OK; + } + + io_state_ = STATE_SEND_BODY; + return OK; +} + +// DoSendBody is called to send the optional body for the request. This call +// will also be called as each write of a chunk of the body completes. +int FlipStream::DoSendBody() { + // If we're already in the STATE_SENDING_BODY state, then we've already + // sent a portion of the body. In that case, we need to first consume + // the bytes written in the body stream. Note that the bytes written is + // the number of bytes in the frame that were written, only consume the + // data portion, of course. + io_state_ = STATE_SEND_BODY_COMPLETE; + int buf_len = static_cast<int>(request_body_stream_->buf_len()); + return session_->WriteStreamData(stream_id_, + request_body_stream_->buf(), + buf_len); +} + +int FlipStream::DoSendBodyComplete(int result) { + if (result < 0) + return result; + + CHECK(result != 0); + + request_body_stream_->DidConsume(result); + + if (!request_body_stream_->eof()) + io_state_ = STATE_SEND_BODY; + else + io_state_ = STATE_READ_HEADERS; + + return OK; +} + +int FlipStream::DoReadHeaders() { + io_state_ = STATE_READ_HEADERS_COMPLETE; + return response_->headers ? OK : ERR_IO_PENDING; +} + +int FlipStream::DoReadHeadersComplete(int result) { + return result; +} + +int FlipStream::DoReadBody() { + // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this + // makes sense. + return ERR_IO_PENDING; +} + +int FlipStream::DoReadBodyComplete(int result) { + // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this + // makes sense. + return ERR_IO_PENDING; +} + +void FlipStream::UpdateHistograms() { + if (histograms_recorded_) + return; + + histograms_recorded_ = true; + + // We need all timers to be filled in, otherwise metrics can be bogus. + if (send_time_.is_null() || recv_first_byte_time_.is_null() || + recv_last_byte_time_.is_null()) + return; + + UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", + recv_first_byte_time_ - send_time_); + UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", + recv_last_byte_time_ - recv_first_byte_time_); + UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", + recv_last_byte_time_ - send_time_); + + UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); + UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); +} + +} // namespace net |