summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/flip/flip_network_transaction.cc20
-rw-r--r--net/flip/flip_network_transaction.h8
-rw-r--r--net/flip/flip_session.cc41
-rw-r--r--net/flip/flip_session.h11
-rwxr-xr-xnet/flip/flip_stream.h2
5 files changed, 60 insertions, 22 deletions
diff --git a/net/flip/flip_network_transaction.cc b/net/flip/flip_network_transaction.cc
index 9ec9b85..0c02a8d 100644
--- a/net/flip/flip_network_transaction.cc
+++ b/net/flip/flip_network_transaction.cc
@@ -33,7 +33,8 @@ FlipStreamParser::FlipStreamParser()
response_status_(OK),
user_callback_(NULL),
user_buffer_(NULL),
- user_buffer_len_(0) {
+ user_buffer_len_(0),
+ cancelled_(false) {
}
FlipStreamParser::~FlipStreamParser() {
@@ -50,6 +51,7 @@ int FlipStreamParser::SendRequest(FlipSession* flip,
CHECK(flip);
CHECK(request);
CHECK(callback);
+ DCHECK(!cancelled_);
request_ = request;
flip_ = flip;
@@ -68,6 +70,7 @@ int FlipStreamParser::ReadResponseHeaders(CompletionCallback* callback) {
// Note: The FlipStream may have already received the response headers, so
// this call may complete synchronously.
DCHECK_GT(io_state_, STATE_HEADERS_SENT);
+ DCHECK(!cancelled_);
CHECK(callback);
// The SYN_REPLY has already been received.
@@ -85,6 +88,7 @@ int FlipStreamParser::ReadResponseBody(
DCHECK(io_state_ == STATE_BODY_PENDING ||
io_state_ == STATE_READ_BODY ||
io_state_ == STATE_DONE);
+ DCHECK(!cancelled_);
CHECK(buf);
CHECK(buf_len);
CHECK(callback);
@@ -137,6 +141,11 @@ const HttpResponseInfo* FlipStreamParser::GetResponseInfo() const {
return response_.get();
}
+void FlipStreamParser::Cancel() {
+ cancelled_ = true;
+ user_callback_ = NULL;
+}
+
const HttpRequestInfo* FlipStreamParser::request() const {
return request_;
}
@@ -283,6 +292,9 @@ int FlipStreamParser::DoReadBodyComplete(int result) {
int FlipStreamParser::DoLoop(int result) {
bool can_do_more = true;
+ if (cancelled_)
+ return ERR_ABORTED;
+
do {
switch (io_state_) {
case STATE_SENDING_HEADERS:
@@ -344,6 +356,8 @@ FlipNetworkTransaction::FlipNetworkTransaction(HttpNetworkSession* session)
FlipNetworkTransaction::~FlipNetworkTransaction() {
LOG(INFO) << "FlipNetworkTransaction dead. " << this;
+ if (flip_stream_parser_.get())
+ flip_stream_parser_->Cancel();
}
int FlipNetworkTransaction::Start(const HttpRequestInfo* request_info,
@@ -530,7 +544,7 @@ int FlipNetworkTransaction::DoInitConnectionComplete(int result) {
int FlipNetworkTransaction::DoSendRequest() {
next_state_ = STATE_SEND_REQUEST_COMPLETE;
CHECK(!flip_stream_parser_.get());
- flip_stream_parser_.reset(new FlipStreamParser);
+ flip_stream_parser_ = new FlipStreamParser;
return flip_stream_parser_->SendRequest(flip_, request_, &io_callback_);
}
@@ -564,7 +578,7 @@ int FlipNetworkTransaction::DoReadBodyComplete(int result) {
user_buffer_len_ = 0;
if (result <= 0)
- flip_stream_parser_.reset();
+ flip_stream_parser_.release();
return result;
}
diff --git a/net/flip/flip_network_transaction.h b/net/flip/flip_network_transaction.h
index 75bcc05..a0cd0c8 100644
--- a/net/flip/flip_network_transaction.h
+++ b/net/flip/flip_network_transaction.h
@@ -52,6 +52,10 @@ class FlipStreamParser : public FlipDelegate {
const HttpResponseInfo* GetResponseInfo() const;
+ // Cancels the stream.
+ // Once cancelled, no callbacks will be made on this stream.
+ void Cancel();
+
// FlipDelegate methods:
virtual const HttpRequestInfo* request() const;
virtual const UploadDataStream* data() const;
@@ -116,6 +120,8 @@ class FlipStreamParser : public FlipDelegate {
scoped_refptr<IOBuffer> user_buffer_;
int user_buffer_len_;
+ bool cancelled_;
+
DISALLOW_COPY_AND_ASSIGN(FlipStreamParser);
};
@@ -201,7 +207,7 @@ class FlipNetworkTransaction : public HttpTransaction {
// The next state in the state machine.
State next_state_;
- scoped_ptr<FlipStreamParser> flip_stream_parser_;
+ scoped_refptr<FlipStreamParser> flip_stream_parser_;
DISALLOW_COPY_AND_ASSIGN(FlipNetworkTransaction);
};
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
index c89db24..82aa9b8 100644
--- a/net/flip/flip_session.cc
+++ b/net/flip/flip_session.cc
@@ -150,7 +150,7 @@ int FlipSession::CreateStream(FlipDelegate* delegate) {
// Check if we have a pending push stream for this url.
std::string url_path = delegate->request()->url.PathForRequest();
- std::map<std::string, FlipDelegate*>::iterator it;
+ PendingStreamMap::iterator it;
it = pending_streams_.find(url_path);
if (it != pending_streams_.end()) {
DCHECK(it->second == NULL);
@@ -233,6 +233,9 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
if (!stream)
return ERR_INVALID_FLIP_STREAM;
+ // TODO(mbelshe): Setting of the FIN is assuming that the caller will pass
+ // all data to write in a single chunk. Is this always true?
+
// Set the flags on the upload.
flip::FlipDataFlags flags = flip::DATA_FLAG_FIN;
if (len > kMaxFlipFrameChunkSize) {
@@ -248,6 +251,11 @@ int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
memcpy(buffer->data(), frame->data(), length);
queue_.push(FlipIOBuffer(buffer, stream->delegate()->request()->priority,
stream));
+
+ // Whenever we queue onto the socket we need to ensure that we will write to
+ // it later.
+ WriteSocketLater();
+
return ERR_IO_PENDING;
}
@@ -458,20 +466,27 @@ void FlipSession::WriteSocket() {
queue_.pop();
// We've deferred compression until just before we write it to the socket,
- // which is now.
+ // which is now. At this time, we don't compress our data frames.
flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- scoped_ptr<flip::FlipFrame> compressed_frame(
- flip_framer_.CompressFrame(&uncompressed_frame));
- size_t size = compressed_frame.get()->length() + flip::FlipFrame::size();
+ size_t size;
+ if (uncompressed_frame.is_control_frame()) {
+ scoped_ptr<flip::FlipFrame> compressed_frame(
+ flip_framer_.CompressFrame(&uncompressed_frame));
+ size = compressed_frame->length() + flip::FlipFrame::size();
- DCHECK(size > 0);
+ DCHECK(size > 0);
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), compressed_frame->data(), size);
+
+ // Attempt to send the frame.
+ in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream());
+ } else {
+ size = uncompressed_frame.length() + flip::FlipFrame::size();
+ in_flight_write_ = next_buffer;
+ }
- // Attempt to send the frame.
- in_flight_write_ = FlipIOBuffer(buffer, 0, next_buffer.stream());
write_pending_ = true;
int rv = connection_.socket()->Write(in_flight_write_.buffer(),
size, &write_callback_);
@@ -578,7 +593,7 @@ FlipStream* FlipSession::GetPushStream(const std::string& path) {
}
void FlipSession::OnError(flip::FlipFramer* framer) {
- LOG(ERROR) << "FlipSession error!";
+ LOG(ERROR) << "FlipSession error: " << framer->error_code();
CloseAllStreams(net::ERR_UNEXPECTED);
Release();
}
@@ -631,7 +646,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
}
// Check if we already have a delegate awaiting this stream.
- std::map<std::string, FlipDelegate*>::iterator it;
+ PendingStreamMap::iterator it;
it = pending_streams_.find(stream->path());
if (it != pending_streams_.end()) {
FlipDelegate* delegate = it->second;
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
index f381b20..fdefb6e 100644
--- a/net/flip/flip_session.h
+++ b/net/flip/flip_session.h
@@ -34,10 +34,8 @@ class HttpResponseInfo;
// The FlipDelegate interface is an interface so that the FlipSession
// can interact with the provider of a given Flip stream.
-class FlipDelegate {
+class FlipDelegate : public base::RefCounted<FlipDelegate> {
public:
- virtual ~FlipDelegate() {}
-
// Accessors from the delegate.
// The delegate provides access to the HttpRequestInfo for use by the flip
@@ -77,6 +75,10 @@ class FlipDelegate {
// delegate will be made after this call.
// |status| is an error code or OK.
virtual void OnClose(int status) = 0;
+
+ protected:
+ friend class base::RefCounted<FlipDelegate>;
+ virtual ~FlipDelegate() {}
};
class FlipSession : public base::RefCounted<FlipSession>,
@@ -208,7 +210,8 @@ class FlipSession : public base::RefCounted<FlipSession>,
ActiveStreamList pushed_streams_;
// List of streams declared in X-Associated-Content headers.
// The key is a string representing the path of the URI being pushed.
- std::map<std::string, FlipDelegate*> pending_streams_;
+ typedef std::map<std::string, scoped_refptr<FlipDelegate> > PendingStreamMap;
+ PendingStreamMap pending_streams_;
// As we gather data to be sent, we put it into the output queue.
typedef std::priority_queue<FlipIOBuffer> OutputQueue;
diff --git a/net/flip/flip_stream.h b/net/flip/flip_stream.h
index 1bd81c3..51266b3 100755
--- a/net/flip/flip_stream.h
+++ b/net/flip/flip_stream.h
@@ -74,7 +74,7 @@ class FlipStream {
private:
flip::FlipStreamId stream_id_;
std::string path_;
- FlipDelegate* delegate_;
+ scoped_refptr<FlipDelegate> delegate_;
scoped_ptr<HttpResponseInfo> response_;
std::list<scoped_refptr<IOBufferWithSize> > response_body_;
bool download_finished_;