diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-24 13:22:51 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-24 13:22:51 +0000 |
commit | 9330067fadd9ec988a45d701016b01c05e9a0512 (patch) | |
tree | a57d67b4823413146ecf7f87bedc40c0d1e3db0e /net | |
parent | 0b0e126d1f8291705dae7d70f793738ba945dcfc (diff) | |
download | chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.zip chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.gz chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.bz2 |
Add download metrics into FLIP. They aren't used yet,
but they do measure accurately.
Also add some minor cleanup to the FlipSession and added
a new test. There is a lot more cleanup to do in FlipSession.
BUG=none
TEST=none
Review URL: http://codereview.chromium.org/333009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30001 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/base/bandwidth_metrics.h | 149 | ||||
-rw-r--r-- | net/base/net_error_list.h | 12 | ||||
-rw-r--r-- | net/flip/flip_network_transaction_unittest.cc | 42 | ||||
-rw-r--r-- | net/flip/flip_session.cc | 141 | ||||
-rw-r--r-- | net/flip/flip_session.h | 4 |
5 files changed, 288 insertions, 60 deletions
diff --git a/net/base/bandwidth_metrics.h b/net/base/bandwidth_metrics.h new file mode 100644 index 0000000..078c22a --- /dev/null +++ b/net/base/bandwidth_metrics.h @@ -0,0 +1,149 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_BASE_BANDWIDTH_METRICS_H_ +#define NET_BASE_BANDWIDTH_METRICS_H_ + +#include <list> + +#include "base/logging.h" +#include "base/time.h" + +namespace net { + +// Tracks statistics about the bandwidth metrics over time. In order to +// measure, this class needs to know when individual streams are in progress, +// so that it can know when to discount idle time. The BandwidthMetrics +// is unidirectional - it should only be used to record upload or download +// bandwidth, but not both. +// +// Note, the easiest thing to do is to just measure each stream and average +// them or add them. However, this does not work. If multiple streams are in +// progress concurrently, you have to look at the aggregate bandwidth at any +// point in time. +// +// Example: +// Imagine 4 streams opening and closing with overlapping time. +// We can't measure bandwidth by looking at any individual stream. +// We can only measure actual bandwidth by looking at the bandwidth +// across all open streams. +// +// Time ---------------------------------------> +// s1 +----------------+ +// s2 +----------------+ +// s3 +--------------+ +// s4 +--------------+ +// +// Example usage: +// +// BandwidthMetrics tracker; +// +// // When a stream is created +// tracker.StartStream(); +// +// // When data is transferred on any stream +// tracker.RecordSample(bytes); +// +// // When the stream is finished +// tracker.StopStream(); +// +// NOTE: This class is not thread safe. +// +class BandwidthMetrics { + public: + BandwidthMetrics() + : num_streams_in_progress_(0), + num_data_samples_(0), + data_sum_(0.0), + bytes_since_last_start_(0) { + } + + // Get the bandwidth. Returns Kbps (kilo-bits-per-second). + double bandwidth() const { + return data_sum_ / num_data_samples_; + } + + // Record that we've started a stream. + void StartStream() { + // If we're the only stream, we've finished some idle time. Record a new + // timestamp to indicate the start of data flow. + if (++num_streams_in_progress_ == 1) { + last_start_ = base::TimeTicks::HighResNow(); + bytes_since_last_start_ = 0; + } + } + + // Track that we've completed a stream. + void StopStream() { + if (--num_streams_in_progress_ == 0) { + // We don't use small streams when tracking bandwidth because they are not + // precise; imagine a 25 byte stream. The sample is too small to make + // a good measurement. + // 20KB is an arbitrary value. We might want to use a lesser value. + static const int64 kRecordSizeThreshold = 20 * 1024; + if (bytes_since_last_start_ < kRecordSizeThreshold) + return; + + base::TimeDelta delta = base::TimeTicks::HighResNow() - last_start_; + double ms = delta.InMillisecondsF(); + if (ms > 0.0) { + double kbps = static_cast<double>(bytes_since_last_start_) * 8 / ms; + ++num_data_samples_; + data_sum_ += kbps; + LOG(INFO) << "Bandwidth: " << kbps + << "Kbps (avg " << bandwidth() << "Kbps)"; + } + } + } + + // Add a sample of the number of bytes read from the network into the tracker. + void RecordBytes(int bytes) { + DCHECK(num_streams_in_progress_); + bytes_since_last_start_ += static_cast<int64>(bytes); + } + + private: + int num_streams_in_progress_; // The number of streams in progress. + // TODO(mbelshe): Use a rolling buffer of 30 samples instead of an average. + int num_data_samples_; // The number of samples collected. + double data_sum_; // The sum of all samples collected. + int64 bytes_since_last_start_; // Bytes tracked during this "session". + base::TimeTicks last_start_; // Timestamp of the begin of this "session". +}; + +// A utility class for managing the lifecycle of a measured stream. +// It is important that we not leave unclosed streams, and this class helps +// ensure we always stop them. +class ScopedBandwidthMetrics { + public: + explicit ScopedBandwidthMetrics(BandwidthMetrics* metrics) + : metrics_(metrics), started_(false) { + } + + ~ScopedBandwidthMetrics() { + if (started_) + metrics_->StopStream(); + } + + void StartStream() { + started_ = true; + metrics_->StartStream(); + } + + void StopStream() { + started_ = false; + metrics_->StopStream(); + } + + void RecordBytes(int bytes) { metrics_->RecordBytes(bytes); } + + private: + BandwidthMetrics* metrics_; + bool started_; +}; + +} // namespace net + +#endif // NET_BASE_BANDWIDTH_METRICS_H_ + diff --git a/net/base/net_error_list.h b/net/base/net_error_list.h index 15a596b..f2aeed6 100644 --- a/net/base/net_error_list.h +++ b/net/base/net_error_list.h @@ -4,6 +4,15 @@ // This file contains the list of network errors. +// +// Ranges: +// 0- 99 System related errors +// 100-199 Connection related errors +// 200-299 Certificate errors +// 300-399 HTTP errors +// 400-499 Cache errors +// + // An asynchronous IO operation is not yet complete. This usually does not // indicate a fatal error. Typically this error will be generated as a // notification to wait for some external notification that the IO operation @@ -267,6 +276,9 @@ NET_ERROR(CONTENT_DECODING_FAILED, -330) // is suspended. NET_ERROR(NETWORK_IO_SUSPENDED, -331) +// FLIP data received without receiving a SYN_REPLY on the stream. +NET_ERROR(SYN_REPLY_NOT_RECEIVED, -332) + // The cache does not have the requested entry. NET_ERROR(CACHE_MISS, -400) diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc index 01a2faf..fb711b1 100644 --- a/net/flip/flip_network_transaction_unittest.cc +++ b/net/flip/flip_network_transaction_unittest.cc @@ -119,8 +119,6 @@ class FlipNetworkTransactionTest : public PlatformTest { return out; const HttpResponseInfo* response = trans->GetResponseInfo(); - EXPECT_TRUE(response != NULL); - EXPECT_TRUE(response->headers != NULL); out.status_line = response->headers->GetStatusLine(); @@ -147,7 +145,7 @@ TEST_F(FlipNetworkTransactionTest, Constructor) { } TEST_F(FlipNetworkTransactionTest, Connect) { - unsigned char syn_reply[] = { + static const unsigned char syn_reply[] = { 0x80, 0x01, 0x00, 0x02, // header 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x01, @@ -161,12 +159,12 @@ TEST_F(FlipNetworkTransactionTest, Connect) { 0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version" 0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1" }; - unsigned char body_frame[] = { + static const unsigned char body_frame[] = { 0x00, 0x00, 0x00, 0x01, // header 0x00, 0x00, 0x00, 0x06, 'h', 'e', 'l', 'l', 'o', '!', // "hello" }; - unsigned char fin_frame[] = { + static const unsigned char fin_frame[] = { 0x80, 0x01, 0x00, 0x03, // header 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x01, @@ -174,9 +172,10 @@ TEST_F(FlipNetworkTransactionTest, Connect) { }; MockRead data_reads[] = { - MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)), - MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)), - MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)), + MockRead(true, reinterpret_cast<const char*>(syn_reply), sizeof(syn_reply)), + MockRead(true, reinterpret_cast<const char*>(body_frame), + sizeof(body_frame)), + MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)), MockRead(true, 0, 0), // EOF }; @@ -188,5 +187,32 @@ TEST_F(FlipNetworkTransactionTest, Connect) { EXPECT_EQ("hello!", out.response_data); } +// Test that the transaction doesn't crash when we don't have a reply. +TEST_F(FlipNetworkTransactionTest, ResponseWithoutSynReply) { + static const unsigned char body_frame[] = { + 0x00, 0x00, 0x00, 0x01, // header + 0x00, 0x00, 0x00, 0x06, + 'h', 'e', 'l', 'l', 'o', '!', // "hello" + }; + static const unsigned char fin_frame[] = { + 0x80, 0x01, 0x00, 0x03, // header + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + }; + + MockRead data_reads[] = { + MockRead(true, reinterpret_cast<const char*>(body_frame), + sizeof(body_frame)), + MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)), + MockRead(true, 0, 0), // EOF + }; + + // We disable SSL for this test. + FlipSession::SetSSLMode(false); + SimpleGetHelperResult out = SimpleGetHelper(data_reads); + EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); +} + } // namespace net diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index d73f915..b0f80524 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -10,6 +10,7 @@ #include "base/rand_util.h" #include "base/stats_counters.h" #include "base/string_util.h" +#include "net/base/bandwidth_metrics.h" #include "net/base/load_flags.h" #include "net/flip/flip_frame_builder.h" #include "net/flip/flip_protocol.h" @@ -37,7 +38,8 @@ class FlipStreamImpl { : stream_id_(stream_id), delegate_(delegate), response_(NULL), - data_complete_(false) {} + download_finished_(false), + metrics_(Singleton<BandwidthMetrics>::get()) {} virtual ~FlipStreamImpl() { LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_; }; @@ -62,10 +64,10 @@ class FlipStreamImpl { // after this call. bool OnData(const char* data, int length); - // Called if the stream is prematurely aborted. - // The caller should Deactivate and delete the FlipStreamImpl after this - // call. - void OnAbort(); + // Called if the stream is being prematurely closed for an abort or an error. + // |err| is actually a net::Error. + // The caller should Deactivate and delete the FlipStreamImpl after this call. + void OnError(int err); private: flip::FlipStreamId stream_id_; @@ -73,7 +75,8 @@ class FlipStreamImpl { FlipDelegate* delegate_; scoped_ptr<HttpResponseInfo> response_; std::list<scoped_refptr<IOBufferWithSize> > response_body_; - bool data_complete_; + bool download_finished_; + ScopedBandwidthMetrics metrics_; }; FlipSession* FlipSession::GetFlipSession( @@ -110,14 +113,15 @@ FlipSession::FlipSession(std::string host, HttpNetworkSession* session) } FlipSession::~FlipSession() { + // Cleanup all the streams. + CloseAllStreams(net::ERR_ABORTED); + if (connection_.is_initialized()) { // With Flip we can't recycle sockets. connection_.socket()->Disconnect(); } if (session_pool_.get()) session_pool_->Remove(this); - - // TODO(mbelshe) - clear out streams (active and pushed) here? } net::Error FlipSession::Connect(const std::string& group_name, @@ -300,7 +304,13 @@ bool FlipSession::CancelStream(int id) { LOG(INFO) << "Cancelling stream " << id; if (!IsStreamActive(id)) return false; + + // TODO(mbelshe): Write a method for tearing down a stream + // that cleans it out of the active list, the pending list, + // etc. + FlipStreamImpl* stream = active_streams_[id]; DeactivateStream(id); + delete stream; return true; } @@ -359,12 +369,17 @@ void FlipSession::OnSSLConnect(int result) { if (IsCertificateError(result)) result = OK; // TODO(mbelshe): pretend we're happy anyway. - connection_ready_ = true; + if (result == OK) { + connection_ready_ = true; - // After we've connected, send any data to the server, and then issue - // our read. - WriteSocketLater(); - ReadSocket(); + // After we've connected, send any data to the server, and then issue + // our read. + WriteSocketLater(); + ReadSocket(); + } else { + NOTREACHED(); + // TODO(mbelshe): handle the error case: could not connect + } } void FlipSession::OnReadComplete(int bytes_read) { @@ -405,13 +420,18 @@ void FlipSession::OnWriteComplete(int result) { LOG(INFO) << "Flip write complete (result=" << result << ")"; - // Cleanup the write which just completed. - in_flight_write_.release(); + if (result >= 0) { + // Cleanup the write which just completed. + in_flight_write_.release(); - // Write more data. We're already in a continuation, so we can - // go ahead and write it immediately (without going back to the - // message loop). - WriteSocketLater(); + // Write more data. We're already in a continuation, so we can + // go ahead and write it immediately (without going back to the + // message loop). + WriteSocketLater(); + } else { + // TODO(mbelshe): Deal with result < 0 error case. + NOTIMPLEMENTED(); + } } void FlipSession::ReadSocket() { @@ -488,20 +508,13 @@ void FlipSession::WriteSocket() { } DCHECK(bytes > 0); in_flight_write_ = PrioritizedIOBuffer(buffer, 0); + write_pending_ = true; int rv = connection_.socket()->Write(in_flight_write_.buffer(), bytes, &write_callback_); - if (rv == net::ERR_IO_PENDING) { - write_pending_ = true; + if (rv == net::ERR_IO_PENDING) break; - } - if (rv < 0) { - // Uhoh - an error! - // TODO(mbelshe): fix me! - NOTIMPLEMENTED(); - } - LOG(INFO) << "FLIPSession wrote " << rv << " bytes."; - in_flight_write_.release(); + OnWriteComplete(rv); } } @@ -525,7 +538,7 @@ void FlipSession::CloseAllStreams(net::Error code) { // Issue the aborts. for (--index; index >= 0; index--) { LOG(ERROR) << "ABANDONED: " << list[index]->path(); - list[index]->OnAbort(); + list[index]->OnError(ERR_ABORTED); delete list[index]; } @@ -608,8 +621,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, LOG(WARNING) << "Received data frame for invalid stream " << stream_id; return; } - //if (!len) - // return; // This was just an empty data packet. + FlipStreamImpl* stream = active_streams_[stream_id]; if (stream->OnData(data, len)) { DeactivateStream(stream->stream_id()); @@ -621,7 +633,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, const flip::FlipHeaderBlock* headers) { flip::FlipStreamId stream_id = frame->stream_id(); - // Server-initiated streams should have odd sequence numbers. + // Server-initiated streams should have even sequence numbers. if ((stream_id & 0x1) != 0) { LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id; return; @@ -745,14 +757,14 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { return; } FlipStreamImpl* stream = active_streams_[stream_id]; - // TODO(mbelshe) - status codes are HTTP codes? - // Shouldn't it be zero/nonzero? - // For now Roberto is sending 200, so use that as "ok". bool cleanup_stream = false; - if (frame->status() == 200 || frame->status() == 0) { + if (frame->status() == 0) { cleanup_stream = stream->OnData(NULL, 0); } else { - stream->OnAbort(); + LOG(ERROR) << "Flip stream closed: " << frame->status(); + // TODO(mbelshe): Map from Flip-protocol errors to something sensical. + // For now, it doesn't matter much - it is a protocol error. + stream->OnError(ERR_FAILED); cleanup_stream = true; } @@ -785,7 +797,7 @@ bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) { } // Finally send up the end-of-stream. - if (data_complete_) { + if (download_finished_) { delegate_->OnClose(net::OK); return true; // tell the caller to shut us down } @@ -800,6 +812,8 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { // TODO(mbelshe): if no version or status is found, we need to error // out the stream. + metrics_.StartStream(); + // Server initiated streams must send a URL to us in the headers. if (headers->find("path") != headers->end()) path_ = headers->find("path")->second; @@ -824,35 +838,58 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { DCHECK(response_ == NULL); response_.reset(new HttpResponseInfo()); response_->headers = new HttpResponseHeaders(raw_headers); + // When pushing content from the server, we may not yet have a delegate_ + // to notify. When the delegate is attached, it will notify then. if (delegate_) delegate_->OnResponseReceived(response_.get()); } bool FlipStreamImpl::OnData(const char* data, int length) { + DCHECK(length >= 0); LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " << stream_id_; - if (length) { - if (delegate_) { - delegate_->OnDataReceived(data, length); - } else { - // Save the data for use later - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - response_body_.push_back(io_buffer); - } - } else { - data_complete_ = true; + + // If we don't have a response, then the SYN_REPLY did not come through. + // We cannot pass data up to the caller unless the reply headers have been + // received. + if (!response_.get()) { + if (delegate_) + delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED); + return true; + } + + // A zero-length read means that the stream is being closed. + if (!length) { + metrics_.StopStream(); + download_finished_ = true; if (delegate_) { delegate_->OnClose(net::OK); return true; // Tell the caller to clean us up. } } + + // Track our bandwidth. + metrics_.RecordBytes(length); + + // We've received data. We try to pass it up to the caller. + // In the case of server-push streams, we may not have a delegate yet assigned + // to this stream. In that case we just queue the data for later. + if (delegate_) { + delegate_->OnDataReceived(data, length); + } else { + // Save the data for use later. + // TODO(mbelshe): We need to have some throttling on this. We shouldn't + // buffer an infinite amount of data. + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); + } return false; } -void FlipStreamImpl::OnAbort() { +void FlipStreamImpl::OnError(int err) { if (delegate_) - delegate_->OnClose(net::ERR_ABORTED); + delegate_->OnClose(err); } } // namespace net diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h index 38d6256..1845c27 100644 --- a/net/flip/flip_session.h +++ b/net/flip/flip_session.h @@ -113,6 +113,7 @@ class FlipSession : public base::RefCounted<FlipSession>, LoadState GetLoadState() const; protected: FRIEND_TEST(FlipNetworkTransactionTest, Connect); + FRIEND_TEST(FlipNetworkTransactionTest, ResponseWithoutSynReply); friend class FlipSessionPool; friend class HttpNetworkLayer; // Temporary for server. @@ -195,6 +196,9 @@ class FlipSession : public base::RefCounted<FlipSession>, int stream_hi_water_mark_; // The next stream id to use. + // TODO(mbelshe): We need to track these stream lists better. + // I suspect it is possible to remove a stream from + // one list, but not the other. typedef std::map<int, FlipStreamImpl*> ActiveStreamMap; typedef std::list<FlipStreamImpl*> ActiveStreamList; ActiveStreamMap active_streams_; |