diff options
5 files changed, 229 insertions, 182 deletions
diff --git a/net/flip/ b/net/flip/
index c081c5d..02aea85 100644
--- a/net/flip/
+++ b/net/flip/
@@ -10,11 +10,11 @@
#include "base/rand_util.h"
#include "base/stats_counters.h"
#include "base/string_util.h"
-#include "net/base/bandwidth_metrics.h"
#include "net/base/load_flags.h"
#include "net/base/net_util.h"
#include "net/flip/flip_frame_builder.h"
#include "net/flip/flip_protocol.h"
+#include "net/flip/flip_stream.h"
#include "net/http/http_network_session.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_headers.h"
@@ -30,56 +30,6 @@ scoped_ptr<FlipSessionPool> FlipSession::session_pool_;
bool FlipSession::use_ssl_ = true;
int PrioritizedIOBuffer::order_ = 0;
-// The FlipStreamImpl is an internal class representing an active FlipStream.
-class FlipStreamImpl {
- public:
- // A FlipStreamImpl represents an active FlipStream.
- // If |delegate| is NULL, then we're receiving pushed data from the server.
- FlipStreamImpl(flip::FlipStreamId stream_id, FlipDelegate* delegate)
- : stream_id_(stream_id),
- delegate_(delegate),
- response_(NULL),
- download_finished_(false),
- metrics_(Singleton<BandwidthMetrics>::get()) {}
- virtual ~FlipStreamImpl() {
- LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_;
- };
- flip::FlipStreamId stream_id() const { return stream_id_; }
- FlipDelegate* delegate() const { return delegate_; }
- // For pushed streams, we track a path to identify them.
- std::string path() const { return path_; }
- void set_path(const std::string& path) { path_ = path; }
- // Attach a delegate to a previously pushed data stream.
- // Returns true if the caller should Deactivate and delete the FlipStreamImpl
- // after this call.
- bool AttachDelegate(FlipDelegate* delegate);
- // Called when a SYN_REPLY is received on the stream.
- void OnReply(const flip::FlipHeaderBlock* headers);
- // Called when data is received on the stream.
- // Returns true if the caller should Deactivate and delete the FlipStreamImpl
- // after this call.
- bool OnData(const char* data, int length);
- // Called if the stream is being prematurely closed for an abort or an error.
- // |err| is actually a net::Error.
- // The caller should Deactivate and delete the FlipStreamImpl after this call.
- void OnError(int err);
- private:
- flip::FlipStreamId stream_id_;
- std::string path_;
- FlipDelegate* delegate_;
- scoped_ptr<HttpResponseInfo> response_;
- std::list<scoped_refptr<IOBufferWithSize> > response_body_;
- bool download_finished_;
- ScopedBandwidthMetrics metrics_;
FlipSession* FlipSession::GetFlipSession(
const HostResolver::RequestInfo& info,
HttpNetworkSession* session) {
@@ -217,7 +167,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) {
GURL url = delegate->request()->url;
std::string path = url.PathForRequest();
- FlipStreamImpl* stream = NULL;
+ FlipStream* stream = NULL;
// Check if we have a push stream for this path.
if (delegate->request()->method == "GET") {
@@ -313,7 +263,7 @@ bool FlipSession::CancelStream(int id) {
// TODO(mbelshe): Write a method for tearing down a stream
// that cleans it out of the active list, the pending list,
// etc.
- FlipStreamImpl* stream = active_streams_[id];
+ FlipStream* stream = active_streams_[id];
delete stream;
return true;
@@ -534,7 +484,7 @@ void FlipSession::CloseAllStreams(net::Error code) {
// Create a copy of the list, since aborting streams can invalidate
// our list.
- FlipStreamImpl** list = new FlipStreamImpl*[active_streams_.size()];
+ FlipStream** list = new FlipStream*[active_streams_.size()];
ActiveStreamMap::const_iterator it;
int index = 0;
for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
@@ -567,11 +517,11 @@ int FlipSession::GetNewStreamId() {
return id;
-FlipStreamImpl* FlipSession::ActivateStream(flip::FlipStreamId id,
+FlipStream* FlipSession::ActivateStream(flip::FlipStreamId id,
FlipDelegate* delegate) {
- FlipStreamImpl* stream = new FlipStreamImpl(id, delegate);
+ FlipStream* stream = new FlipStream(id, delegate);
active_streams_[id] = stream;
return stream;
@@ -582,7 +532,7 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) {
// Verify it is not on the pushed_streams_ list.
ActiveStreamList::iterator it;
for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
- FlipStreamImpl* impl = *it;
+ FlipStream* impl = *it;
if (id == impl->stream_id()) {
@@ -592,7 +542,7 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) {
-FlipStreamImpl* FlipSession::GetPushStream(std::string path) {
+FlipStream* FlipSession::GetPushStream(std::string path) {
static StatsCounter used_push_streams("flip.claimed_push_streams");
LOG(INFO) << "Looking for push stream: " << path;
@@ -600,7 +550,7 @@ FlipStreamImpl* FlipSession::GetPushStream(std::string path) {
// We just walk a linear list here.
ActiveStreamList::iterator it;
for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
- FlipStreamImpl* impl = *it;
+ FlipStream* impl = *it;
if (path == impl->path()) {
@@ -627,7 +577,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
- FlipStreamImpl* stream = active_streams_[stream_id];
+ FlipStream* stream = active_streams_[stream_id];
if (stream->OnData(data, len)) {
delete stream;
@@ -650,7 +600,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
// Activate a stream and parse the headers.
- FlipStreamImpl* stream = ActivateStream(stream_id, NULL);
+ FlipStream* stream = ActivateStream(stream_id, NULL);
// TODO(mbelshe): DCHECK that this is a GET method?
@@ -718,7 +668,7 @@ void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
} while (end < content.length());
- FlipStreamImpl* stream = active_streams_[stream_id];
+ FlipStream* stream = active_streams_[stream_id];
@@ -761,7 +711,7 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
- FlipStreamImpl* stream = active_streams_[stream_id];
+ FlipStream* stream = active_streams_[stream_id];
bool cleanup_stream = false;
if (frame->status() == 0) {
cleanup_stream = stream->OnData(NULL, 0);
@@ -783,119 +733,5 @@ void FlipSession::OnLameDuck() {
-bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) {
- DCHECK(delegate_ == NULL); // Don't attach if already attached.
- DCHECK(path_.length() > 0); // Path needs to be set for push streams.
- delegate_ = delegate;
- // If there is pending data, send it up here.
- // Check for the OnReply, and pass it up.
- if (response_.get())
- delegate_->OnResponseReceived(response_.get());
- // Pass data up
- while (response_body_.size()) {
- scoped_refptr<IOBufferWithSize> buffer = response_body_.front();
- response_body_.pop_front();
- delegate_->OnDataReceived(buffer->data(), buffer->size());
- }
- // Finally send up the end-of-stream.
- if (download_finished_) {
- delegate_->OnClose(net::OK);
- return true; // tell the caller to shut us down
- }
- return false;
-void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
- DCHECK(headers);
- DCHECK(headers->find("version") != headers->end());
- DCHECK(headers->find("status") != headers->end());
- // TODO(mbelshe): if no version or status is found, we need to error
- // out the stream.
- metrics_.StartStream();
- // Server initiated streams must send a URL to us in the headers.
- if (headers->find("path") != headers->end())
- path_ = headers->find("path")->second;
- // TODO(mbelshe): For now we convert from our nice hash map back
- // to a string of headers; this is because the HttpResponseInfo
- // is a bit rigid for its http (non-flip) design.
- std::string raw_headers(headers->find("version")->second);
- raw_headers.append(" ", 1);
- raw_headers.append(headers->find("status")->second);
- raw_headers.append("\0", 1);
- flip::FlipHeaderBlock::const_iterator it;
- for (it = headers->begin(); it != headers->end(); ++it) {
- raw_headers.append(it->first);
- raw_headers.append(":", 1);
- raw_headers.append(it->second);
- raw_headers.append("\0", 1);
- }
- LOG(INFO) << "FlipStream: SynReply received for " << stream_id_;
- DCHECK(response_ == NULL);
- response_.reset(new HttpResponseInfo());
- response_->headers = new HttpResponseHeaders(raw_headers);
- // When pushing content from the server, we may not yet have a delegate_
- // to notify. When the delegate is attached, it will notify then.
- if (delegate_)
- delegate_->OnResponseReceived(response_.get());
-bool FlipStreamImpl::OnData(const char* data, int length) {
- DCHECK(length >= 0);
- LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
- << stream_id_;
- // If we don't have a response, then the SYN_REPLY did not come through.
- // We cannot pass data up to the caller unless the reply headers have been
- // received.
- if (!response_.get()) {
- if (delegate_)
- delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
- return true;
- }
- // A zero-length read means that the stream is being closed.
- if (!length) {
- metrics_.StopStream();
- download_finished_ = true;
- if (delegate_) {
- delegate_->OnClose(net::OK);
- return true; // Tell the caller to clean us up.
- }
- }
- // Track our bandwidth.
- metrics_.RecordBytes(length);
- // We've received data. We try to pass it up to the caller.
- // In the case of server-push streams, we may not have a delegate yet assigned
- // to this stream. In that case we just queue the data for later.
- if (delegate_) {
- delegate_->OnDataReceived(data, length);
- } else {
- // Save the data for use later.
- // TODO(mbelshe): We need to have some throttling on this. We shouldn't
- // buffer an infinite amount of data.
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
- memcpy(io_buffer->data(), data, length);
- response_body_.push_back(io_buffer);
- }
- return false;
-void FlipStreamImpl::OnError(int err) {
- if (delegate_)
- delegate_->OnClose(err);
} // namespace net
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
index 1030c01..22cc578 100644
--- a/net/flip/flip_session.h
+++ b/net/flip/flip_session.h
@@ -26,7 +26,7 @@
namespace net {
-class FlipStreamImpl;
+class FlipStream;
class HttpNetworkSession;
class HttpRequestInfo;
class HttpResponseInfo;
@@ -203,13 +203,13 @@ class FlipSession : public base::RefCounted<FlipSession>,
int GetNewStreamId();
// Track active streams in the active stream list.
- FlipStreamImpl* ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate);
+ FlipStream* ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate);
void DeactivateStream(flip::FlipStreamId id);
// Check if we have a pending pushed-stream for this url
// Returns the stream if found (and returns it from the pending
// list), returns NULL otherwise.
- FlipStreamImpl* GetPushStream(std::string url);
+ FlipStream* GetPushStream(std::string url);
// Callbacks for the Flip session.
CompletionCallbackImpl<FlipSession> connect_callback_;
@@ -239,8 +239,8 @@ class FlipSession : public base::RefCounted<FlipSession>,
// TODO(mbelshe): We need to track these stream lists better.
// I suspect it is possible to remove a stream from
// one list, but not the other.
- typedef std::map<int, FlipStreamImpl*> ActiveStreamMap;
- typedef std::list<FlipStreamImpl*> ActiveStreamList;
+ typedef std::map<int, FlipStream*> ActiveStreamMap;
+ typedef std::list<FlipStream*> ActiveStreamList;
ActiveStreamMap active_streams_;
ActiveStreamList pushed_streams_;
diff --git a/net/flip/ b/net/flip/
new file mode 100755
index 0000000..22f8272
--- /dev/null
+++ b/net/flip/
@@ -0,0 +1,127 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "net/flip/flip_stream.h"
+#include "net/flip/flip_session.h"
+#include "net/http/http_response_info.h"
+namespace net {
+bool FlipStream::AttachDelegate(FlipDelegate* delegate) {
+ DCHECK(delegate_ == NULL); // Don't attach if already attached.
+ DCHECK(path_.length() > 0); // Path needs to be set for push streams.
+ delegate_ = delegate;
+ // If there is pending data, send it up here.
+ // Check for the OnReply, and pass it up.
+ if (response_.get())
+ delegate_->OnResponseReceived(response_.get());
+ // Pass data up
+ while (response_body_.size()) {
+ scoped_refptr<IOBufferWithSize> buffer = response_body_.front();
+ response_body_.pop_front();
+ delegate_->OnDataReceived(buffer->data(), buffer->size());
+ }
+ // Finally send up the end-of-stream.
+ if (download_finished_) {
+ delegate_->OnClose(net::OK);
+ return true; // tell the caller to shut us down
+ }
+ return false;
+void FlipStream::OnReply(const flip::FlipHeaderBlock* headers) {
+ DCHECK(headers);
+ DCHECK(headers->find("version") != headers->end());
+ DCHECK(headers->find("status") != headers->end());
+ // TODO(mbelshe): if no version or status is found, we need to error
+ // out the stream.
+ metrics_.StartStream();
+ // Server initiated streams must send a URL to us in the headers.
+ if (headers->find("path") != headers->end())
+ path_ = headers->find("path")->second;
+ // TODO(mbelshe): For now we convert from our nice hash map back
+ // to a string of headers; this is because the HttpResponseInfo
+ // is a bit rigid for its http (non-flip) design.
+ std::string raw_headers(headers->find("version")->second);
+ raw_headers.append(" ", 1);
+ raw_headers.append(headers->find("status")->second);
+ raw_headers.append("\0", 1);
+ flip::FlipHeaderBlock::const_iterator it;
+ for (it = headers->begin(); it != headers->end(); ++it) {
+ raw_headers.append(it->first);
+ raw_headers.append(":", 1);
+ raw_headers.append(it->second);
+ raw_headers.append("\0", 1);
+ }
+ LOG(INFO) << "FlipStream: SynReply received for " << stream_id_;
+ DCHECK(response_ == NULL);
+ response_.reset(new HttpResponseInfo());
+ response_->headers = new HttpResponseHeaders(raw_headers);
+ // When pushing content from the server, we may not yet have a delegate_
+ // to notify. When the delegate is attached, it will notify then.
+ if (delegate_)
+ delegate_->OnResponseReceived(response_.get());
+bool FlipStream::OnData(const char* data, int length) {
+ DCHECK(length >= 0);
+ LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
+ << stream_id_;
+ // If we don't have a response, then the SYN_REPLY did not come through.
+ // We cannot pass data up to the caller unless the reply headers have been
+ // received.
+ if (!response_.get()) {
+ if (delegate_)
+ delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
+ return true;
+ }
+ // A zero-length read means that the stream is being closed.
+ if (!length) {
+ metrics_.StopStream();
+ download_finished_ = true;
+ if (delegate_) {
+ delegate_->OnClose(net::OK);
+ return true; // Tell the caller to clean us up.
+ }
+ }
+ // Track our bandwidth.
+ metrics_.RecordBytes(length);
+ // We've received data. We try to pass it up to the caller.
+ // In the case of server-push streams, we may not have a delegate yet assigned
+ // to this stream. In that case we just queue the data for later.
+ if (delegate_) {
+ delegate_->OnDataReceived(data, length);
+ } else {
+ // Save the data for use later.
+ // TODO(mbelshe): We need to have some throttling on this. We shouldn't
+ // buffer an infinite amount of data.
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
+ memcpy(io_buffer->data(), data, length);
+ response_body_.push_back(io_buffer);
+ }
+ return false;
+void FlipStream::OnError(int err) {
+ if (delegate_)
+ delegate_->OnClose(err);
+} // namespace net
diff --git a/net/flip/flip_stream.h b/net/flip/flip_stream.h
new file mode 100755
index 0000000..f836f85
--- /dev/null
+++ b/net/flip/flip_stream.h
@@ -0,0 +1,82 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include <string>
+#include <list>
+#include "base/scoped_ptr.h"
+#include "base/singleton.h"
+#include "net/base/bandwidth_metrics.h"
+#include "net/base/io_buffer.h"
+#include "net/flip/flip_protocol.h"
+#include "net/flip/flip_framer.h"
+namespace net {
+class FlipDelegate;
+class HttpResponseInfo;
+// The FlipStream is used by the FlipSession to represent each stream known
+// on the FlipSession.
+// Streams can be created either by the client or by the server. When they
+// are initiated by the client, they will usually have a delegate attached
+// to them (such as a FlipNetworkTransaction). However, when streams are
+// pushed by the server, they may not have a delegate yet.
+class FlipStream {
+ public:
+ // FlipStream constructor
+ // If |delegate| is NULL, then we're receiving pushed data from the server.
+ FlipStream(flip::FlipStreamId stream_id, FlipDelegate* delegate)
+ : stream_id_(stream_id),
+ delegate_(delegate),
+ response_(NULL),
+ download_finished_(false),
+ metrics_(Singleton<BandwidthMetrics>::get()) {}
+ virtual ~FlipStream() {
+ DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
+ };
+ flip::FlipStreamId stream_id() const { return stream_id_; }
+ FlipDelegate* delegate() const { return delegate_; }
+ // For pushed streams, we track a path to identify them.
+ std::string path() const { return path_; }
+ void set_path(const std::string& path) { path_ = path; }
+ // Attach a delegate to a previously pushed data stream.
+ // Returns true if the caller should Deactivate and delete the FlipStream
+ // after this call.
+ bool AttachDelegate(FlipDelegate* delegate);
+ // Called when a SYN_REPLY is received on the stream.
+ void OnReply(const flip::FlipHeaderBlock* headers);
+ // Called when data is received on the stream.
+ // Returns true if the caller should Deactivate and delete the FlipStream
+ // after this call.
+ bool OnData(const char* data, int length);
+ // Called if the stream is being prematurely closed for an abort or an error.
+ // |err| is actually a net::Error.
+ // The caller should Deactivate and delete the FlipStream after this call.
+ void OnError(int err);
+ private:
+ flip::FlipStreamId stream_id_;
+ std::string path_;
+ FlipDelegate* delegate_;
+ scoped_ptr<HttpResponseInfo> response_;
+ std::list<scoped_refptr<IOBufferWithSize> > response_body_;
+ bool download_finished_;
+ ScopedBandwidthMetrics metrics_;
+} // namespace net
diff --git a/net/net.gyp b/net/net.gyp
index 46907a0..aa8a23f 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -284,6 +284,8 @@
+ 'flip/',
+ 'flip/flip_stream.h',