summaryrefslogtreecommitdiffstats
path: root/net/flip
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-10-24 13:22:51 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-10-24 13:22:51 +0000
commit9330067fadd9ec988a45d701016b01c05e9a0512 (patch)
treea57d67b4823413146ecf7f87bedc40c0d1e3db0e /net/flip
parent0b0e126d1f8291705dae7d70f793738ba945dcfc (diff)
downloadchromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.zip
chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.gz
chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.bz2
Add download metrics into FLIP. They aren't used yet,
but they do measure accurately. Also add some minor cleanup to the FlipSession and added a new test. There is a lot more cleanup to do in FlipSession. BUG=none TEST=none Review URL: http://codereview.chromium.org/333009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30001 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/flip')
-rw-r--r--net/flip/flip_network_transaction_unittest.cc42
-rw-r--r--net/flip/flip_session.cc141
-rw-r--r--net/flip/flip_session.h4
3 files changed, 127 insertions, 60 deletions
diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc
index 01a2faf..fb711b1 100644
--- a/net/flip/flip_network_transaction_unittest.cc
+++ b/net/flip/flip_network_transaction_unittest.cc
@@ -119,8 +119,6 @@ class FlipNetworkTransactionTest : public PlatformTest {
return out;
const HttpResponseInfo* response = trans->GetResponseInfo();
- EXPECT_TRUE(response != NULL);
-
EXPECT_TRUE(response->headers != NULL);
out.status_line = response->headers->GetStatusLine();
@@ -147,7 +145,7 @@ TEST_F(FlipNetworkTransactionTest, Constructor) {
}
TEST_F(FlipNetworkTransactionTest, Connect) {
- unsigned char syn_reply[] = {
+ static const unsigned char syn_reply[] = {
0x80, 0x01, 0x00, 0x02, // header
0x00, 0x00, 0x00, 0x45,
0x00, 0x00, 0x00, 0x01,
@@ -161,12 +159,12 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version"
0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1"
};
- unsigned char body_frame[] = {
+ static const unsigned char body_frame[] = {
0x00, 0x00, 0x00, 0x01, // header
0x00, 0x00, 0x00, 0x06,
'h', 'e', 'l', 'l', 'o', '!', // "hello"
};
- unsigned char fin_frame[] = {
+ static const unsigned char fin_frame[] = {
0x80, 0x01, 0x00, 0x03, // header
0x00, 0x00, 0x00, 0x08,
0x00, 0x00, 0x00, 0x01,
@@ -174,9 +172,10 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
};
MockRead data_reads[] = {
- MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)),
- MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)),
- MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)),
+ MockRead(true, reinterpret_cast<const char*>(syn_reply), sizeof(syn_reply)),
+ MockRead(true, reinterpret_cast<const char*>(body_frame),
+ sizeof(body_frame)),
+ MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)),
MockRead(true, 0, 0), // EOF
};
@@ -188,5 +187,32 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
EXPECT_EQ("hello!", out.response_data);
}
+// Test that the transaction doesn't crash when we don't have a reply.
+TEST_F(FlipNetworkTransactionTest, ResponseWithoutSynReply) {
+ static const unsigned char body_frame[] = {
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x00, 0x00, 0x00, 0x06,
+ 'h', 'e', 'l', 'l', 'o', '!', // "hello"
+ };
+ static const unsigned char fin_frame[] = {
+ 0x80, 0x01, 0x00, 0x03, // header
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+ };
+
+ MockRead data_reads[] = {
+ MockRead(true, reinterpret_cast<const char*>(body_frame),
+ sizeof(body_frame)),
+ MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)),
+ MockRead(true, 0, 0), // EOF
+ };
+
+ // We disable SSL for this test.
+ FlipSession::SetSSLMode(false);
+ SimpleGetHelperResult out = SimpleGetHelper(data_reads);
+ EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+}
+
} // namespace net
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
index d73f915..b0f80524 100644
--- a/net/flip/flip_session.cc
+++ b/net/flip/flip_session.cc
@@ -10,6 +10,7 @@
#include "base/rand_util.h"
#include "base/stats_counters.h"
#include "base/string_util.h"
+#include "net/base/bandwidth_metrics.h"
#include "net/base/load_flags.h"
#include "net/flip/flip_frame_builder.h"
#include "net/flip/flip_protocol.h"
@@ -37,7 +38,8 @@ class FlipStreamImpl {
: stream_id_(stream_id),
delegate_(delegate),
response_(NULL),
- data_complete_(false) {}
+ download_finished_(false),
+ metrics_(Singleton<BandwidthMetrics>::get()) {}
virtual ~FlipStreamImpl() {
LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_;
};
@@ -62,10 +64,10 @@ class FlipStreamImpl {
// after this call.
bool OnData(const char* data, int length);
- // Called if the stream is prematurely aborted.
- // The caller should Deactivate and delete the FlipStreamImpl after this
- // call.
- void OnAbort();
+ // Called if the stream is being prematurely closed for an abort or an error.
+ // |err| is actually a net::Error.
+ // The caller should Deactivate and delete the FlipStreamImpl after this call.
+ void OnError(int err);
private:
flip::FlipStreamId stream_id_;
@@ -73,7 +75,8 @@ class FlipStreamImpl {
FlipDelegate* delegate_;
scoped_ptr<HttpResponseInfo> response_;
std::list<scoped_refptr<IOBufferWithSize> > response_body_;
- bool data_complete_;
+ bool download_finished_;
+ ScopedBandwidthMetrics metrics_;
};
FlipSession* FlipSession::GetFlipSession(
@@ -110,14 +113,15 @@ FlipSession::FlipSession(std::string host, HttpNetworkSession* session)
}
FlipSession::~FlipSession() {
+ // Cleanup all the streams.
+ CloseAllStreams(net::ERR_ABORTED);
+
if (connection_.is_initialized()) {
// With Flip we can't recycle sockets.
connection_.socket()->Disconnect();
}
if (session_pool_.get())
session_pool_->Remove(this);
-
- // TODO(mbelshe) - clear out streams (active and pushed) here?
}
net::Error FlipSession::Connect(const std::string& group_name,
@@ -300,7 +304,13 @@ bool FlipSession::CancelStream(int id) {
LOG(INFO) << "Cancelling stream " << id;
if (!IsStreamActive(id))
return false;
+
+ // TODO(mbelshe): Write a method for tearing down a stream
+ // that cleans it out of the active list, the pending list,
+ // etc.
+ FlipStreamImpl* stream = active_streams_[id];
DeactivateStream(id);
+ delete stream;
return true;
}
@@ -359,12 +369,17 @@ void FlipSession::OnSSLConnect(int result) {
if (IsCertificateError(result))
result = OK; // TODO(mbelshe): pretend we're happy anyway.
- connection_ready_ = true;
+ if (result == OK) {
+ connection_ready_ = true;
- // After we've connected, send any data to the server, and then issue
- // our read.
- WriteSocketLater();
- ReadSocket();
+ // After we've connected, send any data to the server, and then issue
+ // our read.
+ WriteSocketLater();
+ ReadSocket();
+ } else {
+ NOTREACHED();
+ // TODO(mbelshe): handle the error case: could not connect
+ }
}
void FlipSession::OnReadComplete(int bytes_read) {
@@ -405,13 +420,18 @@ void FlipSession::OnWriteComplete(int result) {
LOG(INFO) << "Flip write complete (result=" << result << ")";
- // Cleanup the write which just completed.
- in_flight_write_.release();
+ if (result >= 0) {
+ // Cleanup the write which just completed.
+ in_flight_write_.release();
- // Write more data. We're already in a continuation, so we can
- // go ahead and write it immediately (without going back to the
- // message loop).
- WriteSocketLater();
+ // Write more data. We're already in a continuation, so we can
+ // go ahead and write it immediately (without going back to the
+ // message loop).
+ WriteSocketLater();
+ } else {
+ // TODO(mbelshe): Deal with result < 0 error case.
+ NOTIMPLEMENTED();
+ }
}
void FlipSession::ReadSocket() {
@@ -488,20 +508,13 @@ void FlipSession::WriteSocket() {
}
DCHECK(bytes > 0);
in_flight_write_ = PrioritizedIOBuffer(buffer, 0);
+ write_pending_ = true;
int rv = connection_.socket()->Write(in_flight_write_.buffer(),
bytes, &write_callback_);
- if (rv == net::ERR_IO_PENDING) {
- write_pending_ = true;
+ if (rv == net::ERR_IO_PENDING)
break;
- }
- if (rv < 0) {
- // Uhoh - an error!
- // TODO(mbelshe): fix me!
- NOTIMPLEMENTED();
- }
- LOG(INFO) << "FLIPSession wrote " << rv << " bytes.";
- in_flight_write_.release();
+ OnWriteComplete(rv);
}
}
@@ -525,7 +538,7 @@ void FlipSession::CloseAllStreams(net::Error code) {
// Issue the aborts.
for (--index; index >= 0; index--) {
LOG(ERROR) << "ABANDONED: " << list[index]->path();
- list[index]->OnAbort();
+ list[index]->OnError(ERR_ABORTED);
delete list[index];
}
@@ -608,8 +621,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
return;
}
- //if (!len)
- // return; // This was just an empty data packet.
+
FlipStreamImpl* stream = active_streams_[stream_id];
if (stream->OnData(data, len)) {
DeactivateStream(stream->stream_id());
@@ -621,7 +633,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
const flip::FlipHeaderBlock* headers) {
flip::FlipStreamId stream_id = frame->stream_id();
- // Server-initiated streams should have odd sequence numbers.
+ // Server-initiated streams should have even sequence numbers.
if ((stream_id & 0x1) != 0) {
LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
return;
@@ -745,14 +757,14 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
return;
}
FlipStreamImpl* stream = active_streams_[stream_id];
- // TODO(mbelshe) - status codes are HTTP codes?
- // Shouldn't it be zero/nonzero?
- // For now Roberto is sending 200, so use that as "ok".
bool cleanup_stream = false;
- if (frame->status() == 200 || frame->status() == 0) {
+ if (frame->status() == 0) {
cleanup_stream = stream->OnData(NULL, 0);
} else {
- stream->OnAbort();
+ LOG(ERROR) << "Flip stream closed: " << frame->status();
+ // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
+ // For now, it doesn't matter much - it is a protocol error.
+ stream->OnError(ERR_FAILED);
cleanup_stream = true;
}
@@ -785,7 +797,7 @@ bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) {
}
// Finally send up the end-of-stream.
- if (data_complete_) {
+ if (download_finished_) {
delegate_->OnClose(net::OK);
return true; // tell the caller to shut us down
}
@@ -800,6 +812,8 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
// TODO(mbelshe): if no version or status is found, we need to error
// out the stream.
+ metrics_.StartStream();
+
// Server initiated streams must send a URL to us in the headers.
if (headers->find("path") != headers->end())
path_ = headers->find("path")->second;
@@ -824,35 +838,58 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
DCHECK(response_ == NULL);
response_.reset(new HttpResponseInfo());
response_->headers = new HttpResponseHeaders(raw_headers);
+ // When pushing content from the server, we may not yet have a delegate_
+ // to notify. When the delegate is attached, it will notify then.
if (delegate_)
delegate_->OnResponseReceived(response_.get());
}
bool FlipStreamImpl::OnData(const char* data, int length) {
+ DCHECK(length >= 0);
LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
<< stream_id_;
- if (length) {
- if (delegate_) {
- delegate_->OnDataReceived(data, length);
- } else {
- // Save the data for use later
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
- memcpy(io_buffer->data(), data, length);
- response_body_.push_back(io_buffer);
- }
- } else {
- data_complete_ = true;
+
+ // If we don't have a response, then the SYN_REPLY did not come through.
+ // We cannot pass data up to the caller unless the reply headers have been
+ // received.
+ if (!response_.get()) {
+ if (delegate_)
+ delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
+ return true;
+ }
+
+ // A zero-length read means that the stream is being closed.
+ if (!length) {
+ metrics_.StopStream();
+ download_finished_ = true;
if (delegate_) {
delegate_->OnClose(net::OK);
return true; // Tell the caller to clean us up.
}
}
+
+ // Track our bandwidth.
+ metrics_.RecordBytes(length);
+
+ // We've received data. We try to pass it up to the caller.
+ // In the case of server-push streams, we may not have a delegate yet assigned
+ // to this stream. In that case we just queue the data for later.
+ if (delegate_) {
+ delegate_->OnDataReceived(data, length);
+ } else {
+ // Save the data for use later.
+ // TODO(mbelshe): We need to have some throttling on this. We shouldn't
+ // buffer an infinite amount of data.
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
+ memcpy(io_buffer->data(), data, length);
+ response_body_.push_back(io_buffer);
+ }
return false;
}
-void FlipStreamImpl::OnAbort() {
+void FlipStreamImpl::OnError(int err) {
if (delegate_)
- delegate_->OnClose(net::ERR_ABORTED);
+ delegate_->OnClose(err);
}
} // namespace net
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
index 38d6256..1845c27 100644
--- a/net/flip/flip_session.h
+++ b/net/flip/flip_session.h
@@ -113,6 +113,7 @@ class FlipSession : public base::RefCounted<FlipSession>,
LoadState GetLoadState() const;
protected:
FRIEND_TEST(FlipNetworkTransactionTest, Connect);
+ FRIEND_TEST(FlipNetworkTransactionTest, ResponseWithoutSynReply);
friend class FlipSessionPool;
friend class HttpNetworkLayer; // Temporary for server.
@@ -195,6 +196,9 @@ class FlipSession : public base::RefCounted<FlipSession>,
int stream_hi_water_mark_; // The next stream id to use.
+ // TODO(mbelshe): We need to track these stream lists better.
+ // I suspect it is possible to remove a stream from
+ // one list, but not the other.
typedef std::map<int, FlipStreamImpl*> ActiveStreamMap;
typedef std::list<FlipStreamImpl*> ActiveStreamList;
ActiveStreamMap active_streams_;