diff options
Diffstat (limited to 'net/spdy/spdy_session.cc')
-rw-r--r-- | net/spdy/spdy_session.cc | 1056 |
1 files changed, 1056 insertions, 0 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc new file mode 100644 index 0000000..4a837c5 --- /dev/null +++ b/net/spdy/spdy_session.cc @@ -0,0 +1,1056 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_session.h" + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/rand_util.h" +#include "base/stats_counters.h" +#include "base/stl_util-inl.h" +#include "base/string_util.h" +#include "net/base/connection_type_histograms.h" +#include "net/base/load_flags.h" +#include "net/base/load_log.h" +#include "net/base/net_util.h" +#include "net/http/http_network_session.h" +#include "net/http/http_request_info.h" +#include "net/http/http_response_headers.h" +#include "net/http/http_response_info.h" +#include "net/socket/client_socket.h" +#include "net/socket/client_socket_factory.h" +#include "net/socket/ssl_client_socket.h" +#include "net/spdy/spdy_frame_builder.h" +#include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_stream.h" +#include "net/tools/dump_cache/url_to_filename_encoder.h" + +namespace { + +// Diagnostics function to dump the headers of a request. +// TODO(mbelshe): Remove this function. +void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) { + // Because this function gets called on every request, + // take extra care to optimize it away if logging is turned off. + if (logging::LOG_INFO < logging::GetMinLogLevel()) + return; + + flip::FlipHeaderBlock::const_iterator it = headers.begin(); + while (it != headers.end()) { + std::string val = (*it).second; + std::string::size_type pos = 0; + while ((pos = val.find('\0', pos)) != val.npos) + val[pos] = '\n'; + LOG(INFO) << (*it).first << "==" << val; + ++it; + } +} + +} // namespace + +namespace net { + +namespace { + +#ifdef WIN32 +// We use an artificially small buffer size on windows because the async IO +// system will artifiially delay IO completions when we use large buffers. +const int kReadBufferSize = 2 * 1024; +#else +const int kReadBufferSize = 8 * 1024; +#endif + +// Convert a FlipHeaderBlock into an HttpResponseInfo. +// |headers| input parameter with the FlipHeaderBlock. +// |info| output parameter for the HttpResponseInfo. +// Returns true if successfully converted. False if there was a failure +// or if the FlipHeaderBlock was invalid. +bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers, + HttpResponseInfo* response) { + std::string version; + std::string status; + + // The "status" and "version" headers are required. + flip::FlipHeaderBlock::const_iterator it; + it = headers.find("status"); + if (it == headers.end()) { + LOG(ERROR) << "FlipHeaderBlock without status header."; + return false; + } + status = it->second; + + // Grab the version. If not provided by the server, + it = headers.find("version"); + if (it == headers.end()) { + LOG(ERROR) << "FlipHeaderBlock without version header."; + return false; + } + version = it->second; + + std::string raw_headers(version); + raw_headers.push_back(' '); + raw_headers.append(status); + raw_headers.push_back('\0'); + for (it = headers.begin(); it != headers.end(); ++it) { + // For each value, if the server sends a NUL-separated + // list of values, we separate that back out into + // individual headers for each value in the list. + // e.g. + // Set-Cookie "foo\0bar" + // becomes + // Set-Cookie: foo\0 + // Set-Cookie: bar\0 + std::string value = it->second; + size_t start = 0; + size_t end = 0; + do { + end = value.find('\0', start); + std::string tval; + if (end != value.npos) + tval = value.substr(start, (end - start)); + else + tval = value.substr(start); + raw_headers.append(it->first); + raw_headers.push_back(':'); + raw_headers.append(tval); + raw_headers.push_back('\0'); + start = end + 1; + } while (end != value.npos); + } + + response->headers = new HttpResponseHeaders(raw_headers); + response->was_fetched_via_spdy = true; + return true; +} + +// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from +// a HttpRequestInfo block. +void CreateFlipHeadersFromHttpRequest( + const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) { + static const char kHttpProtocolVersion[] = "HTTP/1.1"; + + HttpUtil::HeadersIterator it(info.extra_headers.begin(), + info.extra_headers.end(), + "\r\n"); + while (it.GetNext()) { + std::string name = StringToLowerASCII(it.name()); + if (headers->find(name) == headers->end()) { + (*headers)[name] = it.values(); + } else { + std::string new_value = (*headers)[name]; + new_value += "\0"; + new_value += it.values(); + (*headers)[name] = new_value; + } + } + + // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc) + // TODO(mbelshe): Add authentication headers here. + + (*headers)["method"] = info.method; + (*headers)["url"] = info.url.spec(); + (*headers)["version"] = kHttpProtocolVersion; + if (info.user_agent.length()) + (*headers)["user-agent"] = info.user_agent; + if (!info.referrer.is_empty()) + (*headers)["referer"] = info.referrer.spec(); + + // Honor load flags that impact proxy caches. + if (info.load_flags & LOAD_BYPASS_CACHE) { + (*headers)["pragma"] = "no-cache"; + (*headers)["cache-control"] = "no-cache"; + } else if (info.load_flags & LOAD_VALIDATE_CACHE) { + (*headers)["cache-control"] = "max-age=0"; + } +} + +void AdjustSocketBufferSizes(ClientSocket* socket) { + // Adjust socket buffer sizes. + // FLIP uses one socket, and we want a really big buffer. + // This greatly helps on links with packet loss - we can even + // outperform Vista's dynamic window sizing algorithm. + // TODO(mbelshe): more study. + const int kSocketBufferSize = 512 * 1024; + socket->SetReceiveBufferSize(kSocketBufferSize); + socket->SetSendBufferSize(kSocketBufferSize); +} + +} // namespace + +// static +bool FlipSession::use_ssl_ = true; + +FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session) + : ALLOW_THIS_IN_INITIALIZER_LIST( + connect_callback_(this, &FlipSession::OnTCPConnect)), + ALLOW_THIS_IN_INITIALIZER_LIST( + ssl_connect_callback_(this, &FlipSession::OnSSLConnect)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &FlipSession::OnReadComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_callback_(this, &FlipSession::OnWriteComplete)), + domain_(host), + session_(session), + connection_(new ClientSocketHandle), + read_buffer_(new IOBuffer(kReadBufferSize)), + read_pending_(false), + stream_hi_water_mark_(1), // Always start at 1 for the first stream id. + write_pending_(false), + delayed_write_pending_(false), + is_secure_(false), + error_(OK), + state_(IDLE), + streams_initiated_count_(0), + streams_pushed_count_(0), + streams_pushed_and_claimed_count_(0), + streams_abandoned_count_(0) { + // TODO(mbelshe): consider randomization of the stream_hi_water_mark. + + flip_framer_.set_visitor(this); + + session_->ssl_config_service()->GetSSLConfig(&ssl_config_); + + // TODO(agl): This is a temporary hack for testing reasons. In the medium + // term we'll want to use NPN for all HTTPS connections and use the protocol + // suggested. + // + // In the event that the server supports Next Protocol Negotiation, but + // doesn't support either of these protocols, we'll request the first + // protocol in the list. Because of that, HTTP is listed first because it's + // what we'll actually fallback to in the case that the server doesn't + // support SPDY. + ssl_config_.next_protos = "\007http1.1\004spdy"; +} + +FlipSession::~FlipSession() { + // Cleanup all the streams. + CloseAllStreams(net::ERR_ABORTED); + + if (connection_->is_initialized()) { + // With Flip we can't recycle sockets. + connection_->socket()->Disconnect(); + } + + // TODO(willchan): Don't hardcode port 80 here. + DCHECK(!session_->flip_session_pool()->HasSession( + HostResolver::RequestInfo(domain_, 80))); + + // Record per-session histograms here. + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", + streams_initiated_count_, + 0, 300, 50); + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", + streams_pushed_count_, + 0, 300, 50); + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", + streams_pushed_and_claimed_count_, + 0, 300, 50); + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", + streams_abandoned_count_, + 0, 300, 50); +} + +void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) { + static StatsCounter flip_sessions("flip.sessions"); + flip_sessions.Increment(); + + AdjustSocketBufferSizes(connection->socket()); + + state_ = CONNECTED; + connection_.reset(connection); + + // This is a newly initialized session that no client should have a handle to + // yet, so there's no need to start writing data as in OnTCPConnect(), but we + // should start reading data. + ReadSocket(); +} + +net::Error FlipSession::Connect(const std::string& group_name, + const HostResolver::RequestInfo& host, + RequestPriority priority, + LoadLog* load_log) { + DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST); + + // If the connect process is started, let the caller continue. + if (state_ > IDLE) + return net::OK; + + state_ = CONNECTING; + + static StatsCounter flip_sessions("flip.sessions"); + flip_sessions.Increment(); + + int rv = connection_->Init(group_name, host, priority, &connect_callback_, + session_->tcp_socket_pool(), load_log); + DCHECK(rv <= 0); + + // If the connect is pending, we still return ok. The APIs enqueue + // work until after the connect completes asynchronously later. + if (rv == net::ERR_IO_PENDING) + return net::OK; + return static_cast<net::Error>(rv); +} + +scoped_refptr<FlipStream> FlipSession::GetOrCreateStream( + const HttpRequestInfo& request, + const UploadDataStream* upload_data, + LoadLog* log) { + const GURL& url = request.url; + const std::string& path = url.PathForRequest(); + + scoped_refptr<FlipStream> stream; + + // Check if we have a push stream for this path. + if (request.method == "GET") { + stream = GetPushStream(path); + if (stream) { + DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); + streams_pushed_and_claimed_count_++; + return stream; + } + } + + // Check if we have a pending push stream for this url. + PendingStreamMap::iterator it; + it = pending_streams_.find(path); + if (it != pending_streams_.end()) { + DCHECK(!it->second); + // Server will assign a stream id when the push stream arrives. Use 0 for + // now. + LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM); + FlipStream* stream = new FlipStream(this, 0, true, log); + stream->set_path(path); + it->second = stream; + return it->second; + } + + const flip::FlipStreamId stream_id = GetNewStreamId(); + + // If we still don't have a stream, activate one now. + stream = new FlipStream(this, stream_id, false, log); + stream->set_priority(request.priority); + stream->set_path(path); + ActivateStream(stream); + + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", + static_cast<int>(request.priority), 0, 10, 11); + + LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url; + + // TODO(mbelshe): Optimize memory allocations + DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST && + request.priority <= FLIP_PRIORITY_LOWEST); + + // Convert from HttpRequestHeaders to Flip Headers. + flip::FlipHeaderBlock headers; + CreateFlipHeadersFromHttpRequest(request, &headers); + + flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE; + if (!request.upload_data || !upload_data->size()) + flags = flip::CONTROL_FLAG_FIN; + + // Create a SYN_STREAM packet and add to the output queue. + scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame( + flip_framer_.CreateSynStream(stream_id, request.priority, flags, false, + &headers)); + int length = flip::FlipFrame::size() + syn_frame->length(); + IOBuffer* buffer = new IOBuffer(length); + memcpy(buffer->data(), syn_frame->data(), length); + queue_.push(FlipIOBuffer(buffer, length, request.priority, stream)); + + static StatsCounter flip_requests("flip.requests"); + flip_requests.Increment(); + + LOG(INFO) << "FETCHING: " << request.url.spec(); + streams_initiated_count_++; + + LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------"; + DumpFlipHeaders(headers); + + // Schedule to write to the socket after we've made it back + // to the message loop so that we can aggregate multiple + // requests. + // TODO(mbelshe): Should we do the "first" request immediately? + // maybe we should only 'do later' for subsequent + // requests. + WriteSocketLater(); + + return stream; +} + +int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, + net::IOBuffer* data, int len) { + LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len + << " bytes)"; + const int kMss = 1430; // This is somewhat arbitrary and not really fixed, + // but it will always work reasonably with ethernet. + // Chop the world into 2-packet chunks. This is somewhat arbitrary, but + // is reasonably small and ensures that we elicit ACKs quickly from TCP + // (because TCP tries to only ACK every other packet). + const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size(); + + // Find our stream + DCHECK(IsStreamActive(stream_id)); + scoped_refptr<FlipStream> stream = active_streams_[stream_id]; + CHECK(stream->stream_id() == stream_id); + if (!stream) + return ERR_INVALID_FLIP_STREAM; + + // TODO(mbelshe): Setting of the FIN is assuming that the caller will pass + // all data to write in a single chunk. Is this always true? + + // Set the flags on the upload. + flip::FlipDataFlags flags = flip::DATA_FLAG_FIN; + if (len > kMaxFlipFrameChunkSize) { + len = kMaxFlipFrameChunkSize; + flags = flip::DATA_FLAG_NONE; + } + + // TODO(mbelshe): reduce memory copies here. + scoped_ptr<flip::FlipDataFrame> frame( + flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags)); + int length = flip::FlipFrame::size() + frame->length(); + IOBufferWithSize* buffer = new IOBufferWithSize(length); + memcpy(buffer->data(), frame->data(), length); + queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream)); + + // Whenever we queue onto the socket we need to ensure that we will write to + // it later. + WriteSocketLater(); + + return ERR_IO_PENDING; +} + +bool FlipSession::CancelStream(flip::FlipStreamId stream_id) { + LOG(INFO) << "Cancelling stream " << stream_id; + if (!IsStreamActive(stream_id)) + return false; + + // TODO(mbelshe): We should send a FIN_STREAM control frame here + // so that the server can cancel a large send. + + // TODO(mbelshe): Write a method for tearing down a stream + // that cleans it out of the active list, the pending list, + // etc. + scoped_refptr<FlipStream> stream = active_streams_[stream_id]; + DeactivateStream(stream_id); + return true; +} + +bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const { + return ContainsKey(active_streams_, stream_id); +} + +LoadState FlipSession::GetLoadState() const { + // NOTE: The application only queries the LoadState via the + // FlipNetworkTransaction, and details are only needed when + // we're in the process of connecting. + + // If we're connecting, defer to the connection to give us the actual + // LoadState. + if (state_ == CONNECTING) + return connection_->GetLoadState(); + + // Just report that we're idle since the session could be doing + // many things concurrently. + return LOAD_STATE_IDLE; +} + +void FlipSession::OnTCPConnect(int result) { + LOG(INFO) << "Flip socket connected (result=" << result << ")"; + + // We shouldn't be coming through this path if we didn't just open a fresh + // socket (or have an error trying to do so). + DCHECK(!connection_->socket() || !connection_->is_reused()); + + UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0); + + if (result != net::OK) { + DCHECK_LT(result, 0); + CloseSessionOnError(static_cast<net::Error>(result)); + return; + } + + AdjustSocketBufferSizes(connection_->socket()); + + if (use_ssl_) { + // Add a SSL socket on top of our existing transport socket. + ClientSocket* socket = connection_->release_socket(); + // TODO(mbelshe): Fix the hostname. This is BROKEN without having + // a real hostname. + socket = session_->socket_factory()->CreateSSLClientSocket( + socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_); + connection_->set_socket(socket); + is_secure_ = true; + // TODO(willchan): Plumb LoadLog into FLIP code. + int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL); + if (status != ERR_IO_PENDING) + OnSSLConnect(status); + } else { + DCHECK_EQ(state_, CONNECTING); + state_ = CONNECTED; + + // Make sure we get any pending data sent. + WriteSocketLater(); + // Start reading + ReadSocket(); + } +} + +void FlipSession::OnSSLConnect(int result) { + // TODO(mbelshe): We need to replicate the functionality of + // HttpNetworkTransaction::DoSSLConnectComplete here, where it calls + // HandleCertificateError() and such. + if (IsCertificateError(result)) + result = OK; // TODO(mbelshe): pretend we're happy anyway. + + if (result == OK) { + DCHECK_EQ(state_, CONNECTING); + state_ = CONNECTED; + + // After we've connected, send any data to the server, and then issue + // our read. + WriteSocketLater(); + ReadSocket(); + } else { + DCHECK_LT(result, 0); // It should be an error, not a byte count. + CloseSessionOnError(static_cast<net::Error>(result)); + } +} + +void FlipSession::OnReadComplete(int bytes_read) { + // Parse a frame. For now this code requires that the frame fit into our + // buffer (32KB). + // TODO(mbelshe): support arbitrarily large frames! + + LOG(INFO) << "Flip socket read: " << bytes_read << " bytes"; + + read_pending_ = false; + + if (bytes_read <= 0) { + // Session is tearing down. + net::Error error = static_cast<net::Error>(bytes_read); + if (error == OK) + error = ERR_CONNECTION_CLOSED; + CloseSessionOnError(error); + return; + } + + // The FlipFramer will use callbacks onto |this| as it parses frames. + // When errors occur, those callbacks can lead to teardown of all references + // to |this|, so maintain a reference to self during this call for safe + // cleanup. + scoped_refptr<FlipSession> self(this); + + char *data = read_buffer_->data(); + while (bytes_read && + flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) { + uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read); + bytes_read -= bytes_processed; + data += bytes_processed; + if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE) + flip_framer_.Reset(); + } + + if (state_ != CLOSED) + ReadSocket(); +} + +void FlipSession::OnWriteComplete(int result) { + DCHECK(write_pending_); + DCHECK(in_flight_write_.size()); + DCHECK(result != 0); // This shouldn't happen for write. + + write_pending_ = false; + + LOG(INFO) << "Flip write complete (result=" << result << ") for stream: " + << in_flight_write_.stream()->stream_id(); + + if (result >= 0) { + // It should not be possible to have written more bytes than our + // in_flight_write_. + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); + + in_flight_write_.buffer()->DidConsume(result); + + // We only notify the stream when we've fully written the pending frame. + if (!in_flight_write_.buffer()->BytesRemaining()) { + scoped_refptr<FlipStream> stream = in_flight_write_.stream(); + DCHECK(stream.get()); + + // Report the number of bytes written to the caller, but exclude the + // frame size overhead. NOTE: if this frame was compressed the reported + // bytes written is the compressed size, not the original size. + if (result > 0) { + result = in_flight_write_.buffer()->size(); + DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size())); + result -= static_cast<int>(flip::FlipFrame::size()); + } + + // It is possible that the stream was cancelled while we were writing + // to the socket. + if (!stream->cancelled()) + stream->OnWriteComplete(result); + + // Cleanup the write which just completed. + in_flight_write_.release(); + } + + // Write more data. We're already in a continuation, so we can + // go ahead and write it immediately (without going back to the + // message loop). + WriteSocketLater(); + } else { + in_flight_write_.release(); + + // The stream is now errored. Close it down. + CloseSessionOnError(static_cast<net::Error>(result)); + } +} + +void FlipSession::ReadSocket() { + if (read_pending_) + return; + + if (state_ == CLOSED) { + NOTREACHED(); + return; + } + + CHECK(connection_.get()); + CHECK(connection_->socket()); + int bytes_read = connection_->socket()->Read(read_buffer_.get(), + kReadBufferSize, + &read_callback_); + switch (bytes_read) { + case 0: + // Socket is closed! + // TODO(mbelshe): Need to abort any active streams here. + DCHECK(!active_streams_.size()); + return; + case net::ERR_IO_PENDING: + // Waiting for data. Nothing to do now. + read_pending_ = true; + return; + default: + // Data was read, process it. + // Schedule the work through the message loop to avoid recursive + // callbacks. + read_pending_ = true; + MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( + this, &FlipSession::OnReadComplete, bytes_read)); + break; + } +} + +void FlipSession::WriteSocketLater() { + if (delayed_write_pending_) + return; + + delayed_write_pending_ = true; + MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( + this, &FlipSession::WriteSocket)); +} + +void FlipSession::WriteSocket() { + // This function should only be called via WriteSocketLater. + DCHECK(delayed_write_pending_); + delayed_write_pending_ = false; + + // If the socket isn't connected yet, just wait; we'll get called + // again when the socket connection completes. If the socket is + // closed, just return. + if (state_ < CONNECTED || state_ == CLOSED) + return; + + if (write_pending_) // Another write is in progress still. + return; + + // Loop sending frames until we've sent everything or until the write + // returns error (or ERR_IO_PENDING). + while (in_flight_write_.buffer() || queue_.size()) { + if (!in_flight_write_.buffer()) { + // Grab the next FlipFrame to send. + FlipIOBuffer next_buffer = queue_.top(); + queue_.pop(); + + // We've deferred compression until just before we write it to the socket, + // which is now. At this time, we don't compress our data frames. + flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); + size_t size; + if (uncompressed_frame.is_control_frame()) { + scoped_ptr<flip::FlipFrame> compressed_frame( + flip_framer_.CompressFrame(&uncompressed_frame)); + size = compressed_frame->length() + flip::FlipFrame::size(); + + DCHECK(size > 0); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), compressed_frame->data(), size); + + // Attempt to send the frame. + in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream()); + } else { + size = uncompressed_frame.length() + flip::FlipFrame::size(); + in_flight_write_ = next_buffer; + } + } else { + DCHECK(in_flight_write_.buffer()->BytesRemaining()); + } + + write_pending_ = true; + int rv = connection_->socket()->Write(in_flight_write_.buffer(), + in_flight_write_.buffer()->BytesRemaining(), &write_callback_); + if (rv == net::ERR_IO_PENDING) + break; + + // We sent the frame successfully. + OnWriteComplete(rv); + + // TODO(mbelshe): Test this error case. Maybe we should mark the socket + // as in an error state. + if (rv < 0) + break; + } +} + +void FlipSession::CloseAllStreams(net::Error code) { + LOG(INFO) << "Closing all FLIP Streams"; + + static StatsCounter abandoned_streams("flip.abandoned_streams"); + static StatsCounter abandoned_push_streams("flip.abandoned_push_streams"); + + if (active_streams_.size()) { + abandoned_streams.Add(active_streams_.size()); + + // Create a copy of the list, since aborting streams can invalidate + // our list. + FlipStream** list = new FlipStream*[active_streams_.size()]; + ActiveStreamMap::const_iterator it; + int index = 0; + for (it = active_streams_.begin(); it != active_streams_.end(); ++it) + list[index++] = it->second; + + // Issue the aborts. + for (--index; index >= 0; index--) { + LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id() + << "): " << list[index]->path(); + list[index]->OnClose(code); + } + + // Clear out anything pending. + active_streams_.clear(); + + delete[] list; + } + + if (pushed_streams_.size()) { + streams_abandoned_count_ += pushed_streams_.size(); + abandoned_push_streams.Add(pushed_streams_.size()); + pushed_streams_.clear(); + } +} + +int FlipSession::GetNewStreamId() { + int id = stream_hi_water_mark_; + stream_hi_water_mark_ += 2; + if (stream_hi_water_mark_ > 0x7fff) + stream_hi_water_mark_ = 1; + return id; +} + +void FlipSession::CloseSessionOnError(net::Error err) { + DCHECK_LT(err, OK); + LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")"; + + // Don't close twice. This can occur because we can have both + // a read and a write outstanding, and each can complete with + // an error. + if (state_ != CLOSED) { + state_ = CLOSED; + error_ = err; + CloseAllStreams(err); + session_->flip_session_pool()->Remove(this); + } +} + +void FlipSession::ActivateStream(FlipStream* stream) { + const flip::FlipStreamId id = stream->stream_id(); + DCHECK(!IsStreamActive(id)); + + active_streams_[id] = stream; +} + +void FlipSession::DeactivateStream(flip::FlipStreamId id) { + DCHECK(IsStreamActive(id)); + + // Verify it is not on the pushed_streams_ list. + ActiveStreamList::iterator it; + for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { + scoped_refptr<FlipStream> curr = *it; + if (id == curr->stream_id()) { + pushed_streams_.erase(it); + break; + } + } + + active_streams_.erase(id); +} + +scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) { + static StatsCounter used_push_streams("flip.claimed_push_streams"); + + LOG(INFO) << "Looking for push stream: " << path; + + scoped_refptr<FlipStream> stream; + + // We just walk a linear list here. + ActiveStreamList::iterator it; + for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { + stream = *it; + if (path == stream->path()) { + CHECK(stream->pushed()); + pushed_streams_.erase(it); + used_push_streams.Increment(); + LOG(INFO) << "Push Stream Claim for: " << path; + break; + } + } + + return stream; +} + +void FlipSession::GetSSLInfo(SSLInfo* ssl_info) { + if (is_secure_) { + SSLClientSocket* ssl_socket = + reinterpret_cast<SSLClientSocket*>(connection_->socket()); + ssl_socket->GetSSLInfo(ssl_info); + } +} + +void FlipSession::OnError(flip::FlipFramer* framer) { + LOG(ERROR) << "FlipSession error: " << framer->error_code(); + CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR); +} + +void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, + const char* data, + size_t len) { + LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes"; + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + // NOTE: it may just be that the stream was cancelled. + LOG(WARNING) << "Received data frame for invalid stream " << stream_id; + return; + } + + scoped_refptr<FlipStream> stream = active_streams_[stream_id]; + bool success = stream->OnDataReceived(data, len); + // |len| == 0 implies a closed stream. + if (!success || !len) + DeactivateStream(stream_id); +} + +void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, + const flip::FlipHeaderBlock* headers) { + flip::FlipStreamId stream_id = frame->stream_id(); + + // Server-initiated streams should have even sequence numbers. + if ((stream_id & 0x1) != 0) { + LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id; + return; + } + + if (IsStreamActive(stream_id)) { + LOG(ERROR) << "Received OnSyn for active stream " << stream_id; + return; + } + + streams_pushed_count_++; + + LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id; + + LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------"; + DumpFlipHeaders(*headers); + + // TODO(mbelshe): DCHECK that this is a GET method? + + const std::string& path = ContainsKey(*headers, "path") ? + headers->find("path")->second : ""; + + // Verify that the response had a URL for us. + DCHECK(!path.empty()); + if (path.empty()) { + LOG(WARNING) << "Pushed stream did not contain a path."; + return; + } + + scoped_refptr<FlipStream> stream; + + // Check if we already have a delegate awaiting this stream. + PendingStreamMap::iterator it; + it = pending_streams_.find(path); + if (it != pending_streams_.end()) { + stream = it->second; + pending_streams_.erase(it); + } + + if (stream) { + CHECK(stream->pushed()); + CHECK(stream->stream_id() == 0); + stream->set_stream_id(stream_id); + } else { + // TODO(mbelshe): can we figure out how to use a LoadLog here? + stream = new FlipStream(this, stream_id, true, NULL); + + // A new HttpResponseInfo object needs to be generated so the call to + // OnResponseReceived below has something to fill in. + // When a FlipNetworkTransaction is created for this resource, the + // response_info is copied over and this version is destroyed. + // + // TODO(cbentzel): Minimize allocations and copies of HttpResponseInfo + // object. Should it just be part of FlipStream? + HttpResponseInfo* response_info = new HttpResponseInfo(); + stream->set_response_info_pointer(response_info); + } + + pushed_streams_.push_back(stream); + + // Activate a stream and parse the headers. + ActivateStream(stream); + + stream->set_path(path); + + // TODO(mbelshe): For now we convert from our nice hash map back + // to a string of headers; this is because the HttpResponseInfo + // is a bit rigid for its http (non-flip) design. + HttpResponseInfo response; + if (FlipHeadersToHttpResponse(*headers, &response)) { + GetSSLInfo(&response.ssl_info); + stream->OnResponseReceived(response); + } else { + stream->OnClose(ERR_INVALID_RESPONSE); + DeactivateStream(stream_id); + return; + } + + LOG(INFO) << "Got pushed stream for " << stream->path(); + + static StatsCounter push_requests("flip.pushed_streams"); + push_requests.Increment(); +} + +void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame, + const flip::FlipHeaderBlock* headers) { + DCHECK(headers); + flip::FlipStreamId stream_id = frame->stream_id(); + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + // NOTE: it may just be that the stream was cancelled. + LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id; + return; + } + + LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id; + DumpFlipHeaders(*headers); + + // We record content declared as being pushed so that we don't + // request a duplicate stream which is already scheduled to be + // sent to us. + flip::FlipHeaderBlock::const_iterator it; + it = headers->find("X-Associated-Content"); + if (it != headers->end()) { + const std::string& content = it->second; + std::string::size_type start = 0; + std::string::size_type end = 0; + do { + end = content.find("||", start); + if (end == std::string::npos) + end = content.length(); + std::string url = content.substr(start, end - start); + std::string::size_type pos = url.find("??"); + if (pos == std::string::npos) + break; + url = url.substr(pos + 2); + GURL gurl(url); + std::string path = gurl.PathForRequest(); + if (path.length()) + pending_streams_[path] = NULL; + else + LOG(INFO) << "Invalid X-Associated-Content path: " << url; + start = end + 2; + } while (start < content.length()); + } + + scoped_refptr<FlipStream> stream = active_streams_[stream_id]; + CHECK(stream->stream_id() == stream_id); + CHECK(!stream->cancelled()); + HttpResponseInfo response; + if (FlipHeadersToHttpResponse(*headers, &response)) { + GetSSLInfo(&response.ssl_info); + stream->OnResponseReceived(response); + } else { + stream->OnClose(ERR_INVALID_RESPONSE); + DeactivateStream(stream_id); + } +} + +void FlipSession::OnControl(const flip::FlipControlFrame* frame) { + flip::FlipHeaderBlock headers; + uint32 type = frame->type(); + if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) { + if (!flip_framer_.ParseHeaderBlock(frame, &headers)) { + LOG(WARNING) << "Could not parse Flip Control Frame Header"; + // TODO(mbelshe): Error the session? + return; + } + } + + switch (type) { + case flip::SYN_STREAM: + LOG(INFO) << "Flip SynStream for stream " << frame->stream_id(); + OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame), + &headers); + break; + case flip::SYN_REPLY: + LOG(INFO) << "Flip SynReply for stream " << frame->stream_id(); + OnSynReply( + reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame), + &headers); + break; + case flip::FIN_STREAM: + LOG(INFO) << "Flip Fin for stream " << frame->stream_id(); + OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame)); + break; + default: + DCHECK(false); // Error! + } +} + +void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { + flip::FlipStreamId stream_id = frame->stream_id(); + bool valid_stream = IsStreamActive(stream_id); + if (!valid_stream) { + // NOTE: it may just be that the stream was cancelled. + LOG(WARNING) << "Received FIN for invalid stream" << stream_id; + return; + } + scoped_refptr<FlipStream> stream = active_streams_[stream_id]; + CHECK(stream->stream_id() == stream_id); + CHECK(!stream->cancelled()); + if (frame->status() == 0) { + stream->OnDataReceived(NULL, 0); + } else { + LOG(ERROR) << "Flip stream closed: " << frame->status(); + // TODO(mbelshe): Map from Flip-protocol errors to something sensical. + // For now, it doesn't matter much - it is a protocol error. + stream->OnClose(ERR_FAILED); + } + + DeactivateStream(stream_id); +} + +} // namespace net |