diff options
-rw-r--r-- | net/flip/flip_network_transaction.cc | 20 | ||||
-rw-r--r-- | net/flip/flip_network_transaction.h | 8 | ||||
-rw-r--r-- | net/flip/flip_session.cc | 41 | ||||
-rw-r--r-- | net/flip/flip_session.h | 11 | ||||
-rwxr-xr-x | net/flip/flip_stream.h | 2 |
5 files changed, 60 insertions, 22 deletions
diff --git a/net/flip/flip_network_transaction.cc b/net/flip/flip_network_transaction.cc index 9ec9b85..0c02a8d 100644 --- a/net/flip/flip_network_transaction.cc +++ b/net/flip/flip_network_transaction.cc @@ -33,7 +33,8 @@ FlipStreamParser::FlipStreamParser() response_status_(OK), user_callback_(NULL), user_buffer_(NULL), - user_buffer_len_(0) { + user_buffer_len_(0), + cancelled_(false) { } FlipStreamParser::~FlipStreamParser() { @@ -50,6 +51,7 @@ int FlipStreamParser::SendRequest(FlipSession* flip, CHECK(flip); CHECK(request); CHECK(callback); + DCHECK(!cancelled_); request_ = request; flip_ = flip; @@ -68,6 +70,7 @@ int FlipStreamParser::ReadResponseHeaders(CompletionCallback* callback) { // Note: The FlipStream may have already received the response headers, so // this call may complete synchronously. DCHECK_GT(io_state_, STATE_HEADERS_SENT); + DCHECK(!cancelled_); CHECK(callback); // The SYN_REPLY has already been received. @@ -85,6 +88,7 @@ int FlipStreamParser::ReadResponseBody( DCHECK(io_state_ == STATE_BODY_PENDING || io_state_ == STATE_READ_BODY || io_state_ == STATE_DONE); + DCHECK(!cancelled_); CHECK(buf); CHECK(buf_len); CHECK(callback); @@ -137,6 +141,11 @@ const HttpResponseInfo* FlipStreamParser::GetResponseInfo() const { return response_.get(); } +void FlipStreamParser::Cancel() { + cancelled_ = true; + user_callback_ = NULL; +} + const HttpRequestInfo* FlipStreamParser::request() const { return request_; } @@ -283,6 +292,9 @@ int FlipStreamParser::DoReadBodyComplete(int result) { int FlipStreamParser::DoLoop(int result) { bool can_do_more = true; + if (cancelled_) + return ERR_ABORTED; + do { switch (io_state_) { case STATE_SENDING_HEADERS: @@ -344,6 +356,8 @@ FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session) FlipNetworkTransaction::~FlipNetworkTransaction() { LOG(INFO) << "FlipNetworkTransaction dead. " << this; + if (flip_stream_parser_.get()) + flip_stream_parser_->Cancel(); } int FlipNetworkTransaction::Start(const HttpRequestInfo* request_info, @@ -530,7 +544,7 @@ int FlipNetworkTransaction::DoInitConnectionComplete(int result) { int FlipNetworkTransaction::DoSendRequest() { next_state_ = STATE_SEND_REQUEST_COMPLETE; CHECK(!flip_stream_parser_.get()); - flip_stream_parser_.reset(new FlipStreamParser); + flip_stream_parser_ = new FlipStreamParser; return flip_stream_parser_->SendRequest(flip_, request_, &io_callback_); } @@ -564,7 +578,7 @@ int FlipNetworkTransaction::DoReadBodyComplete(int result) { user_buffer_len_ = 0; if (result <= 0) - flip_stream_parser_.reset(); + flip_stream_parser_.release(); return result; } diff --git a/net/flip/flip_network_transaction.h b/net/flip/flip_network_transaction.h index 75bcc05..a0cd0c8 100644 --- a/net/flip/flip_network_transaction.h +++ b/net/flip/flip_network_transaction.h @@ -52,6 +52,10 @@ class FlipStreamParser : public FlipDelegate { const HttpResponseInfo* GetResponseInfo() const; + // Cancels the stream. + // Once cancelled, no callbacks will be made on this stream. + void Cancel(); + // FlipDelegate methods: virtual const HttpRequestInfo* request() const; virtual const UploadDataStream* data() const; @@ -116,6 +120,8 @@ class FlipStreamParser : public FlipDelegate { scoped_refptr<IOBuffer> user_buffer_; int user_buffer_len_; + bool cancelled_; + DISALLOW_COPY_AND_ASSIGN(FlipStreamParser); }; @@ -201,7 +207,7 @@ class FlipNetworkTransaction : public HttpTransaction { // The next state in the state machine. State next_state_; - scoped_ptr<FlipStreamParser> flip_stream_parser_; + scoped_refptr<FlipStreamParser> flip_stream_parser_; DISALLOW_COPY_AND_ASSIGN(FlipNetworkTransaction); }; diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index c89db24..82aa9b8 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -150,7 +150,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) { // Check if we have a pending push stream for this url. std::string url_path = delegate->request()->url.PathForRequest(); - std::map<std::string, FlipDelegate*>::iterator it; + PendingStreamMap::iterator it; it = pending_streams_.find(url_path); if (it != pending_streams_.end()) { DCHECK(it->second == NULL); @@ -233,6 +233,9 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, if (!stream) return ERR_INVALID_FLIP_STREAM; + // TODO(mbelshe): Setting of the FIN is assuming that the caller will pass + // all data to write in a single chunk. Is this always true? + // Set the flags on the upload. flip::FlipDataFlags flags = flip::DATA_FLAG_FIN; if (len > kMaxFlipFrameChunkSize) { @@ -248,6 +251,11 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, memcpy(buffer->data(), frame->data(), length); queue_.push(FlipIOBuffer(buffer, stream->delegate()->request()->priority, stream)); + + // Whenever we queue onto the socket we need to ensure that we will write to + // it later. + WriteSocketLater(); + return ERR_IO_PENDING; } @@ -458,20 +466,27 @@ void FlipSession::WriteSocket() { queue_.pop(); // We've deferred compression until just before we write it to the socket, - // which is now. + // which is now. At this time, we don't compress our data frames. flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); - scoped_ptr<flip::FlipFrame> compressed_frame( - flip_framer_.CompressFrame(&uncompressed_frame)); - size_t size = compressed_frame.get()->length() + flip::FlipFrame::size(); + size_t size; + if (uncompressed_frame.is_control_frame()) { + scoped_ptr<flip::FlipFrame> compressed_frame( + flip_framer_.CompressFrame(&uncompressed_frame)); + size = compressed_frame->length() + flip::FlipFrame::size(); - DCHECK(size > 0); + DCHECK(size > 0); - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), compressed_frame->data(), size); + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), compressed_frame->data(), size); + + // Attempt to send the frame. + in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream()); + } else { + size = uncompressed_frame.length() + flip::FlipFrame::size(); + in_flight_write_ = next_buffer; + } - // Attempt to send the frame. - in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream()); write_pending_ = true; int rv = connection_.socket()->Write(in_flight_write_.buffer(), size, &write_callback_); @@ -578,7 +593,7 @@ FlipStream* FlipSession::GetPushStream(const std::string& path) { } void FlipSession::OnError(flip::FlipFramer* framer) { - LOG(ERROR) << "FlipSession error!"; + LOG(ERROR) << "FlipSession error: " << framer->error_code(); CloseAllStreams(net::ERR_UNEXPECTED); Release(); } @@ -631,7 +646,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, } // Check if we already have a delegate awaiting this stream. - std::map<std::string, FlipDelegate*>::iterator it; + PendingStreamMap::iterator it; it = pending_streams_.find(stream->path()); if (it != pending_streams_.end()) { FlipDelegate* delegate = it->second; diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h index f381b20..fdefb6e 100644 --- a/net/flip/flip_session.h +++ b/net/flip/flip_session.h @@ -34,10 +34,8 @@ class HttpResponseInfo; // The FlipDelegate interface is an interface so that the FlipSession // can interact with the provider of a given Flip stream. -class FlipDelegate { +class FlipDelegate : public base::RefCounted<FlipDelegate> { public: - virtual ~FlipDelegate() {} - // Accessors from the delegate. // The delegate provides access to the HttpRequestInfo for use by the flip @@ -77,6 +75,10 @@ class FlipDelegate { // delegate will be made after this call. // |status| is an error code or OK. virtual void OnClose(int status) = 0; + + protected: + friend class base::RefCounted<FlipDelegate>; + virtual ~FlipDelegate() {} }; class FlipSession : public base::RefCounted<FlipSession>, @@ -208,7 +210,8 @@ class FlipSession : public base::RefCounted<FlipSession>, ActiveStreamList pushed_streams_; // List of streams declared in X-Associated-Content headers. // The key is a string representing the path of the URI being pushed. - std::map<std::string, FlipDelegate*> pending_streams_; + typedef std::map<std::string, scoped_refptr<FlipDelegate> > PendingStreamMap; + PendingStreamMap pending_streams_; // As we gather data to be sent, we put it into the output queue. typedef std::priority_queue<FlipIOBuffer> OutputQueue; diff --git a/net/flip/flip_stream.h b/net/flip/flip_stream.h index 1bd81c3..51266b3 100755 --- a/net/flip/flip_stream.h +++ b/net/flip/flip_stream.h @@ -74,7 +74,7 @@ class FlipStream { private: flip::FlipStreamId stream_id_; std::string path_; - FlipDelegate* delegate_; + scoped_refptr<FlipDelegate> delegate_; scoped_ptr<HttpResponseInfo> response_; std::list<scoped_refptr<IOBufferWithSize> > response_body_; bool download_finished_; |