summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-10-24 13:22:51 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-10-24 13:22:51 +0000
commit9330067fadd9ec988a45d701016b01c05e9a0512 (patch)
treea57d67b4823413146ecf7f87bedc40c0d1e3db0e /net
parent0b0e126d1f8291705dae7d70f793738ba945dcfc (diff)
downloadchromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.zip
chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.gz
chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.bz2
Add download metrics into FLIP. They aren't used yet,
but they do measure accurately. Also add some minor cleanup to the FlipSession and added a new test. There is a lot more cleanup to do in FlipSession. BUG=none TEST=none Review URL: http://codereview.chromium.org/333009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30001 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/base/bandwidth_metrics.h149
-rw-r--r--net/base/net_error_list.h12
-rw-r--r--net/flip/flip_network_transaction_unittest.cc42
-rw-r--r--net/flip/flip_session.cc141
-rw-r--r--net/flip/flip_session.h4
5 files changed, 288 insertions, 60 deletions
diff --git a/net/base/bandwidth_metrics.h b/net/base/bandwidth_metrics.h
new file mode 100644
index 0000000..078c22a
--- /dev/null
+++ b/net/base/bandwidth_metrics.h
@@ -0,0 +1,149 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_BASE_BANDWIDTH_METRICS_H_
+#define NET_BASE_BANDWIDTH_METRICS_H_
+
+#include <list>
+
+#include "base/logging.h"
+#include "base/time.h"
+
+namespace net {
+
+// Tracks statistics about the bandwidth metrics over time. In order to
+// measure, this class needs to know when individual streams are in progress,
+// so that it can know when to discount idle time. The BandwidthMetrics
+// is unidirectional - it should only be used to record upload or download
+// bandwidth, but not both.
+//
+// Note, the easiest thing to do is to just measure each stream and average
+// them or add them. However, this does not work. If multiple streams are in
+// progress concurrently, you have to look at the aggregate bandwidth at any
+// point in time.
+//
+// Example:
+// Imagine 4 streams opening and closing with overlapping time.
+// We can't measure bandwidth by looking at any individual stream.
+// We can only measure actual bandwidth by looking at the bandwidth
+// across all open streams.
+//
+// Time --------------------------------------->
+// s1 +----------------+
+// s2 +----------------+
+// s3 +--------------+
+// s4 +--------------+
+//
+// Example usage:
+//
+// BandwidthMetrics tracker;
+//
+// // When a stream is created
+// tracker.StartStream();
+//
+// // When data is transferred on any stream
+// tracker.RecordSample(bytes);
+//
+// // When the stream is finished
+// tracker.StopStream();
+//
+// NOTE: This class is not thread safe.
+//
+class BandwidthMetrics {
+ public:
+ BandwidthMetrics()
+ : num_streams_in_progress_(0),
+ num_data_samples_(0),
+ data_sum_(0.0),
+ bytes_since_last_start_(0) {
+ }
+
+ // Get the bandwidth. Returns Kbps (kilo-bits-per-second).
+ double bandwidth() const {
+ return data_sum_ / num_data_samples_;
+ }
+
+ // Record that we've started a stream.
+ void StartStream() {
+ // If we're the only stream, we've finished some idle time. Record a new
+ // timestamp to indicate the start of data flow.
+ if (++num_streams_in_progress_ == 1) {
+ last_start_ = base::TimeTicks::HighResNow();
+ bytes_since_last_start_ = 0;
+ }
+ }
+
+ // Track that we've completed a stream.
+ void StopStream() {
+ if (--num_streams_in_progress_ == 0) {
+ // We don't use small streams when tracking bandwidth because they are not
+ // precise; imagine a 25 byte stream. The sample is too small to make
+ // a good measurement.
+ // 20KB is an arbitrary value. We might want to use a lesser value.
+ static const int64 kRecordSizeThreshold = 20 * 1024;
+ if (bytes_since_last_start_ < kRecordSizeThreshold)
+ return;
+
+ base::TimeDelta delta = base::TimeTicks::HighResNow() - last_start_;
+ double ms = delta.InMillisecondsF();
+ if (ms > 0.0) {
+ double kbps = static_cast<double>(bytes_since_last_start_) * 8 / ms;
+ ++num_data_samples_;
+ data_sum_ += kbps;
+ LOG(INFO) << "Bandwidth: " << kbps
+ << "Kbps (avg " << bandwidth() << "Kbps)";
+ }
+ }
+ }
+
+ // Add a sample of the number of bytes read from the network into the tracker.
+ void RecordBytes(int bytes) {
+ DCHECK(num_streams_in_progress_);
+ bytes_since_last_start_ += static_cast<int64>(bytes);
+ }
+
+ private:
+ int num_streams_in_progress_; // The number of streams in progress.
+ // TODO(mbelshe): Use a rolling buffer of 30 samples instead of an average.
+ int num_data_samples_; // The number of samples collected.
+ double data_sum_; // The sum of all samples collected.
+ int64 bytes_since_last_start_; // Bytes tracked during this "session".
+ base::TimeTicks last_start_; // Timestamp of the begin of this "session".
+};
+
+// A utility class for managing the lifecycle of a measured stream.
+// It is important that we not leave unclosed streams, and this class helps
+// ensure we always stop them.
+class ScopedBandwidthMetrics {
+ public:
+ explicit ScopedBandwidthMetrics(BandwidthMetrics* metrics)
+ : metrics_(metrics), started_(false) {
+ }
+
+ ~ScopedBandwidthMetrics() {
+ if (started_)
+ metrics_->StopStream();
+ }
+
+ void StartStream() {
+ started_ = true;
+ metrics_->StartStream();
+ }
+
+ void StopStream() {
+ started_ = false;
+ metrics_->StopStream();
+ }
+
+ void RecordBytes(int bytes) { metrics_->RecordBytes(bytes); }
+
+ private:
+ BandwidthMetrics* metrics_;
+ bool started_;
+};
+
+} // namespace net
+
+#endif // NET_BASE_BANDWIDTH_METRICS_H_
+
diff --git a/net/base/net_error_list.h b/net/base/net_error_list.h
index 15a596b..f2aeed6 100644
--- a/net/base/net_error_list.h
+++ b/net/base/net_error_list.h
@@ -4,6 +4,15 @@
// This file contains the list of network errors.
+//
+// Ranges:
+// 0- 99 System related errors
+// 100-199 Connection related errors
+// 200-299 Certificate errors
+// 300-399 HTTP errors
+// 400-499 Cache errors
+//
+
// An asynchronous IO operation is not yet complete. This usually does not
// indicate a fatal error. Typically this error will be generated as a
// notification to wait for some external notification that the IO operation
@@ -267,6 +276,9 @@ NET_ERROR(CONTENT_DECODING_FAILED, -330)
// is suspended.
NET_ERROR(NETWORK_IO_SUSPENDED, -331)
+// FLIP data received without receiving a SYN_REPLY on the stream.
+NET_ERROR(SYN_REPLY_NOT_RECEIVED, -332)
+
// The cache does not have the requested entry.
NET_ERROR(CACHE_MISS, -400)
diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc
index 01a2faf..fb711b1 100644
--- a/net/flip/flip_network_transaction_unittest.cc
+++ b/net/flip/flip_network_transaction_unittest.cc
@@ -119,8 +119,6 @@ class FlipNetworkTransactionTest : public PlatformTest {
return out;
const HttpResponseInfo* response = trans->GetResponseInfo();
- EXPECT_TRUE(response != NULL);
-
EXPECT_TRUE(response->headers != NULL);
out.status_line = response->headers->GetStatusLine();
@@ -147,7 +145,7 @@ TEST_F(FlipNetworkTransactionTest, Constructor) {
}
TEST_F(FlipNetworkTransactionTest, Connect) {
- unsigned char syn_reply[] = {
+ static const unsigned char syn_reply[] = {
0x80, 0x01, 0x00, 0x02, // header
0x00, 0x00, 0x00, 0x45,
0x00, 0x00, 0x00, 0x01,
@@ -161,12 +159,12 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version"
0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1"
};
- unsigned char body_frame[] = {
+ static const unsigned char body_frame[] = {
0x00, 0x00, 0x00, 0x01, // header
0x00, 0x00, 0x00, 0x06,
'h', 'e', 'l', 'l', 'o', '!', // "hello"
};
- unsigned char fin_frame[] = {
+ static const unsigned char fin_frame[] = {
0x80, 0x01, 0x00, 0x03, // header
0x00, 0x00, 0x00, 0x08,
0x00, 0x00, 0x00, 0x01,
@@ -174,9 +172,10 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
};
MockRead data_reads[] = {
- MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)),
- MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)),
- MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)),
+ MockRead(true, reinterpret_cast<const char*>(syn_reply), sizeof(syn_reply)),
+ MockRead(true, reinterpret_cast<const char*>(body_frame),
+ sizeof(body_frame)),
+ MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)),
MockRead(true, 0, 0), // EOF
};
@@ -188,5 +187,32 @@ TEST_F(FlipNetworkTransactionTest, Connect) {
EXPECT_EQ("hello!", out.response_data);
}
+// Test that the transaction doesn't crash when we don't have a reply.
+TEST_F(FlipNetworkTransactionTest, ResponseWithoutSynReply) {
+ static const unsigned char body_frame[] = {
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x00, 0x00, 0x00, 0x06,
+ 'h', 'e', 'l', 'l', 'o', '!', // "hello"
+ };
+ static const unsigned char fin_frame[] = {
+ 0x80, 0x01, 0x00, 0x03, // header
+ 0x00, 0x00, 0x00, 0x08,
+ 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+ };
+
+ MockRead data_reads[] = {
+ MockRead(true, reinterpret_cast<const char*>(body_frame),
+ sizeof(body_frame)),
+ MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)),
+ MockRead(true, 0, 0), // EOF
+ };
+
+ // We disable SSL for this test.
+ FlipSession::SetSSLMode(false);
+ SimpleGetHelperResult out = SimpleGetHelper(data_reads);
+ EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+}
+
} // namespace net
diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc
index d73f915..b0f80524 100644
--- a/net/flip/flip_session.cc
+++ b/net/flip/flip_session.cc
@@ -10,6 +10,7 @@
#include "base/rand_util.h"
#include "base/stats_counters.h"
#include "base/string_util.h"
+#include "net/base/bandwidth_metrics.h"
#include "net/base/load_flags.h"
#include "net/flip/flip_frame_builder.h"
#include "net/flip/flip_protocol.h"
@@ -37,7 +38,8 @@ class FlipStreamImpl {
: stream_id_(stream_id),
delegate_(delegate),
response_(NULL),
- data_complete_(false) {}
+ download_finished_(false),
+ metrics_(Singleton<BandwidthMetrics>::get()) {}
virtual ~FlipStreamImpl() {
LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_;
};
@@ -62,10 +64,10 @@ class FlipStreamImpl {
// after this call.
bool OnData(const char* data, int length);
- // Called if the stream is prematurely aborted.
- // The caller should Deactivate and delete the FlipStreamImpl after this
- // call.
- void OnAbort();
+ // Called if the stream is being prematurely closed for an abort or an error.
+ // |err| is actually a net::Error.
+ // The caller should Deactivate and delete the FlipStreamImpl after this call.
+ void OnError(int err);
private:
flip::FlipStreamId stream_id_;
@@ -73,7 +75,8 @@ class FlipStreamImpl {
FlipDelegate* delegate_;
scoped_ptr<HttpResponseInfo> response_;
std::list<scoped_refptr<IOBufferWithSize> > response_body_;
- bool data_complete_;
+ bool download_finished_;
+ ScopedBandwidthMetrics metrics_;
};
FlipSession* FlipSession::GetFlipSession(
@@ -110,14 +113,15 @@ FlipSession::FlipSession(std::string host, HttpNetworkSession* session)
}
FlipSession::~FlipSession() {
+ // Cleanup all the streams.
+ CloseAllStreams(net::ERR_ABORTED);
+
if (connection_.is_initialized()) {
// With Flip we can't recycle sockets.
connection_.socket()->Disconnect();
}
if (session_pool_.get())
session_pool_->Remove(this);
-
- // TODO(mbelshe) - clear out streams (active and pushed) here?
}
net::Error FlipSession::Connect(const std::string& group_name,
@@ -300,7 +304,13 @@ bool FlipSession::CancelStream(int id) {
LOG(INFO) << "Cancelling stream " << id;
if (!IsStreamActive(id))
return false;
+
+ // TODO(mbelshe): Write a method for tearing down a stream
+ // that cleans it out of the active list, the pending list,
+ // etc.
+ FlipStreamImpl* stream = active_streams_[id];
DeactivateStream(id);
+ delete stream;
return true;
}
@@ -359,12 +369,17 @@ void FlipSession::OnSSLConnect(int result) {
if (IsCertificateError(result))
result = OK; // TODO(mbelshe): pretend we're happy anyway.
- connection_ready_ = true;
+ if (result == OK) {
+ connection_ready_ = true;
- // After we've connected, send any data to the server, and then issue
- // our read.
- WriteSocketLater();
- ReadSocket();
+ // After we've connected, send any data to the server, and then issue
+ // our read.
+ WriteSocketLater();
+ ReadSocket();
+ } else {
+ NOTREACHED();
+ // TODO(mbelshe): handle the error case: could not connect
+ }
}
void FlipSession::OnReadComplete(int bytes_read) {
@@ -405,13 +420,18 @@ void FlipSession::OnWriteComplete(int result) {
LOG(INFO) << "Flip write complete (result=" << result << ")";
- // Cleanup the write which just completed.
- in_flight_write_.release();
+ if (result >= 0) {
+ // Cleanup the write which just completed.
+ in_flight_write_.release();
- // Write more data. We're already in a continuation, so we can
- // go ahead and write it immediately (without going back to the
- // message loop).
- WriteSocketLater();
+ // Write more data. We're already in a continuation, so we can
+ // go ahead and write it immediately (without going back to the
+ // message loop).
+ WriteSocketLater();
+ } else {
+ // TODO(mbelshe): Deal with result < 0 error case.
+ NOTIMPLEMENTED();
+ }
}
void FlipSession::ReadSocket() {
@@ -488,20 +508,13 @@ void FlipSession::WriteSocket() {
}
DCHECK(bytes > 0);
in_flight_write_ = PrioritizedIOBuffer(buffer, 0);
+ write_pending_ = true;
int rv = connection_.socket()->Write(in_flight_write_.buffer(),
bytes, &write_callback_);
- if (rv == net::ERR_IO_PENDING) {
- write_pending_ = true;
+ if (rv == net::ERR_IO_PENDING)
break;
- }
- if (rv < 0) {
- // Uhoh - an error!
- // TODO(mbelshe): fix me!
- NOTIMPLEMENTED();
- }
- LOG(INFO) << "FLIPSession wrote " << rv << " bytes.";
- in_flight_write_.release();
+ OnWriteComplete(rv);
}
}
@@ -525,7 +538,7 @@ void FlipSession::CloseAllStreams(net::Error code) {
// Issue the aborts.
for (--index; index >= 0; index--) {
LOG(ERROR) << "ABANDONED: " << list[index]->path();
- list[index]->OnAbort();
+ list[index]->OnError(ERR_ABORTED);
delete list[index];
}
@@ -608,8 +621,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
return;
}
- //if (!len)
- // return; // This was just an empty data packet.
+
FlipStreamImpl* stream = active_streams_[stream_id];
if (stream->OnData(data, len)) {
DeactivateStream(stream->stream_id());
@@ -621,7 +633,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
const flip::FlipHeaderBlock* headers) {
flip::FlipStreamId stream_id = frame->stream_id();
- // Server-initiated streams should have odd sequence numbers.
+ // Server-initiated streams should have even sequence numbers.
if ((stream_id & 0x1) != 0) {
LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
return;
@@ -745,14 +757,14 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
return;
}
FlipStreamImpl* stream = active_streams_[stream_id];
- // TODO(mbelshe) - status codes are HTTP codes?
- // Shouldn't it be zero/nonzero?
- // For now Roberto is sending 200, so use that as "ok".
bool cleanup_stream = false;
- if (frame->status() == 200 || frame->status() == 0) {
+ if (frame->status() == 0) {
cleanup_stream = stream->OnData(NULL, 0);
} else {
- stream->OnAbort();
+ LOG(ERROR) << "Flip stream closed: " << frame->status();
+ // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
+ // For now, it doesn't matter much - it is a protocol error.
+ stream->OnError(ERR_FAILED);
cleanup_stream = true;
}
@@ -785,7 +797,7 @@ bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) {
}
// Finally send up the end-of-stream.
- if (data_complete_) {
+ if (download_finished_) {
delegate_->OnClose(net::OK);
return true; // tell the caller to shut us down
}
@@ -800,6 +812,8 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
// TODO(mbelshe): if no version or status is found, we need to error
// out the stream.
+ metrics_.StartStream();
+
// Server initiated streams must send a URL to us in the headers.
if (headers->find("path") != headers->end())
path_ = headers->find("path")->second;
@@ -824,35 +838,58 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) {
DCHECK(response_ == NULL);
response_.reset(new HttpResponseInfo());
response_->headers = new HttpResponseHeaders(raw_headers);
+ // When pushing content from the server, we may not yet have a delegate_
+ // to notify. When the delegate is attached, it will notify then.
if (delegate_)
delegate_->OnResponseReceived(response_.get());
}
bool FlipStreamImpl::OnData(const char* data, int length) {
+ DCHECK(length >= 0);
LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
<< stream_id_;
- if (length) {
- if (delegate_) {
- delegate_->OnDataReceived(data, length);
- } else {
- // Save the data for use later
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
- memcpy(io_buffer->data(), data, length);
- response_body_.push_back(io_buffer);
- }
- } else {
- data_complete_ = true;
+
+ // If we don't have a response, then the SYN_REPLY did not come through.
+ // We cannot pass data up to the caller unless the reply headers have been
+ // received.
+ if (!response_.get()) {
+ if (delegate_)
+ delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
+ return true;
+ }
+
+ // A zero-length read means that the stream is being closed.
+ if (!length) {
+ metrics_.StopStream();
+ download_finished_ = true;
if (delegate_) {
delegate_->OnClose(net::OK);
return true; // Tell the caller to clean us up.
}
}
+
+ // Track our bandwidth.
+ metrics_.RecordBytes(length);
+
+ // We've received data. We try to pass it up to the caller.
+ // In the case of server-push streams, we may not have a delegate yet assigned
+ // to this stream. In that case we just queue the data for later.
+ if (delegate_) {
+ delegate_->OnDataReceived(data, length);
+ } else {
+ // Save the data for use later.
+ // TODO(mbelshe): We need to have some throttling on this. We shouldn't
+ // buffer an infinite amount of data.
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
+ memcpy(io_buffer->data(), data, length);
+ response_body_.push_back(io_buffer);
+ }
return false;
}
-void FlipStreamImpl::OnAbort() {
+void FlipStreamImpl::OnError(int err) {
if (delegate_)
- delegate_->OnClose(net::ERR_ABORTED);
+ delegate_->OnClose(err);
}
} // namespace net
diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h
index 38d6256..1845c27 100644
--- a/net/flip/flip_session.h
+++ b/net/flip/flip_session.h
@@ -113,6 +113,7 @@ class FlipSession : public base::RefCounted<FlipSession>,
LoadState GetLoadState() const;
protected:
FRIEND_TEST(FlipNetworkTransactionTest, Connect);
+ FRIEND_TEST(FlipNetworkTransactionTest, ResponseWithoutSynReply);
friend class FlipSessionPool;
friend class HttpNetworkLayer; // Temporary for server.
@@ -195,6 +196,9 @@ class FlipSession : public base::RefCounted<FlipSession>,
int stream_hi_water_mark_; // The next stream id to use.
+ // TODO(mbelshe): We need to track these stream lists better.
+ // I suspect it is possible to remove a stream from
+ // one list, but not the other.
typedef std::map<int, FlipStreamImpl*> ActiveStreamMap;
typedef std::list<FlipStreamImpl*> ActiveStreamList;
ActiveStreamMap active_streams_;