diff options
author | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-29 03:13:06 +0000 |
---|---|---|
committer | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-29 03:13:06 +0000 |
commit | cd314c828215d0ab4fb6ca721272f317014510c2 (patch) | |
tree | 93d58b91d9e1bb4146eca1db3e234cc14f00c17f /net | |
parent | dc93fc6b4a41f8f14ef4f7acef40a9e8b9ef9070 (diff) | |
download | chromium_src-cd314c828215d0ab4fb6ca721272f317014510c2.zip chromium_src-cd314c828215d0ab4fb6ca721272f317014510c2.tar.gz chromium_src-cd314c828215d0ab4fb6ca721272f317014510c2.tar.bz2 |
Rename FlipStreamImpl to FlipStream and separate it out into
its own files. This is a straight refactoring with no other
changes
BUG=none
TEST=none
Review URL: http://codereview.chromium.org/348007
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30428 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/flip/flip_session.cc | 190 | ||||
-rw-r--r-- | net/flip/flip_session.h | 10 | ||||
-rwxr-xr-x | net/flip/flip_stream.cc | 127 | ||||
-rwxr-xr-x | net/flip/flip_stream.h | 82 | ||||
-rw-r--r-- | net/net.gyp | 2 |
5 files changed, 229 insertions, 182 deletions
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index c081c5d..02aea85 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -10,11 +10,11 @@ #include "base/rand_util.h" #include "base/stats_counters.h" #include "base/string_util.h" -#include "net/base/bandwidth_metrics.h" #include "net/base/load_flags.h" #include "net/base/net_util.h" #include "net/flip/flip_frame_builder.h" #include "net/flip/flip_protocol.h" +#include "net/flip/flip_stream.h" #include "net/http/http_network_session.h" #include "net/http/http_request_info.h" #include "net/http/http_response_headers.h" @@ -30,56 +30,6 @@ scoped_ptr<FlipSessionPool> FlipSession::session_pool_; bool FlipSession::use_ssl_ = true; int PrioritizedIOBuffer::order_ = 0; -// The FlipStreamImpl is an internal class representing an active FlipStream. -class FlipStreamImpl { - public: - // A FlipStreamImpl represents an active FlipStream. - // If |delegate| is NULL, then we're receiving pushed data from the server. - FlipStreamImpl(flip::FlipStreamId stream_id, FlipDelegate* delegate) - : stream_id_(stream_id), - delegate_(delegate), - response_(NULL), - download_finished_(false), - metrics_(Singleton<BandwidthMetrics>::get()) {} - virtual ~FlipStreamImpl() { - LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_; - }; - - flip::FlipStreamId stream_id() const { return stream_id_; } - FlipDelegate* delegate() const { return delegate_; } - - // For pushed streams, we track a path to identify them. - std::string path() const { return path_; } - void set_path(const std::string& path) { path_ = path; } - - // Attach a delegate to a previously pushed data stream. - // Returns true if the caller should Deactivate and delete the FlipStreamImpl - // after this call. - bool AttachDelegate(FlipDelegate* delegate); - - // Called when a SYN_REPLY is received on the stream. - void OnReply(const flip::FlipHeaderBlock* headers); - - // Called when data is received on the stream. - // Returns true if the caller should Deactivate and delete the FlipStreamImpl - // after this call. - bool OnData(const char* data, int length); - - // Called if the stream is being prematurely closed for an abort or an error. - // |err| is actually a net::Error. - // The caller should Deactivate and delete the FlipStreamImpl after this call. - void OnError(int err); - - private: - flip::FlipStreamId stream_id_; - std::string path_; - FlipDelegate* delegate_; - scoped_ptr<HttpResponseInfo> response_; - std::list<scoped_refptr<IOBufferWithSize> > response_body_; - bool download_finished_; - ScopedBandwidthMetrics metrics_; -}; - FlipSession* FlipSession::GetFlipSession( const HostResolver::RequestInfo& info, HttpNetworkSession* session) { @@ -217,7 +167,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) { GURL url = delegate->request()->url; std::string path = url.PathForRequest(); - FlipStreamImpl* stream = NULL; + FlipStream* stream = NULL; // Check if we have a push stream for this path. if (delegate->request()->method == "GET") { @@ -313,7 +263,7 @@ bool FlipSession::CancelStream(int id) { // TODO(mbelshe): Write a method for tearing down a stream // that cleans it out of the active list, the pending list, // etc. - FlipStreamImpl* stream = active_streams_[id]; + FlipStream* stream = active_streams_[id]; DeactivateStream(id); delete stream; return true; @@ -534,7 +484,7 @@ void FlipSession::CloseAllStreams(net::Error code) { // Create a copy of the list, since aborting streams can invalidate // our list. - FlipStreamImpl** list = new FlipStreamImpl*[active_streams_.size()]; + FlipStream** list = new FlipStream*[active_streams_.size()]; ActiveStreamMap::const_iterator it; int index = 0; for (it = active_streams_.begin(); it != active_streams_.end(); ++it) @@ -567,11 +517,11 @@ int FlipSession::GetNewStreamId() { return id; } -FlipStreamImpl* FlipSession::ActivateStream(flip::FlipStreamId id, +FlipStream* FlipSession::ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate) { DCHECK(!IsStreamActive(id)); - FlipStreamImpl* stream = new FlipStreamImpl(id, delegate); + FlipStream* stream = new FlipStream(id, delegate); active_streams_[id] = stream; return stream; } @@ -582,7 +532,7 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) { // Verify it is not on the pushed_streams_ list. ActiveStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { - FlipStreamImpl* impl = *it; + FlipStream* impl = *it; if (id == impl->stream_id()) { pushed_streams_.erase(it); break; @@ -592,7 +542,7 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) { active_streams_.erase(id); } -FlipStreamImpl* FlipSession::GetPushStream(std::string path) { +FlipStream* FlipSession::GetPushStream(std::string path) { static StatsCounter used_push_streams("flip.claimed_push_streams"); LOG(INFO) << "Looking for push stream: " << path; @@ -600,7 +550,7 @@ FlipStreamImpl* FlipSession::GetPushStream(std::string path) { // We just walk a linear list here. ActiveStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { - FlipStreamImpl* impl = *it; + FlipStream* impl = *it; if (path == impl->path()) { pushed_streams_.erase(it); used_push_streams.Increment(); @@ -627,7 +577,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, return; } - FlipStreamImpl* stream = active_streams_[stream_id]; + FlipStream* stream = active_streams_[stream_id]; if (stream->OnData(data, len)) { DeactivateStream(stream->stream_id()); delete stream; @@ -650,7 +600,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, } // Activate a stream and parse the headers. - FlipStreamImpl* stream = ActivateStream(stream_id, NULL); + FlipStream* stream = ActivateStream(stream_id, NULL); stream->OnReply(headers); // TODO(mbelshe): DCHECK that this is a GET method? @@ -718,7 +668,7 @@ void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame, } while (end < content.length()); } - FlipStreamImpl* stream = active_streams_[stream_id]; + FlipStream* stream = active_streams_[stream_id]; stream->OnReply(headers); } @@ -761,7 +711,7 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { LOG(WARNING) << "Received FIN for invalid stream" << stream_id; return; } - FlipStreamImpl* stream = active_streams_[stream_id]; + FlipStream* stream = active_streams_[stream_id]; bool cleanup_stream = false; if (frame->status() == 0) { cleanup_stream = stream->OnData(NULL, 0); @@ -783,119 +733,5 @@ void FlipSession::OnLameDuck() { NOTIMPLEMENTED(); } -bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) { - DCHECK(delegate_ == NULL); // Don't attach if already attached. - DCHECK(path_.length() > 0); // Path needs to be set for push streams. - delegate_ = delegate; - - // If there is pending data, send it up here. - - // Check for the OnReply, and pass it up. - if (response_.get()) - delegate_->OnResponseReceived(response_.get()); - - // Pass data up - while (response_body_.size()) { - scoped_refptr<IOBufferWithSize> buffer = response_body_.front(); - response_body_.pop_front(); - delegate_->OnDataReceived(buffer->data(), buffer->size()); - } - - // Finally send up the end-of-stream. - if (download_finished_) { - delegate_->OnClose(net::OK); - return true; // tell the caller to shut us down - } - return false; -} - -void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { - DCHECK(headers); - DCHECK(headers->find("version") != headers->end()); - DCHECK(headers->find("status") != headers->end()); - - // TODO(mbelshe): if no version or status is found, we need to error - // out the stream. - - metrics_.StartStream(); - - // Server initiated streams must send a URL to us in the headers. - if (headers->find("path") != headers->end()) - path_ = headers->find("path")->second; - - // TODO(mbelshe): For now we convert from our nice hash map back - // to a string of headers; this is because the HttpResponseInfo - // is a bit rigid for its http (non-flip) design. - std::string raw_headers(headers->find("version")->second); - raw_headers.append(" ", 1); - raw_headers.append(headers->find("status")->second); - raw_headers.append("\0", 1); - flip::FlipHeaderBlock::const_iterator it; - for (it = headers->begin(); it != headers->end(); ++it) { - raw_headers.append(it->first); - raw_headers.append(":", 1); - raw_headers.append(it->second); - raw_headers.append("\0", 1); - } - - LOG(INFO) << "FlipStream: SynReply received for " << stream_id_; - - DCHECK(response_ == NULL); - response_.reset(new HttpResponseInfo()); - response_->headers = new HttpResponseHeaders(raw_headers); - // When pushing content from the server, we may not yet have a delegate_ - // to notify. When the delegate is attached, it will notify then. - if (delegate_) - delegate_->OnResponseReceived(response_.get()); -} - -bool FlipStreamImpl::OnData(const char* data, int length) { - DCHECK(length >= 0); - LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " - << stream_id_; - - // If we don't have a response, then the SYN_REPLY did not come through. - // We cannot pass data up to the caller unless the reply headers have been - // received. - if (!response_.get()) { - if (delegate_) - delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED); - return true; - } - - // A zero-length read means that the stream is being closed. - if (!length) { - metrics_.StopStream(); - download_finished_ = true; - if (delegate_) { - delegate_->OnClose(net::OK); - return true; // Tell the caller to clean us up. - } - } - - // Track our bandwidth. - metrics_.RecordBytes(length); - - // We've received data. We try to pass it up to the caller. - // In the case of server-push streams, we may not have a delegate yet assigned - // to this stream. In that case we just queue the data for later. - if (delegate_) { - delegate_->OnDataReceived(data, length); - } else { - // Save the data for use later. - // TODO(mbelshe): We need to have some throttling on this. We shouldn't - // buffer an infinite amount of data. - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - response_body_.push_back(io_buffer); - } - return false; -} - -void FlipStreamImpl::OnError(int err) { - if (delegate_) - delegate_->OnClose(err); -} - } // namespace net diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h index 1030c01..22cc578 100644 --- a/net/flip/flip_session.h +++ b/net/flip/flip_session.h @@ -26,7 +26,7 @@ namespace net { -class FlipStreamImpl; +class FlipStream; class HttpNetworkSession; class HttpRequestInfo; class HttpResponseInfo; @@ -203,13 +203,13 @@ class FlipSession : public base::RefCounted<FlipSession>, int GetNewStreamId(); // Track active streams in the active stream list. - FlipStreamImpl* ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate); + FlipStream* ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate); void DeactivateStream(flip::FlipStreamId id); // Check if we have a pending pushed-stream for this url // Returns the stream if found (and returns it from the pending // list), returns NULL otherwise. - FlipStreamImpl* GetPushStream(std::string url); + FlipStream* GetPushStream(std::string url); // Callbacks for the Flip session. CompletionCallbackImpl<FlipSession> connect_callback_; @@ -239,8 +239,8 @@ class FlipSession : public base::RefCounted<FlipSession>, // TODO(mbelshe): We need to track these stream lists better. // I suspect it is possible to remove a stream from // one list, but not the other. - typedef std::map<int, FlipStreamImpl*> ActiveStreamMap; - typedef std::list<FlipStreamImpl*> ActiveStreamList; + typedef std::map<int, FlipStream*> ActiveStreamMap; + typedef std::list<FlipStream*> ActiveStreamList; ActiveStreamMap active_streams_; ActiveStreamList pushed_streams_; diff --git a/net/flip/flip_stream.cc b/net/flip/flip_stream.cc new file mode 100755 index 0000000..22f8272 --- /dev/null +++ b/net/flip/flip_stream.cc @@ -0,0 +1,127 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/flip/flip_stream.h" + +#include "net/flip/flip_session.h" +#include "net/http/http_response_info.h" + +namespace net { + +bool FlipStream::AttachDelegate(FlipDelegate* delegate) { + DCHECK(delegate_ == NULL); // Don't attach if already attached. + DCHECK(path_.length() > 0); // Path needs to be set for push streams. + delegate_ = delegate; + + // If there is pending data, send it up here. + + // Check for the OnReply, and pass it up. + if (response_.get()) + delegate_->OnResponseReceived(response_.get()); + + // Pass data up + while (response_body_.size()) { + scoped_refptr<IOBufferWithSize> buffer = response_body_.front(); + response_body_.pop_front(); + delegate_->OnDataReceived(buffer->data(), buffer->size()); + } + + // Finally send up the end-of-stream. + if (download_finished_) { + delegate_->OnClose(net::OK); + return true; // tell the caller to shut us down + } + return false; +} + +void FlipStream::OnReply(const flip::FlipHeaderBlock* headers) { + DCHECK(headers); + DCHECK(headers->find("version") != headers->end()); + DCHECK(headers->find("status") != headers->end()); + + // TODO(mbelshe): if no version or status is found, we need to error + // out the stream. + + metrics_.StartStream(); + + // Server initiated streams must send a URL to us in the headers. + if (headers->find("path") != headers->end()) + path_ = headers->find("path")->second; + + // TODO(mbelshe): For now we convert from our nice hash map back + // to a string of headers; this is because the HttpResponseInfo + // is a bit rigid for its http (non-flip) design. + std::string raw_headers(headers->find("version")->second); + raw_headers.append(" ", 1); + raw_headers.append(headers->find("status")->second); + raw_headers.append("\0", 1); + flip::FlipHeaderBlock::const_iterator it; + for (it = headers->begin(); it != headers->end(); ++it) { + raw_headers.append(it->first); + raw_headers.append(":", 1); + raw_headers.append(it->second); + raw_headers.append("\0", 1); + } + + LOG(INFO) << "FlipStream: SynReply received for " << stream_id_; + + DCHECK(response_ == NULL); + response_.reset(new HttpResponseInfo()); + response_->headers = new HttpResponseHeaders(raw_headers); + // When pushing content from the server, we may not yet have a delegate_ + // to notify. When the delegate is attached, it will notify then. + if (delegate_) + delegate_->OnResponseReceived(response_.get()); +} + +bool FlipStream::OnData(const char* data, int length) { + DCHECK(length >= 0); + LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " + << stream_id_; + + // If we don't have a response, then the SYN_REPLY did not come through. + // We cannot pass data up to the caller unless the reply headers have been + // received. + if (!response_.get()) { + if (delegate_) + delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED); + return true; + } + + // A zero-length read means that the stream is being closed. + if (!length) { + metrics_.StopStream(); + download_finished_ = true; + if (delegate_) { + delegate_->OnClose(net::OK); + return true; // Tell the caller to clean us up. + } + } + + // Track our bandwidth. + metrics_.RecordBytes(length); + + // We've received data. We try to pass it up to the caller. + // In the case of server-push streams, we may not have a delegate yet assigned + // to this stream. In that case we just queue the data for later. + if (delegate_) { + delegate_->OnDataReceived(data, length); + } else { + // Save the data for use later. + // TODO(mbelshe): We need to have some throttling on this. We shouldn't + // buffer an infinite amount of data. + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); + } + return false; +} + +void FlipStream::OnError(int err) { + if (delegate_) + delegate_->OnClose(err); +} + +} // namespace net + diff --git a/net/flip/flip_stream.h b/net/flip/flip_stream.h new file mode 100755 index 0000000..f836f85 --- /dev/null +++ b/net/flip/flip_stream.h @@ -0,0 +1,82 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_FLIP_FLIP_STREAM_H_ +#define NET_FLIP_FLIP_STREAM_H_ + +#include <string> +#include <list> + +#include "base/scoped_ptr.h" +#include "base/singleton.h" +#include "net/base/bandwidth_metrics.h" +#include "net/base/io_buffer.h" +#include "net/flip/flip_protocol.h" +#include "net/flip/flip_framer.h" + +namespace net { + +class FlipDelegate; +class HttpResponseInfo; + +// The FlipStream is used by the FlipSession to represent each stream known +// on the FlipSession. +// Streams can be created either by the client or by the server. When they +// are initiated by the client, they will usually have a delegate attached +// to them (such as a FlipNetworkTransaction). However, when streams are +// pushed by the server, they may not have a delegate yet. +class FlipStream { + public: + // FlipStream constructor + // If |delegate| is NULL, then we're receiving pushed data from the server. + FlipStream(flip::FlipStreamId stream_id, FlipDelegate* delegate) + : stream_id_(stream_id), + delegate_(delegate), + response_(NULL), + download_finished_(false), + metrics_(Singleton<BandwidthMetrics>::get()) {} + + virtual ~FlipStream() { + DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_; + }; + + flip::FlipStreamId stream_id() const { return stream_id_; } + FlipDelegate* delegate() const { return delegate_; } + + // For pushed streams, we track a path to identify them. + std::string path() const { return path_; } + void set_path(const std::string& path) { path_ = path; } + + // Attach a delegate to a previously pushed data stream. + // Returns true if the caller should Deactivate and delete the FlipStream + // after this call. + bool AttachDelegate(FlipDelegate* delegate); + + // Called when a SYN_REPLY is received on the stream. + void OnReply(const flip::FlipHeaderBlock* headers); + + // Called when data is received on the stream. + // Returns true if the caller should Deactivate and delete the FlipStream + // after this call. + bool OnData(const char* data, int length); + + // Called if the stream is being prematurely closed for an abort or an error. + // |err| is actually a net::Error. + // The caller should Deactivate and delete the FlipStream after this call. + void OnError(int err); + + private: + flip::FlipStreamId stream_id_; + std::string path_; + FlipDelegate* delegate_; + scoped_ptr<HttpResponseInfo> response_; + std::list<scoped_refptr<IOBufferWithSize> > response_body_; + bool download_finished_; + ScopedBandwidthMetrics metrics_; +}; + +} // namespace net + +#endif // NET_FLIP_FLIP_STREAM_H_ + diff --git a/net/net.gyp b/net/net.gyp index 46907a0..aa8a23f 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -284,6 +284,8 @@ 'flip/flip_session.h', 'flip/flip_session_pool.cc', 'flip/flip_session_pool.h', + 'flip/flip_stream.cc', + 'flip/flip_stream.h', 'flip/flip_transaction_factory.h', 'ftp/ftp_auth_cache.cc', 'ftp/ftp_auth_cache.h', |