diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-24 13:22:51 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-10-24 13:22:51 +0000 |
commit | 9330067fadd9ec988a45d701016b01c05e9a0512 (patch) | |
tree | a57d67b4823413146ecf7f87bedc40c0d1e3db0e /net/flip | |
parent | 0b0e126d1f8291705dae7d70f793738ba945dcfc (diff) | |
download | chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.zip chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.gz chromium_src-9330067fadd9ec988a45d701016b01c05e9a0512.tar.bz2 |
Add download metrics into FLIP. They aren't used yet,
but they do measure accurately.
Also add some minor cleanup to the FlipSession and added
a new test. There is a lot more cleanup to do in FlipSession.
BUG=none
TEST=none
Review URL: http://codereview.chromium.org/333009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30001 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/flip')
-rw-r--r-- | net/flip/flip_network_transaction_unittest.cc | 42 | ||||
-rw-r--r-- | net/flip/flip_session.cc | 141 | ||||
-rw-r--r-- | net/flip/flip_session.h | 4 |
3 files changed, 127 insertions, 60 deletions
diff --git a/net/flip/flip_network_transaction_unittest.cc b/net/flip/flip_network_transaction_unittest.cc index 01a2faf..fb711b1 100644 --- a/net/flip/flip_network_transaction_unittest.cc +++ b/net/flip/flip_network_transaction_unittest.cc @@ -119,8 +119,6 @@ class FlipNetworkTransactionTest : public PlatformTest { return out; const HttpResponseInfo* response = trans->GetResponseInfo(); - EXPECT_TRUE(response != NULL); - EXPECT_TRUE(response->headers != NULL); out.status_line = response->headers->GetStatusLine(); @@ -147,7 +145,7 @@ TEST_F(FlipNetworkTransactionTest, Constructor) { } TEST_F(FlipNetworkTransactionTest, Connect) { - unsigned char syn_reply[] = { + static const unsigned char syn_reply[] = { 0x80, 0x01, 0x00, 0x02, // header 0x00, 0x00, 0x00, 0x45, 0x00, 0x00, 0x00, 0x01, @@ -161,12 +159,12 @@ TEST_F(FlipNetworkTransactionTest, Connect) { 0x00, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n', // "version" 0x00, 0x08, 'H', 'T', 'T', 'P', '/', '1', '.', '1', // "HTTP/1.1" }; - unsigned char body_frame[] = { + static const unsigned char body_frame[] = { 0x00, 0x00, 0x00, 0x01, // header 0x00, 0x00, 0x00, 0x06, 'h', 'e', 'l', 'l', 'o', '!', // "hello" }; - unsigned char fin_frame[] = { + static const unsigned char fin_frame[] = { 0x80, 0x01, 0x00, 0x03, // header 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x01, @@ -174,9 +172,10 @@ TEST_F(FlipNetworkTransactionTest, Connect) { }; MockRead data_reads[] = { - MockRead(true, reinterpret_cast<char*>(syn_reply), sizeof(syn_reply)), - MockRead(true, reinterpret_cast<char*>(body_frame), sizeof(body_frame)), - MockRead(true, reinterpret_cast<char*>(fin_frame), sizeof(fin_frame)), + MockRead(true, reinterpret_cast<const char*>(syn_reply), sizeof(syn_reply)), + MockRead(true, reinterpret_cast<const char*>(body_frame), + sizeof(body_frame)), + MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)), MockRead(true, 0, 0), // EOF }; @@ -188,5 +187,32 @@ TEST_F(FlipNetworkTransactionTest, Connect) { EXPECT_EQ("hello!", out.response_data); } +// Test that the transaction doesn't crash when we don't have a reply. +TEST_F(FlipNetworkTransactionTest, ResponseWithoutSynReply) { + static const unsigned char body_frame[] = { + 0x00, 0x00, 0x00, 0x01, // header + 0x00, 0x00, 0x00, 0x06, + 'h', 'e', 'l', 'l', 'o', '!', // "hello" + }; + static const unsigned char fin_frame[] = { + 0x80, 0x01, 0x00, 0x03, // header + 0x00, 0x00, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + }; + + MockRead data_reads[] = { + MockRead(true, reinterpret_cast<const char*>(body_frame), + sizeof(body_frame)), + MockRead(true, reinterpret_cast<const char*>(fin_frame), sizeof(fin_frame)), + MockRead(true, 0, 0), // EOF + }; + + // We disable SSL for this test. + FlipSession::SetSSLMode(false); + SimpleGetHelperResult out = SimpleGetHelper(data_reads); + EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); +} + } // namespace net diff --git a/net/flip/flip_session.cc b/net/flip/flip_session.cc index d73f915..b0f80524 100644 --- a/net/flip/flip_session.cc +++ b/net/flip/flip_session.cc @@ -10,6 +10,7 @@ #include "base/rand_util.h" #include "base/stats_counters.h" #include "base/string_util.h" +#include "net/base/bandwidth_metrics.h" #include "net/base/load_flags.h" #include "net/flip/flip_frame_builder.h" #include "net/flip/flip_protocol.h" @@ -37,7 +38,8 @@ class FlipStreamImpl { : stream_id_(stream_id), delegate_(delegate), response_(NULL), - data_complete_(false) {} + download_finished_(false), + metrics_(Singleton<BandwidthMetrics>::get()) {} virtual ~FlipStreamImpl() { LOG(INFO) << "Deleting FlipStreamImpl for stream " << stream_id_; }; @@ -62,10 +64,10 @@ class FlipStreamImpl { // after this call. bool OnData(const char* data, int length); - // Called if the stream is prematurely aborted. - // The caller should Deactivate and delete the FlipStreamImpl after this - // call. - void OnAbort(); + // Called if the stream is being prematurely closed for an abort or an error. + // |err| is actually a net::Error. + // The caller should Deactivate and delete the FlipStreamImpl after this call. + void OnError(int err); private: flip::FlipStreamId stream_id_; @@ -73,7 +75,8 @@ class FlipStreamImpl { FlipDelegate* delegate_; scoped_ptr<HttpResponseInfo> response_; std::list<scoped_refptr<IOBufferWithSize> > response_body_; - bool data_complete_; + bool download_finished_; + ScopedBandwidthMetrics metrics_; }; FlipSession* FlipSession::GetFlipSession( @@ -110,14 +113,15 @@ FlipSession::FlipSession(std::string host, HttpNetworkSession* session) } FlipSession::~FlipSession() { + // Cleanup all the streams. + CloseAllStreams(net::ERR_ABORTED); + if (connection_.is_initialized()) { // With Flip we can't recycle sockets. connection_.socket()->Disconnect(); } if (session_pool_.get()) session_pool_->Remove(this); - - // TODO(mbelshe) - clear out streams (active and pushed) here? } net::Error FlipSession::Connect(const std::string& group_name, @@ -300,7 +304,13 @@ bool FlipSession::CancelStream(int id) { LOG(INFO) << "Cancelling stream " << id; if (!IsStreamActive(id)) return false; + + // TODO(mbelshe): Write a method for tearing down a stream + // that cleans it out of the active list, the pending list, + // etc. + FlipStreamImpl* stream = active_streams_[id]; DeactivateStream(id); + delete stream; return true; } @@ -359,12 +369,17 @@ void FlipSession::OnSSLConnect(int result) { if (IsCertificateError(result)) result = OK; // TODO(mbelshe): pretend we're happy anyway. - connection_ready_ = true; + if (result == OK) { + connection_ready_ = true; - // After we've connected, send any data to the server, and then issue - // our read. - WriteSocketLater(); - ReadSocket(); + // After we've connected, send any data to the server, and then issue + // our read. + WriteSocketLater(); + ReadSocket(); + } else { + NOTREACHED(); + // TODO(mbelshe): handle the error case: could not connect + } } void FlipSession::OnReadComplete(int bytes_read) { @@ -405,13 +420,18 @@ void FlipSession::OnWriteComplete(int result) { LOG(INFO) << "Flip write complete (result=" << result << ")"; - // Cleanup the write which just completed. - in_flight_write_.release(); + if (result >= 0) { + // Cleanup the write which just completed. + in_flight_write_.release(); - // Write more data. We're already in a continuation, so we can - // go ahead and write it immediately (without going back to the - // message loop). - WriteSocketLater(); + // Write more data. We're already in a continuation, so we can + // go ahead and write it immediately (without going back to the + // message loop). + WriteSocketLater(); + } else { + // TODO(mbelshe): Deal with result < 0 error case. + NOTIMPLEMENTED(); + } } void FlipSession::ReadSocket() { @@ -488,20 +508,13 @@ void FlipSession::WriteSocket() { } DCHECK(bytes > 0); in_flight_write_ = PrioritizedIOBuffer(buffer, 0); + write_pending_ = true; int rv = connection_.socket()->Write(in_flight_write_.buffer(), bytes, &write_callback_); - if (rv == net::ERR_IO_PENDING) { - write_pending_ = true; + if (rv == net::ERR_IO_PENDING) break; - } - if (rv < 0) { - // Uhoh - an error! - // TODO(mbelshe): fix me! - NOTIMPLEMENTED(); - } - LOG(INFO) << "FLIPSession wrote " << rv << " bytes."; - in_flight_write_.release(); + OnWriteComplete(rv); } } @@ -525,7 +538,7 @@ void FlipSession::CloseAllStreams(net::Error code) { // Issue the aborts. for (--index; index >= 0; index--) { LOG(ERROR) << "ABANDONED: " << list[index]->path(); - list[index]->OnAbort(); + list[index]->OnError(ERR_ABORTED); delete list[index]; } @@ -608,8 +621,7 @@ void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, LOG(WARNING) << "Received data frame for invalid stream " << stream_id; return; } - //if (!len) - // return; // This was just an empty data packet. + FlipStreamImpl* stream = active_streams_[stream_id]; if (stream->OnData(data, len)) { DeactivateStream(stream->stream_id()); @@ -621,7 +633,7 @@ void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, const flip::FlipHeaderBlock* headers) { flip::FlipStreamId stream_id = frame->stream_id(); - // Server-initiated streams should have odd sequence numbers. + // Server-initiated streams should have even sequence numbers. if ((stream_id & 0x1) != 0) { LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id; return; @@ -745,14 +757,14 @@ void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { return; } FlipStreamImpl* stream = active_streams_[stream_id]; - // TODO(mbelshe) - status codes are HTTP codes? - // Shouldn't it be zero/nonzero? - // For now Roberto is sending 200, so use that as "ok". bool cleanup_stream = false; - if (frame->status() == 200 || frame->status() == 0) { + if (frame->status() == 0) { cleanup_stream = stream->OnData(NULL, 0); } else { - stream->OnAbort(); + LOG(ERROR) << "Flip stream closed: " << frame->status(); + // TODO(mbelshe): Map from Flip-protocol errors to something sensical. + // For now, it doesn't matter much - it is a protocol error. + stream->OnError(ERR_FAILED); cleanup_stream = true; } @@ -785,7 +797,7 @@ bool FlipStreamImpl::AttachDelegate(FlipDelegate* delegate) { } // Finally send up the end-of-stream. - if (data_complete_) { + if (download_finished_) { delegate_->OnClose(net::OK); return true; // tell the caller to shut us down } @@ -800,6 +812,8 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { // TODO(mbelshe): if no version or status is found, we need to error // out the stream. + metrics_.StartStream(); + // Server initiated streams must send a URL to us in the headers. if (headers->find("path") != headers->end()) path_ = headers->find("path")->second; @@ -824,35 +838,58 @@ void FlipStreamImpl::OnReply(const flip::FlipHeaderBlock* headers) { DCHECK(response_ == NULL); response_.reset(new HttpResponseInfo()); response_->headers = new HttpResponseHeaders(raw_headers); + // When pushing content from the server, we may not yet have a delegate_ + // to notify. When the delegate is attached, it will notify then. if (delegate_) delegate_->OnResponseReceived(response_.get()); } bool FlipStreamImpl::OnData(const char* data, int length) { + DCHECK(length >= 0); LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for " << stream_id_; - if (length) { - if (delegate_) { - delegate_->OnDataReceived(data, length); - } else { - // Save the data for use later - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - response_body_.push_back(io_buffer); - } - } else { - data_complete_ = true; + + // If we don't have a response, then the SYN_REPLY did not come through. + // We cannot pass data up to the caller unless the reply headers have been + // received. + if (!response_.get()) { + if (delegate_) + delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED); + return true; + } + + // A zero-length read means that the stream is being closed. + if (!length) { + metrics_.StopStream(); + download_finished_ = true; if (delegate_) { delegate_->OnClose(net::OK); return true; // Tell the caller to clean us up. } } + + // Track our bandwidth. + metrics_.RecordBytes(length); + + // We've received data. We try to pass it up to the caller. + // In the case of server-push streams, we may not have a delegate yet assigned + // to this stream. In that case we just queue the data for later. + if (delegate_) { + delegate_->OnDataReceived(data, length); + } else { + // Save the data for use later. + // TODO(mbelshe): We need to have some throttling on this. We shouldn't + // buffer an infinite amount of data. + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); + } return false; } -void FlipStreamImpl::OnAbort() { +void FlipStreamImpl::OnError(int err) { if (delegate_) - delegate_->OnClose(net::ERR_ABORTED); + delegate_->OnClose(err); } } // namespace net diff --git a/net/flip/flip_session.h b/net/flip/flip_session.h index 38d6256..1845c27 100644 --- a/net/flip/flip_session.h +++ b/net/flip/flip_session.h @@ -113,6 +113,7 @@ class FlipSession : public base::RefCounted<FlipSession>, LoadState GetLoadState() const; protected: FRIEND_TEST(FlipNetworkTransactionTest, Connect); + FRIEND_TEST(FlipNetworkTransactionTest, ResponseWithoutSynReply); friend class FlipSessionPool; friend class HttpNetworkLayer; // Temporary for server. @@ -195,6 +196,9 @@ class FlipSession : public base::RefCounted<FlipSession>, int stream_hi_water_mark_; // The next stream id to use. + // TODO(mbelshe): We need to track these stream lists better. + // I suspect it is possible to remove a stream from + // one list, but not the other. typedef std::map<int, FlipStreamImpl*> ActiveStreamMap; typedef std::list<FlipStreamImpl*> ActiveStreamList; ActiveStreamMap active_streams_; |