summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-22 00:43:00 +0000
committerwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-22 00:43:00 +0000
commita677f2b546d2034b459d40e6d0f24977e9b9ade3 (patch)
tree899e1150d0518fe318309301b215bc4ccc7690df
parent967dd54183a539e2c178fed766c48a61f407f8ff (diff)
downloadchromium_src-a677f2b546d2034b459d40e6d0f24977e9b9ade3.zip
chromium_src-a677f2b546d2034b459d40e6d0f24977e9b9ade3.tar.gz
chromium_src-a677f2b546d2034b459d40e6d0f24977e9b9ade3.tar.bz2
Flip: Merge FlipStreamParser and FlipStream. Eliminate FlipDelegate.
FlipStream now conceptually contains everything associated with a single Flip stream. This primarily consists of 2 things: (1) FlipStream as a consumer of network events from FlipSession (2) FlipStream as a network provider to consumers (such as FlipNetworkTransaction). Conceptually, FlipStream also should be agnostic of wire level protocol framing details, only dealing with HTTP style headers and responses. Anything wire level has been moved out of FlipStream into FlipSession. FlipStream is now reference counted since it is referenced by both the FlipSession and the consumer (only FlipNetworkTransaction currently). FlipNetworkTransaction can be cancelled prior to all network events finishing up, therefore the code needs to handle this gracefully. FlipStream supports a Cancel() function for this reason. FlipStream now communicates with consumers via CompletionCallbacks rather than using FlipDelegates. Review URL: http://codereview.chromium.org/410006 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@32765 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--net/flip/flip_io_buffer.cc13
-rw-r--r--net/flip/flip_io_buffer.h20
-rw-r--r--net/flip/flip_network_transaction.cc355
-rw-r--r--net/flip/flip_network_transaction.h111
-rw-r--r--net/flip/flip_network_transaction_unittest.cc57
-rw-r--r--net/flip/flip_session.cc315
-rw-r--r--net/flip/flip_session.h93
-rw-r--r--net/flip/flip_session_unittest.cc2
-rwxr-xr-xnet/flip/flip_stream.cc410
-rwxr-xr-xnet/flip/flip_stream.h185
-rw-r--r--net/flip/flip_stream_unittest.cc98
-rw-r--r--net/net.gyp1
12 files changed, 826 insertions, 834 deletions
diff --git a/net/flip/flip_io_buffer.cc b/net/flip/flip_io_buffer.cc
index 963aa3e..b3aecc5 100644
--- a/net/flip/flip_io_buffer.cc
+++ b/net/flip/flip_io_buffer.cc
@@ -3,11 +3,22 @@
// found in the LICENSE file.
#include "net/flip/flip_io_buffer.h"
+#include "net/flip/flip_stream.h"
namespace net {
// static
uint64 FlipIOBuffer::order_ = 0;
-} // namespace net
+FlipIOBuffer::FlipIOBuffer(
+ IOBufferWithSize* buffer, int priority, FlipStream* stream)
+ : buffer_(buffer),
+ priority_(priority),
+ position_(++order_),
+ stream_(stream) {}
+
+FlipIOBuffer::FlipIOBuffer() : priority_(0), position_(0), stream_(NULL) {}
+FlipIOBuffer::~FlipIOBuffer() {}
+
+} // namespace net
diff --git a/net/flip/flip_io_buffer.h b/net/flip/flip_io_buffer.h
index ce3d6367..85d9510 100644
--- a/net/flip/flip_io_buffer.h
+++ b/net/flip/flip_io_buffer.h
@@ -23,20 +23,19 @@ class FlipIOBuffer {
// |priority| is the priority of this buffer. Lower numbers are higher
// priority.
// |stream| is a pointer to the stream which is managing this buffer.
- FlipIOBuffer(IOBufferWithSize* buffer, int priority, FlipStream* stream)
- : buffer_(buffer),
- priority_(priority),
- position_(++order_),
- stream_(stream) {
- }
- FlipIOBuffer() : priority_(0), position_(0), stream_(NULL) {}
+ FlipIOBuffer(IOBufferWithSize* buffer, int priority, FlipStream* stream);
+ FlipIOBuffer();
+ ~FlipIOBuffer();
// Accessors.
IOBuffer* buffer() const { return buffer_; }
size_t size() const { return buffer_->size(); }
- void release() { buffer_ = NULL; }
+ void release() {
+ buffer_.release();
+ stream_.release();
+ }
int priority() const { return priority_; }
- FlipStream* stream() const { return stream_; }
+ const scoped_refptr<FlipStream>& stream() const { return stream_; }
// Comparison operator to support sorting.
bool operator<(const FlipIOBuffer& other) const {
@@ -49,11 +48,10 @@ class FlipIOBuffer {
scoped_refptr<IOBufferWithSize> buffer_;
int priority_;
uint64 position_;
- FlipStream* stream_;
+ scoped_refptr<FlipStream> stream_;
static uint64 order_; // Maintains a FIFO order for equal priorities.
};
} // namespace net
#endif // NET_FLIP_FLIP_IO_BUFFER_H_
-
diff --git a/net/flip/flip_network_transaction.cc b/net/flip/flip_network_transaction.cc
index 633c76f..d5cc1ee 100644
--- a/net/flip/flip_network_transaction.cc
+++ b/net/flip/flip_network_transaction.cc
@@ -4,17 +4,19 @@
#include "net/flip/flip_network_transaction.h"
-#include "base/scoped_ptr.h"
#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/scoped_ptr.h"
#include "net/base/host_resolver.h"
#include "net/base/io_buffer.h"
#include "net/base/load_flags.h"
#include "net/base/net_errors.h"
#include "net/base/net_util.h"
#include "net/base/upload_data_stream.h"
+#include "net/flip/flip_stream.h"
#include "net/http/http_network_session.h"
#include "net/http/http_request_info.h"
-#include "net/http/http_response_headers.h"
+#include "net/http/http_response_info.h"
using base::Time;
@@ -22,328 +24,6 @@ namespace net {
//-----------------------------------------------------------------------------
-FlipStreamParser::FlipStreamParser()
- : flip_(NULL),
- flip_stream_id_(0),
- request_(NULL),
- response_(NULL),
- request_body_stream_(NULL),
- response_complete_(false),
- io_state_(STATE_NONE),
- response_status_(OK),
- user_callback_(NULL),
- user_buffer_(NULL),
- user_buffer_len_(0),
- cancelled_(false) {
-}
-
-FlipStreamParser::~FlipStreamParser() {
- if (flip_ && flip_stream_id_) {
- flip_->CancelStream(flip_stream_id_);
- } else if (!response_complete_) {
- NOTREACHED();
- }
-}
-
-int FlipStreamParser::SendRequest(FlipSession* flip,
- const HttpRequestInfo* request,
- CompletionCallback* callback) {
- CHECK(flip);
- CHECK(request);
- CHECK(callback);
- DCHECK(!cancelled_);
-
- request_ = request;
- flip_ = flip;
-
- DCHECK(io_state_ == STATE_NONE);
- io_state_ = STATE_SENDING_HEADERS;
- int result = DoLoop(OK);
- if (result == ERR_IO_PENDING) {
- CHECK(!user_callback_);
- user_callback_ = callback;
- }
- return result;
-}
-
-int FlipStreamParser::ReadResponseHeaders(CompletionCallback* callback) {
- // Note: The FlipStream may have already received the response headers, so
- // this call may complete synchronously.
- DCHECK_GT(io_state_, STATE_HEADERS_SENT);
- DCHECK(!cancelled_);
- CHECK(callback);
-
- // The SYN_REPLY has already been received.
- if (response_.get())
- return OK;
-
- io_state_ = STATE_READ_HEADERS;
- CHECK(!user_callback_);
- user_callback_ = callback;
- return ERR_IO_PENDING;
-}
-
-int FlipStreamParser::ReadResponseBody(
- IOBuffer* buf, int buf_len, CompletionCallback* callback) {
- DCHECK(io_state_ == STATE_BODY_PENDING ||
- io_state_ == STATE_READ_BODY ||
- io_state_ == STATE_DONE);
- DCHECK(!cancelled_);
- CHECK(buf);
- CHECK(buf_len);
- CHECK(callback);
-
- io_state_ = STATE_READ_BODY;
-
- // If we have data buffered, complete the IO immediately.
- if (response_body_.size()) {
- int bytes_read = 0;
- while (response_body_.size() && buf_len > 0) {
- scoped_refptr<IOBufferWithSize> data = response_body_.front();
- const int bytes_to_copy = std::min(buf_len, data->size());
- memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
- buf_len -= bytes_to_copy;
- if (bytes_to_copy == data->size()) {
- response_body_.pop_front();
- } else {
- const int bytes_remaining = data->size() - bytes_to_copy;
- IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
- memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
- bytes_remaining);
- response_body_.pop_front();
- response_body_.push_front(new_buffer);
- }
- bytes_read += bytes_to_copy;
- }
- return bytes_read;
- } else if (response_complete_) {
- return response_status_;
- }
-
- CHECK(!user_callback_);
- CHECK(!user_buffer_);
- CHECK(user_buffer_len_ == 0);
-
- user_callback_ = callback;
- user_buffer_ = buf;
- user_buffer_len_ = buf_len;
- return ERR_IO_PENDING;
-}
-
-uint64 FlipStreamParser::GetUploadProgress() const {
- if (!request_body_stream_.get())
- return 0;
-
- return request_body_stream_->position();
-}
-
-const HttpResponseInfo* FlipStreamParser::GetResponseInfo() const {
- return response_.get();
-}
-
-void FlipStreamParser::Cancel() {
- cancelled_ = true;
- user_callback_ = NULL;
-}
-
-const HttpRequestInfo* FlipStreamParser::request() const {
- return request_;
-}
-
-const UploadDataStream* FlipStreamParser::data() const {
- return request_body_stream_.get();
-}
-
-void FlipStreamParser::OnWriteComplete(int status) {
- if (io_state_ == STATE_SENDING_HEADERS)
- io_state_ = STATE_HEADERS_SENT;
-
- DoLoop(status);
-}
-
-void FlipStreamParser::OnResponseReceived(HttpResponseInfo* response) {
- response_.reset(new HttpResponseInfo);
- *response_ = *response; // TODO(mbelshe): avoid copy.
-
- DCHECK_GE(io_state_, STATE_HEADERS_SENT);
- io_state_ = STATE_BODY_PENDING;
-
- if (user_callback_)
- DoCallback(OK);
-}
-
-void FlipStreamParser::OnDataReceived(const char* buffer, int bytes) {
- // TODO(mbelshe): if data is received before a syn reply, this will crash.
-
- DCHECK_GE(bytes, 0);
- if (bytes > 0) {
- DCHECK(buffer);
-
- // TODO(mbelshe): If read is pending, we should copy the data straight into
- // the read buffer here. For now, we'll queue it always.
-
- IOBufferWithSize* io_buffer = new IOBufferWithSize(bytes);
- memcpy(io_buffer->data(), buffer, bytes);
-
- response_body_.push_back(io_buffer);
- }
-
- // Note that data may be received for a FlipStream prior to the user calling
- // ReadResponseBody(), therefore user_callback_ may be NULL.
- if (user_callback_) {
- int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
- CHECK(rv != ERR_IO_PENDING);
- user_buffer_ = NULL;
- user_buffer_len_ = 0;
- DoCallback(rv);
- }
-}
-
-void FlipStreamParser::OnClose(int status) {
- response_complete_ = true;
- response_status_ = status;
- flip_stream_id_ = 0;
-
- if (user_callback_)
- DoCallback(status);
-}
-
-void FlipStreamParser::DoCallback(int rv) {
- CHECK(rv != ERR_IO_PENDING);
- CHECK(user_callback_);
-
- // Since Run may result in being called back, clear user_callback_ in advance.
- CompletionCallback* c = user_callback_;
- user_callback_ = NULL;
- c->Run(rv);
-}
-
-int FlipStreamParser::DoSendHeaders(int result) {
- // TODO(mbelshe): rethink this UploadDataStream wrapper.
- if (request_->upload_data)
- request_body_stream_.reset(new UploadDataStream(request_->upload_data));
-
- CHECK(flip_stream_id_ == 0);
- flip_stream_id_ = flip_->CreateStream(this);
-
- // The FlipSession will always call us back when the send is complete.
- return ERR_IO_PENDING;
-}
-
-// DoSendBody is called to send the optional body for the request. This call
-// will also be called as each write of a chunk of the body completes.
-int FlipStreamParser::DoSendBody(int result) {
- // There is no body, move to the next state.
- if (!request_body_stream_.get()) {
- io_state_ = STATE_REQUEST_SENT;
- return result;
- }
-
- DCHECK(result != 0); // This should not happen.
- if (result <= 0)
- return result;
-
- // If we're already in the STATE_SENDING_BODY state, then we've already
- // sent a portion of the body. In that case, we need to first consume
- // the bytes written in the body stream. Note that the bytes written is
- // the number of bytes in the frame that were written, only consume the
- // data portion, of course.
- if (io_state_ == STATE_SENDING_BODY)
- request_body_stream_->DidConsume(result);
- else
- io_state_ = STATE_SENDING_BODY;
-
- if (request_body_stream_->position() < request_body_stream_->size()) {
- int buf_len = static_cast<int>(request_body_stream_->buf_len());
- int rv = flip_->WriteStreamData(flip_stream_id_,
- request_body_stream_->buf(),
- buf_len);
- return rv;
- }
-
- io_state_ = STATE_REQUEST_SENT;
- return result;
-}
-
-int FlipStreamParser::DoReadHeaders() {
- // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
- // makes sense.
- return ERR_IO_PENDING;
-}
-
-int FlipStreamParser::DoReadHeadersComplete(int result) {
- // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
- // makes sense.
- io_state_ = STATE_BODY_PENDING;
- return ERR_IO_PENDING;
-}
-
-int FlipStreamParser::DoReadBody() {
- // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
- // makes sense.
- return ERR_IO_PENDING;
-}
-
-int FlipStreamParser::DoReadBodyComplete(int result) {
- // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
- // makes sense.
- return ERR_IO_PENDING;
-}
-
-int FlipStreamParser::DoLoop(int result) {
- bool can_do_more = true;
- if (cancelled_)
- return ERR_ABORTED;
-
- do {
- switch (io_state_) {
- case STATE_SENDING_HEADERS:
- result = DoSendHeaders(result);
- break;
- case STATE_HEADERS_SENT:
- case STATE_SENDING_BODY:
- if (result < 0)
- can_do_more = false;
- else
- result = DoSendBody(result);
- break;
- case STATE_REQUEST_SENT:
- DCHECK(result != ERR_IO_PENDING);
- can_do_more = false;
- break;
- case STATE_READ_HEADERS:
- result = DoReadHeaders();
- break;
- case STATE_READ_HEADERS_COMPLETE:
- result = DoReadHeadersComplete(result);
- break;
- case STATE_BODY_PENDING:
- DCHECK(result != ERR_IO_PENDING);
- can_do_more = false;
- break;
- case STATE_READ_BODY:
- result = DoReadBody();
- // DoReadBodyComplete handles error conditions.
- break;
- case STATE_READ_BODY_COMPLETE:
- result = DoReadBodyComplete(result);
- break;
- case STATE_DONE:
- DCHECK(result != ERR_IO_PENDING);
- can_do_more = false;
- break;
- default:
- NOTREACHED();
- can_do_more = false;
- break;
- }
- } while (result != ERR_IO_PENDING && can_do_more);
-
- return result;
-}
-
-//-----------------------------------------------------------------------------
-
FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session)
: ALLOW_THIS_IN_INITIALIZER_LIST(
io_callback_(this, &FlipNetworkTransaction::OnIOComplete)),
@@ -351,13 +31,14 @@ FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session)
user_buffer_len_(0),
session_(session),
request_(NULL),
- next_state_(STATE_NONE) {
+ next_state_(STATE_NONE),
+ stream_(NULL) {
}
FlipNetworkTransaction::~FlipNetworkTransaction() {
LOG(INFO) << "FlipNetworkTransaction dead. " << this;
- if (flip_stream_parser_.get())
- flip_stream_parser_->Cancel();
+ if (stream_.get())
+ stream_->Cancel();
}
int FlipNetworkTransaction::Start(const HttpRequestInfo* request_info,
@@ -417,7 +98,7 @@ int FlipNetworkTransaction::Read(IOBuffer* buf, int buf_len,
}
const HttpResponseInfo* FlipNetworkTransaction::GetResponseInfo() const {
- const HttpResponseInfo* response = flip_stream_parser_->GetResponseInfo();
+ const HttpResponseInfo* response = stream_->GetResponseInfo();
return (response->headers || response->ssl_info.cert) ? response : NULL;
}
@@ -439,10 +120,10 @@ LoadState FlipNetworkTransaction::GetLoadState() const {
}
uint64 FlipNetworkTransaction::GetUploadProgress() const {
- if (!flip_stream_parser_.get())
+ if (!stream_.get())
return 0;
- return flip_stream_parser_->GetUploadProgress();
+ return stream_->GetUploadProgress();
}
void FlipNetworkTransaction::DoCallback(int rv) {
@@ -543,9 +224,11 @@ int FlipNetworkTransaction::DoInitConnectionComplete(int result) {
int FlipNetworkTransaction::DoSendRequest() {
next_state_ = STATE_SEND_REQUEST_COMPLETE;
- CHECK(!flip_stream_parser_.get());
- flip_stream_parser_ = new FlipStreamParser;
- return flip_stream_parser_->SendRequest(flip_, request_, &io_callback_);
+ CHECK(!stream_.get());
+ UploadDataStream* upload_data = request_->upload_data ?
+ new UploadDataStream(request_->upload_data) : NULL;
+ stream_ = flip_->GetOrCreateStream(*request_, upload_data);
+ return stream_->SendRequest(upload_data, &io_callback_);
}
int FlipNetworkTransaction::DoSendRequestComplete(int result) {
@@ -558,7 +241,7 @@ int FlipNetworkTransaction::DoSendRequestComplete(int result) {
int FlipNetworkTransaction::DoReadHeaders() {
next_state_ = STATE_READ_HEADERS_COMPLETE;
- return flip_stream_parser_->ReadResponseHeaders(&io_callback_);
+ return stream_->ReadResponseHeaders(&io_callback_);
}
int FlipNetworkTransaction::DoReadHeadersComplete(int result) {
@@ -569,7 +252,7 @@ int FlipNetworkTransaction::DoReadHeadersComplete(int result) {
int FlipNetworkTransaction::DoReadBody() {
next_state_ = STATE_READ_BODY_COMPLETE;
- return flip_stream_parser_->ReadResponseBody(
+ return stream_->ReadResponseBody(
user_buffer_, user_buffer_len_, &io_callback_);
}
@@ -578,7 +261,7 @@ int FlipNetworkTransaction::DoReadBodyComplete(int result) {
user_buffer_len_ = 0;
if (result <= 0)
- flip_stream_parser_ = NULL;
+ stream_ = NULL;
return result;
}
diff --git a/net/flip/flip_network_transaction.h b/net/flip/flip_network_transaction.h
index a0cd0c8..ae1588d 100644
--- a/net/flip/flip_network_transaction.h
+++ b/net/flip/flip_network_transaction.h
@@ -12,119 +12,20 @@
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
#include "base/time.h"
-#include "net/base/io_buffer.h"
+#include "net/base/completion_callback.h"
#include "net/base/load_states.h"
#include "net/flip/flip_session.h"
-#include "net/http/http_response_info.h"
#include "net/http/http_transaction.h"
namespace net {
class FlipSession;
+class FlipStream;
class HttpNetworkSession;
+class HttpResponseInfo;
+class IOBuffer;
class UploadDataStream;
-// FlipStreamParser is a class to encapsulate the IO of a FLIP
-// stream on top of a FlipSession. All read/writes go through
-// the FlipStreamParser.
-class FlipStreamParser : public FlipDelegate {
- public:
- FlipStreamParser();
- ~FlipStreamParser();
-
- // Creates a FLIP stream from |flip| and send the HTTP request over it.
- // |request|'s lifetime must persist longer than |this|. This always
- // completes asynchronously, so |callback| must be non-NULL. Returns a net
- // error code.
- int SendRequest(FlipSession* flip, const HttpRequestInfo* request,
- CompletionCallback* callback);
-
- // Reads the response headers. Returns a net error code.
- int ReadResponseHeaders(CompletionCallback* callback);
-
- // Reads the response body. Returns a net error code or the number of bytes
- // read.
- int ReadResponseBody(
- IOBuffer* buf, int buf_len, CompletionCallback* callback);
-
- // Returns the number of bytes uploaded.
- uint64 GetUploadProgress() const;
-
- const HttpResponseInfo* GetResponseInfo() const;
-
- // Cancels the stream.
- // Once cancelled, no callbacks will be made on this stream.
- void Cancel();
-
- // FlipDelegate methods:
- virtual const HttpRequestInfo* request() const;
- virtual const UploadDataStream* data() const;
- virtual void OnResponseReceived(HttpResponseInfo* response);
- virtual void OnDataReceived(const char* buffer, int bytes);
- virtual void OnWriteComplete(int status);
- virtual void OnClose(int status);
-
- private:
- friend class FlipStreamParserPeer;
-
- enum State {
- STATE_NONE,
- STATE_SENDING_HEADERS,
- STATE_HEADERS_SENT,
- STATE_SENDING_BODY,
- STATE_REQUEST_SENT,
- STATE_READ_HEADERS,
- STATE_READ_HEADERS_COMPLETE,
- STATE_BODY_PENDING,
- STATE_READ_BODY,
- STATE_READ_BODY_COMPLETE,
- STATE_DONE
- };
-
- // Try to make progress sending/receiving the request/response.
- int DoLoop(int result);
-
- // The implementations of each state of the state machine.
- int DoSendHeaders(int result);
- int DoHeadersSent(int result);
- int DoSendBody(int result);
- int DoReadHeaders();
- int DoReadHeadersComplete(int result);
- int DoReadBody();
- int DoReadBodyComplete(int result);
-
- void DoCallback(int rv);
-
- // The Flip request id for this request.
- scoped_refptr<FlipSession> flip_;
- flip::FlipStreamId flip_stream_id_;
-
- const HttpRequestInfo* request_;
- scoped_ptr<HttpResponseInfo> response_;
- scoped_ptr<UploadDataStream> request_body_stream_;
-
- bool response_complete_; // TODO(mbelshe): fold this into the io_state.
- State io_state_;
-
- // We buffer the response body as it arrives asynchronously from the stream.
- // TODO(mbelshe): is this infinite buffering?
- std::deque<scoped_refptr<IOBufferWithSize> > response_body_;
-
- // Since we buffer the response, we also buffer the response status.
- // Not valid until response_complete_ is true.
- int response_status_;
-
- CompletionCallback* user_callback_;
-
- // User provided buffer for the ReadResponseBody() response.
- scoped_refptr<IOBuffer> user_buffer_;
- int user_buffer_len_;
-
- bool cancelled_;
-
- DISALLOW_COPY_AND_ASSIGN(FlipStreamParser);
-};
-
// A FlipNetworkTransaction can be used to fetch HTTP conent.
// The FlipDelegate is the consumer of events from the FlipSession.
class FlipNetworkTransaction : public HttpTransaction {
@@ -193,7 +94,7 @@ class FlipNetworkTransaction : public HttpTransaction {
CompletionCallbackImpl<FlipNetworkTransaction> io_callback_;
CompletionCallback* user_callback_;
- // Used to pass onto the FlipStreamParser.
+ // Used to pass onto the FlipStream
scoped_refptr<IOBuffer> user_buffer_;
int user_buffer_len_;
@@ -207,7 +108,7 @@ class FlipNetworkTransaction : public HttpTransaction {
// The next state in the state machine.
State next_state_;
- scoped_refptr<FlipStreamParser> flip_stream_parser_;
+ scoped_refptr<FlipStream> stream_;
DISALLOW_COPY_AND_ASSIGN(FlipNetworkTransaction);
};
diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc
index 775da10..8405718 100644
--- a/net/flip/flip_network_transaction_unittest.cc
+++ b/net/flip/flip_network_transaction_unittest.cc
@@ -22,19 +22,6 @@
namespace net {
-class FlipStreamParserPeer {
- public:
- explicit FlipStreamParserPeer(FlipStreamParser* flip_stream_parser)
- : flip_stream_parser_(flip_stream_parser) {}
-
- int flip_stream_id() const { return flip_stream_parser_->flip_stream_id_; }
-
- private:
- FlipStreamParser* const flip_stream_parser_;
-
- DISALLOW_COPY_AND_ASSIGN(FlipStreamParserPeer);
-};
-
namespace {
// Create a proxy service which fails on all requests (falls back to direct).
@@ -82,47 +69,6 @@ HttpNetworkSession* CreateSession(SessionDependencies* session_deps) {
session_deps->flip_session_pool);
}
-class FlipStreamParserTest : public PlatformTest {
- protected:
- FlipStreamParserTest()
- : session_(CreateSession(&session_deps_)),
- parser_peer_(&parser_) {}
-
- FlipSession* CreateFlipSession() {
- HostResolver::RequestInfo resolve_info("www.google.com", 80);
- FlipSession* session = session_->flip_session_pool()->Get(
- resolve_info, session_);
- return session;
- }
-
- virtual void TearDown() {
- MessageLoop::current()->RunAllPending();
- PlatformTest::TearDown();
- }
-
- SessionDependencies session_deps_;
- scoped_refptr<HttpNetworkSession> session_;
- FlipStreamParser parser_;
- FlipStreamParserPeer parser_peer_;
-};
-
-// TODO(willchan): Look into why TCPConnectJobs are still alive when this test
-// goes away. They're calling into the ClientSocketFactory which doesn't exist
-// anymore, so it crashes.
-TEST_F(FlipStreamParserTest, DISABLED_SendRequest) {
- scoped_refptr<FlipSession> flip(CreateFlipSession());
- HttpRequestInfo request;
- request.method = "GET";
- request.url = GURL("http://www.google.com/");
- TestCompletionCallback callback;
-
- EXPECT_EQ(ERR_IO_PENDING, parser_.SendRequest(flip, &request, &callback));
- EXPECT_TRUE(flip->IsStreamActive(parser_peer_.flip_stream_id()));
-}
-
-// TODO(willchan): Write a longer test for FlipStreamParser that exercises all
-// methods.
-
} // namespace
// A DataProvider where the client must write a request before the reads (e.g.
@@ -350,7 +296,8 @@ TEST_F(FlipNetworkTransactionTest, Post) {
0x00, 0x06, 's', 't', 'a', 't', 'u', 's', // "status"
0x00, 0x03, '2', '0', '0', // "200"
0x00, 0x03, 'u', 'r', 'l', // "url"
- 0x00, 0x0a, '/', 'i', 'n', 'd', 'e', 'x', '.', 'p', 'h', 'p', // "HTTP/1.1"
+ // "/index.php"
+ 0x00, 0x0a, '/', 'i', 'n', 'd', 'e', 'x', '.', 'p', 'h', 'p',
0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version"
0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1"
};
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
index 7ce6da8..cede942 100644
--- a/net/flip/flip_session.cc
+++ b/net/flip/flip_session.cc
@@ -9,6 +9,7 @@
#include "base/message_loop.h"
#include "base/rand_util.h"
#include "base/stats_counters.h"
+#include "base/stl_util-inl.h"
#include "base/string_util.h"
#include "net/base/load_flags.h"
#include "net/base/net_util.h"
@@ -48,6 +49,89 @@ void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) {
namespace net {
+namespace {
+
+const int kReadBufferSize = 4 * 1024;
+
+HttpResponseInfo FlipHeadersToHttpResponse(
+ const flip::FlipHeaderBlock& headers) {
+ std::string raw_headers(headers.find("version")->second);
+ raw_headers.push_back(' ');
+ raw_headers.append(headers.find("status")->second);
+ raw_headers.push_back('\0');
+ flip::FlipHeaderBlock::const_iterator it;
+ for (it = headers.begin(); it != headers.end(); ++it) {
+ // For each value, if the server sends a NUL-separated
+ // list of values, we separate that back out into
+ // individual headers for each value in the list.
+ // e.g.
+ // Set-Cookie "foo\0bar"
+ // becomes
+ // Set-Cookie: foo\0
+ // Set-Cookie: bar\0
+ std::string value = it->second;
+ size_t start = 0;
+ size_t end = 0;
+ do {
+ end = value.find('\0', start);
+ std::string tval;
+ if (end != value.npos)
+ tval = value.substr(start, (end - start));
+ else
+ tval = value.substr(start);
+ raw_headers.append(it->first);
+ raw_headers.push_back(':');
+ raw_headers.append(tval);
+ raw_headers.push_back('\0');
+ start = end + 1;
+ } while (end != value.npos);
+ }
+
+ HttpResponseInfo response;
+ response.headers = new HttpResponseHeaders(raw_headers);
+ return response;
+}
+
+// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
+// a HttpRequestInfo block.
+void CreateFlipHeadersFromHttpRequest(
+ const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) {
+ static const char kHttpProtocolVersion[] = "HTTP/1.1";
+
+ HttpUtil::HeadersIterator it(info.extra_headers.begin(),
+ info.extra_headers.end(),
+ "\r\n");
+ while (it.GetNext()) {
+ std::string name = StringToLowerASCII(it.name());
+ if (headers->find(name) == headers->end()) {
+ (*headers)[name] = it.values();
+ } else {
+ std::string new_value = (*headers)[name];
+ new_value += "\0";
+ new_value += it.values();
+ (*headers)[name] = new_value;
+ }
+ }
+
+ (*headers)["method"] = info.method;
+ (*headers)["url"] = info.url.spec();
+ (*headers)["version"] = kHttpProtocolVersion;
+ if (info.user_agent.length())
+ (*headers)["user-agent"] = info.user_agent;
+ if (!info.referrer.is_empty())
+ (*headers)["referer"] = info.referrer.spec();
+
+ // Honor load flags that impact proxy caches.
+ if (info.load_flags & LOAD_BYPASS_CACHE) {
+ (*headers)["pragma"] = "no-cache";
+ (*headers)["cache-control"] = "no-cache";
+ } else if (info.load_flags & LOAD_VALIDATE_CACHE) {
+ (*headers)["cache-control"] = "max-age=0";
+ }
+}
+
+} // namespace
+
// static
bool FlipSession::use_ssl_ = true;
@@ -112,85 +196,46 @@ net::Error FlipSession::Connect(const std::string& group_name,
return static_cast<net::Error>(rv);
}
-// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
-// a HttpRequestInfo block.
-void CreateFlipHeadersFromHttpRequest(
- const HttpRequestInfo* info, flip::FlipHeaderBlock* headers) {
- static const std::string kHttpProtocolVersion("HTTP/1.1");
-
- HttpUtil::HeadersIterator it(info->extra_headers.begin(),
- info->extra_headers.end(),
- "\r\n");
- while (it.GetNext()) {
- std::string name = StringToLowerASCII(it.name());
- if (headers->find(name) == headers->end()) {
- (*headers)[name] = it.values();
- } else {
- std::string new_value = (*headers)[name];
- new_value += "\0";
- new_value += it.values();
- (*headers)[name] = new_value;
- }
- }
-
- (*headers)["method"] = info->method;
- (*headers)["url"] = info->url.spec();
- (*headers)["version"] = kHttpProtocolVersion;
- if (info->user_agent.length())
- (*headers)["user-agent"] = info->user_agent;
- if (!info->referrer.is_empty())
- (*headers)["referer"] = info->referrer.spec();
-
- // Honor load flags that impact proxy caches.
- if (info->load_flags & LOAD_BYPASS_CACHE) {
- (*headers)["pragma"] = "no-cache";
- (*headers)["cache-control"] = "no-cache";
- } else if (info->load_flags & LOAD_VALIDATE_CACHE) {
- (*headers)["cache-control"] = "max-age=0";
- }
-}
-
-int FlipSession::CreateStream(FlipDelegate* delegate) {
- flip::FlipStreamId stream_id = GetNewStreamId();
-
- GURL url = delegate->request()->url;
- std::string path = url.PathForRequest();
+scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
+ const HttpRequestInfo& request,
+ const UploadDataStream* upload_data) {
+ const GURL& url = request.url;
+ const std::string& path = url.PathForRequest();
- FlipStream* stream = NULL;
+ scoped_refptr<FlipStream> stream;
// Check if we have a push stream for this path.
- if (delegate->request()->method == "GET") {
+ if (request.method == "GET") {
stream = GetPushStream(path);
- if (stream) {
- if (stream->AttachDelegate(delegate)) {
- DeactivateStream(stream->stream_id());
- delete stream;
- return 0;
- }
- return stream->stream_id();
- }
+ if (stream)
+ return stream;
}
// Check if we have a pending push stream for this url.
- std::string url_path = delegate->request()->url.PathForRequest();
PendingStreamMap::iterator it;
- it = pending_streams_.find(url_path);
+ it = pending_streams_.find(path);
if (it != pending_streams_.end()) {
- DCHECK(it->second == NULL);
- it->second = delegate;
- return 0; // TODO(mbelshe): this is overloaded with the fail case.
+ DCHECK(!it->second);
+ // Server will assign a stream id when the push stream arrives. Use 0 for
+ // now.
+ FlipStream* stream = new FlipStream(this, 0, true);
+ stream->set_path(path);
+ it->second = stream;
+ return it->second;
}
+ const flip::FlipStreamId stream_id = GetNewStreamId();
+
// If we still don't have a stream, activate one now.
- stream = ActivateStream(stream_id, delegate);
- if (!stream)
- return net::ERR_FAILED;
+ stream = new FlipStream(this, stream_id, false);
+ stream->set_priority(request.priority);
+ stream->set_path(path);
+ ActivateStream(stream);
- LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for "
- << delegate->request()->url;
+ LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url;
// TODO(mbelshe): Optimize memory allocations
- int priority = delegate->request()->priority;
+ int priority = request.priority;
// Hack for the priorities
// TODO(mbelshe): These need to be plumbed through the Http Network Stack.
@@ -209,10 +254,10 @@ int FlipSession::CreateStream(FlipDelegate* delegate) {
// Convert from HttpRequestHeaders to Flip Headers.
flip::FlipHeaderBlock headers;
- CreateFlipHeadersFromHttpRequest(delegate->request(), &headers);
+ CreateFlipHeadersFromHttpRequest(request, &headers);
flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE;
- if (!delegate->data() || !delegate->data()->size())
+ if (!request.upload_data || !upload_data->size())
flags = flip::CONTROL_FLAG_FIN;
// Create a SYN_STREAM packet and add to the output queue.
@@ -227,7 +272,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) {
static StatsCounter flip_requests("flip.requests");
flip_requests.Increment();
- LOG(INFO) << "FETCHING: " << delegate->request()->url.spec();
+ LOG(INFO) << "FETCHING: " << request.url.spec();
LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------";
DumpFlipHeaders(headers);
@@ -240,7 +285,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) {
// requests.
WriteSocketLater();
- return stream_id;
+ return stream;
}
int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
@@ -256,7 +301,8 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
// Find our stream
DCHECK(IsStreamActive(stream_id));
- FlipStream* stream = active_streams_[stream_id];
+ scoped_refptr<FlipStream> stream = active_streams_[stream_id];
+ CHECK(stream->stream_id() == stream_id);
if (!stream)
return ERR_INVALID_FLIP_STREAM;
@@ -276,8 +322,7 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
int length = flip::FlipFrame::size() + frame->length();
IOBufferWithSize* buffer = new IOBufferWithSize(length);
memcpy(buffer->data(), frame->data(), length);
- queue_.push(FlipIOBuffer(buffer, stream->delegate()->request()->priority,
- stream));
+ queue_.push(FlipIOBuffer(buffer, stream->priority(), stream));
// Whenever we queue onto the socket we need to ensure that we will write to
// it later.
@@ -294,14 +339,13 @@ bool FlipSession::CancelStream(flip::FlipStreamId stream_id) {
// TODO(mbelshe): Write a method for tearing down a stream
// that cleans it out of the active list, the pending list,
// etc.
- FlipStream* stream = active_streams_[stream_id];
+ scoped_refptr<FlipStream> stream = active_streams_[stream_id];
DeactivateStream(stream_id);
- delete stream;
return true;
}
bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const {
- return active_streams_.find(stream_id) != active_streams_.end();
+ return ContainsKey(active_streams_, stream_id);
}
LoadState FlipSession::GetLoadState() const {
@@ -414,16 +458,18 @@ void FlipSession::OnWriteComplete(int result) {
// We only notify the stream when we've fully written the pending flip
// frame.
- FlipStream* stream = in_flight_write_.stream();
- DCHECK(stream);
-
- // Report the number of bytes written to the caller, but exclude the
- // frame size overhead.
- if (result > 0) {
- DCHECK(result > static_cast<int>(flip::FlipFrame::size()));
- result -= static_cast<int>(flip::FlipFrame::size());
+ scoped_refptr<FlipStream> stream = in_flight_write_.stream();
+ DCHECK(stream.get());
+
+ if (!stream->cancelled()) {
+ // Report the number of bytes written to the caller, but exclude the
+ // frame size overhead.
+ if (result > 0) {
+ DCHECK(result > static_cast<int>(flip::FlipFrame::size()));
+ result -= static_cast<int>(flip::FlipFrame::size());
+ }
+ stream->OnWriteComplete(result);
}
- stream->OnWriteComplete(result);
// Cleanup the write which just completed.
in_flight_write_.release();
@@ -551,8 +597,7 @@ void FlipSession::CloseAllStreams(net::Error code) {
for (--index; index >= 0; index--) {
LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
<< "): " << list[index]->path();
- list[index]->OnError(ERR_ABORTED);
- delete list[index];
+ list[index]->OnClose(ERR_ABORTED);
}
// Clear out anything pending.
@@ -575,13 +620,11 @@ int FlipSession::GetNewStreamId() {
return id;
}
-FlipStream* FlipSession::ActivateStream(flip::FlipStreamId id,
- FlipDelegate* delegate) {
+void FlipSession::ActivateStream(FlipStream* stream) {
+ const flip::FlipStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
- FlipStream* stream = new FlipStream(id, delegate);
active_streams_[id] = stream;
- return stream;
}
void FlipSession::DeactivateStream(flip::FlipStreamId id) {
@@ -590,8 +633,8 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) {
// Verify it is not on the pushed_streams_ list.
ActiveStreamList::iterator it;
for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
- FlipStream* impl = *it;
- if (id == impl->stream_id()) {
+ scoped_refptr<FlipStream> curr = *it;
+ if (id == curr->stream_id()) {
pushed_streams_.erase(it);
break;
}
@@ -600,23 +643,27 @@ void FlipSession::DeactivateStream(flip::FlipStreamId id) {
active_streams_.erase(id);
}
-FlipStream* FlipSession::GetPushStream(const std::string& path) {
+scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) {
static StatsCounter used_push_streams("flip.claimed_push_streams");
LOG(INFO) << "Looking for push stream: " << path;
+ scoped_refptr<FlipStream> stream;
+
// We just walk a linear list here.
ActiveStreamList::iterator it;
for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
- FlipStream* impl = *it;
- if (path == impl->path()) {
+ stream = *it;
+ if (path == stream->path()) {
+ CHECK(stream->pushed());
pushed_streams_.erase(it);
used_push_streams.Increment();
LOG(INFO) << "Push Stream Claim for: " << path;
- return impl;
+ break;
}
}
- return NULL;
+
+ return stream;
}
void FlipSession::OnError(flip::FlipFramer* framer) {
@@ -635,11 +682,11 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
return;
}
- FlipStream* stream = active_streams_[stream_id];
- if (stream->OnData(data, len)) {
- DeactivateStream(stream->stream_id());
- delete stream;
- }
+ scoped_refptr<FlipStream> stream = active_streams_[stream_id];
+ bool success = stream->OnDataReceived(data, len);
+ // |len| == 0 implies a closed stream.
+ if (!success || !len)
+ DeactivateStream(stream_id);
}
void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
@@ -657,38 +704,58 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
return;
}
+ LOG(INFO) << "FlipSession: SynReply received for stream: " << stream_id;
+
+ DCHECK(ContainsKey(*headers, "version"));
+ DCHECK(ContainsKey(*headers, "status"));
LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS -----------------------";
DumpFlipHeaders(*headers);
- // Activate a stream and parse the headers.
- FlipStream* stream = ActivateStream(stream_id, NULL);
- stream->OnReply(headers);
-
// TODO(mbelshe): DCHECK that this is a GET method?
+ const std::string& path = ContainsKey(*headers, "path") ?
+ headers->find("path")->second : "";
+
// Verify that the response had a URL for us.
- DCHECK(stream->path().length() != 0);
- if (stream->path().length() == 0) {
+ DCHECK(!path.empty());
+ if (path.empty()) {
LOG(WARNING) << "Pushed stream did not contain a path.";
- DeactivateStream(stream_id);
- delete stream;
return;
}
+ scoped_refptr<FlipStream> stream;
+
// Check if we already have a delegate awaiting this stream.
PendingStreamMap::iterator it;
- it = pending_streams_.find(stream->path());
+ it = pending_streams_.find(path);
if (it != pending_streams_.end()) {
- FlipDelegate* delegate = it->second;
+ stream = it->second;
pending_streams_.erase(it);
- if (delegate)
- stream->AttachDelegate(delegate);
- else
+ if (stream)
pushed_streams_.push_back(stream);
} else {
pushed_streams_.push_back(stream);
}
+ if (stream) {
+ CHECK(stream->pushed());
+ CHECK(stream->stream_id() == 0);
+ stream->set_stream_id(stream_id);
+ } else {
+ stream = new FlipStream(this, stream_id, true);
+ }
+
+ // Activate a stream and parse the headers.
+ ActivateStream(stream);
+
+ stream->set_path(path);
+
+ // TODO(mbelshe): For now we convert from our nice hash map back
+ // to a string of headers; this is because the HttpResponseInfo
+ // is a bit rigid for its http (non-flip) design.
+ const HttpResponseInfo response = FlipHeadersToHttpResponse(*headers);
+ stream->OnResponseReceived(response);
+
LOG(INFO) << "Got pushed stream for " << stream->path();
static StatsCounter push_requests("flip.pushed_streams");
@@ -733,8 +800,10 @@ void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
} while (start < content.length());
}
- FlipStream* stream = active_streams_[stream_id];
- stream->OnReply(headers);
+ scoped_refptr<FlipStream> stream = active_streams_[stream_id];
+ CHECK(stream->stream_id() == stream_id);
+ const HttpResponseInfo response = FlipHeadersToHttpResponse(*headers);
+ stream->OnResponseReceived(response);
}
void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
@@ -776,22 +845,18 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
return;
}
- FlipStream* stream = active_streams_[stream_id];
- bool cleanup_stream = false;
+ scoped_refptr<FlipStream> stream = active_streams_[stream_id];
+ CHECK(stream->stream_id() == stream_id);
if (frame->status() == 0) {
- cleanup_stream = stream->OnData(NULL, 0);
+ stream->OnDataReceived(NULL, 0);
} else {
LOG(ERROR) << "Flip stream closed: " << frame->status();
// TODO(mbelshe): Map from Flip-protocol errors to something sensical.
// For now, it doesn't matter much - it is a protocol error.
- stream->OnError(ERR_FAILED);
- cleanup_stream = true;
+ stream->OnClose(ERR_FAILED);
}
- if (cleanup_stream) {
- DeactivateStream(stream_id);
- delete stream;
- }
+ DeactivateStream(stream_id);
}
} // namespace net
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
index fdefb6e..3ebd6d2 100644
--- a/net/flip/flip_session.h
+++ b/net/flip/flip_session.h
@@ -32,55 +32,6 @@ class HttpNetworkSession;
class HttpRequestInfo;
class HttpResponseInfo;
-// The FlipDelegate interface is an interface so that the FlipSession
-// can interact with the provider of a given Flip stream.
-class FlipDelegate : public base::RefCounted<FlipDelegate> {
- public:
- // Accessors from the delegate.
-
- // The delegate provides access to the HttpRequestInfo for use by the flip
- // session.
- virtual const HttpRequestInfo* request() const = 0;
-
- // The delegate provides access to an UploadDataStream for use by the
- // flip session. If the delegate is not uploading content, this call
- // must return NULL.
- virtual const UploadDataStream* data() const = 0;
-
- // Callbacks.
-
- // Called by the FlipSession when a response (e.g. a SYN_REPLY) has been
- // received for this request. This callback will never be called prior
- // to the OnRequestSent() callback.
- virtual void OnResponseReceived(HttpResponseInfo* response) = 0;
-
- // Called by the FlipSession when response data has been received for this
- // request. This callback may be called multiple times as data arrives
- // from the network, and will never be called prior to OnResponseReceived.
- // |buffer| contains the data received. The delegate must copy any data
- // from this buffer before returning from this callback.
- // |bytes| is the number of bytes received or an error.
- // A zero-length count does not indicate end-of-stream.
- virtual void OnDataReceived(const char* buffer, int bytes) = 0;
-
- // Called by the FlipSession when a write has completed. This callback
- // will be called multiple times for each write which completes. Writes
- // include the SYN_STREAM write and also DATA frame writes.
- // |result| is the number of bytes written or a net error code.
- virtual void OnWriteComplete(int result) = 0;
-
- // Called by the FlipSession when the request is finished. This callback
- // will always be called at the end of the request and signals to the
- // delegate that the delegate can be torn down. No further callbacks to the
- // delegate will be made after this call.
- // |status| is an error code or OK.
- virtual void OnClose(int status) = 0;
-
- protected:
- friend class base::RefCounted<FlipDelegate>;
- virtual ~FlipDelegate() {}
-};
-
class FlipSession : public base::RefCounted<FlipSession>,
public flip::FlipFramerVisitorInterface {
public:
@@ -94,12 +45,14 @@ class FlipSession : public base::RefCounted<FlipSession>,
net::Error Connect(const std::string& group_name,
const HostResolver::RequestInfo& host, int priority);
- // Create a new stream.
- // FlipDelegate must remain valid until the stream is either cancelled by the
- // creator via CancelStream or the FlipDelegate OnClose or OnCancel callbacks
- // have been made.
- // Once the stream is created, the delegate should wait for a callback.
- int CreateStream(FlipDelegate* delegate);
+ // Get a stream for a given |request|. In the typical case, this will involve
+ // the creation of a new stream (and will send the SYN frame). If the server
+ // initiates a stream, it might already exist for a given path. The server
+ // might also not have initiated the stream yet, but indicated it will via
+ // X-Associated-Content.
+ // Returns the new or existing stream. Never returns NULL.
+ scoped_refptr<FlipStream> GetOrCreateStream(
+ const HttpRequestInfo& request, const UploadDataStream* upload_data);
// Write a data frame to the stream.
// Used to create and queue a data frame for the given stream.
@@ -134,6 +87,12 @@ class FlipSession : public base::RefCounted<FlipSession>,
private:
friend class base::RefCounted<FlipSession>;
+
+ typedef std::map<int, scoped_refptr<FlipStream> > ActiveStreamMap;
+ typedef std::list<scoped_refptr<FlipStream> > ActiveStreamList;
+ typedef std::map<std::string, scoped_refptr<FlipStream> > PendingStreamMap;
+ typedef std::priority_queue<FlipIOBuffer> OutputQueue;
+
virtual ~FlipSession();
// FlipFramerVisitorInterface
@@ -167,13 +126,13 @@ class FlipSession : public base::RefCounted<FlipSession>,
int GetNewStreamId();
// Track active streams in the active stream list.
- FlipStream* ActivateStream(flip::FlipStreamId id, FlipDelegate* delegate);
+ void ActivateStream(FlipStream* stream);
void DeactivateStream(flip::FlipStreamId id);
// Check if we have a pending pushed-stream for this url
// Returns the stream if found (and returns it from the pending
// list), returns NULL otherwise.
- FlipStream* GetPushStream(const std::string& url);
+ scoped_refptr<FlipStream> GetPushStream(const std::string& url);
// Callbacks for the Flip session.
CompletionCallbackImpl<FlipSession> connect_callback_;
@@ -194,7 +153,6 @@ class FlipSession : public base::RefCounted<FlipSession>,
bool connection_ready_; // Is the connection ready for use.
// The read buffer used to read data from the socket.
- enum { kReadBufferSize = (4 * 1024) };
scoped_refptr<IOBuffer> read_buffer_;
bool read_pending_;
@@ -203,18 +161,25 @@ class FlipSession : public base::RefCounted<FlipSession>,
// TODO(mbelshe): We need to track these stream lists better.
// I suspect it is possible to remove a stream from
// one list, but not the other.
- typedef std::map<int, FlipStream*> ActiveStreamMap;
- typedef std::list<FlipStream*> ActiveStreamList;
- ActiveStreamMap active_streams_;
+ // Map from stream id to all active streams. Streams are active in the sense
+ // that they have a consumer (typically FlipNetworkTransaction and regardless
+ // of whether or not there is currently any ongoing IO [might be waiting for
+ // the server to start pushing the stream]) or there are still network events
+ // incoming even though the consumer has already gone away (cancellation).
+ // TODO(willchan): Perhaps we should separate out cancelled streams and move
+ // them into a separate ActiveStreamMap, and not deliver network events to
+ // them?
+ ActiveStreamMap active_streams_;
+ // List of all the streams that have already started to be pushed by the
+ // server, but do not have consumers yet.
ActiveStreamList pushed_streams_;
- // List of streams declared in X-Associated-Content headers.
+ // List of streams declared in X-Associated-Content headers, but do not have
+ // consumers yet.
// The key is a string representing the path of the URI being pushed.
- typedef std::map<std::string, scoped_refptr<FlipDelegate> > PendingStreamMap;
PendingStreamMap pending_streams_;
// As we gather data to be sent, we put it into the output queue.
- typedef std::priority_queue<FlipIOBuffer> OutputQueue;
OutputQueue queue_;
// TODO(mbelshe): this is ugly!!
diff --git a/net/flip/flip_session_unittest.cc b/net/flip/flip_session_unittest.cc
index c27f478..20604fe 100644
--- a/net/flip/flip_session_unittest.cc
+++ b/net/flip/flip_session_unittest.cc
@@ -5,6 +5,7 @@
#include "net/base/test_completion_callback.h"
#include "net/flip/flip_io_buffer.h"
#include "net/flip/flip_session.h"
+#include "net/flip/flip_stream.h"
#include "net/socket/socket_test_util.h"
#include "testing/platform_test.h"
@@ -52,4 +53,3 @@ TEST_F(FlipSessionTest, FlipIOBuffer) {
}
} // namespace net
-
diff --git a/net/flip/flip_stream.cc b/net/flip/flip_stream.cc
index 051a827..b07f5bf 100755
--- a/net/flip/flip_stream.cc
+++ b/net/flip/flip_stream.cc
@@ -4,99 +4,177 @@
#include "net/flip/flip_stream.h"
+#include "base/logging.h"
#include "net/flip/flip_session.h"
+#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"
namespace net {
-bool FlipStream::AttachDelegate(FlipDelegate* delegate) {
- DCHECK(delegate_ == NULL); // Don't attach if already attached.
- DCHECK(path_.length() > 0); // Path needs to be set for push streams.
- delegate_ = delegate;
+FlipStream::FlipStream(FlipSession* session,
+ flip::FlipStreamId stream_id,
+ bool pushed)
+ : stream_id_(stream_id),
+ priority_(0),
+ pushed_(pushed),
+ download_finished_(false),
+ metrics_(Singleton<BandwidthMetrics>::get()),
+ session_(session),
+ response_(NULL),
+ request_body_stream_(NULL),
+ response_complete_(false),
+ io_state_(STATE_NONE),
+ response_status_(OK),
+ user_callback_(NULL),
+ user_buffer_(NULL),
+ user_buffer_len_(0),
+ cancelled_(false) {}
- // If there is pending data, send it up here.
+FlipStream::~FlipStream() {
+ DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
- // Check for the OnReply, and pass it up.
+ // TODO(willchan): We're still calling CancelStream() too many times, because
+ // inactive pending/pushed streams will still have stream_id_ set.
+ if (stream_id_) {
+ session_->CancelStream(stream_id_);
+ } else if (!response_complete_) {
+ NOTREACHED();
+ }
+}
+
+uint64 FlipStream::GetUploadProgress() const {
+ if (!request_body_stream_.get())
+ return 0;
+
+ return request_body_stream_->position();
+}
+
+const HttpResponseInfo* FlipStream::GetResponseInfo() const {
+ return response_.get();
+}
+
+int FlipStream::ReadResponseHeaders(CompletionCallback* callback) {
+ // Note: The FlipStream may have already received the response headers, so
+ // this call may complete synchronously.
+ CHECK(callback);
+ CHECK(io_state_ == STATE_NONE);
+ CHECK(!cancelled_);
+
+ // The SYN_REPLY has already been received.
if (response_.get())
- delegate_->OnResponseReceived(response_.get());
+ return OK;
+
+ io_state_ = STATE_READ_HEADERS;
+ CHECK(!user_callback_);
+ user_callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
+int FlipStream::ReadResponseBody(
+ IOBuffer* buf, int buf_len, CompletionCallback* callback) {
+ DCHECK_EQ(io_state_, STATE_NONE);
+ CHECK(buf);
+ CHECK(buf_len);
+ CHECK(callback);
+ CHECK(!cancelled_);
- // Pass data up
- while (response_body_.size()) {
- scoped_refptr<IOBufferWithSize> buffer = response_body_.front();
- response_body_.pop_front();
- delegate_->OnDataReceived(buffer->data(), buffer->size());
+ // If we have data buffered, complete the IO immediately.
+ if (response_body_.size()) {
+ int bytes_read = 0;
+ while (response_body_.size() && buf_len > 0) {
+ scoped_refptr<IOBufferWithSize> data = response_body_.front();
+ const int bytes_to_copy = std::min(buf_len, data->size());
+ memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
+ buf_len -= bytes_to_copy;
+ if (bytes_to_copy == data->size()) {
+ response_body_.pop_front();
+ } else {
+ const int bytes_remaining = data->size() - bytes_to_copy;
+ IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
+ memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
+ bytes_remaining);
+ response_body_.pop_front();
+ response_body_.push_front(new_buffer);
+ }
+ bytes_read += bytes_to_copy;
+ }
+ return bytes_read;
+ } else if (response_complete_) {
+ return response_status_;
}
- // Finally send up the end-of-stream.
- if (download_finished_) {
- delegate_->OnClose(net::OK);
- return true; // tell the caller to shut us down
+ CHECK(!user_callback_);
+ CHECK(!user_buffer_);
+ CHECK(user_buffer_len_ == 0);
+
+ user_callback_ = callback;
+ user_buffer_ = buf;
+ user_buffer_len_ = buf_len;
+ return ERR_IO_PENDING;
+}
+
+int FlipStream::SendRequest(UploadDataStream* upload_data,
+ CompletionCallback* callback) {
+ CHECK(callback);
+ CHECK(!cancelled_);
+
+ // TODO(mbelshe): rethink this UploadDataStream wrapper.
+ if (upload_data)
+ request_body_stream_.reset(upload_data);
+
+ DCHECK_EQ(io_state_, STATE_NONE);
+ if (!pushed_)
+ io_state_ = STATE_SEND_HEADERS;
+ else
+ io_state_ = STATE_READ_HEADERS;
+ int result = DoLoop(OK);
+ if (result == ERR_IO_PENDING) {
+ CHECK(!user_callback_);
+ user_callback_ = callback;
}
- return false;
+ return result;
}
-void FlipStream::OnReply(const flip::FlipHeaderBlock* headers) {
- DCHECK(headers);
- DCHECK(headers->find("version") != headers->end());
- DCHECK(headers->find("status") != headers->end());
+void FlipStream::Cancel() {
+ cancelled_ = true;
+ user_callback_ = NULL;
- // TODO(mbelshe): if no version or status is found, we need to error
- // out the stream.
+ // TODO(willchan): Should we also call FlipSession::CancelStream()? What
+ // makes more sense? If we cancel the stream, perhaps we should send the
+ // server a control packet to tell it to stop sending to the dead stream. But
+ // then does FlipSession deactivate the stream? It would log warnings for
+ // data packets for an invalid stream then, unless we maintained some data
+ // structure to keep track of cancelled stream ids. Ugh. Currently
+ // FlipStream does not call CancelStream(). We should free up all the memory
+ // associated with the stream though (ditch all the IOBuffers) and at all
+ // FlipSession's entrypoints into FlipStream check |cancelled_| and not copy
+ // the data.
+}
+void FlipStream::OnResponseReceived(const HttpResponseInfo& response) {
metrics_.StartStream();
- // Server initiated streams must send a URL to us in the headers.
- if (headers->find("path") != headers->end())
- path_ = headers->find("path")->second;
-
- // TODO(mbelshe): For now we convert from our nice hash map back
- // to a string of headers; this is because the HttpResponseInfo
- // is a bit rigid for its http (non-flip) design.
- std::string raw_headers(headers->find("version")->second);
- raw_headers.append(" ", 1);
- raw_headers.append(headers->find("status")->second);
- raw_headers.append("\0", 1);
- flip::FlipHeaderBlock::const_iterator it;
- for (it = headers->begin(); it != headers->end(); ++it) {
- // For each value, if the server sends a NUL-separated
- // list of values, we separate that back out into
- // individual headers for each value in the list.
- // e.g.
- // Set-Cookie "foo\0bar"
- // becomes
- // Set-Cookie: foo\0
- // Set-Cookie: bar\0
- std::string value = it->second;
- size_t start = 0;
- size_t end = 0;
- do {
- end = value.find('\0', start);
- std::string tval;
- if (end != value.npos)
- tval = value.substr(start, (end - start));
- else
- tval = value.substr(start);
- raw_headers.append(it->first);
- raw_headers.append(":", 1);
- raw_headers.append(tval);
- raw_headers.append("\0", 1);
- start = end + 1;
- } while (end != value.npos);
+ CHECK(!response_.get());
+
+ response_.reset(new HttpResponseInfo);
+ *response_ = response; // TODO(mbelshe): avoid copy.
+
+ if (io_state_ == STATE_NONE) {
+ CHECK(pushed_);
+ } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) {
+ CHECK(!pushed_);
+ } else {
+ NOTREACHED();
}
- LOG(INFO) << "FlipStream: SynReply received for " << stream_id_;
+ int rv = DoLoop(OK);
- DCHECK(response_ == NULL);
- response_.reset(new HttpResponseInfo());
- response_->headers = new HttpResponseHeaders(raw_headers);
- // When pushing content from the server, we may not yet have a delegate_
- // to notify. When the delegate is attached, it will notify then.
- if (delegate_)
- delegate_->OnResponseReceived(response_.get());
+ if (user_callback_)
+ DoCallback(rv);
}
-bool FlipStream::OnData(const char* data, int length) {
- DCHECK(length >= 0);
+bool FlipStream::OnDataReceived(const char* data, int length) {
+ DCHECK_GE(length, 0);
LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
<< stream_id_;
@@ -104,55 +182,197 @@ bool FlipStream::OnData(const char* data, int length) {
// We cannot pass data up to the caller unless the reply headers have been
// received.
if (!response_.get()) {
- if (delegate_)
- delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
- return true;
+ OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
+ return false;
}
// A zero-length read means that the stream is being closed.
if (!length) {
metrics_.StopStream();
download_finished_ = true;
- if (delegate_) {
- delegate_->OnClose(net::OK);
- return true; // Tell the caller to clean us up.
- }
+ OnClose(net::OK);
+ return true;
}
// Track our bandwidth.
metrics_.RecordBytes(length);
- // We've received data. We try to pass it up to the caller.
- // In the case of server-push streams, we may not have a delegate yet assigned
- // to this stream. In that case we just queue the data for later.
- if (delegate_) {
- delegate_->OnDataReceived(data, length);
- } else {
- // Save the data for use later.
+ if (length > 0) {
+ // TODO(mbelshe): If read is pending, we should copy the data straight into
+ // the read buffer here. For now, we'll queue it always.
// TODO(mbelshe): We need to have some throttling on this. We shouldn't
// buffer an infinite amount of data.
+
IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
memcpy(io_buffer->data(), data, length);
+
response_body_.push_back(io_buffer);
}
- return false;
+
+ // Note that data may be received for a FlipStream prior to the user calling
+ // ReadResponseBody(), therefore user_callback_ may be NULL. This may often
+ // happen for server initiated streams.
+ if (user_callback_) {
+ int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
+ CHECK(rv != ERR_IO_PENDING);
+ user_buffer_ = NULL;
+ user_buffer_len_ = 0;
+ DoCallback(rv);
+ }
+
+ return true;
+}
+
+void FlipStream::OnWriteComplete(int status) {
+ DoLoop(status);
+}
+
+void FlipStream::OnClose(int status) {
+ response_complete_ = true;
+ response_status_ = status;
+ stream_id_ = 0;
+
+ if (user_callback_)
+ DoCallback(status);
+}
+
+int FlipStream::DoLoop(int result) {
+ do {
+ State state = io_state_;
+ io_state_ = STATE_NONE;
+ switch (state) {
+ // State machine 1: Send headers and wait for response headers.
+ case STATE_SEND_HEADERS:
+ CHECK(result == OK);
+ result = DoSendHeaders();
+ break;
+ case STATE_SEND_HEADERS_COMPLETE:
+ result = DoSendHeadersComplete(result);
+ break;
+ case STATE_SEND_BODY:
+ CHECK(result == OK);
+ result = DoSendBody();
+ break;
+ case STATE_SEND_BODY_COMPLETE:
+ result = DoSendBodyComplete(result);
+ break;
+ case STATE_READ_HEADERS:
+ CHECK(result == OK);
+ result = DoReadHeaders();
+ break;
+ case STATE_READ_HEADERS_COMPLETE:
+ result = DoReadHeadersComplete(result);
+ break;
+
+ // State machine 2: Read body.
+ // NOTE(willchan): Currently unused. Currently we handle this stuff in
+ // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc. Only reason
+ // to do this is for consistency with the Http code.
+ case STATE_READ_BODY:
+ result = DoReadBody();
+ break;
+ case STATE_READ_BODY_COMPLETE:
+ result = DoReadBodyComplete(result);
+ break;
+ case STATE_DONE:
+ DCHECK(result != ERR_IO_PENDING);
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+ } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);
+
+ return result;
+}
+
+void FlipStream::DoCallback(int rv) {
+ CHECK(rv != ERR_IO_PENDING);
+ CHECK(user_callback_);
+
+ // Since Run may result in being called back, clear user_callback_ in advance.
+ CompletionCallback* c = user_callback_;
+ user_callback_ = NULL;
+ c->Run(rv);
+}
+
+int FlipStream::DoSendHeaders() {
+ // The FlipSession will always call us back when the send is complete.
+ // TODO(willchan): This code makes the assumption that for the non-push stream
+ // case, the client code calls SendRequest() after creating the stream and
+ // before yielding back to the MessageLoop. This is true in the current code,
+ // but is not obvious from the headers. We should make the code handle
+ // SendRequest() being called after the SYN_REPLY has been received.
+ io_state_ = STATE_SEND_HEADERS_COMPLETE;
+ return ERR_IO_PENDING;
+}
+
+int FlipStream::DoSendHeadersComplete(int result) {
+ if (result < 0)
+ return result;
+
+ CHECK(result > 0);
+
+ // There is no body, skip that state.
+ if (!request_body_stream_.get()) {
+ io_state_ = STATE_READ_HEADERS;
+ return OK;
+ }
+
+ io_state_ = STATE_SEND_BODY;
+ return OK;
}
-void FlipStream::OnError(int err) {
- if (delegate_)
- delegate_->OnClose(err);
+// DoSendBody is called to send the optional body for the request. This call
+// will also be called as each write of a chunk of the body completes.
+int FlipStream::DoSendBody() {
+ // If we're already in the STATE_SENDING_BODY state, then we've already
+ // sent a portion of the body. In that case, we need to first consume
+ // the bytes written in the body stream. Note that the bytes written is
+ // the number of bytes in the frame that were written, only consume the
+ // data portion, of course.
+ io_state_ = STATE_SEND_BODY_COMPLETE;
+ int buf_len = static_cast<int>(request_body_stream_->buf_len());
+ return session_->WriteStreamData(stream_id_,
+ request_body_stream_->buf(),
+ buf_len);
}
-int FlipStream::OnWriteComplete(int result) {
- // We only write to the socket when there is a delegate.
- DCHECK(delegate_);
+int FlipStream::DoSendBodyComplete(int result) {
+ if (result < 0)
+ return result;
+
+ CHECK(result != 0);
- delegate_->OnWriteComplete(result);
+ request_body_stream_->DidConsume(result);
+
+ if (request_body_stream_->position() < request_body_stream_->size())
+ io_state_ = STATE_SEND_BODY;
+ else
+ io_state_ = STATE_READ_HEADERS;
- // TODO(mbelshe): we might want to remove the status code
- // since we're not doing anything useful with it.
return OK;
}
-} // namespace net
+int FlipStream::DoReadHeaders() {
+ io_state_ = STATE_READ_HEADERS_COMPLETE;
+ return response_.get() ? OK : ERR_IO_PENDING;
+}
+
+int FlipStream::DoReadHeadersComplete(int result) {
+ return result;
+}
+
+int FlipStream::DoReadBody() {
+ // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
+ // makes sense.
+ return ERR_IO_PENDING;
+}
+
+int FlipStream::DoReadBodyComplete(int result) {
+ // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
+ // makes sense.
+ return ERR_IO_PENDING;
+}
+} // namespace net
diff --git a/net/flip/flip_stream.h b/net/flip/flip_stream.h
index 51266b3..54c5cd4 100755
--- a/net/flip/flip_stream.h
+++ b/net/flip/flip_stream.h
@@ -8,80 +8,183 @@
#include <string>
#include <list>
+#include "base/basictypes.h"
+#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
#include "base/singleton.h"
#include "net/base/bandwidth_metrics.h"
+#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
-#include "net/flip/flip_protocol.h"
#include "net/flip/flip_framer.h"
+#include "net/flip/flip_protocol.h"
namespace net {
-class FlipDelegate;
+class FlipSession;
+class HttpRequestInfo;
class HttpResponseInfo;
+class UploadData;
+class UploadDataStream;
// The FlipStream is used by the FlipSession to represent each stream known
// on the FlipSession.
// Streams can be created either by the client or by the server. When they
-// are initiated by the client, they will usually have a delegate attached
-// to them (such as a FlipNetworkTransaction). However, when streams are
-// pushed by the server, they may not have a delegate yet.
-class FlipStream {
+// are initiated by the client, both the FlipSession and client object (such as
+// a FlipNetworkTransaction) will maintain a reference to the stream. When
+// initiated by the server, only the FlipSession will maintain any reference,
+// until such a time as a client object requests a stream for the path.
+class FlipStream : public base::RefCounted<FlipStream> {
public:
// FlipStream constructor
- // If |delegate| is NULL, then we're receiving pushed data from the server.
- FlipStream(flip::FlipStreamId stream_id, FlipDelegate* delegate)
- : stream_id_(stream_id),
- delegate_(delegate),
- response_(NULL),
- download_finished_(false),
- metrics_(Singleton<BandwidthMetrics>::get()) {}
-
- virtual ~FlipStream() {
- DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
- };
+ FlipStream(FlipSession* session, flip::FlipStreamId stream_id, bool pushed);
+
+ // Ideally I'd use two abstract classes as interfaces for these two sections,
+ // but since we're ref counted, I can't make both abstract classes inherit
+ // from RefCounted or we'll have two separate ref counts for the same object.
+ // TODO(willchan): Consider using linked_ptr here orcreating proxy wrappers
+ // for FlipStream to provide the appropriate interface.
+
+ // ===================================================
+ // Interface for [Http|Flip]NetworkTransaction to use.
+
+ // Sends the request. If |upload_data| is non-NULL, sends that in the request
+ // body. |callback| is used when this completes asynchronously. Note that
+ // the actual SYN_STREAM packet will have already been sent by this point.
+ // Also note that FlipStream takes ownership of |upload_data|.
+ int SendRequest(UploadDataStream* upload_data, CompletionCallback* callback);
+
+ // Reads the response headers. Returns a net error code.
+ int ReadResponseHeaders(CompletionCallback* callback);
+
+ // Reads the response body. Returns a net error code or the number of bytes
+ // read.
+ int ReadResponseBody(
+ IOBuffer* buf, int buf_len, CompletionCallback* callback);
+
+ // Cancels the stream. Note that this does not immediately cause deletion of
+ // the stream. This function is used to cancel any callbacks from being
+ // invoked. TODO(willchan): It should also free up any memory associated with
+ // the stream, such as IOBuffers.
+ void Cancel();
+
+ // Returns the number of bytes uploaded.
+ uint64 GetUploadProgress() const;
+
+ const HttpResponseInfo* GetResponseInfo() const;
+
+ // Is this stream a pushed stream from the server.
+ bool pushed() const { return pushed_; }
+
+ // =================================
+ // Interface for FlipSession to use.
flip::FlipStreamId stream_id() const { return stream_id_; }
- FlipDelegate* delegate() const { return delegate_; }
+ void set_stream_id(flip::FlipStreamId stream_id) { stream_id_ = stream_id; }
// For pushed streams, we track a path to identify them.
- std::string path() const { return path_; }
+ const std::string& path() const { return path_; }
void set_path(const std::string& path) { path_ = path; }
- // Attach a delegate to a previously pushed data stream.
- // Returns true if the caller should Deactivate and delete the FlipStream
- // after this call.
- bool AttachDelegate(FlipDelegate* delegate);
+ int priority() const { return priority_; }
+ void set_priority(int priority) { priority_ = priority; }
+
+ // Called by the FlipSession when a response (e.g. a SYN_REPLY) has been
+ // received for this stream. |path| is the path of the URL for a server
+ // initiated stream, otherwise is empty.
+ void OnResponseReceived(const HttpResponseInfo& response);
+
+ // Called by the FlipSession when response data has been received for this
+ // stream. This callback may be called multiple times as data arrives
+ // from the network, and will never be called prior to OnResponseReceived.
+ // |buffer| contains the data received. The stream must copy any data
+ // from this buffer before returning from this callback.
+ // |length| is the number of bytes received or an error.
+ // A zero-length count does not indicate end-of-stream.
+ // Returns true on success and false on error.
+ bool OnDataReceived(const char* buffer, int bytes);
+
+ // Called by the FlipSession when a write has completed. This callback
+ // will be called multiple times for each write which completes. Writes
+ // include the SYN_STREAM write and also DATA frame writes.
+ // |result| is the number of bytes written or a net error code.
+ void OnWriteComplete(int status);
+
+ // Called by the FlipSession when the request is finished. This callback
+ // will always be called at the end of the request and signals to the
+ // stream that the stream has no more network events. No further callbacks
+ // to the stream will be made after this call.
+ // |status| is an error code or OK.
+ void OnClose(int status);
+
+ bool cancelled() const { return cancelled_; }
+
+ private:
+ friend class base::RefCounted<FlipStream>;
+
+ enum State {
+ STATE_NONE,
+ STATE_SEND_HEADERS,
+ STATE_SEND_HEADERS_COMPLETE,
+ STATE_SEND_BODY,
+ STATE_SEND_BODY_COMPLETE,
+ STATE_READ_HEADERS,
+ STATE_READ_HEADERS_COMPLETE,
+ STATE_READ_BODY,
+ STATE_READ_BODY_COMPLETE,
+ STATE_DONE
+ };
- // Called when a SYN_REPLY is received on the stream.
- void OnReply(const flip::FlipHeaderBlock* headers);
+ ~FlipStream();
- // Called when data is received on the stream.
- // Returns true if the caller should Deactivate and delete the FlipStream
- // after this call.
- bool OnData(const char* data, int length);
+ // Try to make progress sending/receiving the request/response.
+ int DoLoop(int result);
- // Called if the stream is being prematurely closed for an abort or an error.
- // |err| is actually a net::Error.
- // The caller should Deactivate and delete the FlipStream after this call.
- void OnError(int err);
+ // Call the user callback.
+ void DoCallback(int rv);
- // Called when data has been sent on the stream.
- // |result| is either an error or the number of bytes written.
- // Returns an error if the stream is errored, or OK.
- int OnWriteComplete(int result);
+ // The implementations of each state of the state machine.
+ int DoSendHeaders();
+ int DoSendHeadersComplete(int result);
+ int DoSendBody();
+ int DoSendBodyComplete(int result);
+ int DoReadHeaders();
+ int DoReadHeadersComplete(int result);
+ int DoReadBody();
+ int DoReadBodyComplete(int result);
- private:
flip::FlipStreamId stream_id_;
std::string path_;
- scoped_refptr<FlipDelegate> delegate_;
- scoped_ptr<HttpResponseInfo> response_;
+ int priority_;
+ const bool pushed_;
+ // We buffer the response body as it arrives asynchronously from the stream.
+ // TODO(mbelshe): is this infinite buffering?
std::list<scoped_refptr<IOBufferWithSize> > response_body_;
bool download_finished_;
ScopedBandwidthMetrics metrics_;
+
+ scoped_refptr<FlipSession> session_;
+
+ scoped_ptr<HttpResponseInfo> response_;
+ scoped_ptr<UploadDataStream> request_body_stream_;
+
+ bool response_complete_; // TODO(mbelshe): fold this into the io_state.
+ State io_state_;
+
+ // Since we buffer the response, we also buffer the response status.
+ // Not valid until response_complete_ is true.
+ int response_status_;
+
+ CompletionCallback* user_callback_;
+
+ // User provided buffer for the ReadResponseBody() response.
+ scoped_refptr<IOBuffer> user_buffer_;
+ int user_buffer_len_;
+
+ bool cancelled_;
+
+ DISALLOW_COPY_AND_ASSIGN(FlipStream);
};
} // namespace net
#endif // NET_FLIP_FLIP_STREAM_H_
-
diff --git a/net/flip/flip_stream_unittest.cc b/net/flip/flip_stream_unittest.cc
new file mode 100644
index 0000000..bb661c0
--- /dev/null
+++ b/net/flip/flip_stream_unittest.cc
@@ -0,0 +1,98 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/flip/flip_stream.h"
+#include "base/ref_counted.h"
+#include "net/base/mock_host_resolver.h"
+#include "net/base/net_errors.h"
+#include "net/base/ssl_config_service.h"
+#include "net/base/ssl_config_service_defaults.h"
+#include "net/base/test_completion_callback.h"
+#include "net/flip/flip_session.h"
+#include "net/flip/flip_session_pool.h"
+#include "net/http/http_network_session.h"
+#include "net/http/http_request_info.h"
+#include "net/proxy/proxy_service.h"
+#include "net/socket/socket_test_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace net {
+
+namespace {
+
+// Create a proxy service which fails on all requests (falls back to direct).
+ProxyService* CreateNullProxyService() {
+ return ProxyService::CreateNull();
+}
+
+// Helper to manage the lifetimes of the dependencies for a
+// FlipNetworkTransaction.
+class SessionDependencies {
+ public:
+ // Default set of dependencies -- "null" proxy service.
+ SessionDependencies()
+ : host_resolver(new MockHostResolver),
+ proxy_service(CreateNullProxyService()),
+ ssl_config_service(new SSLConfigServiceDefaults),
+ flip_session_pool(new FlipSessionPool) {}
+
+ // Custom proxy service dependency.
+ explicit SessionDependencies(ProxyService* proxy_service)
+ : host_resolver(new MockHostResolver),
+ proxy_service(proxy_service),
+ ssl_config_service(new SSLConfigServiceDefaults),
+ flip_session_pool(new FlipSessionPool) {}
+
+ scoped_refptr<MockHostResolverBase> host_resolver;
+ scoped_refptr<ProxyService> proxy_service;
+ scoped_refptr<SSLConfigService> ssl_config_service;
+ MockClientSocketFactory socket_factory;
+ scoped_refptr<FlipSessionPool> flip_session_pool;
+};
+
+HttpNetworkSession* CreateSession(SessionDependencies* session_deps) {
+ return new HttpNetworkSession(session_deps->host_resolver,
+ session_deps->proxy_service,
+ &session_deps->socket_factory,
+ session_deps->ssl_config_service,
+ session_deps->flip_session_pool);
+}
+
+class FlipStreamTest : public testing::Test {
+ protected:
+ FlipStreamTest()
+ : session_(CreateSession(&session_deps_)) {}
+
+ scoped_refptr<FlipSession> CreateFlipSession() {
+ HostResolver::RequestInfo resolve_info("www.google.com", 80);
+ scoped_refptr<FlipSession> session(
+ session_->flip_session_pool()->Get(resolve_info, session_));
+ return session;
+ }
+
+ virtual void TearDown() {
+ MessageLoop::current()->RunAllPending();
+ }
+
+ SessionDependencies session_deps_;
+ scoped_refptr<HttpNetworkSession> session_;
+};
+
+TEST_F(FlipStreamTest, SendRequest) {
+ scoped_refptr<FlipSession> session(CreateFlipSession());
+ HttpRequestInfo request;
+ request.method = "GET";
+ request.url = GURL("http://www.google.com/");
+ TestCompletionCallback callback;
+
+ scoped_refptr<FlipStream> stream(new FlipStream(session, 1, false));
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(NULL, &callback));
+}
+
+// TODO(willchan): Write a longer test for FlipStream that exercises all
+// methods.
+
+} // namespace
+
+} // namespace net
diff --git a/net/net.gyp b/net/net.gyp
index 3b1cfd9..4a302f98 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -619,6 +619,7 @@
'flip/flip_framer_test.cc',
'flip/flip_network_transaction_unittest.cc',
'flip/flip_session_unittest.cc',
+ 'flip/flip_stream_unittest.cc',
'http/http_auth_cache_unittest.cc',
'http/http_auth_handler_basic_unittest.cc',
'http/http_auth_handler_digest_unittest.cc',