summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/quic/quic_client_session.cc11
-rw-r--r--net/quic/quic_client_session.h3
-rw-r--r--net/quic/quic_http_stream.cc5
-rw-r--r--net/quic/quic_http_stream_test.cc89
-rw-r--r--net/quic/quic_protocol.cc1
-rw-r--r--net/quic/quic_reliable_client_stream.cc5
-rw-r--r--net/quic/quic_session.cc4
-rw-r--r--net/quic/quic_session.h20
-rw-r--r--net/quic/quic_session_test.cc39
-rw-r--r--net/quic/reliable_quic_stream.cc13
-rw-r--r--net/quic/reliable_quic_stream.h23
11 files changed, 160 insertions, 53 deletions
diff --git a/net/quic/quic_client_session.cc b/net/quic/quic_client_session.cc
index 576892e..1881815 100644
--- a/net/quic/quic_client_session.cc
+++ b/net/quic/quic_client_session.cc
@@ -27,7 +27,6 @@ QuicClientSession::QuicClientSession(QuicConnection* connection,
}
QuicClientSession::~QuicClientSession() {
- STLDeleteValues(&streams_);
}
QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() {
@@ -42,8 +41,6 @@ QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() {
}
QuicReliableClientStream* stream =
new QuicReliableClientStream(GetNextStreamId(), this);
- streams_[stream->id()] = stream;
-
ActivateStream(stream);
return stream;
}
@@ -74,14 +71,6 @@ ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream(
void QuicClientSession::CloseStream(QuicStreamId stream_id) {
QuicSession::CloseStream(stream_id);
- StreamMap::iterator it = streams_.find(stream_id);
- DCHECK(it != streams_.end());
- if (it != streams_.end()) {
- ReliableQuicStream* stream = it->second;
- streams_.erase(it);
- delete stream;
- }
-
if (GetNumOpenStreams() == 0) {
stream_factory_->OnIdleSession(this);
}
diff --git a/net/quic/quic_client_session.h b/net/quic/quic_client_session.h
index ad52106..2cd4456 100644
--- a/net/quic/quic_client_session.h
+++ b/net/quic/quic_client_session.h
@@ -51,15 +51,12 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession {
QuicStreamId id) OVERRIDE;
private:
- typedef base::hash_map<QuicStreamId, ReliableQuicStream*> StreamMap;
-
// A completion callback invoked when a read completes.
void OnReadComplete(int result);
base::WeakPtrFactory<QuicClientSession> weak_factory_;
QuicCryptoClientStream crypto_stream_;
scoped_ptr<QuicConnectionHelper> helper_;
- StreamMap streams_;
QuicStreamFactory* stream_factory_;
scoped_refptr<IOBufferWithSize> read_buffer_;
bool read_pending_;
diff --git a/net/quic/quic_http_stream.cc b/net/quic/quic_http_stream.cc
index 4ea35bf..8124a4f 100644
--- a/net/quic/quic_http_stream.cc
+++ b/net/quic/quic_http_stream.cc
@@ -160,7 +160,9 @@ int QuicHttpStream::ReadResponseBody(
void QuicHttpStream::Close(bool not_reusable) {
// Note: the not_reusable flag has no meaning for SPDY streams.
if (stream_) {
+ stream_->SetDelegate(NULL);
stream_->Close(QUIC_NO_ERROR);
+ stream_ = NULL;
}
}
@@ -210,8 +212,7 @@ bool QuicHttpStream::IsSpdyHttpStream() const {
}
void QuicHttpStream::Drain(HttpNetworkSession* session) {
- if (stream_)
- stream_->Close(QUIC_NO_ERROR);
+ Close(false);
delete this;
}
diff --git a/net/quic/quic_http_stream_test.cc b/net/quic/quic_http_stream_test.cc
index ff8a749..0226d40 100644
--- a/net/quic/quic_http_stream_test.cc
+++ b/net/quic/quic_http_stream_test.cc
@@ -73,6 +73,20 @@ class TestCollector : public QuicReceiptMetricsCollector {
DISALLOW_COPY_AND_ASSIGN(TestCollector);
};
+// Subclass of QuicHttpStream that closes itself when the first piece of data
+// is received.
+class AutoClosingStream : public QuicHttpStream {
+ public:
+ explicit AutoClosingStream(QuicReliableClientStream* stream)
+ : QuicHttpStream(stream) {
+ }
+
+ virtual int OnDataReceived(const char* data, int length) {
+ Close(false);
+ return OK;
+ }
+};
+
} // namespace
class QuicHttpStreamTest : public ::testing::Test {
@@ -92,6 +106,7 @@ class QuicHttpStreamTest : public ::testing::Test {
QuicHttpStreamTest()
: net_log_(BoundNetLog()),
+ use_closing_stream_(false),
read_buffer_(new IOBufferWithSize(4096)),
guid_(2),
framer_(QuicDecrypter::Create(kNULL), QuicEncrypter::Create(kNULL)),
@@ -159,8 +174,11 @@ class QuicHttpStreamTest : public ::testing::Test {
message.tag = kSHLO;
session_->GetCryptoStream()->OnHandshakeMessage(message);
EXPECT_TRUE(session_->IsCryptoHandshakeComplete());
- stream_.reset(new QuicHttpStream(session_->CreateOutgoingReliableStream()));
- }
+ QuicReliableClientStream* stream =
+ session_->CreateOutgoingReliableStream();
+ stream_.reset(use_closing_stream_ ? new AutoClosingStream(stream) :
+ new QuicHttpStream(stream));
+ }
// Returns a newly created packet to send kData on stream 1.
QuicEncryptedPacket* ConstructDataPacket(
@@ -176,14 +194,27 @@ class QuicHttpStreamTest : public ::testing::Test {
// Returns a newly created packet to send ack data.
QuicEncryptedPacket* ConstructAckPacket(
QuicPacketSequenceNumber sequence_number,
- QuicPacketSequenceNumber largest_received) {
+ QuicPacketSequenceNumber largest_received,
+ QuicPacketSequenceNumber least_unacked) {
InitializeHeader(sequence_number);
- QuicAckFrame ack(largest_received, sequence_number);
+ QuicAckFrame ack(largest_received, least_unacked);
return ConstructPacket(header_, QuicFrame(&ack));
}
+ // Returns a newly created packet to send ack data.
+ QuicEncryptedPacket* ConstructRstPacket(
+ QuicPacketSequenceNumber sequence_number,
+ QuicStreamId stream_id,
+ QuicStreamOffset offset) {
+ InitializeHeader(sequence_number);
+
+ QuicRstStreamFrame rst(stream_id, offset, QUIC_NO_ERROR);
+ return ConstructPacket(header_, QuicFrame(&rst));
+ }
+
BoundNetLog net_log_;
+ bool use_closing_stream_;
MockScheduler* scheduler_;
TestCollector* collector_;
scoped_refptr<TestTaskRunner> runner_;
@@ -247,7 +278,7 @@ TEST_F(QuicHttpStreamTest, IsConnectionReusable) {
TEST_F(QuicHttpStreamTest, GetRequest) {
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0,
"GET / HTTP/1.1\r\n\r\n"));
- AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2));
+ AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 2));
Initialize();
request_.method = "GET";
@@ -260,7 +291,7 @@ TEST_F(QuicHttpStreamTest, GetRequest) {
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack the request.
- scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1));
+ scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1));
ProcessPacket(*ack);
EXPECT_EQ(ERR_IO_PENDING,
@@ -290,7 +321,7 @@ TEST_F(QuicHttpStreamTest, GetRequest) {
TEST_F(QuicHttpStreamTest, GetRequestFullResponseInSinglePacket) {
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0,
"GET / HTTP/1.1\r\n\r\n"));
- AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2));
+ AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 2));
Initialize();
request_.method = "GET";
@@ -303,7 +334,7 @@ TEST_F(QuicHttpStreamTest, GetRequestFullResponseInSinglePacket) {
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack the request.
- scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1));
+ scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1));
ProcessPacket(*ack);
EXPECT_EQ(ERR_IO_PENDING,
@@ -336,8 +367,8 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) {
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kNoFin, 0, kRequestData));
AddWrite(SYNCHRONOUS, ConstructDataPacket(2, kFin, strlen(kRequestData),
kUploadData));
- AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2));
- AddWrite(SYNCHRONOUS, ConstructAckPacket(4, 3));
+ AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2, 3));
+ AddWrite(SYNCHRONOUS, ConstructAckPacket(4, 3, 4));
Initialize();
@@ -357,7 +388,7 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) {
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack both packets in the request.
- scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 2));
+ scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 2, 1));
ProcessPacket(*ack);
// Send the response headers (but not the body).
@@ -388,6 +419,42 @@ TEST_F(QuicHttpStreamTest, SendPostRequest) {
EXPECT_TRUE(AtEof());
}
+TEST_F(QuicHttpStreamTest, DestroyedEarly) {
+ const char kRequest[] = "GET / HTTP/1.1\r\n\r\n";
+ AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0, kRequest));
+ AddWrite(SYNCHRONOUS, ConstructRstPacket(2, 3, strlen(kRequest)));
+ AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2, 2));
+ use_closing_stream_ = true;
+ Initialize();
+
+ request_.method = "GET";
+ request_.url = GURL("http://www.google.com/");
+
+ //stream_.reset(new TestStream(session_->CreateOutgoingReliableStream()));
+ EXPECT_EQ(OK, stream_->InitializeStream(&request_, net_log_,
+ callback_.callback()));
+ EXPECT_EQ(OK, stream_->SendRequest(headers_, &response_,
+ callback_.callback()));
+ EXPECT_EQ(&response_, stream_->GetResponseInfo());
+
+ // Ack the request.
+ scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 1));
+ ProcessPacket(*ack);
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream_->ReadResponseHeaders(callback_.callback()));
+
+ // Send the response with a body.
+ const char kResponseHeaders[] = "HTTP/1.1 404 OK\r\n"
+ "Content-Type: text/plain\r\n\r\nhello world!";
+ scoped_ptr<QuicEncryptedPacket> resp(
+ ConstructDataPacket(2, kFin, 0, kResponseHeaders));
+
+ // In the course of processing this packet, the QuicHttpStream close itself.
+ ProcessPacket(*resp);
+
+ EXPECT_TRUE(AtEof());
+}
+
} // namespace test
} // namespace net
diff --git a/net/quic/quic_protocol.cc b/net/quic/quic_protocol.cc
index a1ed882..76e6298 100644
--- a/net/quic/quic_protocol.cc
+++ b/net/quic/quic_protocol.cc
@@ -91,6 +91,7 @@ ostream& operator<<(ostream& os, const ReceivedPacketInfo& r) {
it != r.missing_packets.end(); ++it) {
os << *it << " ";
}
+ os << " ] ";
return os;
}
diff --git a/net/quic/quic_reliable_client_stream.cc b/net/quic/quic_reliable_client_stream.cc
index 2e66897..7c9da7e 100644
--- a/net/quic/quic_reliable_client_stream.cc
+++ b/net/quic/quic_reliable_client_stream.cc
@@ -24,7 +24,9 @@ QuicReliableClientStream::~QuicReliableClientStream() {
uint32 QuicReliableClientStream::ProcessData(const char* data,
uint32 data_len) {
// TODO(rch): buffer data if we don't have a delegate.
- DCHECK(delegate_);
+ if (!delegate_) {
+ return ERR_ABORTED;
+ }
int rv = delegate_->OnDataReceived(data, data_len);
if (rv != OK) {
DLOG(ERROR) << "Delegate refused data, rv: " << rv;
@@ -39,6 +41,7 @@ void QuicReliableClientStream::TerminateFromPeer(bool half_close) {
delegate_->OnClose(error());
delegate_ = NULL;
}
+ ReliableQuicStream::TerminateFromPeer(half_close);
}
void QuicReliableClientStream::SetDelegate(
diff --git a/net/quic/quic_session.cc b/net/quic/quic_session.cc
index 5551972..c21682f 100644
--- a/net/quic/quic_session.cc
+++ b/net/quic/quic_session.cc
@@ -70,6 +70,8 @@ QuicSession::QuicSession(QuicConnection* connection, bool is_server)
}
QuicSession::~QuicSession() {
+ STLDeleteElements(&closed_streams_);
+ STLDeleteValues(&stream_map_);
}
bool QuicSession::OnPacket(const IPEndPoint& self_address,
@@ -165,6 +167,8 @@ void QuicSession::CloseStream(QuicStreamId stream_id) {
DLOG(INFO) << "Stream is already closed: " << stream_id;
return;
}
+ it->second->OnClose();
+ closed_streams_.push_back(it->second);
stream_map_.erase(it);
}
diff --git a/net/quic/quic_session.h b/net/quic/quic_session.h
index 4841627..b124226 100644
--- a/net/quic/quic_session.h
+++ b/net/quic/quic_session.h
@@ -110,6 +110,20 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface {
return max_open_streams_;
}
+ protected:
+ // This is called after every call other than OnConnectionClose from the
+ // QuicConnectionVisitor to allow post-processing once the work has been done.
+ // In this case, it deletes streams given that it's safe to do so (no other
+ // opterations are being done on the streams at this time)
+ virtual void PostProcessAfterData();
+
+ base::hash_map<QuicStreamId, ReliableQuicStream*>* streams() {
+ return &stream_map_;
+ }
+ std::vector<ReliableQuicStream*>* closed_streams() {
+ return &closed_streams_;
+ }
+
private:
friend class test::QuicSessionPeer;
friend class VisitorShim;
@@ -118,12 +132,6 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface {
ReliableQuicStream* GetStream(const QuicStreamId stream_id);
- // This is called after every call other than OnConnectionClose from the
- // QuicConnectionVisitor to allow post-processing once the work has been done.
- // In this case, it deletes streams given that it's safe to do so (no other
- // opterations are being done on the streams at this time)
- void PostProcessAfterData();
-
scoped_ptr<QuicConnection> connection_;
// A shim to stand between the connection and the session, to handle stream
diff --git a/net/quic/quic_session_test.cc b/net/quic/quic_session_test.cc
index a34035a..0f2d0ac 100644
--- a/net/quic/quic_session_test.cc
+++ b/net/quic/quic_session_test.cc
@@ -128,8 +128,10 @@ TEST_F(QuicSessionTest, IsClosedStreamDefault) {
}
TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) {
- scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream());
- scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream());
+ TestStream* stream2 = session_.CreateOutgoingReliableStream();
+ EXPECT_EQ(2u, stream2->id());
+ TestStream* stream4 = session_.CreateOutgoingReliableStream();
+ EXPECT_EQ(4u, stream4->id());
CheckClosedStreams();
CloseStream(4);
@@ -139,15 +141,15 @@ TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) {
}
TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) {
- scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3));
- scoped_ptr<ReliableQuicStream> stream5(session_.GetIncomingReliableStream(5));
+ session_.GetIncomingReliableStream(3);
+ session_.GetIncomingReliableStream(5);
CheckClosedStreams();
CloseStream(3);
CheckClosedStreams();
CloseStream(5);
// Create stream id 9, and implicitly 7
- scoped_ptr<ReliableQuicStream> stream9(session_.GetIncomingReliableStream(9));
+ session_.GetIncomingReliableStream(9);
CheckClosedStreams();
// Close 9, but make sure 7 is still not closed
CloseStream(9);
@@ -155,35 +157,34 @@ TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) {
}
TEST_F(QuicSessionTest, StreamIdTooLarge) {
- scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3));
+ session_.GetIncomingReliableStream(3);
EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_STREAM_ID));
- scoped_ptr<ReliableQuicStream> stream5(
- session_.GetIncomingReliableStream(105));
+ session_.GetIncomingReliableStream(105);
}
TEST_F(QuicSessionTest, OnCanWrite) {
- scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream());
- scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream());
- scoped_ptr<TestStream> stream6(session_.CreateOutgoingReliableStream());
+ TestStream* stream2 = session_.CreateOutgoingReliableStream();
+ TestStream* stream4 = session_.CreateOutgoingReliableStream();
+ TestStream* stream6 = session_.CreateOutgoingReliableStream();
session_.MarkWriteBlocked(2);
session_.MarkWriteBlocked(6);
session_.MarkWriteBlocked(4);
InSequence s;
- EXPECT_CALL(*stream2.get(), OnCanWrite()).WillOnce(
+ EXPECT_CALL(*stream2, OnCanWrite()).WillOnce(
// Reregister, to test the loop limit.
testing::InvokeWithoutArgs(&session_, &TestSession::MarkTwoWriteBlocked));
- EXPECT_CALL(*stream6.get(), OnCanWrite());
- EXPECT_CALL(*stream4.get(), OnCanWrite());
+ EXPECT_CALL(*stream6, OnCanWrite());
+ EXPECT_CALL(*stream4, OnCanWrite());
EXPECT_FALSE(session_.OnCanWrite());
}
TEST_F(QuicSessionTest, OnCanWriteWithClosedStream) {
- scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream());
- scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream());
- scoped_ptr<TestStream> stream6(session_.CreateOutgoingReliableStream());
+ TestStream* stream2 = session_.CreateOutgoingReliableStream();
+ TestStream* stream4 = session_.CreateOutgoingReliableStream();
+ session_.CreateOutgoingReliableStream(); // stream 6
session_.MarkWriteBlocked(2);
session_.MarkWriteBlocked(6);
@@ -191,8 +192,8 @@ TEST_F(QuicSessionTest, OnCanWriteWithClosedStream) {
CloseStream(6);
InSequence s;
- EXPECT_CALL(*stream2.get(), OnCanWrite());
- EXPECT_CALL(*stream4.get(), OnCanWrite());
+ EXPECT_CALL(*stream2, OnCanWrite());
+ EXPECT_CALL(*stream4, OnCanWrite());
EXPECT_TRUE(session_.OnCanWrite());
}
diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc
index 6b284e6..2aa477b 100644
--- a/net/quic/reliable_quic_stream.cc
+++ b/net/quic/reliable_quic_stream.cc
@@ -16,6 +16,9 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
id_(id),
offset_(0),
session_(session),
+ visitor_(NULL),
+ stream_bytes_read_(0),
+ stream_bytes_written_(0),
error_(QUIC_NO_ERROR),
read_side_closed_(false),
write_side_closed_(false),
@@ -180,4 +183,14 @@ void ReliableQuicStream::CloseWriteSide() {
}
}
+void ReliableQuicStream::OnClose() {
+ if (visitor_) {
+ Visitor* visitor = visitor_;
+ // Calling Visitor::OnClose() may result the destruction of the visitor,
+ // so we need to ensure we don't call it again.
+ visitor_ = NULL;
+ visitor->OnClose(this);
+ }
+}
+
} // namespace net
diff --git a/net/quic/reliable_quic_stream.h b/net/quic/reliable_quic_stream.h
index bfe7fea..c1d7406 100644
--- a/net/quic/reliable_quic_stream.h
+++ b/net/quic/reliable_quic_stream.h
@@ -25,6 +25,21 @@ class QuicSession;
// All this does right now is send data to subclasses via the sequencer.
class NET_EXPORT_PRIVATE ReliableQuicStream {
public:
+ // Visitor receives callbacks from the stream.
+ class Visitor {
+ public:
+ Visitor() {}
+
+ // Called when the stream is closed.
+ virtual void OnClose(ReliableQuicStream* stream) = 0;
+
+ protected:
+ virtual ~Visitor() {}
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Visitor);
+ };
+
ReliableQuicStream(QuicStreamId id,
QuicSession* session);
@@ -35,6 +50,9 @@ class NET_EXPORT_PRIVATE ReliableQuicStream {
virtual void OnCanWrite();
+ // Called by the session just before the stream is deleted.
+ virtual void OnClose();
+
// Called when we get a stream reset from the client.
// The rst will be passed through the sequencer, which will call
// TerminateFromPeer when 'offset' bytes have been processed.
@@ -68,6 +86,9 @@ class NET_EXPORT_PRIVATE ReliableQuicStream {
const IPEndPoint& GetPeerAddress() const;
+ Visitor* visitor() { return visitor_; }
+ void set_visitor(Visitor* visitor) { visitor_ = visitor; }
+
protected:
// TODO(alyssar): document the return value -- whether it can be negative and
// how a failure is reported.
@@ -101,6 +122,8 @@ class NET_EXPORT_PRIVATE ReliableQuicStream {
QuicStreamId id_;
QuicStreamOffset offset_;
QuicSession* session_;
+ // Optional visitor of this stream to be notified when the stream is closed.
+ Visitor* visitor_;
// Bytes read and written refer to payload bytes only: they do not include
// framing, encryption overhead etc.
uint64 stream_bytes_read_;