diff options
-rw-r--r-- | net/quic/quic_client_session.cc | 11 | ||||
-rw-r--r-- | net/quic/quic_client_session.h | 3 | ||||
-rw-r--r-- | net/quic/quic_http_stream.cc | 5 | ||||
-rw-r--r-- | net/quic/quic_http_stream_test.cc | 89 | ||||
-rw-r--r-- | net/quic/quic_protocol.cc | 1 | ||||
-rw-r--r-- | net/quic/quic_reliable_client_stream.cc | 5 | ||||
-rw-r--r-- | net/quic/quic_session.cc | 4 | ||||
-rw-r--r-- | net/quic/quic_session.h | 20 | ||||
-rw-r--r-- | net/quic/quic_session_test.cc | 39 | ||||
-rw-r--r-- | net/quic/reliable_quic_stream.cc | 13 | ||||
-rw-r--r-- | net/quic/reliable_quic_stream.h | 23 |
11 files changed, 160 insertions, 53 deletions
diff --git a/net/quic/quic_client_session.cc b/net/quic/quic_client_session.cc index 576892e..1881815 100644 --- a/net/quic/quic_client_session.cc +++ b/net/quic/quic_client_session.cc @@ -27,7 +27,6 @@ QuicClientSession::QuicClientSession(QuicConnection* connection, } QuicClientSession::~QuicClientSession() { - STLDeleteValues(&streams_); } QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { @@ -42,8 +41,6 @@ QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { } QuicReliableClientStream* stream = new QuicReliableClientStream(GetNextStreamId(), this); - streams_[stream->id()] = stream; - ActivateStream(stream); return stream; } @@ -74,14 +71,6 @@ ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream( void QuicClientSession::CloseStream(QuicStreamId stream_id) { QuicSession::CloseStream(stream_id); - StreamMap::iterator it = streams_.find(stream_id); - DCHECK(it != streams_.end()); - if (it != streams_.end()) { - ReliableQuicStream* stream = it->second; - streams_.erase(it); - delete stream; - } - if (GetNumOpenStreams() == 0) { stream_factory_->OnIdleSession(this); } diff --git a/net/quic/quic_client_session.h b/net/quic/quic_client_session.h index ad52106..2cd4456 100644 --- a/net/quic/quic_client_session.h +++ b/net/quic/quic_client_session.h @@ -51,15 +51,12 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession { QuicStreamId id) OVERRIDE; private: - typedef base::hash_map<QuicStreamId, ReliableQuicStream*> StreamMap; - // A completion callback invoked when a read completes. void OnReadComplete(int result); base::WeakPtrFactory<QuicClientSession> weak_factory_; QuicCryptoClientStream crypto_stream_; scoped_ptr<QuicConnectionHelper> helper_; - StreamMap streams_; QuicStreamFactory* stream_factory_; scoped_refptr<IOBufferWithSize> read_buffer_; bool read_pending_; diff --git a/net/quic/quic_http_stream.cc b/net/quic/quic_http_stream.cc index 4ea35bf..8124a4f 100644 --- a/net/quic/quic_http_stream.cc +++ b/net/quic/quic_http_stream.cc @@ -160,7 +160,9 @@ int QuicHttpStream::ReadResponseBody( void QuicHttpStream::Close(bool not_reusable) { // Note: the not_reusable flag has no meaning for SPDY streams. if (stream_) { + stream_->SetDelegate(NULL); stream_->Close(QUIC_NO_ERROR); + stream_ = NULL; } } @@ -210,8 +212,7 @@ bool QuicHttpStream::IsSpdyHttpStream() const { } void QuicHttpStream::Drain(HttpNetworkSession* session) { - if (stream_) - stream_->Close(QUIC_NO_ERROR); + Close(false); delete this; } diff --git a/net/quic/quic_http_stream_test.cc b/net/quic/quic_http_stream_test.cc index ff8a749..0226d40 100644 --- a/net/quic/quic_http_stream_test.cc +++ b/net/quic/quic_http_stream_test.cc @@ -73,6 +73,20 @@ class TestCollector : public QuicReceiptMetricsCollector { DISALLOW_COPY_AND_ASSIGN(TestCollector); }; +// Subclass of QuicHttpStream that closes itself when the first piece of data +// is received. +class AutoClosingStream : public QuicHttpStream { + public: + explicit AutoClosingStream(QuicReliableClientStream* stream) + : QuicHttpStream(stream) { + } + + virtual int OnDataReceived(const char* data, int length) { + Close(false); + return OK; + } +}; + } // namespace class QuicHttpStreamTest : public ::testing::Test { @@ -92,6 +106,7 @@ class QuicHttpStreamTest : public ::testing::Test { QuicHttpStreamTest() : net_log_(BoundNetLog()), + use_closing_stream_(false), read_buffer_(new IOBufferWithSize(4096)), guid_(2), framer_(QuicDecrypter::Create(kNULL), QuicEncrypter::Create(kNULL)), @@ -159,8 +174,11 @@ class QuicHttpStreamTest : public ::testing::Test { message.tag = kSHLO; session_->GetCryptoStream()->OnHandshakeMessage(message); EXPECT_TRUE(session_->IsCryptoHandshakeComplete()); - stream_.reset(new QuicHttpStream(session_->CreateOutgoingReliableStream())); - } + QuicReliableClientStream* stream = + session_->CreateOutgoingReliableStream(); + stream_.reset(use_closing_stream_ ? new AutoClosingStream(stream) : + new QuicHttpStream(stream)); + } // Returns a newly created packet to send kData on stream 1. QuicEncryptedPacket* ConstructDataPacket( @@ -176,14 +194,27 @@ class QuicHttpStreamTest : public ::testing::Test { // Returns a newly created packet to send ack data. QuicEncryptedPacket* ConstructAckPacket( QuicPacketSequenceNumber sequence_number, - QuicPacketSequenceNumber largest_received) { + QuicPacketSequenceNumber largest_received, + QuicPacketSequenceNumber least_unacked) { InitializeHeader(sequence_number); - QuicAckFrame ack(largest_received, sequence_number); + QuicAckFrame ack(largest_received, least_unacked); return ConstructPacket(header_, QuicFrame(&ack)); } + // Returns a newly created packet to send ack data. + QuicEncryptedPacket* ConstructRstPacket( + QuicPacketSequenceNumber sequence_number, + QuicStreamId stream_id, + QuicStreamOffset offset) { + InitializeHeader(sequence_number); + + QuicRstStreamFrame rst(stream_id, offset, QUIC_NO_ERROR); + return ConstructPacket(header_, QuicFrame(&rst)); + } + BoundNetLog net_log_; + bool use_closing_stream_; MockScheduler* scheduler_; TestCollector* collector_; scoped_refptr<TestTaskRunner> runner_; @@ -247,7 +278,7 @@ TEST_F(QuicHttpStreamTest, IsConnectionReusable) { TEST_F(QuicHttpStreamTest, GetRequest) { AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0, "GET / HTTP/1.1\r\n\r\n")); - AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2)); + AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 2)); Initialize(); request_.method = "GET"; @@ -260,7 +291,7 @@ TEST_F(QuicHttpStreamTest, GetRequest) { EXPECT_EQ(&response_, stream_->GetResponseInfo()); // Ack the request. - scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1)); + scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1)); ProcessPacket(*ack); EXPECT_EQ(ERR_IO_PENDING, @@ -290,7 +321,7 @@ TEST_F(QuicHttpStreamTest, GetRequest) { TEST_F(QuicHttpStreamTest, GetRequestFullResponseInSinglePacket) { AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0, "GET / HTTP/1.1\r\n\r\n")); - AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2)); + AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 2)); Initialize(); request_.method = "GET"; @@ -303,7 +334,7 @@ TEST_F(QuicHttpStreamTest, GetRequestFullResponseInSinglePacket) { EXPECT_EQ(&response_, stream_->GetResponseInfo()); // Ack the request. - scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1)); + scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1)); ProcessPacket(*ack); EXPECT_EQ(ERR_IO_PENDING, @@ -336,8 +367,8 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) { AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kNoFin, 0, kRequestData)); AddWrite(SYNCHRONOUS, ConstructDataPacket(2, kFin, strlen(kRequestData), kUploadData)); - AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2)); - AddWrite(SYNCHRONOUS, ConstructAckPacket(4, 3)); + AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2, 3)); + AddWrite(SYNCHRONOUS, ConstructAckPacket(4, 3, 4)); Initialize(); @@ -357,7 +388,7 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) { EXPECT_EQ(&response_, stream_->GetResponseInfo()); // Ack both packets in the request. - scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 2)); + scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 2, 1)); ProcessPacket(*ack); // Send the response headers (but not the body). @@ -388,6 +419,42 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) { EXPECT_TRUE(AtEof()); } +TEST_F(QuicHttpStreamTest, DestroyedEarly) { + const char kRequest[] = "GET / HTTP/1.1\r\n\r\n"; + AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0, kRequest)); + AddWrite(SYNCHRONOUS, ConstructRstPacket(2, 3, strlen(kRequest))); + AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2, 2)); + use_closing_stream_ = true; + Initialize(); + + request_.method = "GET"; + request_.url = GURL("http://www.google.com/"); + + //stream_.reset(new TestStream(session_->CreateOutgoingReliableStream())); + EXPECT_EQ(OK, stream_->InitializeStream(&request_, net_log_, + callback_.callback())); + EXPECT_EQ(OK, stream_->SendRequest(headers_, &response_, + callback_.callback())); + EXPECT_EQ(&response_, stream_->GetResponseInfo()); + + // Ack the request. + scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1)); + ProcessPacket(*ack); + EXPECT_EQ(ERR_IO_PENDING, + stream_->ReadResponseHeaders(callback_.callback())); + + // Send the response with a body. + const char kResponseHeaders[] = "HTTP/1.1 404 OK\r\n" + "Content-Type: text/plain\r\n\r\nhello world!"; + scoped_ptr<QuicEncryptedPacket> resp( + ConstructDataPacket(2, kFin, 0, kResponseHeaders)); + + // In the course of processing this packet, the QuicHttpStream close itself. + ProcessPacket(*resp); + + EXPECT_TRUE(AtEof()); +} + } // namespace test } // namespace net diff --git a/net/quic/quic_protocol.cc b/net/quic/quic_protocol.cc index a1ed882..76e6298 100644 --- a/net/quic/quic_protocol.cc +++ b/net/quic/quic_protocol.cc @@ -91,6 +91,7 @@ ostream& operator<<(ostream& os, const ReceivedPacketInfo& r) { it != r.missing_packets.end(); ++it) { os << *it << " "; } + os << " ] "; return os; } diff --git a/net/quic/quic_reliable_client_stream.cc b/net/quic/quic_reliable_client_stream.cc index 2e66897..7c9da7e 100644 --- a/net/quic/quic_reliable_client_stream.cc +++ b/net/quic/quic_reliable_client_stream.cc @@ -24,7 +24,9 @@ QuicReliableClientStream::~QuicReliableClientStream() { uint32 QuicReliableClientStream::ProcessData(const char* data, uint32 data_len) { // TODO(rch): buffer data if we don't have a delegate. - DCHECK(delegate_); + if (!delegate_) { + return ERR_ABORTED; + } int rv = delegate_->OnDataReceived(data, data_len); if (rv != OK) { DLOG(ERROR) << "Delegate refused data, rv: " << rv; @@ -39,6 +41,7 @@ void QuicReliableClientStream::TerminateFromPeer(bool half_close) { delegate_->OnClose(error()); delegate_ = NULL; } + ReliableQuicStream::TerminateFromPeer(half_close); } void QuicReliableClientStream::SetDelegate( diff --git a/net/quic/quic_session.cc b/net/quic/quic_session.cc index 5551972..c21682f 100644 --- a/net/quic/quic_session.cc +++ b/net/quic/quic_session.cc @@ -70,6 +70,8 @@ QuicSession::QuicSession(QuicConnection* connection, bool is_server) } QuicSession::~QuicSession() { + STLDeleteElements(&closed_streams_); + STLDeleteValues(&stream_map_); } bool QuicSession::OnPacket(const IPEndPoint& self_address, @@ -165,6 +167,8 @@ void QuicSession::CloseStream(QuicStreamId stream_id) { DLOG(INFO) << "Stream is already closed: " << stream_id; return; } + it->second->OnClose(); + closed_streams_.push_back(it->second); stream_map_.erase(it); } diff --git a/net/quic/quic_session.h b/net/quic/quic_session.h index 4841627..b124226 100644 --- a/net/quic/quic_session.h +++ b/net/quic/quic_session.h @@ -110,6 +110,20 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { return max_open_streams_; } + protected: + // This is called after every call other than OnConnectionClose from the + // QuicConnectionVisitor to allow post-processing once the work has been done. + // In this case, it deletes streams given that it's safe to do so (no other + // opterations are being done on the streams at this time) + virtual void PostProcessAfterData(); + + base::hash_map<QuicStreamId, ReliableQuicStream*>* streams() { + return &stream_map_; + } + std::vector<ReliableQuicStream*>* closed_streams() { + return &closed_streams_; + } + private: friend class test::QuicSessionPeer; friend class VisitorShim; @@ -118,12 +132,6 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { ReliableQuicStream* GetStream(const QuicStreamId stream_id); - // This is called after every call other than OnConnectionClose from the - // QuicConnectionVisitor to allow post-processing once the work has been done. - // In this case, it deletes streams given that it's safe to do so (no other - // opterations are being done on the streams at this time) - void PostProcessAfterData(); - scoped_ptr<QuicConnection> connection_; // A shim to stand between the connection and the session, to handle stream diff --git a/net/quic/quic_session_test.cc b/net/quic/quic_session_test.cc index a34035a..0f2d0ac 100644 --- a/net/quic/quic_session_test.cc +++ b/net/quic/quic_session_test.cc @@ -128,8 +128,10 @@ TEST_F(QuicSessionTest, IsClosedStreamDefault) { } TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) { - scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream()); - scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream()); + TestStream* stream2 = session_.CreateOutgoingReliableStream(); + EXPECT_EQ(2u, stream2->id()); + TestStream* stream4 = session_.CreateOutgoingReliableStream(); + EXPECT_EQ(4u, stream4->id()); CheckClosedStreams(); CloseStream(4); @@ -139,15 +141,15 @@ TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) { } TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) { - scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3)); - scoped_ptr<ReliableQuicStream> stream5(session_.GetIncomingReliableStream(5)); + session_.GetIncomingReliableStream(3); + session_.GetIncomingReliableStream(5); CheckClosedStreams(); CloseStream(3); CheckClosedStreams(); CloseStream(5); // Create stream id 9, and implicitly 7 - scoped_ptr<ReliableQuicStream> stream9(session_.GetIncomingReliableStream(9)); + session_.GetIncomingReliableStream(9); CheckClosedStreams(); // Close 9, but make sure 7 is still not closed CloseStream(9); @@ -155,35 +157,34 @@ TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) { } TEST_F(QuicSessionTest, StreamIdTooLarge) { - scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3)); + session_.GetIncomingReliableStream(3); EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_STREAM_ID)); - scoped_ptr<ReliableQuicStream> stream5( - session_.GetIncomingReliableStream(105)); + session_.GetIncomingReliableStream(105); } TEST_F(QuicSessionTest, OnCanWrite) { - scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream()); - scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream()); - scoped_ptr<TestStream> stream6(session_.CreateOutgoingReliableStream()); + TestStream* stream2 = session_.CreateOutgoingReliableStream(); + TestStream* stream4 = session_.CreateOutgoingReliableStream(); + TestStream* stream6 = session_.CreateOutgoingReliableStream(); session_.MarkWriteBlocked(2); session_.MarkWriteBlocked(6); session_.MarkWriteBlocked(4); InSequence s; - EXPECT_CALL(*stream2.get(), OnCanWrite()).WillOnce( + EXPECT_CALL(*stream2, OnCanWrite()).WillOnce( // Reregister, to test the loop limit. testing::InvokeWithoutArgs(&session_, &TestSession::MarkTwoWriteBlocked)); - EXPECT_CALL(*stream6.get(), OnCanWrite()); - EXPECT_CALL(*stream4.get(), OnCanWrite()); + EXPECT_CALL(*stream6, OnCanWrite()); + EXPECT_CALL(*stream4, OnCanWrite()); EXPECT_FALSE(session_.OnCanWrite()); } TEST_F(QuicSessionTest, OnCanWriteWithClosedStream) { - scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream()); - scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream()); - scoped_ptr<TestStream> stream6(session_.CreateOutgoingReliableStream()); + TestStream* stream2 = session_.CreateOutgoingReliableStream(); + TestStream* stream4 = session_.CreateOutgoingReliableStream(); + session_.CreateOutgoingReliableStream(); // stream 6 session_.MarkWriteBlocked(2); session_.MarkWriteBlocked(6); @@ -191,8 +192,8 @@ TEST_F(QuicSessionTest, OnCanWriteWithClosedStream) { CloseStream(6); InSequence s; - EXPECT_CALL(*stream2.get(), OnCanWrite()); - EXPECT_CALL(*stream4.get(), OnCanWrite()); + EXPECT_CALL(*stream2, OnCanWrite()); + EXPECT_CALL(*stream4, OnCanWrite()); EXPECT_TRUE(session_.OnCanWrite()); } diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc index 6b284e6..2aa477b 100644 --- a/net/quic/reliable_quic_stream.cc +++ b/net/quic/reliable_quic_stream.cc @@ -16,6 +16,9 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id, id_(id), offset_(0), session_(session), + visitor_(NULL), + stream_bytes_read_(0), + stream_bytes_written_(0), error_(QUIC_NO_ERROR), read_side_closed_(false), write_side_closed_(false), @@ -180,4 +183,14 @@ void ReliableQuicStream::CloseWriteSide() { } } +void ReliableQuicStream::OnClose() { + if (visitor_) { + Visitor* visitor = visitor_; + // Calling Visitor::OnClose() may result the destruction of the visitor, + // so we need to ensure we don't call it again. + visitor_ = NULL; + visitor->OnClose(this); + } +} + } // namespace net diff --git a/net/quic/reliable_quic_stream.h b/net/quic/reliable_quic_stream.h index bfe7fea..c1d7406 100644 --- a/net/quic/reliable_quic_stream.h +++ b/net/quic/reliable_quic_stream.h @@ -25,6 +25,21 @@ class QuicSession; // All this does right now is send data to subclasses via the sequencer. class NET_EXPORT_PRIVATE ReliableQuicStream { public: + // Visitor receives callbacks from the stream. + class Visitor { + public: + Visitor() {} + + // Called when the stream is closed. + virtual void OnClose(ReliableQuicStream* stream) = 0; + + protected: + virtual ~Visitor() {} + + private: + DISALLOW_COPY_AND_ASSIGN(Visitor); + }; + ReliableQuicStream(QuicStreamId id, QuicSession* session); @@ -35,6 +50,9 @@ class NET_EXPORT_PRIVATE ReliableQuicStream { virtual void OnCanWrite(); + // Called by the session just before the stream is deleted. + virtual void OnClose(); + // Called when we get a stream reset from the client. // The rst will be passed through the sequencer, which will call // TerminateFromPeer when 'offset' bytes have been processed. @@ -68,6 +86,9 @@ class NET_EXPORT_PRIVATE ReliableQuicStream { const IPEndPoint& GetPeerAddress() const; + Visitor* visitor() { return visitor_; } + void set_visitor(Visitor* visitor) { visitor_ = visitor; } + protected: // TODO(alyssar): document the return value -- whether it can be negative and // how a failure is reported. @@ -101,6 +122,8 @@ class NET_EXPORT_PRIVATE ReliableQuicStream { QuicStreamId id_; QuicStreamOffset offset_; QuicSession* session_; + // Optional visitor of this stream to be notified when the stream is closed. + Visitor* visitor_; // Bytes read and written refer to payload bytes only: they do not include // framing, encryption overhead etc. uint64 stream_bytes_read_; |