diff options
72 files changed, 1950 insertions, 1824 deletions
diff --git a/net/net.gyp b/net/net.gyp index b48a4c7..971a988 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -860,6 +860,8 @@ 'quic/quic_crypto_stream.h', 'quic/quic_data_reader.cc', 'quic/quic_data_reader.h', + 'quic/quic_data_stream.cc', + 'quic/quic_data_stream.h', 'quic/quic_data_writer.cc', 'quic/quic_data_writer.h', 'quic/quic_default_packet_writer.cc', @@ -1820,6 +1822,8 @@ 'quic/test_tools/quic_client_session_peer.h', 'quic/test_tools/quic_connection_peer.cc', 'quic/test_tools/quic_connection_peer.h', + 'quic/test_tools/quic_data_stream_peer.cc', + 'quic/test_tools/quic_data_stream_peer.h', 'quic/test_tools/quic_framer_peer.cc', 'quic/test_tools/quic_framer_peer.h', 'quic/test_tools/quic_packet_creator_peer.cc', @@ -1851,6 +1855,7 @@ 'quic/quic_crypto_client_stream_test.cc', 'quic/quic_crypto_server_stream_test.cc', 'quic/quic_crypto_stream_test.cc', + 'quic/quic_data_stream_test.cc', 'quic/quic_data_writer_test.cc', 'quic/quic_fec_group_test.cc', 'quic/quic_framer_test.cc', @@ -2001,10 +2006,9 @@ 'tools/quic/quic_epoll_clock_test.cc', 'tools/quic/quic_epoll_connection_helper_test.cc', 'tools/quic/quic_in_memory_cache_test.cc', - 'tools/quic/quic_reliable_client_stream_test.cc', - 'tools/quic/quic_reliable_server_stream_test.cc', 'tools/quic/quic_server_session_test.cc', 'tools/quic/quic_server_test.cc', + 'tools/quic/quic_spdy_client_stream_test.cc', 'tools/quic/quic_spdy_server_stream_test.cc', 'tools/quic/quic_time_wait_list_manager_test.cc', 'tools/quic/test_tools/http_message_test_utils.cc', @@ -2864,10 +2868,6 @@ 'tools/quic/quic_epoll_connection_helper.h', 'tools/quic/quic_in_memory_cache.cc', 'tools/quic/quic_in_memory_cache.h', - 'tools/quic/quic_reliable_client_stream.cc', - 'tools/quic/quic_reliable_client_stream.h', - 'tools/quic/quic_reliable_server_stream.cc', - 'tools/quic/quic_reliable_server_stream.h', 'tools/quic/quic_server.cc', 'tools/quic/quic_server.h', 'tools/quic/quic_server_session.cc', diff --git a/net/quic/crypto/quic_random.cc b/net/quic/crypto/quic_random.cc index c96f01a..73fce2e 100644 --- a/net/quic/crypto/quic_random.cc +++ b/net/quic/crypto/quic_random.cc @@ -19,7 +19,6 @@ class DefaultRandom : public QuicRandom { // QuicRandom implementation virtual void RandBytes(void* data, size_t len) OVERRIDE; virtual uint64 RandUint64() OVERRIDE; - virtual bool RandBool() OVERRIDE; virtual void Reseed(const void* additional_entropy, size_t entropy_len) OVERRIDE; @@ -45,12 +44,6 @@ uint64 DefaultRandom::RandUint64() { return value; } -bool DefaultRandom::RandBool() { - char value; - RandBytes(&value, sizeof(value)); - return (value & 1) == 1; -} - void DefaultRandom::Reseed(const void* additional_entropy, size_t entropy_len) { // No such function exists in crypto/random.h. } diff --git a/net/quic/crypto/quic_random.h b/net/quic/crypto/quic_random.h index 68640c1..ac69b85 100644 --- a/net/quic/crypto/quic_random.h +++ b/net/quic/crypto/quic_random.h @@ -27,9 +27,6 @@ class NET_EXPORT_PRIVATE QuicRandom { // Returns a random number in the range [0, kuint64max]. virtual uint64 RandUint64() = 0; - // Returns a random boolean value. - virtual bool RandBool() = 0; - // Reseeds the random number generator with additional entropy input. // NOTE: the constructor of a QuicRandom object is responsible for seeding // itself with enough entropy input. diff --git a/net/quic/quic_client_session.cc b/net/quic/quic_client_session.cc index a4bf743..e4a7df0 100644 --- a/net/quic/quic_client_session.cc +++ b/net/quic/quic_client_session.cc @@ -214,7 +214,7 @@ void QuicClientSession::CancelRequest(StreamRequest* request) { } } -QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { +QuicReliableClientStream* QuicClientSession::CreateOutgoingDataStream() { if (!crypto_stream_->encryption_established()) { DVLOG(1) << "Encryption not active so no outgoing stream created."; return NULL; @@ -277,7 +277,7 @@ int QuicClientSession::GetNumSentClientHellos() const { return crypto_stream_->num_sent_client_hellos(); } -ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream( +QuicDataStream* QuicClientSession::CreateIncomingDataStream( QuicStreamId id) { DLOG(ERROR) << "Server push not supported"; return NULL; diff --git a/net/quic/quic_client_session.h b/net/quic/quic_client_session.h index 6e8ea47..07da96a 100644 --- a/net/quic/quic_client_session.h +++ b/net/quic/quic_client_session.h @@ -114,7 +114,7 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession { // QuicSession methods: virtual bool OnStreamFrames( const std::vector<QuicStreamFrame>& frames) OVERRIDE; - virtual QuicReliableClientStream* CreateOutgoingReliableStream() OVERRIDE; + virtual QuicReliableClientStream* CreateOutgoingDataStream() OVERRIDE; virtual QuicCryptoClientStream* GetCryptoStream() OVERRIDE; virtual void CloseStream(QuicStreamId stream_id) OVERRIDE; virtual void SendRstStream(QuicStreamId id, @@ -156,8 +156,7 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession { protected: // QuicSession methods: - virtual ReliableQuicStream* CreateIncomingReliableStream( - QuicStreamId id) OVERRIDE; + virtual QuicDataStream* CreateIncomingDataStream(QuicStreamId id) OVERRIDE; private: friend class test::QuicClientSessionPeer; diff --git a/net/quic/quic_client_session_test.cc b/net/quic/quic_client_session_test.cc index f04e640..f18f71b 100644 --- a/net/quic/quic_client_session_test.cc +++ b/net/quic/quic_client_session_test.cc @@ -116,15 +116,15 @@ TEST_F(QuicClientSessionTest, MaxNumStreams) { std::vector<QuicReliableClientStream*> streams; for (size_t i = 0; i < kDefaultMaxStreamsPerConnection; i++) { - QuicReliableClientStream* stream = session_.CreateOutgoingReliableStream(); + QuicReliableClientStream* stream = session_.CreateOutgoingDataStream(); EXPECT_TRUE(stream); streams.push_back(stream); } - EXPECT_FALSE(session_.CreateOutgoingReliableStream()); + EXPECT_FALSE(session_.CreateOutgoingDataStream()); // Close a stream and ensure I can now open a new one. session_.CloseStream(streams[0]->id()); - EXPECT_TRUE(session_.CreateOutgoingReliableStream()); + EXPECT_TRUE(session_.CreateOutgoingDataStream()); } TEST_F(QuicClientSessionTest, MaxNumStreamsViaRequest) { @@ -132,7 +132,7 @@ TEST_F(QuicClientSessionTest, MaxNumStreamsViaRequest) { std::vector<QuicReliableClientStream*> streams; for (size_t i = 0; i < kDefaultMaxStreamsPerConnection; i++) { - QuicReliableClientStream* stream = session_.CreateOutgoingReliableStream(); + QuicReliableClientStream* stream = session_.CreateOutgoingDataStream(); EXPECT_TRUE(stream); streams.push_back(stream); } @@ -157,7 +157,7 @@ TEST_F(QuicClientSessionTest, GoAwayReceived) { // After receiving a GoAway, I should no longer be able to create outgoing // streams. session_.OnGoAway(QuicGoAwayFrame(QUIC_PEER_GOING_AWAY, 1u, "Going away.")); - EXPECT_EQ(NULL, session_.CreateOutgoingReliableStream()); + EXPECT_EQ(NULL, session_.CreateOutgoingDataStream()); } } // namespace diff --git a/net/quic/quic_connection.cc b/net/quic/quic_connection.cc index ca92f44..b93fdee 100644 --- a/net/quic/quic_connection.cc +++ b/net/quic/quic_connection.cc @@ -101,23 +101,6 @@ class RetransmissionAlarm : public QuicAlarm::Delegate { QuicConnection* connection_; }; -// This alarm will be scheduled any time a FEC-bearing packet is sent out. -// When the alarm goes off, the connection checks to see if the oldest packets -// have been acked, and removes them from the congestion window if not. -class AbandonFecAlarm : public QuicAlarm::Delegate { - public: - explicit AbandonFecAlarm(QuicConnection* connection) - : connection_(connection) { - } - - virtual QuicTime OnAlarm() OVERRIDE { - return connection_->OnAbandonFecTimeout(); - } - - private: - QuicConnection* connection_; -}; - // An alarm that is scheduled when the sent scheduler requires a // a delay before sending packets and fires when the packet may be sent. class SendAlarm : public QuicAlarm::Delegate { @@ -208,7 +191,6 @@ QuicConnection::QuicConnection(QuicGuid guid, received_packet_manager_(kTCP), ack_alarm_(helper->CreateAlarm(new AckAlarm(this))), retransmission_alarm_(helper->CreateAlarm(new RetransmissionAlarm(this))), - abandon_fec_alarm_(helper->CreateAlarm(new AbandonFecAlarm(this))), send_alarm_(helper->CreateAlarm(new SendAlarm(this))), resume_writes_alarm_(helper->CreateAlarm(new SendAlarm(this))), timeout_alarm_(helper->CreateAlarm(new TimeoutAlarm(this))), @@ -524,18 +506,13 @@ void QuicConnection::ProcessAckFrame(const QuicAckFrame& incoming_ack) { if (reset_retransmission_alarm) { retransmission_alarm_->Cancel(); - abandon_fec_alarm_->Cancel(); // Reset the RTO and FEC alarms if the are unacked packets. - QuicTime::Delta retransmission_delay = - sent_packet_manager_.GetRetransmissionDelay(); if (sent_packet_manager_.HasUnackedPackets()) { + QuicTime::Delta retransmission_delay = + sent_packet_manager_.GetRetransmissionDelay(); retransmission_alarm_->Set( clock_->ApproximateNow().Add(retransmission_delay)); } - if (sent_packet_manager_.HasUnackedFecPackets()) { - abandon_fec_alarm_->Set( - clock_->ApproximateNow().Add(retransmission_delay)); - } } } @@ -1108,9 +1085,8 @@ bool QuicConnection::CanWrite(TransmissionType transmission_type, return true; } -void QuicConnection::SetupRetransmission( - QuicPacketSequenceNumber sequence_number, - EncryptionLevel level) { +void QuicConnection::SetupRetransmissionAlarm( + QuicPacketSequenceNumber sequence_number) { if (!sent_packet_manager_.HasRetransmittableFrames(sequence_number)) { DVLOG(1) << ENDPOINT << "Will not retransmit packet " << sequence_number; return; @@ -1128,16 +1104,6 @@ void QuicConnection::SetupRetransmission( clock_->ApproximateNow().Add(retransmission_delay)); } -void QuicConnection::SetupAbandonFecTimer( - QuicPacketSequenceNumber sequence_number) { - if (abandon_fec_alarm_->IsSet()) { - return; - } - QuicTime::Delta retransmission_delay = - sent_packet_manager_.GetRetransmissionDelay(); - abandon_fec_alarm_->Set(clock_->ApproximateNow().Add(retransmission_delay)); -} - bool QuicConnection::WritePacket(EncryptionLevel level, QuicPacketSequenceNumber sequence_number, QuicPacket* packet, @@ -1314,7 +1280,6 @@ bool QuicConnection::OnPacketSent(WriteResult result) { QuicPacketSequenceNumber sequence_number = pending_write_->sequence_number; TransmissionType transmission_type = pending_write_->transmission_type; HasRetransmittableData retransmittable = pending_write_->retransmittable; - EncryptionLevel level = pending_write_->level; bool is_fec_packet = pending_write_->is_fec_packet; size_t length = pending_write_->length; pending_write_.reset(); @@ -1336,10 +1301,8 @@ bool QuicConnection::OnPacketSent(WriteResult result) { // Set the retransmit alarm only when we have sent the packet to the client // and not when it goes to the pending queue, otherwise we will end up adding // an entry to retransmission_timeout_ every time we attempt a write. - if (retransmittable == HAS_RETRANSMITTABLE_DATA) { - SetupRetransmission(sequence_number, level); - } else if (is_fec_packet) { - SetupAbandonFecTimer(sequence_number); + if (retransmittable == HAS_RETRANSMITTABLE_DATA || is_fec_packet) { + SetupRetransmissionAlarm(sequence_number); } // TODO(ianswett): Change the sequence number length and other packet creator @@ -1350,7 +1313,7 @@ bool QuicConnection::OnPacketSent(WriteResult result) { sent_packet_manager_.SmoothedRtt())); sent_packet_manager_.OnPacketSent(sequence_number, now, length, - transmission_type, retransmittable); + transmission_type, retransmittable); stats_.bytes_sent += result.bytes_written; ++stats_.packets_sent; @@ -1370,8 +1333,7 @@ bool QuicConnection::OnSerializedPacket( serialized_packet.retransmittable_frames-> set_encryption_level(encryption_level_); } - sent_packet_manager_.OnSerializedPacket(serialized_packet, - clock_->ApproximateNow()); + sent_packet_manager_.OnSerializedPacket(serialized_packet); // The TransmissionType is NOT_RETRANSMISSION because all retransmissions // serialize packets and invoke SendOrQueuePacket directly. return SendOrQueuePacket(encryption_level_, @@ -1436,16 +1398,14 @@ void QuicConnection::OnRetransmissionTimeout() { sent_packet_manager_.OnRetransmissionTimeout(); WriteIfNotBlocked(); -} - -QuicTime QuicConnection::OnAbandonFecTimeout() { - QuicTime fec_timeout = sent_packet_manager_.OnAbandonFecTimeout(); - // If a packet was abandoned, then the congestion window may have - // opened up, so attempt to write. - WriteIfNotBlocked(); - - return fec_timeout; + // Ensure the retransmission alarm is always set if there are unacked packets. + if (sent_packet_manager_.HasUnackedPackets() && !HasQueuedData() && + !retransmission_alarm_->IsSet()) { + QuicTime rto_timeout = clock_->ApproximateNow().Add( + sent_packet_manager_.GetRetransmissionDelay()); + retransmission_alarm_->Set(rto_timeout); + } } void QuicConnection::SetEncrypter(EncryptionLevel level, diff --git a/net/quic/quic_connection.h b/net/quic/quic_connection.h index 3cf3384..beb8985 100644 --- a/net/quic/quic_connection.h +++ b/net/quic/quic_connection.h @@ -359,14 +359,10 @@ class NET_EXPORT_PRIVATE QuicConnection // Sets up a packet with an QuicAckFrame and sends it out. void SendAck(); - // Called when an RTO fires. Returns the time when this alarm - // should next fire, or 0 if no retransmission alarm should be set. + // Called when an RTO fires. Resets the retransmission alarm if there are + // remaining unacked packets. void OnRetransmissionTimeout(); - // Called when an alarm to abandon sent FEC packets fires. The alarm is set - // by the same policy as the RTO alarm, but is a separate alarm. - QuicTime OnAbandonFecTimeout(); - // Retransmits all unacked packets with retransmittable frames if // |retransmission_type| is ALL_PACKETS, otherwise retransmits only initially // encrypted packets. Used when the negotiated protocol version is different @@ -579,8 +575,7 @@ class NET_EXPORT_PRIVATE QuicConnection // Sends a version negotiation packet to the peer. void SendVersionNegotiationPacket(); - void SetupRetransmission(QuicPacketSequenceNumber sequence_number, - EncryptionLevel level); + void SetupRetransmissionAlarm(QuicPacketSequenceNumber sequence_number); bool IsRetransmission(QuicPacketSequenceNumber sequence_number); void SetupAbandonFecTimer(QuicPacketSequenceNumber sequence_number); @@ -708,8 +703,6 @@ class NET_EXPORT_PRIVATE QuicConnection scoped_ptr<QuicAlarm> ack_alarm_; // An alarm that fires when a packet needs to be retransmitted. scoped_ptr<QuicAlarm> retransmission_alarm_; - // An alarm that fires when one or more FEC packets are to be discarded. - scoped_ptr<QuicAlarm> abandon_fec_alarm_; // An alarm that is scheduled when the sent scheduler requires a // a delay before sending packets and fires when the packet may be sent. scoped_ptr<QuicAlarm> send_alarm_; diff --git a/net/quic/quic_connection_test.cc b/net/quic/quic_connection_test.cc index 085c528..05aeeeb 100644 --- a/net/quic/quic_connection_test.cc +++ b/net/quic/quic_connection_test.cc @@ -13,7 +13,6 @@ #include "net/quic/crypto/null_encrypter.h" #include "net/quic/crypto/quic_decrypter.h" #include "net/quic/crypto/quic_encrypter.h" -#include "net/quic/crypto/quic_random.h" #include "net/quic/quic_protocol.h" #include "net/quic/quic_sent_packet_manager.h" #include "net/quic/quic_utils.h" @@ -473,11 +472,6 @@ class TestConnection : public QuicConnection { QuicConnectionPeer::GetRetransmissionAlarm(this)); } - TestConnectionHelper::TestAlarm* GetAbandonFecAlarm() { - return reinterpret_cast<TestConnectionHelper::TestAlarm*>( - QuicConnectionPeer::GetAbandonFecAlarm(this)); - } - TestConnectionHelper::TestAlarm* GetSendAlarm() { return reinterpret_cast<TestConnectionHelper::TestAlarm*>( QuicConnectionPeer::GetSendAlarm(this)); @@ -507,7 +501,7 @@ class QuicConnectionTest : public ::testing::TestWithParam<bool> { QuicConnectionTest() : guid_(42), framer_(QuicSupportedVersions(), QuicTime::Zero(), false), - creator_(guid_, &framer_, QuicRandom::GetInstance(), false), + creator_(guid_, &framer_, &random_generator_, false), send_algorithm_(new StrictMock<MockSendAlgorithm>), helper_(new TestConnectionHelper(&clock_, &random_generator_)), writer_(new TestPacketWriter()), @@ -920,6 +914,7 @@ TEST_F(QuicConnectionTest, TruncatedAck) { for (QuicPacketSequenceNumber i = 1; i <= 256; ++i) { frame.received_info.missing_packets.insert(i * 2); } + frame.received_info.entropy_hash = 0; EXPECT_CALL(entropy_calculator_, EntropyHash(511)).WillOnce(testing::Return(0)); EXPECT_CALL(*send_algorithm_, OnPacketAcked(_, _, _)).Times(256); @@ -934,6 +929,7 @@ TEST_F(QuicConnectionTest, TruncatedAck) { received_packet_manager->peer_largest_observed_packet()); frame.received_info.missing_packets.erase(192); + frame.received_info.entropy_hash = 2; // Removing one missing packet allows us to ack 192 and one more range. EXPECT_CALL(*send_algorithm_, OnPacketAcked(_, _, _)).Times(2); @@ -1024,6 +1020,8 @@ TEST_F(QuicConnectionTest, AckReceiptCausesAckSend) { // But an ack with no missing packets will not send an ack. frame2.received_info.missing_packets.clear(); + frame2.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, retransmission); ProcessAckPacket(&frame2); ProcessAckPacket(&frame2); } @@ -1296,10 +1294,12 @@ TEST_F(QuicConnectionTest, AbandonFECFromCongestionWindow) { QuicTime::Delta::FromMilliseconds(5000); clock_.AdvanceTime(retransmission_time); - // Abandon FEC packet. - EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)).Times(1); + // Abandon FEC packet and data packet. + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)).Times(2); + EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)).Times(1); EXPECT_CALL(visitor_, OnCanWrite()); - connection_.OnAbandonFecTimeout(); + connection_.OnRetransmissionTimeout(); } TEST_F(QuicConnectionTest, DontAbandonAckedFEC) { @@ -1330,8 +1330,53 @@ TEST_F(QuicConnectionTest, DontAbandonAckedFEC) { // Don't abandon the acked FEC packet, but it will abandon 2 the subsequent // FEC packets. - EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)).Times(2); - connection_.GetAbandonFecAlarm()->Fire(); + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)).Times(5); + EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)).Times(3); + connection_.GetRetransmissionAlarm()->Fire(); +} + +TEST_F(QuicConnectionTest, DontAbandonAllFEC) { + EXPECT_CALL(visitor_, OnSuccessfulVersionNegotiation(_)); + connection_.options()->max_packets_per_fec_group = 1; + + // 1 Data and 1 FEC packet. + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)).Times(6); + connection_.SendStreamDataWithString(1, "foo", 0, !kFin, NULL); + // Send some more data afterwards to ensure early retransmit doesn't trigger. + connection_.SendStreamDataWithString(1, "foo", 3, !kFin, NULL); + // Advance the time so not all the FEC packets are abandoned. + clock_.AdvanceTime(QuicTime::Delta::FromMilliseconds(1)); + connection_.SendStreamDataWithString(1, "foo", 6, !kFin, NULL); + + QuicAckFrame ack_fec(5, QuicTime::Zero(), 1); + // Ack all data packets, but no fec packets. + ack_fec.received_info.missing_packets.insert(2); + ack_fec.received_info.missing_packets.insert(4); + ack_fec.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 5) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 4) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 3) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 2) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); + + // Lose the first FEC packet and ack the three data packets. + EXPECT_CALL(*send_algorithm_, OnPacketAcked(_, _, _)).Times(3); + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(2, _)); + EXPECT_CALL(*send_algorithm_, OnPacketLost(2, _)); + ProcessAckPacket(&ack_fec); + + clock_.AdvanceTime(DefaultRetransmissionTime().Subtract( + QuicTime::Delta::FromMilliseconds(1))); + + // Don't abandon the acked FEC packet, but it will abandon 1 of the subsequent + // FEC packets. + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(4, _)); + connection_.GetRetransmissionAlarm()->Fire(); + + // Ensure the connection's alarm is still set, in order to abandon the third + // FEC packet. + EXPECT_TRUE(connection_.GetRetransmissionAlarm()->IsSet()); } TEST_F(QuicConnectionTest, FramePacking) { @@ -1653,7 +1698,7 @@ TEST_F(QuicConnectionTest, DiscardRetransmit) { // Now, ack the previous transmission. QuicAckFrame ack_all(3, QuicTime::Zero(), 0); - nack_two.received_info.entropy_hash = + ack_all.received_info.entropy_hash = QuicConnectionPeer::GetSentEntropyHash(&connection_, 3); ProcessAckPacket(&ack_all); @@ -1795,7 +1840,7 @@ TEST_F(QuicConnectionTest, MultipleAcks) { frame1.received_info.entropy_hash = QuicConnectionPeer::GetSentEntropyHash(&connection_, 5) ^ QuicConnectionPeer::GetSentEntropyHash(&connection_, 3) ^ - QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); + QuicConnectionPeer::GetSentEntropyHash(&connection_, 2); EXPECT_CALL(visitor_, OnSuccessfulVersionNegotiation(_)); @@ -1932,9 +1977,9 @@ TEST_F(QuicConnectionTest, RTOWithSameEncryptionLevel) { connection_.GetRetransmissionAlarm()->deadline()); { InSequence s; - EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(1, _)); EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(2, _)); + EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); EXPECT_CALL(*send_algorithm_, OnPacketSent(_, 3, _, RTO_RETRANSMISSION, _)); EXPECT_CALL(*send_algorithm_, OnPacketSent(_, 4, _, RTO_RETRANSMISSION, _)); } @@ -2130,8 +2175,11 @@ TEST_F(QuicConnectionTest, RetransmissionCountCalculation) { // Ack the retransmitted packet. ack.received_info.missing_packets.insert(original_sequence_number); ack.received_info.missing_packets.insert(rto_sequence_number); - ack.received_info.entropy_hash = QuicConnectionPeer::GetSentEntropyHash( - &connection_, rto_sequence_number - 1); + ack.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, + rto_sequence_number - 1) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, + original_sequence_number); for (int i = 0; i < 3; i++) { ProcessAckPacket(&ack); } @@ -2170,6 +2218,8 @@ TEST_F(QuicConnectionTest, DelayRTOWithAckReceipt) { clock_.AdvanceTime(DefaultRetransmissionTime()); EXPECT_CALL(*send_algorithm_, OnPacketAcked(_, _, _)).Times(1); QuicAckFrame ack(1, QuicTime::Zero(), 0); + ack.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); ProcessAckPacket(&ack); EXPECT_TRUE(retransmission_alarm->IsSet()); @@ -3144,6 +3194,8 @@ TEST_F(QuicConnectionTest, AckNotifierTriggerCallback) { // Process an ACK from the server which should trigger the callback. EXPECT_CALL(*send_algorithm_, OnPacketAcked(_, _, _)).Times(1); QuicAckFrame frame(1, QuicTime::Zero(), 0); + frame.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); ProcessAckPacket(&frame); } @@ -3168,6 +3220,9 @@ TEST_F(QuicConnectionTest, AckNotifierFailToTriggerCallback) { // which we registered to be notified about. QuicAckFrame frame(3, QuicTime::Zero(), 0); frame.received_info.missing_packets.insert(1); + frame.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 3) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); EXPECT_CALL(*send_algorithm_, OnPacketLost(_, _)); EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)); ProcessAckPacket(&frame); @@ -3192,6 +3247,10 @@ TEST_F(QuicConnectionTest, AckNotifierCallbackAfterRetransmission) { // Now we receive ACK for packets 1, 3, and 4, which invokes fast retransmit. QuicAckFrame frame(4, QuicTime::Zero(), 0); frame.received_info.missing_packets.insert(2); + frame.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 4) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 2) ^ + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); EXPECT_CALL(*send_algorithm_, OnPacketLost(2, _)); EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(2, _)); EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)); @@ -3200,6 +3259,8 @@ TEST_F(QuicConnectionTest, AckNotifierCallbackAfterRetransmission) { // Now we get an ACK for packet 5 (retransmitted packet 2), which should // trigger the callback. QuicAckFrame second_ack_frame(5, QuicTime::Zero(), 0); + second_ack_frame.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 5); ProcessAckPacket(&second_ack_frame); } @@ -3224,6 +3285,8 @@ TEST_F(QuicConnectionTest, AckNotifierCallbackAfterFECRecovery) { QuicFrames frames; QuicAckFrame ack_frame(1, QuicTime::Zero(), 0); + ack_frame.received_info.entropy_hash = + QuicConnectionPeer::GetSentEntropyHash(&connection_, 1); frames.push_back(QuicFrame(&ack_frame)); // Dummy stream frame to satisfy expectations set elsewhere. diff --git a/net/quic/quic_crypto_client_stream_test.cc b/net/quic/quic_crypto_client_stream_test.cc index 0af2e61..d49e2c5 100644 --- a/net/quic/quic_crypto_client_stream_test.cc +++ b/net/quic/quic_crypto_client_stream_test.cc @@ -70,7 +70,7 @@ TEST_F(QuicCryptoClientStreamTest, MessageAfterHandshake) { QUIC_CRYPTO_MESSAGE_AFTER_HANDSHAKE_COMPLETE)); message_.set_tag(kCHLO); ConstructHandshakeMessage(); - stream_->ProcessData(message_data_->data(), message_data_->length()); + stream_->ProcessRawData(message_data_->data(), message_data_->length()); } TEST_F(QuicCryptoClientStreamTest, BadMessageType) { @@ -81,7 +81,7 @@ TEST_F(QuicCryptoClientStreamTest, BadMessageType) { EXPECT_CALL(*connection_, SendConnectionCloseWithDetails( QUIC_INVALID_CRYPTO_MESSAGE_TYPE, "Expected REJ")); - stream_->ProcessData(message_data_->data(), message_data_->length()); + stream_->ProcessRawData(message_data_->data(), message_data_->length()); } TEST_F(QuicCryptoClientStreamTest, NegotiatedParameters) { diff --git a/net/quic/quic_crypto_server_stream_test.cc b/net/quic/quic_crypto_server_stream_test.cc index d2b6e99..cc50dce 100644 --- a/net/quic/quic_crypto_server_stream_test.cc +++ b/net/quic/quic_crypto_server_stream_test.cc @@ -238,7 +238,7 @@ TEST_P(QuicCryptoServerStreamTest, MessageAfterHandshake) { QUIC_CRYPTO_MESSAGE_AFTER_HANDSHAKE_COMPLETE)); message_.set_tag(kCHLO); ConstructHandshakeMessage(); - stream_.ProcessData(message_data_->data(), message_data_->length()); + stream_.ProcessRawData(message_data_->data(), message_data_->length()); } TEST_P(QuicCryptoServerStreamTest, BadMessageType) { @@ -246,7 +246,7 @@ TEST_P(QuicCryptoServerStreamTest, BadMessageType) { ConstructHandshakeMessage(); EXPECT_CALL(*connection_, SendConnectionClose( QUIC_INVALID_CRYPTO_MESSAGE_TYPE)); - stream_.ProcessData(message_data_->data(), message_data_->length()); + stream_.ProcessRawData(message_data_->data(), message_data_->length()); } TEST_P(QuicCryptoServerStreamTest, WithoutCertificates) { diff --git a/net/quic/quic_crypto_stream.cc b/net/quic/quic_crypto_stream.cc index e73e61d..d1ad84b 100644 --- a/net/quic/quic_crypto_stream.cc +++ b/net/quic/quic_crypto_stream.cc @@ -33,8 +33,8 @@ void QuicCryptoStream::OnHandshakeMessage( session()->OnCryptoHandshakeMessageReceived(message); } -uint32 QuicCryptoStream::ProcessData(const char* data, - uint32 data_len) { +uint32 QuicCryptoStream::ProcessRawData(const char* data, + uint32 data_len) { // Do not process handshake messages after the handshake is confirmed. if (handshake_confirmed()) { CloseConnection(QUIC_CRYPTO_MESSAGE_AFTER_HANDSHAKE_COMPLETE); @@ -47,6 +47,10 @@ uint32 QuicCryptoStream::ProcessData(const char* data, return data_len; } +QuicPriority QuicCryptoStream::EffectivePriority() const { + return 0; +} + void QuicCryptoStream::SendHandshakeMessage( const CryptoHandshakeMessage& message) { session()->OnCryptoHandshakeMessageSent(message); @@ -55,7 +59,7 @@ void QuicCryptoStream::SendHandshakeMessage( // any other frames in a single packet. session()->connection()->Flush(); // TODO(wtc): check the return value. - WriteData(string(data.data(), data.length()), false); + WriteOrBufferData(string(data.data(), data.length()), false); session()->connection()->Flush(); } diff --git a/net/quic/quic_crypto_stream.h b/net/quic/quic_crypto_stream.h index d5b3417..a082d50 100644 --- a/net/quic/quic_crypto_stream.h +++ b/net/quic/quic_crypto_stream.h @@ -38,7 +38,8 @@ class NET_EXPORT_PRIVATE QuicCryptoStream const CryptoHandshakeMessage& message) OVERRIDE; // ReliableQuicStream implementation - virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE; + virtual uint32 ProcessRawData(const char* data, uint32 data_len) OVERRIDE; + virtual QuicPriority EffectivePriority() const OVERRIDE; // Sends |message| to the peer. // TODO(wtc): return a success/failure status. diff --git a/net/quic/quic_crypto_stream_test.cc b/net/quic/quic_crypto_stream_test.cc index 0403eb3..ea59659 100644 --- a/net/quic/quic_crypto_stream_test.cc +++ b/net/quic/quic_crypto_stream_test.cc @@ -78,10 +78,10 @@ TEST_F(QuicCryptoStreamTest, NotInitiallyConected) { EXPECT_FALSE(stream_.handshake_confirmed()); } -TEST_F(QuicCryptoStreamTest, ProcessData) { +TEST_F(QuicCryptoStreamTest, ProcessRawData) { EXPECT_EQ(message_data_->length(), - stream_.ProcessData(message_data_->data(), - message_data_->length())); + stream_.ProcessRawData(message_data_->data(), + message_data_->length())); ASSERT_EQ(1u, stream_.messages()->size()); const CryptoHandshakeMessage& message = (*stream_.messages())[0]; EXPECT_EQ(kSHLO, message.tag()); @@ -100,7 +100,7 @@ TEST_F(QuicCryptoStreamTest, ProcessBadData) { EXPECT_CALL(*connection_, SendConnectionClose(QUIC_CRYPTO_TAGS_OUT_OF_ORDER)); - EXPECT_EQ(0u, stream_.ProcessData(bad.data(), bad.length())); + EXPECT_EQ(0u, stream_.ProcessRawData(bad.data(), bad.length())); } } // namespace diff --git a/net/quic/quic_data_stream.cc b/net/quic/quic_data_stream.cc new file mode 100644 index 0000000..c19f484 --- /dev/null +++ b/net/quic/quic_data_stream.cc @@ -0,0 +1,333 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_data_stream.h" + +#include "base/logging.h" +#include "net/quic/quic_session.h" +#include "net/quic/quic_spdy_decompressor.h" +#include "net/spdy/write_blocked_list.h" + +using base::StringPiece; +using std::min; + +namespace net { + +#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") + +namespace { + +// This is somewhat arbitrary. It's possible, but unlikely, we will either fail +// to set a priority client-side, or cancel a stream before stripping the +// priority from the wire server-side. In either case, start out with a +// priority in the middle. +QuicPriority kDefaultPriority = 3; + +// Appends bytes from data into partial_data_buffer. Once partial_data_buffer +// reaches 4 bytes, copies the data into 'result' and clears +// partial_data_buffer. +// Returns the number of bytes consumed. +uint32 StripUint32(const char* data, uint32 data_len, + string* partial_data_buffer, + uint32* result) { + DCHECK_GT(4u, partial_data_buffer->length()); + size_t missing_size = 4 - partial_data_buffer->length(); + if (data_len < missing_size) { + StringPiece(data, data_len).AppendToString(partial_data_buffer); + return data_len; + } + StringPiece(data, missing_size).AppendToString(partial_data_buffer); + DCHECK_EQ(4u, partial_data_buffer->length()); + memcpy(result, partial_data_buffer->data(), 4); + partial_data_buffer->clear(); + return missing_size; +} + +} // namespace + +QuicDataStream::QuicDataStream(QuicStreamId id, + QuicSession* session) + : ReliableQuicStream(id, session), + visitor_(NULL), + headers_decompressed_(false), + priority_(kDefaultPriority), + headers_id_(0), + decompression_failed_(false), + priority_parsed_(false) { + DCHECK_NE(kCryptoStreamId, id); +} + +QuicDataStream::~QuicDataStream() { +} + +size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { + if (FinishedReadingHeaders()) { + // If the headers have been read, simply delegate to the sequencer's + // Readv method. + return sequencer()->Readv(iov, iov_len); + } + // Otherwise, copy decompressed header data into |iov|. + size_t bytes_consumed = 0; + size_t iov_index = 0; + while (iov_index < iov_len && + decompressed_headers_.length() > bytes_consumed) { + size_t bytes_to_read = min(iov[iov_index].iov_len, + decompressed_headers_.length() - bytes_consumed); + char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); + memcpy(iov_ptr, + decompressed_headers_.data() + bytes_consumed, bytes_to_read); + bytes_consumed += bytes_to_read; + ++iov_index; + } + decompressed_headers_.erase(0, bytes_consumed); + return bytes_consumed; +} + +int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { + if (FinishedReadingHeaders()) { + return sequencer()->GetReadableRegions(iov, iov_len); + } + if (iov_len == 0) { + return 0; + } + iov[0].iov_base = static_cast<void*>( + const_cast<char*>(decompressed_headers_.data())); + iov[0].iov_len = decompressed_headers_.length(); + return 1; +} + +bool QuicDataStream::IsDoneReading() const { + if (!headers_decompressed_ || !decompressed_headers_.empty()) { + return false; + } + return sequencer()->IsClosed(); +} + +bool QuicDataStream::HasBytesToRead() const { + return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); +} + +void QuicDataStream::set_priority(QuicPriority priority) { + DCHECK_EQ(0u, stream_bytes_written()); + priority_ = priority; +} + +QuicPriority QuicDataStream::EffectivePriority() const { + return priority(); +} + +uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { + DCHECK_NE(0u, data_len); + + uint32 total_bytes_consumed = 0; + if (headers_id_ == 0u) { + total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); + data += total_bytes_consumed; + data_len -= total_bytes_consumed; + if (data_len == 0 || total_bytes_consumed == 0) { + return total_bytes_consumed; + } + } + DCHECK_NE(0u, headers_id_); + + // Once the headers are finished, we simply pass the data through. + if (headers_decompressed_) { + // Some buffered header data remains. + if (!decompressed_headers_.empty()) { + ProcessHeaderData(); + } + if (decompressed_headers_.empty()) { + DVLOG(1) << "Delegating procesing to ProcessData"; + total_bytes_consumed += ProcessData(data, data_len); + } + return total_bytes_consumed; + } + + QuicHeaderId current_header_id = + session()->decompressor()->current_header_id(); + // Ensure that this header id looks sane. + if (headers_id_ < current_header_id || + headers_id_ > kMaxHeaderIdDelta + current_header_id) { + DVLOG(1) << ENDPOINT + << "Invalid headers for stream: " << id() + << " header_id: " << headers_id_ + << " current_header_id: " << current_header_id; + session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); + return total_bytes_consumed; + } + + // If we are head-of-line blocked on decompression, then back up. + if (current_header_id != headers_id_) { + session()->MarkDecompressionBlocked(headers_id_, id()); + DVLOG(1) << ENDPOINT + << "Unable to decompress header data for stream: " << id() + << " header_id: " << headers_id_; + return total_bytes_consumed; + } + + // Decompressed data will be delivered to decompressed_headers_. + size_t bytes_consumed = session()->decompressor()->DecompressData( + StringPiece(data, data_len), this); + DCHECK_NE(0u, bytes_consumed); + if (bytes_consumed > data_len) { + DCHECK(false) << "DecompressData returned illegal value"; + OnDecompressionError(); + return total_bytes_consumed; + } + total_bytes_consumed += bytes_consumed; + data += bytes_consumed; + data_len -= bytes_consumed; + + if (decompression_failed_) { + // The session will have been closed in OnDecompressionError. + return total_bytes_consumed; + } + + // Headers are complete if the decompressor has moved on to the + // next stream. + headers_decompressed_ = + session()->decompressor()->current_header_id() != headers_id_; + if (!headers_decompressed_) { + DCHECK_EQ(0u, data_len); + } + + ProcessHeaderData(); + + if (!headers_decompressed_ || !decompressed_headers_.empty()) { + return total_bytes_consumed; + } + + // We have processed all of the decompressed data but we might + // have some more raw data to process. + if (data_len > 0) { + total_bytes_consumed += ProcessData(data, data_len); + } + + // The sequencer will push any additional buffered frames if this data + // has been completely consumed. + return total_bytes_consumed; +} + +const IPEndPoint& QuicDataStream::GetPeerAddress() { + return session()->peer_address(); +} + +QuicSpdyCompressor* QuicDataStream::compressor() { + return session()->compressor(); +} + +bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { + return session()->GetSSLInfo(ssl_info); +} + +uint32 QuicDataStream::ProcessHeaderData() { + if (decompressed_headers_.empty()) { + return 0; + } + + size_t bytes_processed = ProcessData(decompressed_headers_.data(), + decompressed_headers_.length()); + if (bytes_processed == decompressed_headers_.length()) { + decompressed_headers_.clear(); + } else { + decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); + } + return bytes_processed; +} + +void QuicDataStream::OnDecompressorAvailable() { + DCHECK_EQ(headers_id_, + session()->decompressor()->current_header_id()); + DCHECK(!headers_decompressed_); + DCHECK(!decompression_failed_); + DCHECK_EQ(0u, decompressed_headers_.length()); + + while (!headers_decompressed_) { + struct iovec iovec; + if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { + return; + } + + size_t bytes_consumed = session()->decompressor()->DecompressData( + StringPiece(static_cast<char*>(iovec.iov_base), + iovec.iov_len), + this); + DCHECK_LE(bytes_consumed, iovec.iov_len); + if (decompression_failed_) { + return; + } + sequencer()->MarkConsumed(bytes_consumed); + + headers_decompressed_ = + session()->decompressor()->current_header_id() != headers_id_; + } + + // Either the headers are complete, or the all data as been consumed. + ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. + if (IsDoneReading()) { + OnFinRead(); + } else if (FinishedReadingHeaders()) { + sequencer()->FlushBufferedFrames(); + } +} + +bool QuicDataStream::OnDecompressedData(StringPiece data) { + data.AppendToString(&decompressed_headers_); + return true; +} + +void QuicDataStream::OnDecompressionError() { + DCHECK(!decompression_failed_); + decompression_failed_ = true; + session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); +} + +void QuicDataStream::OnClose() { + ReliableQuicStream::OnClose(); + + if (visitor_) { + Visitor* visitor = visitor_; + // Calling Visitor::OnClose() may result the destruction of the visitor, + // so we need to ensure we don't call it again. + visitor_ = NULL; + visitor->OnClose(this); + } +} + +uint32 QuicDataStream::StripPriorityAndHeaderId( + const char* data, uint32 data_len) { + uint32 total_bytes_parsed = 0; + + if (!priority_parsed_ && session()->connection()->is_server()) { + QuicPriority temporary_priority = priority_; + total_bytes_parsed = StripUint32( + data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); + if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { + priority_parsed_ = true; + + // Spdy priorities are inverted, so the highest numerical value is the + // lowest legal priority. + if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { + session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); + return 0; + } + priority_ = temporary_priority; + } + data += total_bytes_parsed; + data_len -= total_bytes_parsed; + } + if (data_len > 0 && headers_id_ == 0u) { + // The headers ID has not yet been read. Strip it from the beginning of + // the data stream. + total_bytes_parsed += StripUint32( + data, data_len, &headers_id_and_priority_buffer_, &headers_id_); + } + return total_bytes_parsed; +} + +bool QuicDataStream::FinishedReadingHeaders() { + return headers_decompressed_ && decompressed_headers_.empty(); +} + +} // namespace net diff --git a/net/quic/quic_data_stream.h b/net/quic/quic_data_stream.h new file mode 100644 index 0000000..84990439 --- /dev/null +++ b/net/quic/quic_data_stream.h @@ -0,0 +1,140 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The base class for streams which deliver data to/from an application. +// In each direction, the data on such a stream first contains compressed +// headers then body data. + +#ifndef NET_QUIC_QUIC_DATA_STREAM_H_ +#define NET_QUIC_QUIC_DATA_STREAM_H_ + +#include <sys/types.h> + +#include <list> + +#include "base/strings/string_piece.h" +#include "net/base/iovec.h" +#include "net/base/net_export.h" +#include "net/quic/quic_ack_notifier.h" +#include "net/quic/quic_spdy_compressor.h" +#include "net/quic/quic_spdy_decompressor.h" +#include "net/quic/quic_stream_sequencer.h" +#include "net/quic/reliable_quic_stream.h" + +namespace net { + +namespace test { +class QuicDataStreamPeer; +class ReliableQuicStreamPeer; +} // namespace test + +class IPEndPoint; +class QuicSession; +class SSLInfo; + +// All this does right now is send data to subclasses via the sequencer. +class NET_EXPORT_PRIVATE QuicDataStream : public ReliableQuicStream, + public QuicSpdyDecompressor::Visitor { + public: + // Visitor receives callbacks from the stream. + class Visitor { + public: + Visitor() {} + + // Called when the stream is closed. + virtual void OnClose(QuicDataStream* stream) = 0; + + protected: + virtual ~Visitor() {} + + private: + DISALLOW_COPY_AND_ASSIGN(Visitor); + }; + + QuicDataStream(QuicStreamId id, QuicSession* session); + + virtual ~QuicDataStream(); + + // ReliableQuicStream implementation + virtual void OnClose() OVERRIDE; + // By default, this is the same as priority(), however it allows streams + // to temporarily alter effective priority. For example if a SPDY stream has + // compressed but not written headers it can write the headers with a higher + // priority. + virtual QuicPriority EffectivePriority() const OVERRIDE; + virtual uint32 ProcessRawData(const char* data, uint32 data_len) OVERRIDE; + + // QuicSpdyDecompressor::Visitor implementation. + virtual bool OnDecompressedData(base::StringPiece data) OVERRIDE; + virtual void OnDecompressionError() OVERRIDE; + + virtual uint32 ProcessData(const char* data, uint32 data_len) = 0; + + // This block of functions wraps the sequencer's functions of the same + // name. These methods return uncompressed data until that has + // been fully processed. Then they simply delegate to the sequencer. + virtual size_t Readv(const struct iovec* iov, size_t iov_len); + virtual int GetReadableRegions(iovec* iov, size_t iov_len); + // Returns true when all data has been read from the peer, including the fin. + virtual bool IsDoneReading() const; + virtual bool HasBytesToRead() const; + + // Called by the session when a decompression blocked stream + // becomes unblocked. + virtual void OnDecompressorAvailable(); + + void set_visitor(Visitor* visitor) { visitor_ = visitor; } + + bool headers_decompressed() const { return headers_decompressed_; } + + const IPEndPoint& GetPeerAddress(); + + QuicSpdyCompressor* compressor(); + + // Gets the SSL connection information. + bool GetSSLInfo(SSLInfo* ssl_info); + + protected: + // Sets priority_ to priority. This should only be called before bytes are + // written to the server. + void set_priority(QuicPriority priority); + // This is protected because external classes should use EffectivePriority + // instead. + QuicPriority priority() const { return priority_; } + + private: + friend class test::QuicDataStreamPeer; + friend class test::ReliableQuicStreamPeer; + friend class QuicStreamUtils; + + uint32 ProcessHeaderData(); + + uint32 StripPriorityAndHeaderId(const char* data, uint32 data_len); + + bool FinishedReadingHeaders(); + + Visitor* visitor_; + // True if the headers have been completely decompresssed. + bool headers_decompressed_; + // The priority of the stream, once parsed. + QuicPriority priority_; + // ID of the header block sent by the peer, once parsed. + QuicHeaderId headers_id_; + // Buffer into which we write bytes from priority_ and headers_id_ + // until each is fully parsed. + string headers_id_and_priority_buffer_; + // Contains a copy of the decompressed headers until they are consumed + // via ProcessData or Readv. + string decompressed_headers_; + // True if an error was encountered during decompression. + bool decompression_failed_; + // True if the priority has been read, false otherwise. + bool priority_parsed_; + + DISALLOW_COPY_AND_ASSIGN(QuicDataStream); +}; + +} // namespace net + +#endif // NET_QUIC_QUIC_DATA_STREAM_H_ diff --git a/net/quic/quic_data_stream_test.cc b/net/quic/quic_data_stream_test.cc new file mode 100644 index 0000000..4dd90bb --- /dev/null +++ b/net/quic/quic_data_stream_test.cc @@ -0,0 +1,455 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_data_stream.h" + +#include "net/quic/quic_ack_notifier.h" +#include "net/quic/quic_connection.h" +#include "net/quic/quic_spdy_compressor.h" +#include "net/quic/quic_spdy_decompressor.h" +#include "net/quic/quic_utils.h" +#include "net/quic/spdy_utils.h" +#include "net/quic/test_tools/quic_session_peer.h" +#include "net/quic/test_tools/quic_test_utils.h" +#include "testing/gmock/include/gmock/gmock.h" + +using base::StringPiece; +using std::min; +using testing::_; +using testing::InSequence; +using testing::Return; +using testing::SaveArg; +using testing::StrEq; +using testing::StrictMock; + +namespace net { +namespace test { +namespace { + +const QuicGuid kGuid = 42; +const QuicGuid kStreamId = 3; +const bool kIsServer = true; +const bool kShouldProcessData = true; + +class TestStream : public QuicDataStream { + public: + TestStream(QuicStreamId id, + QuicSession* session, + bool should_process_data) + : QuicDataStream(id, session), + should_process_data_(should_process_data) {} + + virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE { + EXPECT_NE(0u, data_len); + DVLOG(1) << "ProcessData data_len: " << data_len; + data_ += string(data, data_len); + return should_process_data_ ? data_len : 0; + } + + using ReliableQuicStream::WriteOrBufferData; + using ReliableQuicStream::CloseReadSide; + using ReliableQuicStream::CloseWriteSide; + + const string& data() const { return data_; } + + private: + bool should_process_data_; + string data_; +}; + +class QuicDataStreamTest : public ::testing::TestWithParam<bool> { + public: + QuicDataStreamTest() { + headers_[":host"] = "www.google.com"; + headers_[":path"] = "/index.hml"; + headers_[":scheme"] = "https"; + headers_["cookie"] = + "__utma=208381060.1228362404.1372200928.1372200928.1372200928.1; " + "__utmc=160408618; " + "GX=DQAAAOEAAACWJYdewdE9rIrW6qw3PtVi2-d729qaa-74KqOsM1NVQblK4VhX" + "hoALMsy6HOdDad2Sz0flUByv7etmo3mLMidGrBoljqO9hSVA40SLqpG_iuKKSHX" + "RW3Np4bq0F0SDGDNsW0DSmTS9ufMRrlpARJDS7qAI6M3bghqJp4eABKZiRqebHT" + "pMU-RXvTI5D5oCF1vYxYofH_l1Kviuiy3oQ1kS1enqWgbhJ2t61_SNdv-1XJIS0" + "O3YeHLmVCs62O6zp89QwakfAWK9d3IDQvVSJzCQsvxvNIvaZFa567MawWlXg0Rh" + "1zFMi5vzcns38-8_Sns; " + "GA=v*2%2Fmem*57968640*47239936%2Fmem*57968640*47114716%2Fno-nm-" + "yj*15%2Fno-cc-yj*5%2Fpc-ch*133685%2Fpc-s-cr*133947%2Fpc-s-t*1339" + "47%2Fno-nm-yj*4%2Fno-cc-yj*1%2Fceft-as*1%2Fceft-nqas*0%2Fad-ra-c" + "v_p%2Fad-nr-cv_p-f*1%2Fad-v-cv_p*859%2Fad-ns-cv_p-f*1%2Ffn-v-ad%" + "2Fpc-t*250%2Fpc-cm*461%2Fpc-s-cr*722%2Fpc-s-t*722%2Fau_p*4" + "SICAID=AJKiYcHdKgxum7KMXG0ei2t1-W4OD1uW-ecNsCqC0wDuAXiDGIcT_HA2o1" + "3Rs1UKCuBAF9g8rWNOFbxt8PSNSHFuIhOo2t6bJAVpCsMU5Laa6lewuTMYI8MzdQP" + "ARHKyW-koxuhMZHUnGBJAM1gJODe0cATO_KGoX4pbbFxxJ5IicRxOrWK_5rU3cdy6" + "edlR9FsEdH6iujMcHkbE5l18ehJDwTWmBKBzVD87naobhMMrF6VvnDGxQVGp9Ir_b" + "Rgj3RWUoPumQVCxtSOBdX0GlJOEcDTNCzQIm9BSfetog_eP_TfYubKudt5eMsXmN6" + "QnyXHeGeK2UINUzJ-D30AFcpqYgH9_1BvYSpi7fc7_ydBU8TaD8ZRxvtnzXqj0RfG" + "tuHghmv3aD-uzSYJ75XDdzKdizZ86IG6Fbn1XFhYZM-fbHhm3mVEXnyRW4ZuNOLFk" + "Fas6LMcVC6Q8QLlHYbXBpdNFuGbuZGUnav5C-2I_-46lL0NGg3GewxGKGHvHEfoyn" + "EFFlEYHsBQ98rXImL8ySDycdLEFvBPdtctPmWCfTxwmoSMLHU2SCVDhbqMWU5b0yr" + "JBCScs_ejbKaqBDoB7ZGxTvqlrB__2ZmnHHjCr8RgMRtKNtIeuZAo "; + } + + void Initialize(bool stream_should_process_data) { + connection_ = new testing::StrictMock<MockConnection>( + kGuid, IPEndPoint(), kIsServer); + session_.reset(new testing::StrictMock<MockSession>( + connection_, kIsServer)); + stream_.reset(new TestStream(kStreamId, session_.get(), + stream_should_process_data)); + stream2_.reset(new TestStream(kStreamId + 2, session_.get(), + stream_should_process_data)); + compressor_.reset(new QuicSpdyCompressor()); + decompressor_.reset(new QuicSpdyDecompressor); + write_blocked_list_ = + QuicSessionPeer::GetWriteblockedStreams(session_.get()); + } + + protected: + MockConnection* connection_; + scoped_ptr<MockSession> session_; + scoped_ptr<TestStream> stream_; + scoped_ptr<TestStream> stream2_; + scoped_ptr<QuicSpdyCompressor> compressor_; + scoped_ptr<QuicSpdyDecompressor> decompressor_; + SpdyHeaderBlock headers_; + WriteBlockedList<QuicStreamId>* write_blocked_list_; +}; + +TEST_F(QuicDataStreamTest, ProcessHeaders) { + Initialize(kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); + + stream_->OnStreamFrame(frame); + EXPECT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_), stream_->data()); + EXPECT_EQ(static_cast<QuicPriority>(kHighestPriority), + stream_->EffectivePriority()); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersWithInvalidHeaderId) { + Initialize(kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + compressed_headers[4] = '\xFF'; // Illegal header id. + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); + + EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_HEADER_ID)); + stream_->OnStreamFrame(frame); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersWithInvalidPriority) { + Initialize(kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + compressed_headers[0] = '\xFF'; // Illegal priority. + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); + + EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_PRIORITY)); + stream_->OnStreamFrame(frame); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersAndBody) { + Initialize(kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + string body = "this is the body"; + string data = compressed_headers + body; + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); + + stream_->OnStreamFrame(frame); + EXPECT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, + stream_->data()); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersAndBodyFragments) { + Initialize(kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kLowestPriority, headers_); + string body = "this is the body"; + string data = compressed_headers + body; + + for (size_t fragment_size = 1; fragment_size < data.size(); ++fragment_size) { + Initialize(kShouldProcessData); + for (size_t offset = 0; offset < data.size(); offset += fragment_size) { + size_t remaining_data = data.length() - offset; + StringPiece fragment(data.data() + offset, + min(fragment_size, remaining_data)); + QuicStreamFrame frame(kStreamId, false, offset, MakeIOVector(fragment)); + + stream_->OnStreamFrame(frame); + } + ASSERT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, + stream_->data()) << "fragment_size: " << fragment_size; + } + + for (size_t split_point = 1; split_point < data.size() - 1; ++split_point) { + Initialize(kShouldProcessData); + + StringPiece fragment1(data.data(), split_point); + QuicStreamFrame frame1(kStreamId, false, 0, MakeIOVector(fragment1)); + stream_->OnStreamFrame(frame1); + + StringPiece fragment2(data.data() + split_point, data.size() - split_point); + QuicStreamFrame frame2( + kStreamId, false, split_point, MakeIOVector(fragment2)); + stream_->OnStreamFrame(frame2); + + ASSERT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, + stream_->data()) << "split_point: " << split_point; + } + EXPECT_EQ(static_cast<QuicPriority>(kLowestPriority), + stream_->EffectivePriority()); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersAndBodyReadv) { + Initialize(!kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + string body = "this is the body"; + string data = compressed_headers + body; + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); + string uncompressed_headers = + SpdyUtils::SerializeUncompressedHeaders(headers_); + string uncompressed_data = uncompressed_headers + body; + + stream_->OnStreamFrame(frame); + EXPECT_EQ(uncompressed_headers, stream_->data()); + + char buffer[2048]; + ASSERT_LT(data.length(), arraysize(buffer)); + struct iovec vec; + vec.iov_base = buffer; + vec.iov_len = arraysize(buffer); + + size_t bytes_read = stream_->Readv(&vec, 1); + EXPECT_EQ(uncompressed_headers.length(), bytes_read); + EXPECT_EQ(uncompressed_headers, string(buffer, bytes_read)); + + bytes_read = stream_->Readv(&vec, 1); + EXPECT_EQ(body.length(), bytes_read); + EXPECT_EQ(body, string(buffer, bytes_read)); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersAndBodyIncrementalReadv) { + Initialize(!kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + string body = "this is the body"; + string data = compressed_headers + body; + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); + string uncompressed_headers = + SpdyUtils::SerializeUncompressedHeaders(headers_); + string uncompressed_data = uncompressed_headers + body; + + stream_->OnStreamFrame(frame); + EXPECT_EQ(uncompressed_headers, stream_->data()); + + char buffer[1]; + struct iovec vec; + vec.iov_base = buffer; + vec.iov_len = arraysize(buffer); + for (size_t i = 0; i < uncompressed_data.length(); ++i) { + size_t bytes_read = stream_->Readv(&vec, 1); + ASSERT_EQ(1u, bytes_read); + EXPECT_EQ(uncompressed_data.data()[i], buffer[0]); + } +} + +TEST_F(QuicDataStreamTest, ProcessHeadersUsingReadvWithMultipleIovecs) { + Initialize(!kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + string body = "this is the body"; + string data = compressed_headers + body; + QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); + string uncompressed_headers = + SpdyUtils::SerializeUncompressedHeaders(headers_); + string uncompressed_data = uncompressed_headers + body; + + stream_->OnStreamFrame(frame); + EXPECT_EQ(uncompressed_headers, stream_->data()); + + char buffer1[1]; + char buffer2[1]; + struct iovec vec[2]; + vec[0].iov_base = buffer1; + vec[0].iov_len = arraysize(buffer1); + vec[1].iov_base = buffer2; + vec[1].iov_len = arraysize(buffer2); + for (size_t i = 0; i < uncompressed_data.length(); i += 2) { + size_t bytes_read = stream_->Readv(vec, 2); + ASSERT_EQ(2u, bytes_read) << i; + ASSERT_EQ(uncompressed_data.data()[i], buffer1[0]) << i; + ASSERT_EQ(uncompressed_data.data()[i + 1], buffer2[0]) << i; + } +} + +TEST_F(QuicDataStreamTest, ProcessCorruptHeadersEarly) { + Initialize(kShouldProcessData); + + string compressed_headers1 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame1( + stream_->id(), false, 0, MakeIOVector(compressed_headers1)); + string decompressed_headers1 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + headers_["content-type"] = "text/plain"; + string compressed_headers2 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + // Corrupt the compressed data. + compressed_headers2[compressed_headers2.length() - 1] ^= 0xA1; + QuicStreamFrame frame2( + stream2_->id(), false, 0, MakeIOVector(compressed_headers2)); + string decompressed_headers2 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + // Deliver frame2 to stream2 out of order. The decompressor is not + // available yet, so no data will be processed. The compressed data + // will be buffered until OnDecompressorAvailable() is called + // to process it. + stream2_->OnStreamFrame(frame2); + EXPECT_EQ("", stream2_->data()); + + // Now deliver frame1 to stream1. The decompressor is available so + // the data will be processed, and the decompressor will become + // available for stream2. + stream_->OnStreamFrame(frame1); + EXPECT_EQ(decompressed_headers1, stream_->data()); + + // Verify that the decompressor is available, and inform stream2 + // that it can now decompress the buffered compressed data. Since + // the compressed data is corrupt, the stream will shutdown the session. + EXPECT_EQ(2u, session_->decompressor()->current_header_id()); + EXPECT_CALL(*connection_, SendConnectionClose(QUIC_DECOMPRESSION_FAILURE)); + stream2_->OnDecompressorAvailable(); + EXPECT_EQ("", stream2_->data()); +} + +TEST_F(QuicDataStreamTest, ProcessPartialHeadersEarly) { + Initialize(kShouldProcessData); + + string compressed_headers1 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame1( + stream_->id(), false, 0, MakeIOVector(compressed_headers1)); + string decompressed_headers1 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + headers_["content-type"] = "text/plain"; + string compressed_headers2 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + string partial_compressed_headers = + compressed_headers2.substr(0, compressed_headers2.length() / 2); + QuicStreamFrame frame2( + stream2_->id(), false, 0, MakeIOVector(partial_compressed_headers)); + string decompressed_headers2 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + // Deliver frame2 to stream2 out of order. The decompressor is not + // available yet, so no data will be processed. The compressed data + // will be buffered until OnDecompressorAvailable() is called + // to process it. + stream2_->OnStreamFrame(frame2); + EXPECT_EQ("", stream2_->data()); + + // Now deliver frame1 to stream1. The decompressor is available so + // the data will be processed, and the decompressor will become + // available for stream2. + stream_->OnStreamFrame(frame1); + EXPECT_EQ(decompressed_headers1, stream_->data()); + + // Verify that the decompressor is available, and inform stream2 + // that it can now decompress the buffered compressed data. Since + // the compressed data is incomplete it will not be passed to + // the stream. + EXPECT_EQ(2u, session_->decompressor()->current_header_id()); + stream2_->OnDecompressorAvailable(); + EXPECT_EQ("", stream2_->data()); + + // Now send remaining data and verify that we have now received the + // compressed headers. + string remaining_compressed_headers = + compressed_headers2.substr(partial_compressed_headers.length()); + + QuicStreamFrame frame3(stream2_->id(), false, + partial_compressed_headers.length(), + MakeIOVector(remaining_compressed_headers)); + stream2_->OnStreamFrame(frame3); + EXPECT_EQ(decompressed_headers2, stream2_->data()); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersEarly) { + Initialize(kShouldProcessData); + + string compressed_headers1 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame1( + stream_->id(), false, 0, MakeIOVector(compressed_headers1)); + string decompressed_headers1 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + headers_["content-type"] = "text/plain"; + string compressed_headers2 = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame2( + stream2_->id(), false, 0, MakeIOVector(compressed_headers2)); + string decompressed_headers2 = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + // Deliver frame2 to stream2 out of order. The decompressor is not + // available yet, so no data will be processed. The compressed data + // will be buffered until OnDecompressorAvailable() is called + // to process it. + stream2_->OnStreamFrame(frame2); + EXPECT_EQ("", stream2_->data()); + + // Now deliver frame1 to stream1. The decompressor is available so + // the data will be processed, and the decompressor will become + // available for stream2. + stream_->OnStreamFrame(frame1); + EXPECT_EQ(decompressed_headers1, stream_->data()); + + // Verify that the decompressor is available, and inform stream2 + // that it can now decompress the buffered compressed data. + EXPECT_EQ(2u, session_->decompressor()->current_header_id()); + stream2_->OnDecompressorAvailable(); + EXPECT_EQ(decompressed_headers2, stream2_->data()); +} + +TEST_F(QuicDataStreamTest, ProcessHeadersDelay) { + Initialize(!kShouldProcessData); + + string compressed_headers = compressor_->CompressHeadersWithPriority( + kHighestPriority, headers_); + QuicStreamFrame frame1( + stream_->id(), false, 0, MakeIOVector(compressed_headers)); + string decompressed_headers = + SpdyUtils::SerializeUncompressedHeaders(headers_); + + // Send the headers to the stream and verify they were decompressed. + stream_->OnStreamFrame(frame1); + EXPECT_EQ(2u, session_->decompressor()->current_header_id()); + + // Verify that we are now able to handle the body data, + // even though the stream has not processed the headers. + EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_HEADER_ID)) + .Times(0); + QuicStreamFrame frame2(stream_->id(), false, compressed_headers.length(), + MakeIOVector("body data")); + stream_->OnStreamFrame(frame2); +} + +} // namespace +} // namespace test +} // namespace net diff --git a/net/quic/quic_framer.cc b/net/quic/quic_framer.cc index 92b4895..cc02882 100644 --- a/net/quic/quic_framer.cc +++ b/net/quic/quic_framer.cc @@ -261,12 +261,7 @@ QuicFramer::AckFrameInfo::~AckFrameInfo() { } QuicPacketEntropyHash QuicFramer::GetPacketEntropyHash( const QuicPacketHeader& header) const { - if (!header.entropy_flag) { - // TODO(satyamshekhar): Return some more better value here (something that - // is not a constant). - return 0; - } - return 1 << (header.packet_sequence_number % 8); + return header.entropy_flag << (header.packet_sequence_number % 8); } // Test only. @@ -1823,9 +1818,9 @@ bool QuicFramer::AppendAckFramePayloadAndTypeByte( max_num_ranges = min(static_cast<size_t>(numeric_limits<uint8>::max()), max_num_ranges); bool truncated = ack_info.nack_ranges.size() > max_num_ranges; - DLOG_IF(INFO, truncated) << "Truncating ack from " - << ack_info.nack_ranges.size() << " ranges to " - << max_num_ranges; + DVLOG_IF(1, truncated) << "Truncating ack from " + << ack_info.nack_ranges.size() << " ranges to " + << max_num_ranges; // Write out the type byte by setting the low order bits and doing shifts // to make room for the next bit flags to be set. diff --git a/net/quic/quic_http_stream_test.cc b/net/quic/quic_http_stream_test.cc index 3e58dd0..97ae972 100644 --- a/net/quic/quic_http_stream_test.cc +++ b/net/quic/quic_http_stream_test.cc @@ -136,7 +136,8 @@ class QuicHttpStreamTest : public ::testing::TestWithParam<bool> { read_buffer_(new IOBufferWithSize(4096)), guid_(2), framer_(QuicSupportedVersions(), QuicTime::Zero(), false), - creator_(guid_, &framer_, &random_, false) { + random_generator_(0), + creator_(guid_, &framer_, &random_generator_, false) { IPAddressNumber ip; CHECK(ParseIPLiteralToNumber("192.0.2.33", &ip)); peer_addr_ = IPEndPoint(ip, 443); @@ -305,7 +306,6 @@ class QuicHttpStreamTest : public ::testing::TestWithParam<bool> { scoped_refptr<TestTaskRunner> runner_; scoped_ptr<MockWrite[]> mock_writes_; MockClock clock_; - MockRandom random_generator_; TestQuicConnection* connection_; scoped_ptr<QuicConnectionHelper> helper_; testing::StrictMock<MockConnectionVisitor> visitor_; @@ -348,7 +348,7 @@ class QuicHttpStreamTest : public ::testing::TestWithParam<bool> { QuicFramer framer_; IPEndPoint self_addr_; IPEndPoint peer_addr_; - MockRandom random_; + MockRandom random_generator_; MockCryptoClientStreamFactory crypto_client_stream_factory_; QuicPacketCreator creator_; QuicPacketHeader header_; diff --git a/net/quic/quic_network_transaction_unittest.cc b/net/quic/quic_network_transaction_unittest.cc index 3018523..2e4610c 100644 --- a/net/quic/quic_network_transaction_unittest.cc +++ b/net/quic/quic_network_transaction_unittest.cc @@ -64,6 +64,7 @@ class QuicNetworkTransactionTest : public PlatformTest { compressor_(new QuicSpdyCompressor()), auth_handler_factory_( HttpAuthHandlerFactory::CreateDefault(&host_resolver_)), + random_generator_(0), hanging_data_(NULL, 0, NULL, 0) { request_.method = "GET"; request_.url = GURL("http://www.google.com/"); @@ -90,7 +91,7 @@ class QuicNetworkTransactionTest : public PlatformTest { QuicPacketSequenceNumber num, QuicStreamId stream_id) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = false; header.public_header.sequence_number_length = PACKET_1BYTE_SEQUENCE_NUMBER; @@ -107,7 +108,7 @@ class QuicNetworkTransactionTest : public PlatformTest { scoped_ptr<QuicEncryptedPacket> ConstructConnectionClosePacket( QuicPacketSequenceNumber num) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = false; header.public_header.sequence_number_length = PACKET_1BYTE_SEQUENCE_NUMBER; @@ -127,7 +128,7 @@ class QuicNetworkTransactionTest : public PlatformTest { QuicPacketSequenceNumber largest_received, QuicPacketSequenceNumber least_unacked) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = false; header.public_header.sequence_number_length = PACKET_1BYTE_SEQUENCE_NUMBER; diff --git a/net/quic/quic_packet_creator.cc b/net/quic/quic_packet_creator.cc index 901ae2a..5b0e1c6 100644 --- a/net/quic/quic_packet_creator.cc +++ b/net/quic/quic_packet_creator.cc @@ -23,13 +23,48 @@ bool FLAGS_pad_quic_handshake_packets = true; namespace net { +// A QuicRandom wrapper that gets a bucket of entropy and distributes it +// bit-by-bit. Replenishes the bucket as needed. Not thread-safe. Expose this +// class if single bit randomness is needed elsewhere. +class QuicRandomBoolSource { + public: + // random: Source of entropy. Not owned. + explicit QuicRandomBoolSource(QuicRandom* random) + : random_(random), + bit_bucket_(0), + bit_mask_(0) {} + + ~QuicRandomBoolSource() {} + + // Returns the next random bit from the bucket. + bool RandBool() { + if (bit_mask_ == 0) { + bit_bucket_ = random_->RandUint64(); + bit_mask_ = 1; + } + bool result = ((bit_bucket_ & bit_mask_) != 0); + bit_mask_ <<= 1; + return result; + } + + private: + // Source of entropy. + QuicRandom* random_; + // Stored random bits. + uint64 bit_bucket_; + // The next available bit has "1" in the mask. Zero means empty bucket. + uint64 bit_mask_; + + DISALLOW_COPY_AND_ASSIGN(QuicRandomBoolSource); +}; + QuicPacketCreator::QuicPacketCreator(QuicGuid guid, QuicFramer* framer, QuicRandom* random_generator, bool is_server) : guid_(guid), framer_(framer), - random_generator_(random_generator), + random_bool_source_(new QuicRandomBoolSource(random_generator)), sequence_number_(0), fec_group_number_(0), is_server_(is_server), @@ -372,17 +407,12 @@ void QuicPacketCreator::FillPacketHeader(QuicFecGroupNumber fec_group, header->public_header.sequence_number_length = sequence_number_length_; bool entropy_flag; - if (header->packet_sequence_number == 1) { - DCHECK(!fec_flag); - // TODO(satyamshekhar): No entropy in the first message. - // For crypto tests to pass. Fix this by using deterministic QuicRandom. - entropy_flag = 0; - } else if (fec_flag) { + if (fec_flag) { // FEC packets don't have an entropy of their own. Entropy flag for FEC // packets is the XOR of entropy of previous packets. entropy_flag = fec_entropy_flag; } else { - entropy_flag = random_generator_->RandBool(); + entropy_flag = random_bool_source_->RandBool(); } header->entropy_flag = entropy_flag; header->is_in_fec_group = fec_group == 0 ? NOT_IN_FEC_GROUP : IN_FEC_GROUP; diff --git a/net/quic/quic_packet_creator.h b/net/quic/quic_packet_creator.h index 25e4e41..1416836 100644 --- a/net/quic/quic_packet_creator.h +++ b/net/quic/quic_packet_creator.h @@ -27,6 +27,7 @@ class QuicPacketCreatorPeer; class QuicAckNotifier; class QuicRandom; +class QuicRandomBoolSource; class NET_EXPORT_PRIVATE QuicPacketCreator : public QuicFecBuilderInterface { public: @@ -202,7 +203,7 @@ class NET_EXPORT_PRIVATE QuicPacketCreator : public QuicFecBuilderInterface { Options options_; QuicGuid guid_; QuicFramer* framer_; - QuicRandom* random_generator_; + scoped_ptr<QuicRandomBoolSource> random_bool_source_; QuicPacketSequenceNumber sequence_number_; QuicFecGroupNumber fec_group_number_; scoped_ptr<QuicFecGroup> fec_group_; diff --git a/net/quic/quic_packet_creator_test.cc b/net/quic/quic_packet_creator_test.cc index e8dd174..7b38819 100644 --- a/net/quic/quic_packet_creator_test.cc +++ b/net/quic/quic_packet_creator_test.cc @@ -8,8 +8,8 @@ #include "net/quic/crypto/null_encrypter.h" #include "net/quic/crypto/quic_decrypter.h" #include "net/quic/crypto/quic_encrypter.h" -#include "net/quic/crypto/quic_random.h" #include "net/quic/quic_utils.h" +#include "net/quic/test_tools/mock_random.h" #include "net/quic/test_tools/quic_packet_creator_peer.h" #include "net/quic/test_tools/quic_test_utils.h" #include "testing/gmock/include/gmock/gmock.h" @@ -35,7 +35,7 @@ class QuicPacketCreatorTest : public ::testing::TestWithParam<bool> { sequence_number_(0), guid_(2), data_("foo"), - creator_(guid_, &client_framer_, QuicRandom::GetInstance(), false) { + creator_(guid_, &client_framer_, &mock_random_, false) { client_framer_.set_visitor(&framer_visitor_); server_framer_.set_visitor(&framer_visitor_); } @@ -97,6 +97,7 @@ class QuicPacketCreatorTest : public ::testing::TestWithParam<bool> { QuicPacketSequenceNumber sequence_number_; QuicGuid guid_; string data_; + MockRandom mock_random_; QuicPacketCreator creator_; }; @@ -625,6 +626,29 @@ TEST_P(QuicPacketCreatorTest, AddFrameAndSerialize) { creator_.BytesFree()); } +TEST_F(QuicPacketCreatorTest, EntropyFlag) { + frames_.push_back(QuicFrame(new QuicStreamFrame(0u, false, 0u, IOVector()))); + + for (int i = 0; i < 2; ++i) { + for (int j = 0; j < 64; ++j) { + SerializedPacket serialized = creator_.SerializeAllFrames(frames_); + // Verify both BoolSource and hash algorithm. + bool expected_rand_bool = + (mock_random_.RandUint64() & (GG_UINT64_C(1) << j)) != 0; + bool observed_rand_bool = + (serialized.entropy_hash & (1 << ((j+1) % 8))) != 0; + uint8 rest_of_hash = serialized.entropy_hash & ~(1 << ((j+1) % 8)); + EXPECT_EQ(expected_rand_bool, observed_rand_bool); + EXPECT_EQ(0, rest_of_hash); + delete serialized.packet; + } + // After 64 calls, BoolSource will refresh the bucket - make sure it does. + mock_random_.ChangeValue(); + } + + delete frames_[0].stream_frame; +} + } // namespace } // namespace test } // namespace net diff --git a/net/quic/quic_protocol.h b/net/quic/quic_protocol.h index 7f55847..e97e2fc 100644 --- a/net/quic/quic_protocol.h +++ b/net/quic/quic_protocol.h @@ -56,7 +56,9 @@ const QuicByteCount kMaxPacketSize = 1452; // Maximum size of the initial congestion window in packets. const size_t kDefaultInitialWindow = 10; -const size_t kMaxInitialWindow = 100; +// TODO(ianswett): Temporarily changed to 10 due to a large number of clients +// mistakenly negotiating 100 initially and suffering the consequences. +const size_t kMaxInitialWindow = 10; // Maximum size of the congestion window, in packets, for TCP congestion control // algorithms. diff --git a/net/quic/quic_reliable_client_stream.cc b/net/quic/quic_reliable_client_stream.cc index 919913f..af12b8b 100644 --- a/net/quic/quic_reliable_client_stream.cc +++ b/net/quic/quic_reliable_client_stream.cc @@ -14,7 +14,7 @@ namespace net { QuicReliableClientStream::QuicReliableClientStream(QuicStreamId id, QuicSession* session, const BoundNetLog& net_log) - : ReliableQuicStream(id, session), + : QuicDataStream(id, session), net_log_(net_log), delegate_(NULL) { } @@ -57,7 +57,7 @@ void QuicReliableClientStream::OnCanWrite() { QuicPriority QuicReliableClientStream::EffectivePriority() const { if (delegate_ && delegate_->HasSendHeadersComplete()) { - return ReliableQuicStream::EffectivePriority(); + return QuicDataStream::EffectivePriority(); } return kHighestPriority; } @@ -69,7 +69,7 @@ int QuicReliableClientStream::WriteStreamData( // We should not have data buffered. DCHECK(!HasBufferedData()); // Writes the data, or buffers it. - WriteData(data, fin); + WriteOrBufferData(data, fin); if (!HasBufferedData()) { return OK; } diff --git a/net/quic/quic_reliable_client_stream.h b/net/quic/quic_reliable_client_stream.h index cd1e4af..2d5b818 100644 --- a/net/quic/quic_reliable_client_stream.h +++ b/net/quic/quic_reliable_client_stream.h @@ -12,7 +12,7 @@ #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/http/http_stream.h" -#include "net/quic/reliable_quic_stream.h" +#include "net/quic/quic_data_stream.h" namespace net { @@ -20,7 +20,7 @@ class QuicClientSession; // A client-initiated ReliableQuicStream. Instances of this class // are owned by the QuicClientSession which created them. -class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream { +class NET_EXPORT_PRIVATE QuicReliableClientStream : public QuicDataStream { public: // Delegate handles protocol specific behavior of a quic stream. class NET_EXPORT_PRIVATE Delegate { @@ -53,7 +53,7 @@ class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream { virtual ~QuicReliableClientStream(); - // ReliableQuicStream + // QuicDataStream virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE; virtual void OnFinRead() OVERRIDE; virtual void OnCanWrite() OVERRIDE; @@ -61,7 +61,7 @@ class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream { // While the server's set_priority shouldn't be called externally, the creator // of client-side streams should be able to set the priority. - using ReliableQuicStream::set_priority; + using QuicDataStream::set_priority; int WriteStreamData(base::StringPiece data, bool fin, @@ -81,7 +81,7 @@ class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream { const BoundNetLog& net_log() const { return net_log_; } - using ReliableQuicStream::HasBufferedData; + using QuicDataStream::HasBufferedData; private: BoundNetLog net_log_; diff --git a/net/quic/quic_reliable_client_stream_test.cc b/net/quic/quic_reliable_client_stream_test.cc index 328dc87..8999d26 100644 --- a/net/quic/quic_reliable_client_stream_test.cc +++ b/net/quic/quic_reliable_client_stream_test.cc @@ -20,6 +20,8 @@ namespace net { namespace test { namespace { +const QuicGuid kStreamId = 3; + class MockDelegate : public QuicReliableClientStream::Delegate { public: MockDelegate() {} @@ -39,7 +41,7 @@ class QuicReliableClientStreamTest : public ::testing::Test { public: QuicReliableClientStreamTest() : session_(new MockConnection(1, IPEndPoint(), false), false), - stream_(1, &session_, BoundNetLog()) { + stream_(kStreamId, &session_, BoundNetLog()) { stream_.SetDelegate(&delegate_); } @@ -82,17 +84,17 @@ class QuicReliableClientStreamTest : public ::testing::Test { TEST_F(QuicReliableClientStreamTest, OnFinRead) { InitializeHeaders(); - const QuicGuid kStreamId = 1; + QuicSpdyCompressor compressor; + string compressed_headers = compressor.CompressHeaders(headers_); + QuicStreamFrame frame1(kStreamId, false, 0, MakeIOVector(compressed_headers)); string uncompressed_headers = SpdyUtils::SerializeUncompressedHeaders(headers_); - QuicStreamFrame frame1(kStreamId, false, 0, - MakeIOVector(uncompressed_headers)); EXPECT_CALL(delegate_, OnDataReceived(StrEq(uncompressed_headers.data()), uncompressed_headers.size())); stream_.OnStreamFrame(frame1); IOVector iov; - QuicStreamFrame frame2(kStreamId, true, uncompressed_headers.length(), iov); + QuicStreamFrame frame2(kStreamId, true, compressed_headers.length(), iov); EXPECT_CALL(delegate_, OnClose(QUIC_NO_ERROR)); stream_.OnStreamFrame(frame2); } diff --git a/net/quic/quic_sent_packet_manager.cc b/net/quic/quic_sent_packet_manager.cc index a71943e..55693e5 100644 --- a/net/quic/quic_sent_packet_manager.cc +++ b/net/quic/quic_sent_packet_manager.cc @@ -112,15 +112,9 @@ void QuicSentPacketManager::SetMaxPacketSize(QuicByteCount max_packet_size) { } void QuicSentPacketManager::OnSerializedPacket( - const SerializedPacket& serialized_packet, QuicTime serialized_time) { - if (serialized_packet.packet->is_fec_packet()) { - DCHECK(!serialized_packet.retransmittable_frames); - unacked_fec_packets_.insert(make_pair( - serialized_packet.sequence_number, serialized_time)); - return; - } - - if (serialized_packet.retransmittable_frames == NULL) { + const SerializedPacket& serialized_packet) { + if (serialized_packet.retransmittable_frames == NULL && + !serialized_packet.packet->is_fec_packet()) { // Don't track ack/congestion feedback packets. return; } @@ -183,12 +177,11 @@ bool QuicSentPacketManager::OnIncomingAck( const ReceivedPacketInfo& received_info, QuicTime ack_receive_time) { // Determine if the least unacked sequence number is being acked. QuicPacketSequenceNumber least_unacked_sent_before = - min(GetLeastUnackedSentPacket(), GetLeastUnackedFecPacket()); + GetLeastUnackedSentPacket(); bool new_least_unacked = !IsAwaitingPacket(received_info, least_unacked_sent_before); HandleAckForSentPackets(received_info); - HandleAckForSentFecPackets(received_info); SequenceNumberSet retransmission_packets = OnIncomingAckFrame(received_info, ack_receive_time); @@ -446,35 +439,6 @@ void QuicSentPacketManager::DiscardPacket( return; } -void QuicSentPacketManager::HandleAckForSentFecPackets( - const ReceivedPacketInfo& received_info) { - UnackedFecPacketMap::iterator it = unacked_fec_packets_.begin(); - while (it != unacked_fec_packets_.end()) { - QuicPacketSequenceNumber sequence_number = it->first; - if (sequence_number > received_info.largest_observed) { - break; - } - - if (!IsAwaitingPacket(received_info, sequence_number)) { - DVLOG(1) << ENDPOINT << "Got an ack for fec packet: " << sequence_number; - unacked_fec_packets_.erase(it++); - } else { - // TODO(rch): treat these packets more consistently. They should - // be subject to NACK and RTO based loss. (Thought obviously, they - // should not be retransmitted.) - DVLOG(1) << ENDPOINT << "Still missing ack for fec packet: " - << sequence_number; - ++it; - } - } -} - -void QuicSentPacketManager::DiscardFecPacket( - QuicPacketSequenceNumber sequence_number) { - DCHECK(ContainsKey(unacked_fec_packets_, sequence_number)); - unacked_fec_packets_.erase(sequence_number); -} - bool QuicSentPacketManager::IsUnacked( QuicPacketSequenceNumber sequence_number) const { return ContainsKey(unacked_packets_, sequence_number); @@ -487,13 +451,6 @@ QuicSequenceNumberLength QuicSentPacketManager::GetSequenceNumberLength( return unacked_packets_.find(sequence_number)->second.sequence_number_length; } -QuicTime QuicSentPacketManager::GetFecSentTime( - QuicPacketSequenceNumber sequence_number) const { - DCHECK(ContainsKey(unacked_fec_packets_, sequence_number)); - - return unacked_fec_packets_.find(sequence_number)->second; -} - bool QuicSentPacketManager::HasUnackedPackets() const { return !unacked_packets_.empty(); } @@ -510,10 +467,6 @@ size_t QuicSentPacketManager::GetNumRetransmittablePackets() const { return num_unacked_packets; } -bool QuicSentPacketManager::HasUnackedFecPackets() const { - return !unacked_fec_packets_.empty(); -} - QuicPacketSequenceNumber QuicSentPacketManager::GetLeastUnackedSentPacket() const { if (unacked_packets_.empty()) { @@ -525,17 +478,6 @@ QuicSentPacketManager::GetLeastUnackedSentPacket() const { return unacked_packets_.begin()->first; } -QuicPacketSequenceNumber -QuicSentPacketManager::GetLeastUnackedFecPacket() const { - if (unacked_fec_packets_.empty()) { - // If there are no unacked packets, set the least unacked packet to - // the sequence number of the next packet sent. - return helper_->GetNextPacketSequenceNumber(); - } - - return unacked_fec_packets_.begin()->first; -} - SequenceNumberSet QuicSentPacketManager::GetUnackedPackets() const { SequenceNumberSet unacked_packets; for (UnackedPacketMap::const_iterator it = unacked_packets_.begin(); @@ -553,6 +495,9 @@ void QuicSentPacketManager::OnPacketSent( HasRetransmittableData has_retransmittable_data) { DCHECK_LT(0u, sequence_number); DCHECK(!ContainsKey(pending_packets_, sequence_number)); + if (ContainsKey(unacked_packets_, sequence_number)) { + unacked_packets_[sequence_number].sent_time = sent_time; + } // Only track packets the send algorithm wants us to track. if (!send_algorithm_->OnPacketSent(sent_time, sequence_number, bytes, @@ -567,18 +512,28 @@ void QuicSentPacketManager::OnPacketSent( } void QuicSentPacketManager::OnRetransmissionTimeout() { - ++consecutive_rto_count_; - send_algorithm_->OnRetransmissionTimeout(); // Abandon all pending packets to ensure the congestion window // opens up before we attempt to retransmit packets. - for (SequenceNumberSet::const_iterator it = pending_packets_.begin(); - it != pending_packets_.end(); ++it) { + QuicTime::Delta retransmission_delay = GetRetransmissionDelay(); + QuicTime max_send_time = + clock_->ApproximateNow().Subtract(retransmission_delay); + for (SequenceNumberSet::iterator it = pending_packets_.begin(); + it != pending_packets_.end();) { QuicPacketSequenceNumber sequence_number = *it; DCHECK(ContainsKey(packet_history_map_, sequence_number)); - send_algorithm_->OnPacketAbandoned( - sequence_number, packet_history_map_[sequence_number]->bytes_sent()); + DCHECK(ContainsKey(unacked_packets_, sequence_number)); + const TransmissionInfo& transmission_info = + unacked_packets_.find(sequence_number)->second; + // Abandon retransmittable packet and old non-retransmittable packets. + if (transmission_info.retransmittable_frames || + transmission_info.sent_time <= max_send_time) { + pending_packets_.erase(it++); + send_algorithm_->OnPacketAbandoned( + sequence_number, packet_history_map_[sequence_number]->bytes_sent()); + } else { + ++it; + } } - pending_packets_.clear(); // Attempt to send all the unacked packets when the RTO fires, let the // congestion manager decide how many to send immediately and the remaining @@ -587,31 +542,20 @@ void QuicSentPacketManager::OnRetransmissionTimeout() { << unacked_packets_.size() << " unacked packets."; // Retransmit any packet with retransmittable frames. + bool packets_retransmitted = false; for (UnackedPacketMap::const_iterator it = unacked_packets_.begin(); it != unacked_packets_.end(); ++it) { if (it->second.retransmittable_frames != NULL) { + packets_retransmitted = true; MarkForRetransmission(it->first, RTO_RETRANSMISSION); } } -} -QuicTime QuicSentPacketManager::OnAbandonFecTimeout() { - // Abandon all the FEC packets older than the current RTO, then reschedule - // the alarm if there are more pending fec packets. - QuicTime::Delta retransmission_delay = GetRetransmissionDelay(); - QuicTime max_send_time = - clock_->ApproximateNow().Subtract(retransmission_delay); - while (HasUnackedFecPackets()) { - QuicPacketSequenceNumber oldest_unacked_fec = GetLeastUnackedFecPacket(); - QuicTime fec_sent_time = GetFecSentTime(oldest_unacked_fec); - if (fec_sent_time > max_send_time) { - return fec_sent_time.Add(retransmission_delay); - } - DiscardFecPacket(oldest_unacked_fec); - OnPacketAbandoned(oldest_unacked_fec); + // Only inform the sent packet manager of an RTO if data was retransmitted. + if (packets_retransmitted) { + ++consecutive_rto_count_; + send_algorithm_->OnRetransmissionTimeout(); } - - return QuicTime::Zero(); } void QuicSentPacketManager::OnPacketAbandoned( @@ -751,10 +695,6 @@ QuicTime::Delta QuicSentPacketManager::TimeUntilSend( handshake); } -const QuicTime::Delta QuicSentPacketManager::DefaultRetransmissionTime() { - return QuicTime::Delta::FromMilliseconds(kDefaultRetransmissionTimeMs); -} - // Ensures that the Delayed Ack timer is always set to a value lesser // than the retransmission timer's minimum value (MinRTO). We want the // delayed ack to get back to the QUIC peer before the sender's diff --git a/net/quic/quic_sent_packet_manager.h b/net/quic/quic_sent_packet_manager.h index e4c16be..9e7bdb8 100644 --- a/net/quic/quic_sent_packet_manager.h +++ b/net/quic/quic_sent_packet_manager.h @@ -79,8 +79,7 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { // Called when a new packet is serialized. If the packet contains // retransmittable data, it will be added to the unacked packet map. - void OnSerializedPacket(const SerializedPacket& serialized_packet, - QuicTime serialized_time); + void OnSerializedPacket(const SerializedPacket& serialized_packet); // Called when a packet is retransmitted with a new sequence number. // Replaces the old entry in the unacked packet map with the new @@ -98,9 +97,6 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { // will be discarded as well. void DiscardUnackedPacket(QuicPacketSequenceNumber sequence_number); - // Discards all information about fec packet |sequence_number|. - void DiscardFecPacket(QuicPacketSequenceNumber sequence_number); - // Returns true if the non-FEC packet |sequence_number| is unacked. bool IsUnacked(QuicPacketSequenceNumber sequence_number) const; @@ -119,30 +115,19 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { // Retrieves the next pending retransmission. PendingRetransmission NextPendingRetransmission(); - // Returns the time the fec packet was sent. - QuicTime GetFecSentTime(QuicPacketSequenceNumber sequence_number) const; - - // Returns true if there are any unacked packets. bool HasUnackedPackets() const; // Returns the number of unacked packets which have retransmittable frames. size_t GetNumRetransmittablePackets() const; - // Returns true if there are any unacked FEC packets. - bool HasUnackedFecPackets() const; - // Returns the smallest sequence number of a sent packet which has not been // acked by the peer. Excludes any packets which have been retransmitted // with a new sequence number. If all packets have been acked, returns the // sequence number of the next packet that will be sent. QuicPacketSequenceNumber GetLeastUnackedSentPacket() const; - // Returns the smallest sequence number of a sent fec packet which has not - // been acked by the peer. If all packets have been acked, returns the - // sequence number of the next packet that will be sent. - QuicPacketSequenceNumber GetLeastUnackedFecPacket() const; - // Returns the set of sequence numbers of all unacked packets. + // Test only. SequenceNumberSet GetUnackedPackets() const; // Returns true if |sequence_number| is a previous transmission of packet. @@ -173,10 +158,6 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { // Called when the retransmission timer expires. virtual void OnRetransmissionTimeout(); - // Called when the fec timout timer expires. Returns the next timeout of the - // FEC timer if it should be reset, and QuicTime::Zero() otherwise. - virtual QuicTime OnAbandonFecTimeout(); - // Called when a packet is timed out, such as an RTO. Removes the bytes from // the congestion manager, but does not change the congestion window size. virtual void OnPacketAbandoned(QuicPacketSequenceNumber sequence_number); @@ -191,8 +172,6 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { HasRetransmittableData retransmittable, IsHandshake handshake); - const QuicTime::Delta DefaultRetransmissionTime(); - // Returns amount of time for delayed ack timer. const QuicTime::Delta DelayedAckTime(); @@ -222,22 +201,26 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { friend class test::QuicSentPacketManagerPeer; struct TransmissionInfo { - TransmissionInfo() {} + TransmissionInfo() + : retransmittable_frames(NULL), + sequence_number_length(PACKET_1BYTE_SEQUENCE_NUMBER), + sent_time(QuicTime::Zero()) { } TransmissionInfo(RetransmittableFrames* retransmittable_frames, QuicSequenceNumberLength sequence_number_length) : retransmittable_frames(retransmittable_frames), - sequence_number_length(sequence_number_length) { + sequence_number_length(sequence_number_length), + sent_time(QuicTime::Zero()) { } RetransmittableFrames* retransmittable_frames; QuicSequenceNumberLength sequence_number_length; + // Zero when the packet is serialized, non-zero once it's sent. + QuicTime sent_time; }; typedef linked_hash_map<QuicPacketSequenceNumber, TransmissionInfo> UnackedPacketMap; typedef linked_hash_map<QuicPacketSequenceNumber, - QuicTime> UnackedFecPacketMap; - typedef linked_hash_map<QuicPacketSequenceNumber, TransmissionType> PendingRetransmissionMap; typedef base::hash_map<QuicPacketSequenceNumber, SequenceNumberSet*> PreviousTransmissionMap; @@ -245,9 +228,6 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { // Process the incoming ack looking for newly ack'd data packets. void HandleAckForSentPackets(const ReceivedPacketInfo& received_info); - // Process the incoming ack looking for newly ack'd FEC packets. - void HandleAckForSentFecPackets(const ReceivedPacketInfo& received_info); - // Update the RTT if the ack is for the largest acked sequence number. void MaybeUpdateRTT(const ReceivedPacketInfo& received_info, const QuicTime& ack_receive_time); @@ -283,21 +263,16 @@ class NET_EXPORT_PRIVATE QuicSentPacketManager { void CleanupPacketHistory(); - // When new packets are created which may be retransmitted, they are added - // to this map, which contains owning pointers to the contained frames. If - // a packet is retransmitted, this map will contain entries for both the old - // and the new packet. The old packet's retransmittable frames entry will be - // NULL, while the new packet's entry will contain the frames to retransmit. + // Newly serialized retransmittable and fec packets are added to this map, + // which contains owning pointers to any contained frames. If a packet is + // retransmitted, this map will contain entries for both the old and the new + // packet. The old packet's retransmittable frames entry will be NULL, while + // the new packet's entry will contain the frames to retransmit. // If the old packet is acked before the new packet, then the old entry will // be removed from the map and the new entry's retransmittable frames will be // set to NULL. UnackedPacketMap unacked_packets_; - // Pending fec packets that have not been acked yet. These packets need to be - // cleared out of the cgst_window after a timeout since FEC packets are never - // retransmitted. - UnackedFecPacketMap unacked_fec_packets_; - // Pending retransmissions which have not been packetized and sent yet. PendingRetransmissionMap pending_retransmissions_; diff --git a/net/quic/quic_sent_packet_manager_test.cc b/net/quic/quic_sent_packet_manager_test.cc index 1a0885a..0429a26 100644 --- a/net/quic/quic_sent_packet_manager_test.cc +++ b/net/quic/quic_sent_packet_manager_test.cc @@ -41,7 +41,6 @@ class QuicSentPacketManagerTest : public ::testing::TestWithParam<bool> { if (num_packets == 0) { EXPECT_FALSE(manager_.HasUnackedPackets()); EXPECT_EQ(0u, manager_.GetNumRetransmittablePackets()); - EXPECT_FALSE(manager_.HasUnackedPackets()); return; } @@ -113,6 +112,32 @@ class QuicSentPacketManagerTest : public ::testing::TestWithParam<bool> { packets_.back(), 0u, NULL); } + void SendDataPacket(QuicPacketSequenceNumber sequence_number) { + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, sequence_number, _, _, _)) + .Times(1).WillOnce(Return(true)); + SerializedPacket packet(CreatePacket(sequence_number)); + manager_.OnSerializedPacket(packet); + manager_.OnPacketSent(sequence_number, clock_.ApproximateNow(), + packet.packet->length(), NOT_RETRANSMISSION, + HAS_RETRANSMITTABLE_DATA); + } + + // Based on QuicConnection's WritePendingRetransmissions. + void RetransmitNextPacket( + QuicPacketSequenceNumber retransmission_sequence_number) { + EXPECT_TRUE(manager_.HasPendingRetransmissions()); + EXPECT_CALL(*send_algorithm_, + OnPacketSent(_, retransmission_sequence_number, _, _, _)) + .Times(1).WillOnce(Return(true)); + const QuicSentPacketManager::PendingRetransmission pending = + manager_.NextPendingRetransmission(); + manager_.OnRetransmittedPacket( + pending.sequence_number, retransmission_sequence_number); + manager_.OnPacketSent(retransmission_sequence_number, + clock_.ApproximateNow(), 1000, + pending.transmission_type, HAS_RETRANSMITTABLE_DATA); + } + testing::StrictMock<MockHelper> helper_; QuicSentPacketManager manager_; vector<QuicPacket*> packets_; @@ -125,7 +150,7 @@ TEST_F(QuicSentPacketManagerTest, IsUnacked) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); QuicPacketSequenceNumber unacked[] = { 1 }; VerifyUnackedPackets(unacked, arraysize(unacked)); @@ -136,7 +161,7 @@ TEST_F(QuicSentPacketManagerTest, IsUnacked) { TEST_F(QuicSentPacketManagerTest, IsUnAckedRetransmit) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); RetransmitPacket(1, 2); EXPECT_TRUE(QuicSentPacketManagerPeer::IsRetransmission(&manager_, 2)); @@ -149,14 +174,13 @@ TEST_F(QuicSentPacketManagerTest, IsUnAckedRetransmit) { TEST_F(QuicSentPacketManagerTest, RetransmitThenAck) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); RetransmitPacket(1, 2); // Ack 2 but not 1. ReceivedPacketInfo received_info; received_info.largest_observed = 2; received_info.missing_packets.insert(1); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(3u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // No unacked packets remain. @@ -167,7 +191,7 @@ TEST_F(QuicSentPacketManagerTest, RetransmitThenAck) { TEST_F(QuicSentPacketManagerTest, RetransmitThenAckBeforeSend) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); QuicSentPacketManagerPeer::MarkForRetransmission( &manager_, 1, NACK_RETRANSMISSION); EXPECT_TRUE(manager_.HasPendingRetransmissions()); @@ -175,7 +199,6 @@ TEST_F(QuicSentPacketManagerTest, RetransmitThenAckBeforeSend) { // Ack 1. ReceivedPacketInfo received_info; received_info.largest_observed = 1; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(2u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // There should no longer be a pending retransmission. @@ -189,44 +212,56 @@ TEST_F(QuicSentPacketManagerTest, RetransmitThenAckBeforeSend) { TEST_F(QuicSentPacketManagerTest, RetransmitThenAckPrevious) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); RetransmitPacket(1, 2); // Ack 1 but not 2. ReceivedPacketInfo received_info; received_info.largest_observed = 1; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(3u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // 2 remains unacked, but no packets have retransmittable data. QuicPacketSequenceNumber unacked[] = { 2 }; VerifyUnackedPackets(unacked, arraysize(unacked)); VerifyRetransmittablePackets(NULL, 0); + + // Verify that if the retransmission alarm does fire to abandon packet 2, + // the sent packet manager is not notified, since there is no retransmittable + // data outstanding. + EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) + .WillOnce(Return(QuicTime::Delta::FromMilliseconds(1))); + manager_.OnRetransmissionTimeout(); } TEST_F(QuicSentPacketManagerTest, RetransmitTwiceThenAckFirst) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); RetransmitPacket(1, 2); RetransmitPacket(2, 3); // Ack 1 but not 2 or 3. ReceivedPacketInfo received_info; received_info.largest_observed = 1; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(4u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // 3 remains unacked, but no packets have retransmittable data. QuicPacketSequenceNumber unacked[] = { 3 }; VerifyUnackedPackets(unacked, arraysize(unacked)); VerifyRetransmittablePackets(NULL, 0); + + // Verify that if the retransmission alarm does fire to abandon packet 3, + // the sent packet manager is not notified, since there is no retransmittable + // data outstanding. + EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) + .WillOnce(Return(QuicTime::Delta::FromMilliseconds(1))); + manager_.OnRetransmissionTimeout(); } TEST_F(QuicSentPacketManagerTest, TruncatedAck) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); RetransmitPacket(1, 2); RetransmitPacket(2, 3); RetransmitPacket(3, 4); @@ -237,7 +272,6 @@ TEST_F(QuicSentPacketManagerTest, TruncatedAck) { received_info.missing_packets.insert(1); received_info.missing_packets.insert(2); received_info.is_truncated = true; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(5u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // High water mark will be raised. @@ -248,16 +282,15 @@ TEST_F(QuicSentPacketManagerTest, TruncatedAck) { } TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { - manager_.OnSerializedPacket(CreatePacket(1), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(2), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(3), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(1)); + manager_.OnSerializedPacket(CreatePacket(2)); + manager_.OnSerializedPacket(CreatePacket(3)); { // Ack packets 1 and 3. ReceivedPacketInfo received_info; received_info.largest_observed = 3; received_info.missing_packets.insert(2); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(4u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); QuicPacketSequenceNumber unacked[] = { 2 }; @@ -266,8 +299,8 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { VerifyRetransmittablePackets(retransmittable, arraysize(retransmittable)); } - manager_.OnSerializedPacket(CreatePacket(4), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(5), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(4)); + manager_.OnSerializedPacket(CreatePacket(5)); { // Ack packets 5. @@ -275,7 +308,6 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { received_info.largest_observed = 5; received_info.missing_packets.insert(2); received_info.missing_packets.insert(4); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(6u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); QuicPacketSequenceNumber unacked[] = { 2, 4 }; @@ -284,8 +316,8 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { VerifyRetransmittablePackets(retransmittable, arraysize(retransmittable)); } - manager_.OnSerializedPacket(CreatePacket(6), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(7), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(6)); + manager_.OnSerializedPacket(CreatePacket(7)); { // Ack packets 7. @@ -294,7 +326,6 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { received_info.missing_packets.insert(2); received_info.missing_packets.insert(4); received_info.missing_packets.insert(6); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(8u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); QuicPacketSequenceNumber unacked[] = { 2, 4, 6 }; @@ -304,8 +335,8 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { } RetransmitPacket(2, 8); - manager_.OnSerializedPacket(CreatePacket(9), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(10), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(9)); + manager_.OnSerializedPacket(CreatePacket(10)); { // Ack packet 10. @@ -316,7 +347,6 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { received_info.missing_packets.insert(6); received_info.missing_packets.insert(8); received_info.missing_packets.insert(9); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(11u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); QuicPacketSequenceNumber unacked[] = { 2, 4, 6, 8, 9 }; @@ -327,8 +357,8 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { RetransmitPacket(4, 11); - manager_.OnSerializedPacket(CreatePacket(12), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(13), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(12)); + manager_.OnSerializedPacket(CreatePacket(13)); { // Ack packet 13. @@ -341,7 +371,6 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { received_info.missing_packets.insert(9); received_info.missing_packets.insert(11); received_info.missing_packets.insert(12); - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(14u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); QuicPacketSequenceNumber unacked[] = { 2, 4, 6, 8, 9, 11, 12 }; @@ -351,8 +380,8 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { } RetransmitPacket(6, 14); - manager_.OnSerializedPacket(CreatePacket(15), QuicTime::Zero()); - manager_.OnSerializedPacket(CreatePacket(16), QuicTime::Zero()); + manager_.OnSerializedPacket(CreatePacket(15)); + manager_.OnSerializedPacket(CreatePacket(16)); { // Ack packet 16. @@ -366,7 +395,6 @@ TEST_F(QuicSentPacketManagerTest, SendDropAckRetransmitManyPackets) { received_info.missing_packets.insert(11); received_info.missing_packets.insert(12); received_info.is_truncated = true; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(17u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); // Truncated ack raises the high water mark by clearing out 2, 4, and 6. @@ -385,77 +413,85 @@ TEST_F(QuicSentPacketManagerTest, GetLeastUnackedSentPacket) { TEST_F(QuicSentPacketManagerTest, GetLeastUnackedSentPacketUnacked) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); EXPECT_EQ(1u, manager_.GetLeastUnackedSentPacket()); } TEST_F(QuicSentPacketManagerTest, GetLeastUnackedSentPacketUnackedFec) { SerializedPacket serialized_packet(CreateFecPacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); - // FEC packets do not count as "unacked". - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(2u)); - EXPECT_EQ(2u, manager_.GetLeastUnackedSentPacket()); + manager_.OnSerializedPacket(serialized_packet); + EXPECT_EQ(1u, manager_.GetLeastUnackedSentPacket()); } TEST_F(QuicSentPacketManagerTest, GetLeastUnackedSentPacketDiscardUnacked) { SerializedPacket serialized_packet(CreatePacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); manager_.DiscardUnackedPacket(1u); EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(2u)); EXPECT_EQ(2u, manager_.GetLeastUnackedSentPacket()); } -TEST_F(QuicSentPacketManagerTest, GetLeastUnackedFecPacketAndDiscard) { +TEST_F(QuicSentPacketManagerTest, GetLeastUnackedPacketAndDiscard) { VerifyUnackedPackets(NULL, 0); SerializedPacket serialized_packet(CreateFecPacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); - EXPECT_EQ(1u, manager_.GetLeastUnackedFecPacket()); + manager_.OnSerializedPacket(serialized_packet); + EXPECT_EQ(1u, manager_.GetLeastUnackedSentPacket()); SerializedPacket serialized_packet2(CreateFecPacket(2)); - manager_.OnSerializedPacket(serialized_packet2, QuicTime::Zero()); - EXPECT_EQ(1u, manager_.GetLeastUnackedFecPacket()); + manager_.OnSerializedPacket(serialized_packet2); + EXPECT_EQ(1u, manager_.GetLeastUnackedSentPacket()); SerializedPacket serialized_packet3(CreateFecPacket(3)); - manager_.OnSerializedPacket(serialized_packet3, QuicTime::Zero()); - EXPECT_EQ(1u, manager_.GetLeastUnackedFecPacket()); + manager_.OnSerializedPacket(serialized_packet3); + EXPECT_EQ(1u, manager_.GetLeastUnackedSentPacket()); - VerifyUnackedPackets(NULL, 0); + QuicPacketSequenceNumber unacked[] = { 1, 2, 3 }; + VerifyUnackedPackets(unacked, arraysize(unacked)); VerifyRetransmittablePackets(NULL, 0); - manager_.DiscardFecPacket(1); - EXPECT_EQ(2u, manager_.GetLeastUnackedFecPacket()); + manager_.DiscardUnackedPacket(1); + EXPECT_EQ(2u, manager_.GetLeastUnackedSentPacket()); // Ack 2. ReceivedPacketInfo received_info; received_info.largest_observed = 2; - EXPECT_CALL(helper_, GetNextPacketSequenceNumber()).WillOnce(Return(4u)); manager_.OnIncomingAck(received_info, QuicTime::Zero()); - EXPECT_EQ(3u, manager_.GetLeastUnackedFecPacket()); + EXPECT_EQ(3u, manager_.GetLeastUnackedSentPacket()); // Discard the 3rd packet and ensure there are no FEC packets. - manager_.DiscardFecPacket(3); - EXPECT_FALSE(manager_.HasUnackedFecPackets()); + manager_.DiscardUnackedPacket(3); + EXPECT_FALSE(manager_.HasUnackedPackets()); } -TEST_F(QuicSentPacketManagerTest, GetFecSentTime) { +TEST_F(QuicSentPacketManagerTest, GetSentTime) { VerifyUnackedPackets(NULL, 0); SerializedPacket serialized_packet(CreateFecPacket(1)); - manager_.OnSerializedPacket(serialized_packet, QuicTime::Zero()); + manager_.OnSerializedPacket(serialized_packet); + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, 1, _, _, _)) + .Times(1).WillOnce(Return(true)); + manager_.OnPacketSent( + 1, QuicTime::Zero(), 0, NOT_RETRANSMISSION, HAS_RETRANSMITTABLE_DATA); SerializedPacket serialized_packet2(CreateFecPacket(2)); QuicTime sent_time = QuicTime::Zero().Add(QuicTime::Delta::FromSeconds(1)); - manager_.OnSerializedPacket(serialized_packet2, sent_time); + manager_.OnSerializedPacket(serialized_packet2); + EXPECT_CALL(*send_algorithm_, OnPacketSent(_, 2, _, _, _)) + .Times(1).WillOnce(Return(true)); + manager_.OnPacketSent( + 2, sent_time, 0, NOT_RETRANSMISSION, HAS_RETRANSMITTABLE_DATA); - VerifyUnackedPackets(NULL, 0); + QuicPacketSequenceNumber unacked[] = { 1, 2 }; + VerifyUnackedPackets(unacked, arraysize(unacked)); VerifyRetransmittablePackets(NULL, 0); - EXPECT_TRUE(manager_.HasUnackedFecPackets()); - EXPECT_EQ(QuicTime::Zero(), manager_.GetFecSentTime(1)); - EXPECT_EQ(sent_time, manager_.GetFecSentTime(2)); + EXPECT_TRUE(manager_.HasUnackedPackets()); + EXPECT_EQ(QuicTime::Zero(), + QuicSentPacketManagerPeer::GetSentTime(&manager_, 1)); + EXPECT_EQ(sent_time, QuicSentPacketManagerPeer::GetSentTime(&manager_, 2)); } TEST_F(QuicSentPacketManagerTest, NackRetransmit1Packet) { @@ -790,41 +826,35 @@ TEST_F(QuicSentPacketManagerTest, RetransmissionTimeout) { // Send 100 packets and then ensure all are abandoned when the RTO fires. const size_t kNumSentPackets = 100; for (size_t i = 1; i <= kNumSentPackets; ++i) { - EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)) - .Times(1).WillOnce(Return(true)); - manager_.OnPacketSent(i, clock_.Now(), 1000, - NOT_RETRANSMISSION, HAS_RETRANSMITTABLE_DATA); + SendDataPacket(i); } - EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(_, _)).Times(kNumSentPackets); + EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) + .WillOnce(Return(QuicTime::Delta::FromMilliseconds(1))); + EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); manager_.OnRetransmissionTimeout(); } TEST_F(QuicSentPacketManagerTest, GetTransmissionDelayMin) { - EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); - QuicTime::Delta delay = QuicTime::Delta::FromMilliseconds(1); EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) - .WillOnce(Return(delay)); + .WillOnce(Return(QuicTime::Delta::FromMilliseconds(1))); - manager_.OnRetransmissionTimeout(); EXPECT_EQ(QuicTime::Delta::FromMilliseconds(200), manager_.GetRetransmissionDelay()); } TEST_F(QuicSentPacketManagerTest, GetTransmissionDelayMax) { - EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); - QuicTime::Delta delay = QuicTime::Delta::FromSeconds(500); EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) - .WillOnce(Return(delay)); + .WillOnce(Return(QuicTime::Delta::FromSeconds(500))); - manager_.OnRetransmissionTimeout(); EXPECT_EQ(QuicTime::Delta::FromSeconds(60), manager_.GetRetransmissionDelay()); } TEST_F(QuicSentPacketManagerTest, GetTransmissionDelay) { + SendDataPacket(1); QuicTime::Delta delay = QuicTime::Delta::FromMilliseconds(500); EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) .WillRepeatedly(Return(delay)); @@ -833,14 +863,17 @@ TEST_F(QuicSentPacketManagerTest, GetTransmissionDelay) { for (int i = 0; i < 5; ++i) { EXPECT_EQ(delay, manager_.GetRetransmissionDelay()); delay = delay.Add(delay); + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(i + 1, _)); EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); manager_.OnRetransmissionTimeout(); + RetransmitNextPacket(i + 2); } } TEST_F(QuicSentPacketManagerTest, GetTestTransmissionDelayTailDrop) { FLAGS_limit_rto_increase_for_tests = true; + SendDataPacket(1); QuicTime::Delta delay = QuicTime::Delta::FromMilliseconds(500); EXPECT_CALL(*send_algorithm_, RetransmissionDelay()) .WillRepeatedly(Return(delay)); @@ -848,8 +881,10 @@ TEST_F(QuicSentPacketManagerTest, GetTestTransmissionDelayTailDrop) { // No backoff for the first 5 retransmissions. for (int i = 0; i < 5; ++i) { EXPECT_EQ(delay, manager_.GetRetransmissionDelay()); + EXPECT_CALL(*send_algorithm_, OnPacketAbandoned(i + 1, _)); EXPECT_CALL(*send_algorithm_, OnRetransmissionTimeout()); manager_.OnRetransmissionTimeout(); + RetransmitNextPacket(i + 2); } // Then backoff starts diff --git a/net/quic/quic_session.cc b/net/quic/quic_session.cc index c035c17..360c79e 100644 --- a/net/quic/quic_session.cc +++ b/net/quic/quic_session.cc @@ -137,11 +137,11 @@ bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { } stream->OnStreamFrame(frames[i]); - // If the stream had been prematurely closed, and the + // If the stream is a data stream had been prematurely closed, and the // headers are now decompressed, then we are finally finished // with this stream. if (ContainsKey(zombie_streams_, stream_id) && - stream->headers_decompressed()) { + static_cast<QuicDataStream*>(stream)->headers_decompressed()) { CloseZombieStream(stream_id); } } @@ -153,7 +153,7 @@ bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { } QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; decompression_blocked_streams_.erase(header_id); - ReliableQuicStream* stream = GetStream(stream_id); + QuicDataStream* stream = GetDataStream(stream_id); if (!stream) { connection()->SendConnectionClose( QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); @@ -165,7 +165,13 @@ bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { } void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { - ReliableQuicStream* stream = GetStream(frame.stream_id); + if (frame.stream_id == kCryptoStreamId) { + connection()->SendConnectionCloseWithDetails( + QUIC_INVALID_STREAM_ID, + "Attempt to reset the crypto stream"); + return; + } + QuicDataStream* stream = GetDataStream(frame.stream_id); if (!stream) { return; // Errors are handled by GetStream. } @@ -198,7 +204,7 @@ void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { } while (!stream_map_.empty()) { - ReliableStreamMap::iterator it = stream_map_.begin(); + DataStreamMap::iterator it = stream_map_.begin(); QuicStreamId id = it->first; it->second->OnConnectionClosed(error, from_peer); // The stream should call CloseStream as part of OnConnectionClosed. @@ -276,12 +282,12 @@ void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool locally_reset) { DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; - ReliableStreamMap::iterator it = stream_map_.find(stream_id); + DataStreamMap::iterator it = stream_map_.find(stream_id); if (it == stream_map_.end()) { DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; return; } - ReliableQuicStream* stream = it->second; + QuicDataStream* stream = it->second; if (connection_->connected() && !stream->headers_decompressed()) { // If the stream is being closed locally (for example a client cancelling // a request before receiving the response) then we need to make sure that @@ -324,7 +330,7 @@ void QuicSession::AddZombieStream(QuicStreamId stream_id) { void QuicSession::CloseZombieStream(QuicStreamId stream_id) { DCHECK(ContainsKey(zombie_streams_, stream_id)); zombie_streams_.erase(stream_id); - ReliableQuicStream* stream = GetStream(stream_id); + QuicDataStream* stream = GetDataStream(stream_id); if (!stream) { return; } @@ -390,7 +396,7 @@ QuicConfig* QuicSession::config() { return &config_; } -void QuicSession::ActivateStream(ReliableQuicStream* stream) { +void QuicSession::ActivateStream(QuicDataStream* stream) { DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() << ". activating " << stream->id(); DCHECK_EQ(stream_map_.count(stream->id()), 0u); @@ -407,8 +413,16 @@ ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { if (stream_id == kCryptoStreamId) { return GetCryptoStream(); } + return GetDataStream(stream_id); +} + +QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { + if (stream_id == kCryptoStreamId) { + DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; + return NULL; + } - ReliableStreamMap::iterator it = stream_map_.find(stream_id); + DataStreamMap::iterator it = stream_map_.find(stream_id); if (it != stream_map_.end()) { return it->second; } @@ -427,7 +441,7 @@ ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { return GetIncomingReliableStream(stream_id); } -ReliableQuicStream* QuicSession::GetIncomingReliableStream( +QuicDataStream* QuicSession::GetIncomingReliableStream( QuicStreamId stream_id) { if (IsClosedStream(stream_id)) { return NULL; @@ -456,7 +470,7 @@ ReliableQuicStream* QuicSession::GetIncomingReliableStream( } largest_peer_created_stream_id_ = stream_id; } - ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id); + QuicDataStream* stream = CreateIncomingDataStream(stream_id); if (stream == NULL) { return NULL; } diff --git a/net/quic/quic_session.h b/net/quic/quic_session.h index 07a41c3..515f2b9 100644 --- a/net/quic/quic_session.h +++ b/net/quic/quic_session.h @@ -15,6 +15,7 @@ #include "net/base/linked_hash_map.h" #include "net/quic/quic_connection.h" #include "net/quic/quic_crypto_stream.h" +#include "net/quic/quic_data_stream.h" #include "net/quic/quic_packet_creator.h" #include "net/quic/quic_protocol.h" #include "net/quic/quic_spdy_compressor.h" @@ -65,7 +66,7 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE; virtual void OnConnectionClosed(QuicErrorCode error, bool from_peer) OVERRIDE; virtual void OnSuccessfulVersionNegotiation( - const QuicVersion& version) OVERRIDE{} + const QuicVersion& version) OVERRIDE {} virtual void OnConfigNegotiated() OVERRIDE; // Not needed for HTTP. virtual bool OnCanWrite() OVERRIDE; @@ -175,25 +176,29 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { bool is_server() const { return is_server_; } protected: + typedef base::hash_map<QuicStreamId, QuicDataStream*> DataStreamMap; + // Creates a new stream, owned by the caller, to handle a peer-initiated // stream. Returns NULL and does error handling if the stream can not be // created. - virtual ReliableQuicStream* CreateIncomingReliableStream(QuicStreamId id) = 0; + virtual QuicDataStream* CreateIncomingDataStream(QuicStreamId id) = 0; // Create a new stream, owned by the caller, to handle a locally-initiated // stream. Returns NULL if max streams have already been opened. - virtual ReliableQuicStream* CreateOutgoingReliableStream() = 0; + virtual QuicDataStream* CreateOutgoingDataStream() = 0; // Return the reserved crypto stream. virtual QuicCryptoStream* GetCryptoStream() = 0; // Adds 'stream' to the active stream map. - virtual void ActivateStream(ReliableQuicStream* stream); + virtual void ActivateStream(QuicDataStream* stream); // Returns the stream id for a new stream. QuicStreamId GetNextStreamId(); - ReliableQuicStream* GetIncomingReliableStream(QuicStreamId stream_id); + QuicDataStream* GetIncomingReliableStream(QuicStreamId stream_id); + + QuicDataStream* GetDataStream(const QuicStreamId stream_id); ReliableQuicStream* GetStream(const QuicStreamId stream_id); @@ -203,17 +208,15 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { // operations are being done on the streams at this time) virtual void PostProcessAfterData(); - base::hash_map<QuicStreamId, ReliableQuicStream*>* streams() { + base::hash_map<QuicStreamId, QuicDataStream*>* streams() { return &stream_map_; } - const base::hash_map<QuicStreamId, ReliableQuicStream*>* streams() const { + const base::hash_map<QuicStreamId, QuicDataStream*>* streams() const { return &stream_map_; } - std::vector<ReliableQuicStream*>* closed_streams() { - return &closed_streams_; - } + std::vector<QuicDataStream*>* closed_streams() { return &closed_streams_; } size_t get_max_open_streams() const { return max_open_streams_; @@ -223,8 +226,6 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { friend class test::QuicSessionPeer; friend class VisitorShim; - typedef base::hash_map<QuicStreamId, ReliableQuicStream*> ReliableStreamMap; - // Performs the work required to close |stream_id|. If |locally_reset| // then the stream has been reset by this endpoint, not by the peer. This // means the stream may become a zombie stream which needs to stay @@ -260,7 +261,7 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { // deletions. scoped_ptr<VisitorShim> visitor_shim_; - std::vector<ReliableQuicStream*> closed_streams_; + std::vector<QuicDataStream*> closed_streams_; QuicSpdyDecompressor decompressor_; QuicSpdyCompressor compressor_; @@ -271,7 +272,7 @@ class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { size_t max_open_streams_; // Map from StreamId to pointers to streams that are owned by the caller. - ReliableStreamMap stream_map_; + DataStreamMap stream_map_; QuicStreamId next_stream_id_; bool is_server_; diff --git a/net/quic/quic_session_test.cc b/net/quic/quic_session_test.cc index 7c3520c..f0d4e5c 100644 --- a/net/quic/quic_session_test.cc +++ b/net/quic/quic_session_test.cc @@ -12,6 +12,7 @@ #include "net/quic/quic_connection.h" #include "net/quic/quic_protocol.h" #include "net/quic/test_tools/quic_connection_peer.h" +#include "net/quic/test_tools/quic_data_stream_peer.h" #include "net/quic/test_tools/quic_test_utils.h" #include "net/quic/test_tools/reliable_quic_stream_peer.h" #include "net/spdy/spdy_framer.h" @@ -55,10 +56,10 @@ class TestCryptoStream : public QuicCryptoStream { MOCK_METHOD0(OnCanWrite, void()); }; -class TestStream : public ReliableQuicStream { +class TestStream : public QuicDataStream { public: TestStream(QuicStreamId id, QuicSession* session) - : ReliableQuicStream(id, session) { + : QuicDataStream(id, session) { } using ReliableQuicStream::CloseWriteSide; @@ -98,13 +99,13 @@ class TestSession : public QuicSession { return &crypto_stream_; } - virtual TestStream* CreateOutgoingReliableStream() OVERRIDE { + virtual TestStream* CreateOutgoingDataStream() OVERRIDE { TestStream* stream = new TestStream(GetNextStreamId(), this); ActivateStream(stream); return stream; } - virtual TestStream* CreateIncomingReliableStream(QuicStreamId id) OVERRIDE { + virtual TestStream* CreateIncomingDataStream(QuicStreamId id) OVERRIDE { return new TestStream(id, this); } @@ -112,7 +113,7 @@ class TestSession : public QuicSession { return QuicSession::IsClosedStream(id); } - ReliableQuicStream* GetIncomingReliableStream(QuicStreamId stream_id) { + QuicDataStream* GetIncomingReliableStream(QuicStreamId stream_id) { return QuicSession::GetIncomingReliableStream(stream_id); } @@ -204,12 +205,12 @@ TEST_F(QuicSessionTest, ImplicitlyCreatedStreams) { } TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) { - TestStream* stream2 = session_.CreateOutgoingReliableStream(); + TestStream* stream2 = session_.CreateOutgoingDataStream(); EXPECT_EQ(2u, stream2->id()); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream2, true); - TestStream* stream4 = session_.CreateOutgoingReliableStream(); + QuicDataStreamPeer::SetHeadersDecompressed(stream2, true); + TestStream* stream4 = session_.CreateOutgoingDataStream(); EXPECT_EQ(4u, stream4->id()); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream4, true); + QuicDataStreamPeer::SetHeadersDecompressed(stream4, true); CheckClosedStreams(); CloseStream(4); @@ -219,18 +220,18 @@ TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) { } TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) { - ReliableQuicStream* stream3 = session_.GetIncomingReliableStream(3); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream3, true); - ReliableQuicStream* stream5 = session_.GetIncomingReliableStream(5); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream5, true); + QuicDataStream* stream3 = session_.GetIncomingReliableStream(3); + QuicDataStreamPeer::SetHeadersDecompressed(stream3, true); + QuicDataStream* stream5 = session_.GetIncomingReliableStream(5); + QuicDataStreamPeer::SetHeadersDecompressed(stream5, true); CheckClosedStreams(); CloseStream(3); CheckClosedStreams(); CloseStream(5); // Create stream id 9, and implicitly 7 - ReliableQuicStream* stream9 = session_.GetIncomingReliableStream(9); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream9, true); + QuicDataStream* stream9 = session_.GetIncomingReliableStream(9); + QuicDataStreamPeer::SetHeadersDecompressed(stream9, true); CheckClosedStreams(); // Close 9, but make sure 7 is still not closed CloseStream(9); @@ -254,9 +255,9 @@ TEST_F(QuicSessionTest, DecompressionError) { } TEST_F(QuicSessionTest, OnCanWrite) { - TestStream* stream2 = session_.CreateOutgoingReliableStream(); - TestStream* stream4 = session_.CreateOutgoingReliableStream(); - TestStream* stream6 = session_.CreateOutgoingReliableStream(); + TestStream* stream2 = session_.CreateOutgoingDataStream(); + TestStream* stream4 = session_.CreateOutgoingDataStream(); + TestStream* stream6 = session_.CreateOutgoingDataStream(); session_.MarkWriteBlocked(stream2->id(), kSomeMiddlePriority); session_.MarkWriteBlocked(stream6->id(), kSomeMiddlePriority); @@ -277,12 +278,12 @@ TEST_F(QuicSessionTest, BufferedHandshake) { EXPECT_FALSE(session_.HasPendingHandshake()); // Default value. // Test that blocking other streams does not change our status. - TestStream* stream2 = session_.CreateOutgoingReliableStream(); + TestStream* stream2 = session_.CreateOutgoingDataStream(); StreamBlocker stream2_blocker(&session_, stream2->id()); stream2_blocker.MarkWriteBlocked(); EXPECT_FALSE(session_.HasPendingHandshake()); - TestStream* stream3 = session_.CreateOutgoingReliableStream(); + TestStream* stream3 = session_.CreateOutgoingDataStream(); StreamBlocker stream3_blocker(&session_, stream3->id()); stream3_blocker.MarkWriteBlocked(); EXPECT_FALSE(session_.HasPendingHandshake()); @@ -291,7 +292,7 @@ TEST_F(QuicSessionTest, BufferedHandshake) { session_.MarkWriteBlocked(kCryptoStreamId, kSomeMiddlePriority); EXPECT_TRUE(session_.HasPendingHandshake()); - TestStream* stream4 = session_.CreateOutgoingReliableStream(); + TestStream* stream4 = session_.CreateOutgoingDataStream(); StreamBlocker stream4_blocker(&session_, stream4->id()); stream4_blocker.MarkWriteBlocked(); EXPECT_TRUE(session_.HasPendingHandshake()); @@ -321,9 +322,9 @@ TEST_F(QuicSessionTest, BufferedHandshake) { } TEST_F(QuicSessionTest, OnCanWriteWithClosedStream) { - TestStream* stream2 = session_.CreateOutgoingReliableStream(); - TestStream* stream4 = session_.CreateOutgoingReliableStream(); - TestStream* stream6 = session_.CreateOutgoingReliableStream(); + TestStream* stream2 = session_.CreateOutgoingDataStream(); + TestStream* stream4 = session_.CreateOutgoingDataStream(); + TestStream* stream6 = session_.CreateOutgoingDataStream(); session_.MarkWriteBlocked(stream2->id(), kSomeMiddlePriority); session_.MarkWriteBlocked(stream6->id(), kSomeMiddlePriority); @@ -343,8 +344,8 @@ TEST_F(QuicSessionTest, OutOfOrderHeaders) { QuicPacketHeader header; header.public_header.guid = session_.guid(); - TestStream* stream2 = session_.CreateOutgoingReliableStream(); - TestStream* stream4 = session_.CreateOutgoingReliableStream(); + TestStream* stream2 = session_.CreateOutgoingDataStream(); + TestStream* stream4 = session_.CreateOutgoingDataStream(); stream2->CloseWriteSide(); stream4->CloseWriteSide(); @@ -397,9 +398,9 @@ TEST_F(QuicSessionTest, ZombieStream) { new StrictMock<MockConnection>(guid_, IPEndPoint(), false); TestSession session(connection, /*is_server=*/ false); - TestStream* stream3 = session.CreateOutgoingReliableStream(); + TestStream* stream3 = session.CreateOutgoingDataStream(); EXPECT_EQ(3u, stream3->id()); - TestStream* stream5 = session.CreateOutgoingReliableStream(); + TestStream* stream5 = session.CreateOutgoingDataStream(); EXPECT_EQ(5u, stream5->id()); EXPECT_EQ(2u, session.GetNumOpenStreams()); @@ -437,9 +438,9 @@ TEST_F(QuicSessionTest, ZombieStreamConnectionClose) { new StrictMock<MockConnection>(guid_, IPEndPoint(), false); TestSession session(connection, /*is_server=*/ false); - TestStream* stream3 = session.CreateOutgoingReliableStream(); + TestStream* stream3 = session.CreateOutgoingDataStream(); EXPECT_EQ(3u, stream3->id()); - TestStream* stream5 = session.CreateOutgoingReliableStream(); + TestStream* stream5 = session.CreateOutgoingDataStream(); EXPECT_EQ(5u, stream5->id()); EXPECT_EQ(2u, session.GetNumOpenStreams()); diff --git a/net/quic/quic_stream_factory_test.cc b/net/quic/quic_stream_factory_test.cc index 856ba7f..1c1f8e1 100644 --- a/net/quic/quic_stream_factory_test.cc +++ b/net/quic/quic_stream_factory_test.cc @@ -64,7 +64,8 @@ class QuicStreamFactoryPeer { class QuicStreamFactoryTest : public ::testing::Test { protected: QuicStreamFactoryTest() - : clock_(new MockClock()), + : random_generator_(0), + clock_(new MockClock()), factory_(&host_resolver_, &socket_factory_, base::WeakPtr<HttpServerProperties>(), &crypto_client_stream_factory_, @@ -80,7 +81,7 @@ class QuicStreamFactoryTest : public ::testing::Test { QuicPacketSequenceNumber num, QuicStreamId stream_id) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = true; header.packet_sequence_number = num; @@ -98,7 +99,7 @@ class QuicStreamFactoryTest : public ::testing::Test { QuicPacketSequenceNumber largest_received, QuicPacketSequenceNumber least_unacked) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = false; header.packet_sequence_number = 2; @@ -126,7 +127,7 @@ class QuicStreamFactoryTest : public ::testing::Test { scoped_ptr<QuicEncryptedPacket> ConstructFeedbackPacket( QuicPacketSequenceNumber sequence_number) { QuicPacketHeader header; - header.public_header.guid = 0xDEADBEEF; + header.public_header.guid = random_generator_.RandUint64(); header.public_header.reset_flag = false; header.public_header.version_flag = false; header.packet_sequence_number = sequence_number; diff --git a/net/quic/quic_stream_sequencer_test.cc b/net/quic/quic_stream_sequencer_test.cc index b4ea18c..eb760af 100644 --- a/net/quic/quic_stream_sequencer_test.cc +++ b/net/quic/quic_stream_sequencer_test.cc @@ -70,11 +70,12 @@ class MockStream : public ReliableQuicStream { } MOCK_METHOD0(OnFinRead, void()); - MOCK_METHOD2(ProcessData, uint32(const char* data, uint32 data_len)); + MOCK_METHOD2(ProcessRawData, uint32(const char* data, uint32 data_len)); MOCK_METHOD2(CloseConnectionWithDetails, void(QuicErrorCode error, const string& details)); MOCK_METHOD1(Reset, void(QuicRstStreamErrorCode error)); MOCK_METHOD0(OnCanWrite, void()); + virtual QuicPriority EffectivePriority() const { return 0; } }; namespace { @@ -136,7 +137,7 @@ class QuicStreamSequencerTest : public ::testing::Test { }; TEST_F(QuicStreamSequencerTest, RejectOldFrame) { - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)) + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)) .WillOnce(Return(3)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); @@ -166,7 +167,7 @@ TEST_F(QuicStreamSequencerTest, DropFramePastBuffering) { } TEST_F(QuicStreamSequencerTest, RejectBufferedFrame) { - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_EQ(1u, sequencer_->frames()->size()); @@ -178,7 +179,7 @@ TEST_F(QuicStreamSequencerTest, RejectBufferedFrame) { } TEST_F(QuicStreamSequencerTest, FullFrameConsumed) { - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_EQ(0u, sequencer_->frames()->size()); @@ -201,7 +202,7 @@ TEST_F(QuicStreamSequencerTest, EmptyFinFrame) { } TEST_F(QuicStreamSequencerTest, PartialFrameConsumed) { - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(2)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(2)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_EQ(1u, sequencer_->frames()->size()); @@ -210,7 +211,7 @@ TEST_F(QuicStreamSequencerTest, PartialFrameConsumed) { } TEST_F(QuicStreamSequencerTest, NextxFrameNotConsumed) { - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(0)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_EQ(1u, sequencer_->frames()->size()); @@ -236,9 +237,9 @@ TEST_F(QuicStreamSequencerTest, OutOfOrderFrameProcessed) { EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("ghi"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("ghi"), 3)).WillOnce(Return(3)); // Ack right away EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); @@ -258,7 +259,7 @@ TEST_F(QuicStreamSequencerTest, OutOfOrderFramesProcessedWithBuffering) { EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); // Ack right away EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); @@ -268,9 +269,9 @@ TEST_F(QuicStreamSequencerTest, OutOfOrderFramesProcessedWithBuffering) { EXPECT_TRUE(sequencer_->OnFrame(9, "jkl")); EXPECT_EQ(3u, sequencer_->num_bytes_consumed()); - EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("ghi"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("jkl"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("ghi"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("jkl"), 3)).WillOnce(Return(3)); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); EXPECT_EQ(12u, sequencer_->num_bytes_consumed()); @@ -297,9 +298,9 @@ TEST_F(QuicStreamSequencerTest, OutOfOrderFramesBlockignWithReadv) { // Push pqr - process InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(0)); - EXPECT_CALL(stream_, ProcessData(StrEq("pqr"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("def"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("pqr"), 3)).WillOnce(Return(3)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); @@ -327,9 +328,9 @@ TEST_F(QuicStreamSequencerTest, OutOfOrderFramesBlockignWithGetReadableRegion) { sequencer_->SetMemoryLimit(9); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(0)); - EXPECT_CALL(stream_, ProcessData(StrEq("pqr"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("def"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("pqr"), 3)).WillOnce(Return(3)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); @@ -361,7 +362,7 @@ TEST_F(QuicStreamSequencerTest, MarkConsumed) { sequencer_->SetMemoryLimit(9); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(0)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); @@ -393,7 +394,7 @@ TEST_F(QuicStreamSequencerTest, MarkConsumed) { TEST_F(QuicStreamSequencerTest, MarkConsumedError) { // TODO(rch): enable when chromium supports EXPECT_DFATAL. /* - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(0)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_TRUE(sequencer_->OnFrame(9, "jklmnopqrstuvwxyz")); @@ -414,7 +415,7 @@ TEST_F(QuicStreamSequencerTest, MarkConsumedError) { TEST_F(QuicStreamSequencerTest, MarkConsumedWithMissingPacket) { InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(0)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); @@ -430,7 +431,7 @@ TEST_F(QuicStreamSequencerTest, MarkConsumedWithMissingPacket) { TEST_F(QuicStreamSequencerTest, BasicHalfCloseOrdered) { InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); EXPECT_CALL(stream_, OnFinRead()); EXPECT_TRUE(sequencer_->OnFinFrame(0, "abc")); @@ -441,8 +442,8 @@ TEST_F(QuicStreamSequencerTest, BasicHalfCloseUnorderedWithFlush) { sequencer_->OnFinFrame(6, ""); EXPECT_EQ(6u, sequencer_->close_offset()); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); - EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("def"), 3)).WillOnce(Return(3)); EXPECT_CALL(stream_, OnFinRead()); EXPECT_TRUE(sequencer_->OnFrame(3, "def")); @@ -453,7 +454,7 @@ TEST_F(QuicStreamSequencerTest, BasicHalfUnordered) { sequencer_->OnFinFrame(3, ""); EXPECT_EQ(3u, sequencer_->close_offset()); InSequence s; - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(3)); EXPECT_CALL(stream_, OnFinRead()); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); @@ -467,7 +468,7 @@ TEST_F(QuicStreamSequencerTest, TerminateWithReadv) { EXPECT_FALSE(sequencer_->IsClosed()); - EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(0)); + EXPECT_CALL(stream_, ProcessRawData(StrEq("abc"), 3)).WillOnce(Return(0)); EXPECT_TRUE(sequencer_->OnFrame(0, "abc")); iovec iov = { &buffer[0], 3 }; @@ -535,7 +536,7 @@ TEST_F(QuicSequencerRandomTest, RandomFramesNoDroppingNoBackup) { InSequence s; for (size_t i = 0; i < list_.size(); ++i) { string* data = &list_[i].second; - EXPECT_CALL(stream_, ProcessData(StrEq(*data), data->size())) + EXPECT_CALL(stream_, ProcessRawData(StrEq(*data), data->size())) .WillOnce(Return(data->size())); } @@ -558,7 +559,7 @@ TEST_F(QuicSequencerRandomTest, RandomFramesDroppingNoBackup) { InSequence s; for (size_t i = 0; i < list_.size(); ++i) { string* data = &list_[i].second; - EXPECT_CALL(stream_, ProcessData(StrEq(*data), data->size())) + EXPECT_CALL(stream_, ProcessRawData(StrEq(*data), data->size())) .WillOnce(Return(data->size())); } diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc index bc472f0..1db5357 100644 --- a/net/quic/reliable_quic_stream.cc +++ b/net/quic/reliable_quic_stream.cc @@ -13,32 +13,14 @@ using std::min; namespace net { +#define ENDPOINT (is_server_ ? "Server: " : " Client: ") + namespace { -// This is somewhat arbitrary. It's possible, but unlikely, we will either fail -// to set a priority client-side, or cancel a stream before stripping the -// priority from the wire server-side. In either case, start out with a -// priority in the middle. -QuicPriority kDefaultPriority = 3; - -// Appends bytes from data into partial_data_buffer. Once partial_data_buffer -// reaches 4 bytes, copies the data into 'result' and clears -// partial_data_buffer. -// Returns the number of bytes consumed. -uint32 StripUint32(const char* data, uint32 data_len, - string* partial_data_buffer, - uint32* result) { - DCHECK_GT(4u, partial_data_buffer->length()); - size_t missing_size = 4 - partial_data_buffer->length(); - if (data_len < missing_size) { - StringPiece(data, data_len).AppendToString(partial_data_buffer); - return data_len; - } - StringPiece(data, missing_size).AppendToString(partial_data_buffer); - DCHECK_EQ(4u, partial_data_buffer->length()); - memcpy(result, partial_data_buffer->data(), 4); - partial_data_buffer->clear(); - return missing_size; +struct iovec MakeIovec(StringPiece data) { + struct iovec iov = {const_cast<char*>(data.data()), + static_cast<size_t>(data.size())}; + return iov; } } // namespace @@ -48,18 +30,12 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id, : sequencer_(this), id_(id), session_(session), - visitor_(NULL), stream_bytes_read_(0), stream_bytes_written_(0), - headers_decompressed_(false), - priority_(kDefaultPriority), - headers_id_(0), - decompression_failed_(false), stream_error_(QUIC_STREAM_NO_ERROR), connection_error_(QUIC_NO_ERROR), read_side_closed_(false), write_side_closed_(false), - priority_parsed_(false), fin_buffered_(false), fin_sent_(false), is_server_(session_->is_server()) { @@ -136,81 +112,16 @@ void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, session()->connection()->SendConnectionCloseWithDetails(error, details); } -size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { - if (headers_decompressed_ && decompressed_headers_.empty()) { - return sequencer_.Readv(iov, iov_len); - } - size_t bytes_consumed = 0; - size_t iov_index = 0; - while (iov_index < iov_len && - decompressed_headers_.length() > bytes_consumed) { - size_t bytes_to_read = min(iov[iov_index].iov_len, - decompressed_headers_.length() - bytes_consumed); - char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); - memcpy(iov_ptr, - decompressed_headers_.data() + bytes_consumed, bytes_to_read); - bytes_consumed += bytes_to_read; - ++iov_index; - } - decompressed_headers_.erase(0, bytes_consumed); - return bytes_consumed; -} - -int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { - if (headers_decompressed_ && decompressed_headers_.empty()) { - return sequencer_.GetReadableRegions(iov, iov_len); - } - if (iov_len == 0) { - return 0; - } - iov[0].iov_base = static_cast<void*>( - const_cast<char*>(decompressed_headers_.data())); - iov[0].iov_len = decompressed_headers_.length(); - return 1; -} - -bool ReliableQuicStream::IsDoneReading() const { - if (!headers_decompressed_ || !decompressed_headers_.empty()) { - return false; - } - return sequencer_.IsClosed(); -} - -bool ReliableQuicStream::HasBytesToRead() const { - return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); -} - -const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { - return session_->peer_address(); -} - -QuicSpdyCompressor* ReliableQuicStream::compressor() { - return session_->compressor(); -} - -bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { - return session_->GetSSLInfo(ssl_info); -} - -QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { +void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { DCHECK(data.size() > 0 || fin); - return WriteOrBuffer(data, fin); -} - - -void ReliableQuicStream::set_priority(QuicPriority priority) { - DCHECK_EQ(0u, stream_bytes_written_); - priority_ = priority; -} - -QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { DCHECK(!fin_buffered_); QuicConsumedData consumed_data(0, false); fin_buffered_ = fin; if (queued_data_.empty()) { - consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); + struct iovec iov(MakeIovec(data)); + consumed_data = WritevData(&iov, 1, fin, NULL); DCHECK_LE(consumed_data.bytes_consumed, data.length()); } @@ -221,8 +132,6 @@ QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { string(data.data() + consumed_data.bytes_consumed, data.length() - consumed_data.bytes_consumed)); } - - return QuicConsumedData(data.size(), true); } void ReliableQuicStream::OnCanWrite() { @@ -232,7 +141,8 @@ void ReliableQuicStream::OnCanWrite() { if (queued_data_.size() == 1 && fin_buffered_) { fin = true; } - QuicConsumedData consumed_data = WriteDataInternal(data, fin); + struct iovec iov(MakeIovec(data)); + QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); if (consumed_data.bytes_consumed == data.size() && fin == consumed_data.fin_consumed) { queued_data_.pop_front(); @@ -243,14 +153,7 @@ void ReliableQuicStream::OnCanWrite() { } } -QuicConsumedData ReliableQuicStream::WriteDataInternal( - StringPiece data, bool fin) { - struct iovec iov = {const_cast<char*>(data.data()), - static_cast<size_t>(data.size())}; - return WritevDataInternal(&iov, 1, fin, NULL); -} - -QuicConsumedData ReliableQuicStream::WritevDataInternal( +QuicConsumedData ReliableQuicStream::WritevData( const struct iovec* iov, int iov_count, bool fin, @@ -280,10 +183,6 @@ QuicConsumedData ReliableQuicStream::WritevDataInternal( return consumed_data; } -QuicPriority ReliableQuicStream::EffectivePriority() const { - return priority(); -} - void ReliableQuicStream::CloseReadSide() { if (read_side_closed_) { return; @@ -297,165 +196,6 @@ void ReliableQuicStream::CloseReadSide() { } } -uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { - DCHECK_NE(0u, data_len); - if (id() == kCryptoStreamId) { - // The crypto stream does not use compression. - return ProcessData(data, data_len); - } - - uint32 total_bytes_consumed = 0; - if (headers_id_ == 0u) { - total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); - data += total_bytes_consumed; - data_len -= total_bytes_consumed; - if (data_len == 0 || total_bytes_consumed == 0) { - return total_bytes_consumed; - } - } - DCHECK_NE(0u, headers_id_); - - // Once the headers are finished, we simply pass the data through. - if (headers_decompressed_) { - // Some buffered header data remains. - if (!decompressed_headers_.empty()) { - ProcessHeaderData(); - } - if (decompressed_headers_.empty()) { - DVLOG(1) << "Delegating procesing to ProcessData"; - total_bytes_consumed += ProcessData(data, data_len); - } - return total_bytes_consumed; - } - - QuicHeaderId current_header_id = - session_->decompressor()->current_header_id(); - // Ensure that this header id looks sane. - if (headers_id_ < current_header_id || - headers_id_ > kMaxHeaderIdDelta + current_header_id) { - DVLOG(1) << ENDPOINT - << "Invalid headers for stream: " << id() - << " header_id: " << headers_id_ - << " current_header_id: " << current_header_id; - session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); - return total_bytes_consumed; - } - - // If we are head-of-line blocked on decompression, then back up. - if (current_header_id != headers_id_) { - session_->MarkDecompressionBlocked(headers_id_, id()); - DVLOG(1) << ENDPOINT - << "Unable to decompress header data for stream: " << id() - << " header_id: " << headers_id_; - return total_bytes_consumed; - } - - // Decompressed data will be delivered to decompressed_headers_. - size_t bytes_consumed = session_->decompressor()->DecompressData( - StringPiece(data, data_len), this); - DCHECK_NE(0u, bytes_consumed); - if (bytes_consumed > data_len) { - DCHECK(false) << "DecompressData returned illegal value"; - OnDecompressionError(); - return total_bytes_consumed; - } - total_bytes_consumed += bytes_consumed; - data += bytes_consumed; - data_len -= bytes_consumed; - - if (decompression_failed_) { - // The session will have been closed in OnDecompressionError. - return total_bytes_consumed; - } - - // Headers are complete if the decompressor has moved on to the - // next stream. - headers_decompressed_ = - session_->decompressor()->current_header_id() != headers_id_; - if (!headers_decompressed_) { - DCHECK_EQ(0u, data_len); - } - - ProcessHeaderData(); - - if (!headers_decompressed_ || !decompressed_headers_.empty()) { - return total_bytes_consumed; - } - - // We have processed all of the decompressed data but we might - // have some more raw data to process. - if (data_len > 0) { - total_bytes_consumed += ProcessData(data, data_len); - } - - // The sequencer will push any additional buffered frames if this data - // has been completely consumed. - return total_bytes_consumed; -} - -uint32 ReliableQuicStream::ProcessHeaderData() { - if (decompressed_headers_.empty()) { - return 0; - } - - size_t bytes_processed = ProcessData(decompressed_headers_.data(), - decompressed_headers_.length()); - if (bytes_processed == decompressed_headers_.length()) { - decompressed_headers_.clear(); - } else { - decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); - } - return bytes_processed; -} - -void ReliableQuicStream::OnDecompressorAvailable() { - DCHECK_EQ(headers_id_, - session_->decompressor()->current_header_id()); - DCHECK(!headers_decompressed_); - DCHECK(!decompression_failed_); - DCHECK_EQ(0u, decompressed_headers_.length()); - - while (!headers_decompressed_) { - struct iovec iovec; - if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { - return; - } - - size_t bytes_consumed = session_->decompressor()->DecompressData( - StringPiece(static_cast<char*>(iovec.iov_base), - iovec.iov_len), - this); - DCHECK_LE(bytes_consumed, iovec.iov_len); - if (decompression_failed_) { - return; - } - sequencer_.MarkConsumed(bytes_consumed); - - headers_decompressed_ = - session_->decompressor()->current_header_id() != headers_id_; - } - - // Either the headers are complete, or the all data as been consumed. - ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. - if (IsDoneReading()) { - OnFinRead(); - } else if (headers_decompressed_ && decompressed_headers_.empty()) { - sequencer_.FlushBufferedFrames(); - } -} - -bool ReliableQuicStream::OnDecompressedData(StringPiece data) { - data.AppendToString(&decompressed_headers_); - return true; -} - -void ReliableQuicStream::OnDecompressionError() { - DCHECK(!decompression_failed_); - decompression_failed_ = true; - session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); -} - - void ReliableQuicStream::CloseWriteSide() { if (write_side_closed_) { return; @@ -476,45 +216,6 @@ bool ReliableQuicStream::HasBufferedData() { void ReliableQuicStream::OnClose() { CloseReadSide(); CloseWriteSide(); - - if (visitor_) { - Visitor* visitor = visitor_; - // Calling Visitor::OnClose() may result the destruction of the visitor, - // so we need to ensure we don't call it again. - visitor_ = NULL; - visitor->OnClose(this); - } -} - -uint32 ReliableQuicStream::StripPriorityAndHeaderId( - const char* data, uint32 data_len) { - uint32 total_bytes_parsed = 0; - - if (!priority_parsed_ && session_->connection()->is_server()) { - QuicPriority temporary_priority = priority_; - total_bytes_parsed = StripUint32( - data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); - if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) { - priority_parsed_ = true; - - // Spdy priorities are inverted, so the highest numerical value is the - // lowest legal priority. - if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { - session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); - return 0; - } - priority_ = temporary_priority; - } - data += total_bytes_parsed; - data_len -= total_bytes_parsed; - } - if (data_len > 0 && headers_id_ == 0u) { - // The headers ID has not yet been read. Strip it from the beginning of - // the data stream. - total_bytes_parsed += StripUint32( - data, data_len, &headers_id_and_priority_buffer_, &headers_id_); - } - return total_bytes_parsed; } } // namespace net diff --git a/net/quic/reliable_quic_stream.h b/net/quic/reliable_quic_stream.h index f39b660..210b8ad 100644 --- a/net/quic/reliable_quic_stream.h +++ b/net/quic/reliable_quic_stream.h @@ -15,8 +15,8 @@ #include "net/base/iovec.h" #include "net/base/net_export.h" #include "net/quic/quic_ack_notifier.h" +#include "net/quic/quic_protocol.h" #include "net/quic/quic_spdy_compressor.h" -#include "net/quic/quic_spdy_decompressor.h" #include "net/quic/quic_stream_sequencer.h" namespace net { @@ -29,27 +29,8 @@ class IPEndPoint; class QuicSession; class SSLInfo; -#define ENDPOINT (is_server_ ? "Server: " : " Client: ") - -// All this does right now is send data to subclasses via the sequencer. -class NET_EXPORT_PRIVATE ReliableQuicStream : public - QuicSpdyDecompressor::Visitor { +class NET_EXPORT_PRIVATE ReliableQuicStream { public: - // Visitor receives callbacks from the stream. - class Visitor { - public: - Visitor() {} - - // Called when the stream is closed. - virtual void OnClose(ReliableQuicStream* stream) = 0; - - protected: - virtual ~Visitor() {} - - private: - DISALLOW_COPY_AND_ASSIGN(Visitor); - }; - ReliableQuicStream(QuicStreamId id, QuicSession* session); @@ -80,12 +61,7 @@ class NET_EXPORT_PRIVATE ReliableQuicStream : public // Called when the final data has been read. virtual void OnFinRead(); - virtual uint32 ProcessRawData(const char* data, uint32 data_len); - - virtual uint32 ProcessData(const char* data, uint32 data_len) = 0; - - virtual bool OnDecompressedData(base::StringPiece data) OVERRIDE; - virtual void OnDecompressionError() OVERRIDE; + virtual uint32 ProcessRawData(const char* data, uint32 data_len) = 0; // Called to reset the stream from this end. virtual void Reset(QuicRstStreamErrorCode error); @@ -95,24 +71,9 @@ class NET_EXPORT_PRIVATE ReliableQuicStream : public virtual void CloseConnectionWithDetails(QuicErrorCode error, const string& details); - // This block of functions wraps the sequencer's functions of the same - // name. These methods return uncompressed data until that has - // been fully processed. Then they simply delegate to the sequencer. - virtual size_t Readv(const struct iovec* iov, size_t iov_len); - virtual int GetReadableRegions(iovec* iov, size_t iov_len); - // Returns true when all data has been read from the peer, including the fin. - virtual bool IsDoneReading() const; - virtual bool HasBytesToRead() const; - - // Called by the session when a decompression blocked stream - // becomes unblocked. - virtual void OnDecompressorAvailable(); - - // By default, this is the same as priority(), however it allows streams - // to temporarily alter effective priority. For example if a SPDY stream has - // compressed but not written headers it can write the headers with a higher - // priority. - virtual QuicPriority EffectivePriority() const; + // Returns the effective priority for the stream. This value may change + // during the life of the stream. + virtual QuicPriority EffectivePriority() const = 0; QuicStreamId id() const { return id_; } @@ -125,27 +86,21 @@ class NET_EXPORT_PRIVATE ReliableQuicStream : public uint64 stream_bytes_read() { return stream_bytes_read_; } uint64 stream_bytes_written() { return stream_bytes_written_; } - const IPEndPoint& GetPeerAddress() const; - - void set_visitor(Visitor* visitor) { visitor_ = visitor; } - - QuicSpdyCompressor* compressor(); - - // Gets the SSL connection information. - bool GetSSLInfo(SSLInfo* ssl_info); - - bool headers_decompressed() const { return headers_decompressed_; } - protected: - // Returns a pair with the number of bytes consumed from data, and a boolean - // indicating if the fin bit was consumed. This does not indicate the data - // has been sent on the wire: it may have been turned into a packet and queued - // if the socket was unexpectedly blocked. - // - // The default implementation always consumed all bytes and any fin, but - // this behavior is not guaranteed for subclasses so callers should check the - // return value. - virtual QuicConsumedData WriteData(base::StringPiece data, bool fin); + // Sends as much of 'data' to the connection as the connection will consume, + // and then buffers any remaining data in queued_data_. + void WriteOrBufferData(base::StringPiece data, bool fin); + + // Sends as many bytes in the first |count| buffers of |iov| to the connection + // as the connection will consume. + // If |ack_notifier_delegate| is provided, then it will be notified once all + // the ACKs for this write have been received. + // Returns the number of bytes consumed by the connection. + QuicConsumedData WritevData( + const struct iovec* iov, + int iov_count, + bool fin, + QuicAckNotifier::DelegateInterface* ack_notifier_delegate); // Close the read side of the socket. Further frames will not be accepted. virtual void CloseReadSide(); @@ -159,67 +114,22 @@ class NET_EXPORT_PRIVATE ReliableQuicStream : public QuicSession* session() { return session_; } - // Sets priority_ to priority. This should only be called before bytes are - // written to the server. - void set_priority(QuicPriority priority); - // This is protected because external classes should use EffectivePriority - // instead. - QuicPriority priority() const { return priority_; } - - // Sends as much of 'data' to the connection as the connection will consume, - // and then buffers any remaining data in queued_data_. - // Returns (data.size(), true) as it always consumed all data: it returns for - // convenience to have the same return type as WriteDataInternal. - QuicConsumedData WriteOrBuffer(base::StringPiece data, bool fin); - - // Sends as much of 'data' to the connection as the connection will consume. - // Returns the number of bytes consumed by the connection. - QuicConsumedData WriteDataInternal(base::StringPiece data, bool fin); - - // Sends as many bytes in the first |count| buffers of |iov| to the connection - // as the connection will consume. - // If |ack_notifier_delegate| is provided, then it will be notified once all - // the ACKs for this write have been received. - // Returns the number of bytes consumed by the connection. - QuicConsumedData WritevDataInternal( - const struct iovec* iov, - int iov_count, - bool fin, - QuicAckNotifier::DelegateInterface* ack_notifier_delegate); + const QuicStreamSequencer* sequencer() const { return &sequencer_; } + QuicStreamSequencer* sequencer() { return &sequencer_; } private: friend class test::ReliableQuicStreamPeer; friend class QuicStreamUtils; - uint32 ProcessHeaderData(); - - uint32 StripPriorityAndHeaderId(const char* data, uint32 data_len); - std::list<string> queued_data_; QuicStreamSequencer sequencer_; QuicStreamId id_; QuicSession* session_; - // Optional visitor of this stream to be notified when the stream is closed. - Visitor* visitor_; // Bytes read and written refer to payload bytes only: they do not include // framing, encryption overhead etc. uint64 stream_bytes_read_; uint64 stream_bytes_written_; - // True if the headers have been completely decompresssed. - bool headers_decompressed_; - // The priority of the stream, once parsed. - QuicPriority priority_; - // ID of the header block sent by the peer, once parsed. - QuicHeaderId headers_id_; - // Buffer into which we write bytes from priority_ and headers_id_ - // until each is fully parsed. - string headers_id_and_priority_buffer_; - // Contains a copy of the decompressed headers_ until they are consumed - // via ProcessData or Readv. - string decompressed_headers_; - // True if an error was encountered during decompression. - bool decompression_failed_; // Stream error code received from a RstStreamFrame or error code sent by the // visitor or sequencer in the RstStreamFrame. @@ -234,13 +144,13 @@ class NET_EXPORT_PRIVATE ReliableQuicStream : public // True if the write side is closed, and further writes should fail. bool write_side_closed_; - // True if the priority has been read, false otherwise. - bool priority_parsed_; bool fin_buffered_; bool fin_sent_; // True if the session this stream is running under is a server session. bool is_server_; + + DISALLOW_COPY_AND_ASSIGN(ReliableQuicStream); }; } // namespace net diff --git a/net/quic/reliable_quic_stream_test.cc b/net/quic/reliable_quic_stream_test.cc index bfbe9c7..60414ae 100644 --- a/net/quic/reliable_quic_stream_test.cc +++ b/net/quic/reliable_quic_stream_test.cc @@ -43,14 +43,16 @@ class TestStream : public ReliableQuicStream { : ReliableQuicStream(id, session), should_process_data_(should_process_data) {} - virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE { + virtual uint32 ProcessRawData(const char* data, uint32 data_len) OVERRIDE { EXPECT_NE(0u, data_len); DVLOG(1) << "ProcessData data_len: " << data_len; data_ += string(data, data_len); return should_process_data_ ? data_len : 0; } - using ReliableQuicStream::WriteData; + virtual QuicPriority EffectivePriority() const OVERRIDE { return 0; } + + using ReliableQuicStream::WriteOrBufferData; using ReliableQuicStream::CloseReadSide; using ReliableQuicStream::CloseWriteSide; @@ -128,7 +130,7 @@ TEST_F(ReliableQuicStreamTest, WriteAllData) { PACKET_6BYTE_SEQUENCE_NUMBER, NOT_IN_FEC_GROUP); EXPECT_CALL(*session_, WritevData(kStreamId, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(kDataLen, true))); - EXPECT_EQ(kDataLen, stream_->WriteData(kData1, false).bytes_consumed); + stream_->WriteOrBufferData(kData1, false); EXPECT_FALSE(write_blocked_list_->HasWriteBlockedStreams()); } @@ -142,7 +144,7 @@ TEST_F(ReliableQuicStreamTest, NoBlockingIfNoDataOrFin) { EXPECT_DEBUG_DEATH({ EXPECT_CALL(*session_, WritevData(kStreamId, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(0, false))); - stream_->WriteData(StringPiece(), false); + stream_->WriteOrBufferData(StringPiece(), false); EXPECT_FALSE(write_blocked_list_->HasWriteBlockedStreams()); }, ""); } @@ -155,7 +157,7 @@ TEST_F(ReliableQuicStreamTest, BlockIfOnlySomeDataConsumed) { // we should be write blocked a not all the data was consumed. EXPECT_CALL(*session_, WritevData(kStreamId, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(1, false))); - stream_->WriteData(StringPiece(kData1, 2), false); + stream_->WriteOrBufferData(StringPiece(kData1, 2), false); ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams()); } @@ -169,7 +171,7 @@ TEST_F(ReliableQuicStreamTest, BlockIfFinNotConsumedWithData) { // last data) EXPECT_CALL(*session_, WritevData(kStreamId, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(2, false))); - stream_->WriteData(StringPiece(kData1, 2), true); + stream_->WriteOrBufferData(StringPiece(kData1, 2), true); ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams()); } @@ -180,11 +182,11 @@ TEST_F(ReliableQuicStreamTest, BlockIfSoloFinNotConsumed) { // as the fin was not consumed. EXPECT_CALL(*session_, WritevData(kStreamId, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(0, false))); - stream_->WriteData(StringPiece(), true); + stream_->WriteOrBufferData(StringPiece(), true); ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams()); } -TEST_F(ReliableQuicStreamTest, WriteData) { +TEST_F(ReliableQuicStreamTest, WriteOrBufferData) { Initialize(kShouldProcessData); EXPECT_FALSE(write_blocked_list_->HasWriteBlockedStreams()); @@ -194,12 +196,11 @@ TEST_F(ReliableQuicStreamTest, WriteData) { PACKET_6BYTE_SEQUENCE_NUMBER, NOT_IN_FEC_GROUP); EXPECT_CALL(*session_, WritevData(_, _, 1, _, _, _)).WillOnce( Return(QuicConsumedData(kDataLen - 1, false))); - // The return will be kDataLen, because the last byte gets buffered. - EXPECT_EQ(kDataLen, stream_->WriteData(kData1, false).bytes_consumed); + stream_->WriteOrBufferData(kData1, false); EXPECT_TRUE(write_blocked_list_->HasWriteBlockedStreams()); // Queue a bytes_consumed write. - EXPECT_EQ(kDataLen, stream_->WriteData(kData2, false).bytes_consumed); + stream_->WriteOrBufferData(kData2, false); // Make sure we get the tail of the first write followed by the bytes_consumed InSequence s; @@ -227,340 +228,6 @@ TEST_F(ReliableQuicStreamTest, ConnectionCloseAfterStreamClose) { EXPECT_EQ(QUIC_NO_ERROR, stream_->connection_error()); } -TEST_F(ReliableQuicStreamTest, ProcessHeaders) { - Initialize(kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); - - stream_->OnStreamFrame(frame); - EXPECT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_), stream_->data()); - EXPECT_EQ(static_cast<QuicPriority>(kHighestPriority), - stream_->EffectivePriority()); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersWithInvalidHeaderId) { - Initialize(kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - compressed_headers[4] = '\xFF'; // Illegal header id. - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); - - EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_HEADER_ID)); - stream_->OnStreamFrame(frame); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersWithInvalidPriority) { - Initialize(kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - compressed_headers[0] = '\xFF'; // Illegal priority. - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(compressed_headers)); - - EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_PRIORITY)); - stream_->OnStreamFrame(frame); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersAndBody) { - Initialize(kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - string body = "this is the body"; - string data = compressed_headers + body; - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); - - stream_->OnStreamFrame(frame); - EXPECT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, - stream_->data()); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersAndBodyFragments) { - Initialize(kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kLowestPriority, headers_); - string body = "this is the body"; - string data = compressed_headers + body; - - for (size_t fragment_size = 1; fragment_size < data.size(); ++fragment_size) { - Initialize(kShouldProcessData); - for (size_t offset = 0; offset < data.size(); offset += fragment_size) { - size_t remaining_data = data.length() - offset; - StringPiece fragment(data.data() + offset, - min(fragment_size, remaining_data)); - QuicStreamFrame frame(kStreamId, false, offset, MakeIOVector(fragment)); - - stream_->OnStreamFrame(frame); - } - ASSERT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, - stream_->data()) << "fragment_size: " << fragment_size; - } - - for (size_t split_point = 1; split_point < data.size() - 1; ++split_point) { - Initialize(kShouldProcessData); - - StringPiece fragment1(data.data(), split_point); - QuicStreamFrame frame1(kStreamId, false, 0, MakeIOVector(fragment1)); - stream_->OnStreamFrame(frame1); - - StringPiece fragment2(data.data() + split_point, data.size() - split_point); - QuicStreamFrame frame2( - kStreamId, false, split_point, MakeIOVector(fragment2)); - stream_->OnStreamFrame(frame2); - - ASSERT_EQ(SpdyUtils::SerializeUncompressedHeaders(headers_) + body, - stream_->data()) << "split_point: " << split_point; - } - EXPECT_EQ(static_cast<QuicPriority>(kLowestPriority), - stream_->EffectivePriority()); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersAndBodyReadv) { - Initialize(!kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - string body = "this is the body"; - string data = compressed_headers + body; - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); - string uncompressed_headers = - SpdyUtils::SerializeUncompressedHeaders(headers_); - string uncompressed_data = uncompressed_headers + body; - - stream_->OnStreamFrame(frame); - EXPECT_EQ(uncompressed_headers, stream_->data()); - - char buffer[2048]; - ASSERT_LT(data.length(), arraysize(buffer)); - struct iovec vec; - vec.iov_base = buffer; - vec.iov_len = arraysize(buffer); - - size_t bytes_read = stream_->Readv(&vec, 1); - EXPECT_EQ(uncompressed_headers.length(), bytes_read); - EXPECT_EQ(uncompressed_headers, string(buffer, bytes_read)); - - bytes_read = stream_->Readv(&vec, 1); - EXPECT_EQ(body.length(), bytes_read); - EXPECT_EQ(body, string(buffer, bytes_read)); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersAndBodyIncrementalReadv) { - Initialize(!kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - string body = "this is the body"; - string data = compressed_headers + body; - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); - string uncompressed_headers = - SpdyUtils::SerializeUncompressedHeaders(headers_); - string uncompressed_data = uncompressed_headers + body; - - stream_->OnStreamFrame(frame); - EXPECT_EQ(uncompressed_headers, stream_->data()); - - char buffer[1]; - struct iovec vec; - vec.iov_base = buffer; - vec.iov_len = arraysize(buffer); - for (size_t i = 0; i < uncompressed_data.length(); ++i) { - size_t bytes_read = stream_->Readv(&vec, 1); - ASSERT_EQ(1u, bytes_read); - EXPECT_EQ(uncompressed_data.data()[i], buffer[0]); - } -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersUsingReadvWithMultipleIovecs) { - Initialize(!kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - string body = "this is the body"; - string data = compressed_headers + body; - QuicStreamFrame frame(kStreamId, false, 0, MakeIOVector(data)); - string uncompressed_headers = - SpdyUtils::SerializeUncompressedHeaders(headers_); - string uncompressed_data = uncompressed_headers + body; - - stream_->OnStreamFrame(frame); - EXPECT_EQ(uncompressed_headers, stream_->data()); - - char buffer1[1]; - char buffer2[1]; - struct iovec vec[2]; - vec[0].iov_base = buffer1; - vec[0].iov_len = arraysize(buffer1); - vec[1].iov_base = buffer2; - vec[1].iov_len = arraysize(buffer2); - for (size_t i = 0; i < uncompressed_data.length(); i += 2) { - size_t bytes_read = stream_->Readv(vec, 2); - ASSERT_EQ(2u, bytes_read) << i; - ASSERT_EQ(uncompressed_data.data()[i], buffer1[0]) << i; - ASSERT_EQ(uncompressed_data.data()[i + 1], buffer2[0]) << i; - } -} - -TEST_F(ReliableQuicStreamTest, ProcessCorruptHeadersEarly) { - Initialize(kShouldProcessData); - - string compressed_headers1 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame1( - stream_->id(), false, 0, MakeIOVector(compressed_headers1)); - string decompressed_headers1 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - headers_["content-type"] = "text/plain"; - string compressed_headers2 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - // Corrupt the compressed data. - compressed_headers2[compressed_headers2.length() - 1] ^= 0xA1; - QuicStreamFrame frame2( - stream2_->id(), false, 0, MakeIOVector(compressed_headers2)); - string decompressed_headers2 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - // Deliver frame2 to stream2 out of order. The decompressor is not - // available yet, so no data will be processed. The compressed data - // will be buffered until OnDecompressorAvailable() is called - // to process it. - stream2_->OnStreamFrame(frame2); - EXPECT_EQ("", stream2_->data()); - - // Now deliver frame1 to stream1. The decompressor is available so - // the data will be processed, and the decompressor will become - // available for stream2. - stream_->OnStreamFrame(frame1); - EXPECT_EQ(decompressed_headers1, stream_->data()); - - // Verify that the decompressor is available, and inform stream2 - // that it can now decompress the buffered compressed data. Since - // the compressed data is corrupt, the stream will shutdown the session. - EXPECT_EQ(2u, session_->decompressor()->current_header_id()); - EXPECT_CALL(*connection_, SendConnectionClose(QUIC_DECOMPRESSION_FAILURE)); - stream2_->OnDecompressorAvailable(); - EXPECT_EQ("", stream2_->data()); -} - -TEST_F(ReliableQuicStreamTest, ProcessPartialHeadersEarly) { - Initialize(kShouldProcessData); - - string compressed_headers1 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame1( - stream_->id(), false, 0, MakeIOVector(compressed_headers1)); - string decompressed_headers1 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - headers_["content-type"] = "text/plain"; - string compressed_headers2 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - string partial_compressed_headers = - compressed_headers2.substr(0, compressed_headers2.length() / 2); - QuicStreamFrame frame2( - stream2_->id(), false, 0, MakeIOVector(partial_compressed_headers)); - string decompressed_headers2 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - // Deliver frame2 to stream2 out of order. The decompressor is not - // available yet, so no data will be processed. The compressed data - // will be buffered until OnDecompressorAvailable() is called - // to process it. - stream2_->OnStreamFrame(frame2); - EXPECT_EQ("", stream2_->data()); - - // Now deliver frame1 to stream1. The decompressor is available so - // the data will be processed, and the decompressor will become - // available for stream2. - stream_->OnStreamFrame(frame1); - EXPECT_EQ(decompressed_headers1, stream_->data()); - - // Verify that the decompressor is available, and inform stream2 - // that it can now decompress the buffered compressed data. Since - // the compressed data is incomplete it will not be passed to - // the stream. - EXPECT_EQ(2u, session_->decompressor()->current_header_id()); - stream2_->OnDecompressorAvailable(); - EXPECT_EQ("", stream2_->data()); - - // Now send remaining data and verify that we have now received the - // compressed headers. - string remaining_compressed_headers = - compressed_headers2.substr(partial_compressed_headers.length()); - - QuicStreamFrame frame3(stream2_->id(), false, - partial_compressed_headers.length(), - MakeIOVector(remaining_compressed_headers)); - stream2_->OnStreamFrame(frame3); - EXPECT_EQ(decompressed_headers2, stream2_->data()); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersEarly) { - Initialize(kShouldProcessData); - - string compressed_headers1 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame1( - stream_->id(), false, 0, MakeIOVector(compressed_headers1)); - string decompressed_headers1 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - headers_["content-type"] = "text/plain"; - string compressed_headers2 = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame2( - stream2_->id(), false, 0, MakeIOVector(compressed_headers2)); - string decompressed_headers2 = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - // Deliver frame2 to stream2 out of order. The decompressor is not - // available yet, so no data will be processed. The compressed data - // will be buffered until OnDecompressorAvailable() is called - // to process it. - stream2_->OnStreamFrame(frame2); - EXPECT_EQ("", stream2_->data()); - - // Now deliver frame1 to stream1. The decompressor is available so - // the data will be processed, and the decompressor will become - // available for stream2. - stream_->OnStreamFrame(frame1); - EXPECT_EQ(decompressed_headers1, stream_->data()); - - // Verify that the decompressor is available, and inform stream2 - // that it can now decompress the buffered compressed data. - EXPECT_EQ(2u, session_->decompressor()->current_header_id()); - stream2_->OnDecompressorAvailable(); - EXPECT_EQ(decompressed_headers2, stream2_->data()); -} - -TEST_F(ReliableQuicStreamTest, ProcessHeadersDelay) { - Initialize(!kShouldProcessData); - - string compressed_headers = compressor_->CompressHeadersWithPriority( - kHighestPriority, headers_); - QuicStreamFrame frame1( - stream_->id(), false, 0, MakeIOVector(compressed_headers)); - string decompressed_headers = - SpdyUtils::SerializeUncompressedHeaders(headers_); - - // Send the headers to the stream and verify they were decompressed. - stream_->OnStreamFrame(frame1); - EXPECT_EQ(2u, session_->decompressor()->current_header_id()); - - // Verify that we are now able to handle the body data, - // even though the stream has not processed the headers. - EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_HEADER_ID)) - .Times(0); - QuicStreamFrame frame2(stream_->id(), false, compressed_headers.length(), - MakeIOVector("body data")); - stream_->OnStreamFrame(frame2); -} - } // namespace } // namespace test } // namespace net diff --git a/net/quic/test_tools/mock_random.cc b/net/quic/test_tools/mock_random.cc index 19a2832..ab71d16 100644 --- a/net/quic/test_tools/mock_random.cc +++ b/net/quic/test_tools/mock_random.cc @@ -7,7 +7,13 @@ namespace net { MockRandom::MockRandom() - : increment_(0) { + : base_(0xDEADBEEF), + increment_(0) { +} + +MockRandom::MockRandom(uint32 base) + : base_(base), + increment_(0) { } void MockRandom::RandBytes(void* data, size_t len) { @@ -15,11 +21,7 @@ void MockRandom::RandBytes(void* data, size_t len) { } uint64 MockRandom::RandUint64() { - return 0xDEADBEEF + increment_; -} - -bool MockRandom::RandBool() { - return false; + return base_ + increment_; } void MockRandom::Reseed(const void* additional_entropy, size_t entropy_len) { diff --git a/net/quic/test_tools/mock_random.h b/net/quic/test_tools/mock_random.h index 544f5ce..3a3c87f 100644 --- a/net/quic/test_tools/mock_random.h +++ b/net/quic/test_tools/mock_random.h @@ -12,15 +12,15 @@ namespace net { class MockRandom : public QuicRandom { public: + // Initializes base_ to 0xDEADBEEF. MockRandom(); + MockRandom(uint32 base); // QuicRandom: // Fills the |data| buffer with a repeating byte, initially 'r'. virtual void RandBytes(void* data, size_t len) OVERRIDE; - // Returns 0xDEADBEEF + the current increment. + // Returns base + the current increment. virtual uint64 RandUint64() OVERRIDE; - // Returns false. - virtual bool RandBool() OVERRIDE; // Does nothing. virtual void Reseed(const void* additional_entropy, size_t entropy_len) OVERRIDE; @@ -30,6 +30,7 @@ class MockRandom : public QuicRandom { void ChangeValue(); private: + uint32 base_; uint8 increment_; }; diff --git a/net/quic/test_tools/quic_connection_peer.cc b/net/quic/test_tools/quic_connection_peer.cc index e305cbea..54410b3 100644 --- a/net/quic/test_tools/quic_connection_peer.cc +++ b/net/quic/test_tools/quic_connection_peer.cc @@ -84,6 +84,7 @@ bool QuicConnectionPeer::IsRetransmission( } // static +// TODO(ianswett): Create a GetSentEntropyHash which accepts an AckFrame. QuicPacketEntropyHash QuicConnectionPeer::GetSentEntropyHash( QuicConnection* connection, QuicPacketSequenceNumber sequence_number) { @@ -178,11 +179,6 @@ QuicAlarm* QuicConnectionPeer::GetRetransmissionAlarm( } // static -QuicAlarm* QuicConnectionPeer::GetAbandonFecAlarm(QuicConnection* connection) { - return connection->abandon_fec_alarm_.get(); -} - -// static QuicAlarm* QuicConnectionPeer::GetSendAlarm(QuicConnection* connection) { return connection->send_alarm_.get(); } diff --git a/net/quic/test_tools/quic_connection_peer.h b/net/quic/test_tools/quic_connection_peer.h index 5916446..fd43661 100644 --- a/net/quic/test_tools/quic_connection_peer.h +++ b/net/quic/test_tools/quic_connection_peer.h @@ -98,7 +98,6 @@ class QuicConnectionPeer { static QuicAlarm* GetAckAlarm(QuicConnection* connection); static QuicAlarm* GetRetransmissionAlarm(QuicConnection* connection); - static QuicAlarm* GetAbandonFecAlarm(QuicConnection* connection); static QuicAlarm* GetSendAlarm(QuicConnection* connection); static QuicAlarm* GetResumeWritesAlarm(QuicConnection* connection); static QuicAlarm* GetTimeoutAlarm(QuicConnection* connection); diff --git a/net/quic/test_tools/quic_data_stream_peer.cc b/net/quic/test_tools/quic_data_stream_peer.cc new file mode 100644 index 0000000..6165675 --- /dev/null +++ b/net/quic/test_tools/quic_data_stream_peer.cc @@ -0,0 +1,19 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/test_tools/quic_data_stream_peer.h" + +#include "net/quic/quic_data_stream.h" + +namespace net { +namespace test { + +// static +void QuicDataStreamPeer::SetHeadersDecompressed(QuicDataStream* stream, + bool headers_decompressed) { + stream->headers_decompressed_ = headers_decompressed; +} + +} // namespace test +} // namespace net diff --git a/net/quic/test_tools/quic_data_stream_peer.h b/net/quic/test_tools/quic_data_stream_peer.h new file mode 100644 index 0000000..bfaf826 --- /dev/null +++ b/net/quic/test_tools/quic_data_stream_peer.h @@ -0,0 +1,30 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_QUIC_TEST_TOOLS_QUIC_DATA_STREAM_PEER_H_ +#define NET_QUIC_TEST_TOOLS_QUIC_DATA_STREAM_PEER_H_ + +#include "base/basictypes.h" +#include "net/quic/quic_protocol.h" + +namespace net { + +class QuicDataStream; + +namespace test { + +class QuicDataStreamPeer { + public: + static void SetHeadersDecompressed(QuicDataStream* stream, + bool headers_decompressed); + + private: + DISALLOW_COPY_AND_ASSIGN(QuicDataStreamPeer); +}; + +} // namespace test + +} // namespace net + +#endif // NET_QUIC_TEST_TOOLS_QUIC_DATA_STREAM_PEER_H_ diff --git a/net/quic/test_tools/quic_sent_packet_manager_peer.cc b/net/quic/test_tools/quic_sent_packet_manager_peer.cc index 89ddb1b..2284d0e 100644 --- a/net/quic/test_tools/quic_sent_packet_manager_peer.cc +++ b/net/quic/test_tools/quic_sent_packet_manager_peer.cc @@ -28,6 +28,16 @@ size_t QuicSentPacketManagerPeer::GetNackCount( } // static +QuicTime QuicSentPacketManagerPeer::GetSentTime( + const QuicSentPacketManager* sent_packet_manager, + QuicPacketSequenceNumber sequence_number) { + DCHECK(ContainsKey(sent_packet_manager->unacked_packets_, sequence_number)); + + return sent_packet_manager->unacked_packets_ + .find(sequence_number)->second.sent_time; +} + +// static QuicTime::Delta QuicSentPacketManagerPeer::rtt( QuicSentPacketManager* sent_packet_manager) { return sent_packet_manager->rtt_sample_; diff --git a/net/quic/test_tools/quic_sent_packet_manager_peer.h b/net/quic/test_tools/quic_sent_packet_manager_peer.h index 184ac72..2ed9b21 100644 --- a/net/quic/test_tools/quic_sent_packet_manager_peer.h +++ b/net/quic/test_tools/quic_sent_packet_manager_peer.h @@ -23,6 +23,9 @@ class QuicSentPacketManagerPeer { const QuicSentPacketManager* sent_packet_manager, QuicPacketSequenceNumber sequence_number); + static QuicTime GetSentTime(const QuicSentPacketManager* sent_packet_manager, + QuicPacketSequenceNumber sequence_number); + static QuicTime::Delta rtt(QuicSentPacketManager* sent_packet_manager); // Returns true if |sequence_number| is a retransmission of a packet. diff --git a/net/quic/test_tools/quic_test_utils.h b/net/quic/test_tools/quic_test_utils.h index a6bcb5e..3bec87c 100644 --- a/net/quic/test_tools/quic_test_utils.h +++ b/net/quic/test_tools/quic_test_utils.h @@ -315,10 +315,9 @@ class MockSession : public QuicSession { const QuicPacketHeader& header, const std::vector<QuicStreamFrame>& frame)); MOCK_METHOD2(OnConnectionClosed, void(QuicErrorCode error, bool from_peer)); - MOCK_METHOD1(CreateIncomingReliableStream, - ReliableQuicStream*(QuicStreamId id)); + MOCK_METHOD1(CreateIncomingDataStream, QuicDataStream*(QuicStreamId id)); MOCK_METHOD0(GetCryptoStream, QuicCryptoStream*()); - MOCK_METHOD0(CreateOutgoingReliableStream, ReliableQuicStream*()); + MOCK_METHOD0(CreateOutgoingDataStream, QuicDataStream*()); MOCK_METHOD6(WritevData, QuicConsumedData(QuicStreamId id, const struct iovec* iov, @@ -340,9 +339,8 @@ class TestSession : public QuicSession { bool is_server); virtual ~TestSession(); - MOCK_METHOD1(CreateIncomingReliableStream, - ReliableQuicStream*(QuicStreamId id)); - MOCK_METHOD0(CreateOutgoingReliableStream, ReliableQuicStream*()); + MOCK_METHOD1(CreateIncomingDataStream, QuicDataStream*(QuicStreamId id)); + MOCK_METHOD0(CreateOutgoingDataStream, QuicDataStream*()); void SetCryptoStream(QuicCryptoStream* stream); diff --git a/net/quic/test_tools/reliable_quic_stream_peer.cc b/net/quic/test_tools/reliable_quic_stream_peer.cc index 31a64e9..5119d03 100644 --- a/net/quic/test_tools/reliable_quic_stream_peer.cc +++ b/net/quic/test_tools/reliable_quic_stream_peer.cc @@ -22,11 +22,5 @@ void ReliableQuicStreamPeer::SetStreamBytesWritten( stream->stream_bytes_written_ = stream_bytes_written; } -void ReliableQuicStreamPeer::SetHeadersDecompressed( - ReliableQuicStream* stream, - bool headers_decompressed) { - stream->headers_decompressed_ = headers_decompressed; -} - } // namespace test } // namespace net diff --git a/net/quic/test_tools/reliable_quic_stream_peer.h b/net/quic/test_tools/reliable_quic_stream_peer.h index 346a9b4..da229da 100644 --- a/net/quic/test_tools/reliable_quic_stream_peer.h +++ b/net/quic/test_tools/reliable_quic_stream_peer.h @@ -19,8 +19,6 @@ class ReliableQuicStreamPeer { static void SetWriteSideClosed(bool value, ReliableQuicStream* stream); static void SetStreamBytesWritten(QuicStreamOffset stream_bytes_written, ReliableQuicStream* stream); - static void SetHeadersDecompressed(ReliableQuicStream* stream, - bool headers_decompressed); private: DISALLOW_COPY_AND_ASSIGN(ReliableQuicStreamPeer); diff --git a/net/tools/quic/end_to_end_test.cc b/net/tools/quic/end_to_end_test.cc index e9640b1..9560630 100644 --- a/net/tools/quic/end_to_end_test.cc +++ b/net/tools/quic/end_to_end_test.cc @@ -26,6 +26,7 @@ #include "net/tools/quic/quic_in_memory_cache.h" #include "net/tools/quic/quic_server.h" #include "net/tools/quic/quic_socket_utils.h" +#include "net/tools/quic/quic_spdy_client_stream.h" #include "net/tools/quic/test_tools/http_message_test_utils.h" #include "net/tools/quic/test_tools/packet_dropping_test_writer.h" #include "net/tools/quic/test_tools/quic_client_peer.h" @@ -622,7 +623,7 @@ TEST_P(EndToEndTest, DISABLED_MultipleTermination) { // Set the offset so we won't frame. Otherwise when we pick up termination // before HTTP framing is complete, we send an error and close the stream, // and the second write is picked up as writing on a closed stream. - QuicReliableClientStream* stream = client_->GetOrCreateStream(); + QuicSpdyClientStream* stream = client_->GetOrCreateStream(); ASSERT_TRUE(stream != NULL); ReliableQuicStreamPeer::SetStreamBytesWritten(3, stream); diff --git a/net/tools/quic/quic_client.cc b/net/tools/quic/quic_client.cc index d0b7cea..dcf9612 100644 --- a/net/tools/quic/quic_client.cc +++ b/net/tools/quic/quic_client.cc @@ -18,8 +18,8 @@ #include "net/quic/quic_protocol.h" #include "net/tools/balsa/balsa_headers.h" #include "net/tools/quic/quic_epoll_connection_helper.h" -#include "net/tools/quic/quic_reliable_client_stream.h" #include "net/tools/quic/quic_socket_utils.h" +#include "net/tools/quic/quic_spdy_client_stream.h" #ifndef SO_RXQ_OVFL #define SO_RXQ_OVFL 40 @@ -188,10 +188,10 @@ void QuicClient::Disconnect() { void QuicClient::SendRequestsAndWaitForResponse( const CommandLine::StringVector& args) { - for (size_t i = 0; i < args.size(); i++) { + for (size_t i = 0; i < args.size(); ++i) { BalsaHeaders headers; headers.SetRequestFirstlineFromStringPieces("GET", args[i], "HTTP/1.1"); - QuicReliableClientStream* stream = CreateReliableClientStream(); + QuicSpdyClientStream* stream = CreateReliableClientStream(); stream->SendRequest(headers, "", true); stream->set_visitor(this); } @@ -199,12 +199,12 @@ void QuicClient::SendRequestsAndWaitForResponse( while (WaitForEvents()) { } } -QuicReliableClientStream* QuicClient::CreateReliableClientStream() { +QuicSpdyClientStream* QuicClient::CreateReliableClientStream() { if (!connected()) { return NULL; } - return session_->CreateOutgoingReliableStream(); + return session_->CreateOutgoingDataStream(); } void QuicClient::WaitForStreamToClose(QuicStreamId id) { @@ -245,13 +245,13 @@ void QuicClient::OnEvent(int fd, EpollEvent* event) { } } -void QuicClient::OnClose(ReliableQuicStream* stream) { +void QuicClient::OnClose(QuicDataStream* stream) { if (!print_response_) { return; } - QuicReliableClientStream* client_stream = - static_cast<QuicReliableClientStream*>(stream); + QuicSpdyClientStream* client_stream = + static_cast<QuicSpdyClientStream*>(stream); const BalsaHeaders& headers = client_stream->headers(); printf("%s\n", headers.first_line().as_string().c_str()); for (BalsaHeaders::const_header_lines_iterator i = diff --git a/net/tools/quic/quic_client.h b/net/tools/quic/quic_client.h index 4835f4b..02f45e6 100644 --- a/net/tools/quic/quic_client.h +++ b/net/tools/quic/quic_client.h @@ -11,7 +11,6 @@ #include <string> #include "base/command_line.h" -#include "base/containers/hash_tables.h" #include "base/memory/scoped_ptr.h" #include "net/base/ip_endpoint.h" #include "net/quic/crypto/crypto_handshake.h" @@ -20,7 +19,7 @@ #include "net/quic/quic_packet_creator.h" #include "net/tools/epoll_server/epoll_server.h" #include "net/tools/quic/quic_client_session.h" -#include "net/tools/quic/quic_reliable_client_stream.h" +#include "net/tools/quic/quic_spdy_client_stream.h" namespace net { @@ -35,7 +34,7 @@ class QuicClientPeer; } // namespace test class QuicClient : public EpollCallbackInterface, - public ReliableQuicStream::Visitor { + public QuicDataStream::Visitor { public: QuicClient(IPEndPoint server_address, const string& server_hostname, @@ -76,7 +75,7 @@ class QuicClient : public EpollCallbackInterface, // Returns a newly created CreateReliableClientStream, owned by the // QuicClient. - QuicReliableClientStream* CreateReliableClientStream(); + QuicSpdyClientStream* CreateReliableClientStream(); // Wait for events until the stream with the given ID is closed. void WaitForStreamToClose(QuicStreamId id); @@ -99,8 +98,8 @@ class QuicClient : public EpollCallbackInterface, virtual void OnUnregistration(int fd, bool replaced) OVERRIDE {} virtual void OnShutdown(EpollServer* eps, int fd) OVERRIDE {} - // ReliableQuicStream::Visitor - virtual void OnClose(ReliableQuicStream* stream) OVERRIDE; + // QuicDataStream::Visitor + virtual void OnClose(QuicDataStream* stream) OVERRIDE; QuicPacketCreator::Options* options(); @@ -154,9 +153,6 @@ class QuicClient : public EpollCallbackInterface, // Read a UDP packet and hand it to the framer. bool ReadAndProcessPacket(); - // Set of streams created (and owned) by this client - base::hash_set<QuicReliableClientStream*> streams_; - // Address of the server. const IPEndPoint server_address_; diff --git a/net/tools/quic/quic_client_session.cc b/net/tools/quic/quic_client_session.cc index f993908..02adb08 100644 --- a/net/tools/quic/quic_client_session.cc +++ b/net/tools/quic/quic_client_session.cc @@ -6,7 +6,6 @@ #include "base/logging.h" #include "net/quic/crypto/crypto_protocol.h" -#include "net/tools/quic/quic_reliable_client_stream.h" #include "net/tools/quic/quic_spdy_client_stream.h" using std::string; @@ -26,7 +25,7 @@ QuicClientSession::QuicClientSession( QuicClientSession::~QuicClientSession() { } -QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { +QuicSpdyClientStream* QuicClientSession::CreateOutgoingDataStream() { if (!crypto_stream_.encryption_established()) { DLOG(INFO) << "Encryption not active so no outgoing stream created."; return NULL; @@ -41,7 +40,7 @@ QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { << "Already received goaway."; return NULL; } - QuicReliableClientStream* stream + QuicSpdyClientStream* stream = new QuicSpdyClientStream(GetNextStreamId(), this); ActivateStream(stream); return stream; @@ -59,7 +58,7 @@ int QuicClientSession::GetNumSentClientHellos() const { return crypto_stream_.num_sent_client_hellos(); } -ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream( +QuicDataStream* QuicClientSession::CreateIncomingDataStream( QuicStreamId id) { DLOG(ERROR) << "Server push not supported"; return NULL; diff --git a/net/tools/quic/quic_client_session.h b/net/tools/quic/quic_client_session.h index a73d721..dcee15e 100644 --- a/net/tools/quic/quic_client_session.h +++ b/net/tools/quic/quic_client_session.h @@ -12,7 +12,7 @@ #include "net/quic/quic_crypto_client_stream.h" #include "net/quic/quic_protocol.h" #include "net/quic/quic_session.h" -#include "net/tools/quic/quic_reliable_client_stream.h" +#include "net/tools/quic/quic_spdy_client_stream.h" namespace net { @@ -21,8 +21,6 @@ class ReliableQuicStream; namespace tools { -class QuicReliableClientStream; - class QuicClientSession : public QuicSession { public: QuicClientSession(const std::string& server_hostname, @@ -32,7 +30,7 @@ class QuicClientSession : public QuicSession { virtual ~QuicClientSession(); // QuicSession methods: - virtual QuicReliableClientStream* CreateOutgoingReliableStream() OVERRIDE; + virtual QuicSpdyClientStream* CreateOutgoingDataStream() OVERRIDE; virtual QuicCryptoClientStream* GetCryptoStream() OVERRIDE; // Performs a crypto handshake with the server. Returns true if the crypto @@ -46,8 +44,7 @@ class QuicClientSession : public QuicSession { protected: // QuicSession methods: - virtual ReliableQuicStream* CreateIncomingReliableStream( - QuicStreamId id) OVERRIDE; + virtual QuicDataStream* CreateIncomingDataStream(QuicStreamId id) OVERRIDE; private: QuicCryptoClientStream crypto_stream_; diff --git a/net/tools/quic/quic_client_session_test.cc b/net/tools/quic/quic_client_session_test.cc index 50201c9..6de227e 100644 --- a/net/tools/quic/quic_client_session_test.cc +++ b/net/tools/quic/quic_client_session_test.cc @@ -10,7 +10,7 @@ #include "net/quic/crypto/aes_128_gcm_12_encrypter.h" #include "net/quic/test_tools/crypto_test_utils.h" #include "net/quic/test_tools/quic_test_utils.h" -#include "net/tools/quic/quic_reliable_client_stream.h" +#include "net/tools/quic/quic_spdy_client_stream.h" #include "testing/gtest/include/gtest/gtest.h" using net::test::CryptoTestUtils; @@ -58,14 +58,14 @@ TEST_F(ToolsQuicClientSessionTest, MaxNumStreams) { // Initialize crypto before the client session will create a stream. CompleteCryptoHandshake(); - QuicReliableClientStream* stream = - session_->CreateOutgoingReliableStream(); + QuicSpdyClientStream* stream = + session_->CreateOutgoingDataStream(); ASSERT_TRUE(stream); - EXPECT_FALSE(session_->CreateOutgoingReliableStream()); + EXPECT_FALSE(session_->CreateOutgoingDataStream()); // Close a stream and ensure I can now open a new one. session_->CloseStream(stream->id()); - stream = session_->CreateOutgoingReliableStream(); + stream = session_->CreateOutgoingDataStream(); EXPECT_TRUE(stream); } @@ -75,7 +75,7 @@ TEST_F(ToolsQuicClientSessionTest, GoAwayReceived) { // After receiving a GoAway, I should no longer be able to create outgoing // streams. session_->OnGoAway(QuicGoAwayFrame(QUIC_PEER_GOING_AWAY, 1u, "Going away.")); - EXPECT_EQ(NULL, session_->CreateOutgoingReliableStream()); + EXPECT_EQ(NULL, session_->CreateOutgoingDataStream()); } } // namespace diff --git a/net/tools/quic/quic_reliable_client_stream.cc b/net/tools/quic/quic_reliable_client_stream.cc deleted file mode 100644 index 359fec4..0000000 --- a/net/tools/quic/quic_reliable_client_stream.cc +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/tools/quic/quic_reliable_client_stream.h" - -using std::string; - -namespace net { -namespace tools { - -// Sends body data to the server and returns the number of bytes sent. -ssize_t QuicReliableClientStream::SendBody(const string& data, bool fin) { - return WriteData(data, fin).bytes_consumed; -} - -bool QuicReliableClientStream::OnStreamFrame(const QuicStreamFrame& frame) { - if (!write_side_closed()) { - DLOG(INFO) << "Got a response before the request was complete. " - << "Aborting request."; - CloseWriteSide(); - } - return ReliableQuicStream::OnStreamFrame(frame); -} - -} // namespace tools -} // namespace net diff --git a/net/tools/quic/quic_reliable_client_stream.h b/net/tools/quic/quic_reliable_client_stream.h deleted file mode 100644 index 3944b7fb..0000000 --- a/net/tools/quic/quic_reliable_client_stream.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_TOOLS_QUIC_QUIC_RELIABLE_CLIENT_STREAM_H_ -#define NET_TOOLS_QUIC_QUIC_RELIABLE_CLIENT_STREAM_H_ - -#include <sys/types.h> -#include <string> - -#include "base/strings/string_piece.h" -#include "net/quic/quic_protocol.h" -#include "net/quic/reliable_quic_stream.h" -#include "net/tools/balsa/balsa_frame.h" -#include "net/tools/balsa/balsa_headers.h" - -namespace net { - -class QuicSession; - -namespace tools { - -class QuicClientSession; - -// A base class for spdy/http client streams which handles the concept -// of sending and receiving headers and bodies. -class QuicReliableClientStream : public ReliableQuicStream { - public: - QuicReliableClientStream(QuicStreamId id, QuicSession* session) - : ReliableQuicStream(id, session) { - } - - // Serializes the headers and body, sends it to the server, and - // returns the number of bytes sent. - virtual ssize_t SendRequest(const BalsaHeaders& headers, - base::StringPiece body, - bool fin) = 0; - // Sends body data to the server and returns the number of bytes sent. - virtual ssize_t SendBody(const std::string& data, bool fin); - - // Override the base class to close the Write side as soon as we get a - // response. - // SPDY/HTTP do not support bidirectional streaming. - virtual bool OnStreamFrame(const QuicStreamFrame& frame) OVERRIDE; - - // Returns the response data. - const std::string& data() { return data_; } - - // Returns whatever headers have been received for this stream. - const BalsaHeaders& headers() { return headers_; } - - protected: - std::string* mutable_data() { return &data_; } - BalsaHeaders* mutable_headers() { return &headers_; } - - private: - BalsaHeaders headers_; - std::string data_; - - DISALLOW_COPY_AND_ASSIGN(QuicReliableClientStream); -}; - -} // namespace tools -} // namespace net - -#endif // NET_TOOLS_QUIC_QUIC_RELIABLE_CLIENT_STREAM_H_ diff --git a/net/tools/quic/quic_reliable_server_stream.cc b/net/tools/quic/quic_reliable_server_stream.cc deleted file mode 100644 index 58b884a..0000000 --- a/net/tools/quic/quic_reliable_server_stream.cc +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/tools/quic/quic_reliable_server_stream.h" - -#include "base/logging.h" -#include "base/memory/singleton.h" -#include "net/tools/quic/quic_in_memory_cache.h" - -using base::StringPiece; - -namespace net { -namespace tools { - -QuicReliableServerStream::QuicReliableServerStream(QuicStreamId id, - QuicSession* session) - : ReliableQuicStream(id, session) { -} - - -void QuicReliableServerStream::SendResponse() { - // Find response in cache. If not found, send error response. - const QuicInMemoryCache::Response* response = - QuicInMemoryCache::GetInstance()->GetResponse(headers_); - if (response == NULL) { - SendErrorResponse(); - return; - } - - DLOG(INFO) << "Sending response for stream " << id(); - SendHeaders(response->headers()); - WriteData(response->body(), true); -} - -void QuicReliableServerStream::SendErrorResponse() { - DLOG(INFO) << "Sending error response for stream " << id(); - BalsaHeaders headers; - headers.SetResponseFirstlineFromStringPieces( - "HTTP/1.1", "500", "Server Error"); - headers.ReplaceOrAppendHeader("content-length", "3"); - SendHeaders(headers); - WriteData("bad", true); -} - -QuicConsumedData QuicReliableServerStream::WriteData(StringPiece data, - bool fin) { - // We only support SPDY and HTTP, and neither handles bidirectional streaming. - if (!read_side_closed()) { - CloseReadSide(); - } - return ReliableQuicStream::WriteData(data, fin); -} - -} // namespace tools -} // namespace net diff --git a/net/tools/quic/quic_reliable_server_stream.h b/net/tools/quic/quic_reliable_server_stream.h deleted file mode 100644 index d0a389b..0000000 --- a/net/tools/quic/quic_reliable_server_stream.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_TOOLS_QUIC_QUIC_RELIABLE_SERVER_STREAM_H_ -#define NET_TOOLS_QUIC_QUIC_RELIABLE_SERVER_STREAM_H_ - -#include <string> - -#include "net/quic/quic_protocol.h" -#include "net/quic/reliable_quic_stream.h" -#include "net/tools/balsa/balsa_headers.h" - -namespace net { - -class QuicSession; - -namespace tools { - -namespace test { -class QuicReliableServerStreamPeer; -} // namespace test - -// A base class for spdy/http server streams which handles the concept -// of sending and receiving headers and bodies. -class QuicReliableServerStream : public ReliableQuicStream { - public: - QuicReliableServerStream(QuicStreamId id, QuicSession* session); - virtual ~QuicReliableServerStream() {} - - // Subclasses should process and frame data when this is called, returning - // how many bytes are processed. - virtual uint32 ProcessData(const char* data, uint32 data_len) = 0; - // Subclasses should implement this to serialize headers in a - // protocol-specific manner, and send it out to the client. - virtual void SendHeaders(const BalsaHeaders& response_headers) = 0; - - // Sends a basic 200 response using SendHeaders for the headers and WriteData - // for the body. - void SendResponse(); - // Sends a basic 500 response using SendHeaders for the headers and WriteData - // for the body - void SendErrorResponse(); - // Make sure that as soon as we start writing data, we stop reading. - virtual QuicConsumedData WriteData(base::StringPiece data, bool fin) OVERRIDE; - - // Returns whatever headers have been received for this stream. - const BalsaHeaders& headers() { return headers_; } - - const string& body() { return body_; } - - protected: - BalsaHeaders* mutable_headers() { return &headers_; } - string* mutable_body() { return &body_; } - - private: - friend class test::QuicReliableServerStreamPeer; - - BalsaHeaders headers_; - string body_; -}; - -} // namespace tools -} // namespace net - -#endif // NET_TOOLS_QUIC_QUIC_RELIABLE_SERVER_STREAM_H_ diff --git a/net/tools/quic/quic_reliable_server_stream_test.cc b/net/tools/quic/quic_reliable_server_stream_test.cc deleted file mode 100644 index 2182192..0000000 --- a/net/tools/quic/quic_reliable_server_stream_test.cc +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/tools/quic/quic_reliable_server_stream.h" - -#include "base/strings/string_number_conversions.h" -#include "net/quic/quic_spdy_compressor.h" -#include "net/quic/quic_utils.h" -#include "net/quic/test_tools/quic_test_utils.h" -#include "net/tools/epoll_server/epoll_server.h" -#include "net/tools/quic/quic_in_memory_cache.h" -#include "net/tools/quic/quic_spdy_server_stream.h" -#include "net/tools/quic/spdy_utils.h" -#include "net/tools/quic/test_tools/quic_in_memory_cache_peer.h" -#include "net/tools/quic/test_tools/quic_test_utils.h" -#include "testing/gmock/include/gmock/gmock.h" -#include "testing/gtest/include/gtest/gtest.h" - -using base::StringPiece; -using net::tools::test::MockConnection; -using net::test::MockSession; -using std::string; -using testing::_; -using testing::AnyNumber; -using testing::Invoke; -using testing::InvokeArgument; -using testing::InSequence; -using testing::Return; -using testing::StrEq; -using testing::StrictMock; -using testing::WithArgs; - -namespace net { -namespace tools { -namespace test { - -class QuicReliableServerStreamPeer { - public: - static BalsaHeaders* GetMutableHeaders( - QuicReliableServerStream* stream) { - return &(stream->headers_); - } -}; - -namespace { - -class QuicReliableServerStreamTest : public ::testing::Test { - public: - QuicReliableServerStreamTest() - : session_(new MockConnection(1, IPEndPoint(), 0, &eps_, true), true), - body_("hello world") { - BalsaHeaders request_headers; - request_headers.SetRequestFirstlineFromStringPieces( - "POST", "https://www.google.com/", "HTTP/1.1"); - request_headers.ReplaceOrAppendHeader("content-length", "11"); - - headers_string_ = SpdyUtils::SerializeRequestHeaders(request_headers); - stream_.reset(new QuicSpdyServerStream(3, &session_)); - } - - QuicConsumedData ValidateHeaders(const struct iovec* iov) { - StringPiece headers = - StringPiece(static_cast<const char*>(iov[0].iov_base), iov[0].iov_len); - headers_string_ = SpdyUtils::SerializeResponseHeaders( - response_headers_); - QuicSpdyDecompressor decompressor; - TestDecompressorVisitor visitor; - - // First the header id, then the compressed data. - EXPECT_EQ(1, headers[0]); - EXPECT_EQ(0, headers[1]); - EXPECT_EQ(0, headers[2]); - EXPECT_EQ(0, headers[3]); - EXPECT_EQ(static_cast<size_t>(headers.length() - 4), - decompressor.DecompressData(headers.substr(4), &visitor)); - - EXPECT_EQ(headers_string_, visitor.data()); - - return QuicConsumedData(headers.size(), false); - } - - static void SetUpTestCase() { - QuicInMemoryCachePeer::ResetForTests(); - } - - virtual void SetUp() { - QuicInMemoryCache* cache = QuicInMemoryCache::GetInstance(); - - BalsaHeaders request_headers, response_headers; - StringPiece body("Yum"); - request_headers.SetRequestFirstlineFromStringPieces( - "GET", - "https://www.google.com/foo", - "HTTP/1.1"); - response_headers.SetRequestFirstlineFromStringPieces("HTTP/1.1", - "200", - "OK"); - response_headers.AppendHeader("content-length", - base::IntToString(body.length())); - - // Check if response already exists and matches. - const QuicInMemoryCache::Response* cached_response = - cache->GetResponse(request_headers); - if (cached_response != NULL) { - string cached_response_headers_str, response_headers_str; - cached_response->headers().DumpToString(&cached_response_headers_str); - response_headers.DumpToString(&response_headers_str); - CHECK_EQ(cached_response_headers_str, response_headers_str); - CHECK_EQ(cached_response->body(), body); - return; - } - - cache->AddResponse(request_headers, response_headers, body); - } - - BalsaHeaders response_headers_; - EpollServer eps_; - StrictMock<MockSession> session_; - scoped_ptr<QuicReliableServerStream> stream_; - string headers_string_; - string body_; -}; - -QuicConsumedData ConsumeAllData( - QuicStreamId id, - const struct iovec* iov, - int iov_count, - QuicStreamOffset offset, - bool fin, - QuicAckNotifier::DelegateInterface* /*ack_notifier_delegate*/) { - ssize_t consumed_length = 0; - for (int i = 0; i < iov_count; ++i) { - consumed_length += iov[i].iov_len; - } - return QuicConsumedData(consumed_length, fin); -} - -TEST_F(QuicReliableServerStreamTest, TestFraming) { - EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). - WillRepeatedly(Invoke(ConsumeAllData)); - - EXPECT_EQ(headers_string_.size(), stream_->ProcessData( - headers_string_.c_str(), headers_string_.size())); - EXPECT_EQ(body_.size(), stream_->ProcessData(body_.c_str(), body_.size())); - EXPECT_EQ(11u, stream_->headers().content_length()); - EXPECT_EQ("https://www.google.com/", stream_->headers().request_uri()); - EXPECT_EQ("POST", stream_->headers().request_method()); - EXPECT_EQ(body_, stream_->body()); -} - -TEST_F(QuicReliableServerStreamTest, TestFramingOnePacket) { - EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). - WillRepeatedly(Invoke(ConsumeAllData)); - - string message = headers_string_ + body_; - - EXPECT_EQ(message.size(), stream_->ProcessData( - message.c_str(), message.size())); - EXPECT_EQ(11u, stream_->headers().content_length()); - EXPECT_EQ("https://www.google.com/", - stream_->headers().request_uri()); - EXPECT_EQ("POST", stream_->headers().request_method()); - EXPECT_EQ(body_, stream_->body()); -} - -TEST_F(QuicReliableServerStreamTest, TestFramingExtraData) { - string large_body = "hello world!!!!!!"; - - // We'll automatically write out an error (headers + body) - EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). - WillRepeatedly(Invoke(ConsumeAllData)); - - EXPECT_EQ(headers_string_.size(), stream_->ProcessData( - headers_string_.c_str(), headers_string_.size())); - // Content length is still 11. This will register as an error and we won't - // accept the bytes. - stream_->ProcessData(large_body.c_str(), large_body.size()); - EXPECT_EQ(11u, stream_->headers().content_length()); - EXPECT_EQ("https://www.google.com/", stream_->headers().request_uri()); - EXPECT_EQ("POST", stream_->headers().request_method()); -} - -TEST_F(QuicReliableServerStreamTest, TestSendResponse) { - BalsaHeaders* request_headers = - QuicReliableServerStreamPeer::GetMutableHeaders(stream_.get()); - request_headers->SetRequestFirstlineFromStringPieces( - "GET", - "https://www.google.com/foo", - "HTTP/1.1"); - - response_headers_.SetResponseFirstlineFromStringPieces( - "HTTP/1.1", "200", "OK"); - response_headers_.ReplaceOrAppendHeader("content-length", "3"); - - InSequence s; - EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1) - .WillOnce(WithArgs<1>(Invoke( - this, &QuicReliableServerStreamTest::ValidateHeaders))); - - EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1). - WillOnce(Return(QuicConsumedData(3, true))); - - stream_->SendResponse(); - EXPECT_TRUE(stream_->read_side_closed()); - EXPECT_TRUE(stream_->write_side_closed()); -} - -TEST_F(QuicReliableServerStreamTest, TestSendErrorResponse) { - response_headers_.SetResponseFirstlineFromStringPieces( - "HTTP/1.1", "500", "Server Error"); - response_headers_.ReplaceOrAppendHeader("content-length", "3"); - - InSequence s; - EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1) - .WillOnce(WithArgs<1>(Invoke( - this, &QuicReliableServerStreamTest::ValidateHeaders))); - - EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1). - WillOnce(Return(QuicConsumedData(3, true))); - - stream_->SendErrorResponse(); - EXPECT_TRUE(stream_->read_side_closed()); - EXPECT_TRUE(stream_->write_side_closed()); -} - -} // namespace -} // namespace test -} // namespace tools -} // namespace net diff --git a/net/tools/quic/quic_server_session.cc b/net/tools/quic/quic_server_session.cc index bea957c..b408976 100644 --- a/net/tools/quic/quic_server_session.cc +++ b/net/tools/quic/quic_server_session.cc @@ -5,6 +5,7 @@ #include "net/tools/quic/quic_server_session.h" #include "base/logging.h" +#include "net/quic/quic_connection.h" #include "net/quic/reliable_quic_stream.h" #include "net/tools/quic/quic_spdy_server_stream.h" @@ -38,7 +39,7 @@ void QuicServerSession::OnConnectionClosed(QuicErrorCode error, owner_->OnConnectionClosed(connection()->guid(), error); } -bool QuicServerSession::ShouldCreateIncomingReliableStream(QuicStreamId id) { +bool QuicServerSession::ShouldCreateIncomingDataStream(QuicStreamId id) { if (id % 2 == 0) { DLOG(INFO) << "Invalid incoming even stream_id:" << id; connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); @@ -53,16 +54,16 @@ bool QuicServerSession::ShouldCreateIncomingReliableStream(QuicStreamId id) { return true; } -ReliableQuicStream* QuicServerSession::CreateIncomingReliableStream( +QuicDataStream* QuicServerSession::CreateIncomingDataStream( QuicStreamId id) { - if (!ShouldCreateIncomingReliableStream(id)) { + if (!ShouldCreateIncomingDataStream(id)) { return NULL; } return new QuicSpdyServerStream(id, this); } -ReliableQuicStream* QuicServerSession::CreateOutgoingReliableStream() { +QuicDataStream* QuicServerSession::CreateOutgoingDataStream() { DLOG(ERROR) << "Server push not yet supported"; return NULL; } diff --git a/net/tools/quic/quic_server_session.h b/net/tools/quic/quic_server_session.h index 1a5bfa3..f470551 100644 --- a/net/tools/quic/quic_server_session.h +++ b/net/tools/quic/quic_server_session.h @@ -56,15 +56,14 @@ class QuicServerSession : public QuicSession { protected: // QuicSession methods: - virtual ReliableQuicStream* CreateIncomingReliableStream( - QuicStreamId id) OVERRIDE; - virtual ReliableQuicStream* CreateOutgoingReliableStream() OVERRIDE; + virtual QuicDataStream* CreateIncomingDataStream(QuicStreamId id) OVERRIDE; + virtual QuicDataStream* CreateOutgoingDataStream() OVERRIDE; virtual QuicCryptoServerStream* GetCryptoStream() OVERRIDE; // If we should create an incoming stream, returns true. Otherwise // does error handling, including communicating the error to the client and // possibly closing the connection, and returns false. - virtual bool ShouldCreateIncomingReliableStream(QuicStreamId id); + virtual bool ShouldCreateIncomingDataStream(QuicStreamId id); virtual QuicCryptoServerStream* CreateQuicCryptoServerStream( const QuicCryptoServerConfig& crypto_config); diff --git a/net/tools/quic/quic_server_session_test.cc b/net/tools/quic/quic_server_session_test.cc index 4357da2..8d3c8df9 100644 --- a/net/tools/quic/quic_server_session_test.cc +++ b/net/tools/quic/quic_server_session_test.cc @@ -4,13 +4,12 @@ #include "net/tools/quic/quic_server_session.h" - #include "net/quic/crypto/quic_crypto_server_config.h" #include "net/quic/crypto/quic_random.h" #include "net/quic/quic_connection.h" #include "net/quic/test_tools/quic_connection_peer.h" +#include "net/quic/test_tools/quic_data_stream_peer.h" #include "net/quic/test_tools/quic_test_utils.h" -#include "net/quic/test_tools/reliable_quic_stream_peer.h" #include "net/tools/epoll_server/epoll_server.h" #include "net/tools/quic/quic_spdy_server_stream.h" #include "net/tools/quic/test_tools/quic_test_utils.h" @@ -20,7 +19,7 @@ using __gnu_cxx::vector; using net::test::MockConnection; using net::test::QuicConnectionPeer; -using net::test::ReliableQuicStreamPeer; +using net::test::QuicDataStreamPeer; using testing::_; using testing::StrictMock; @@ -30,19 +29,19 @@ namespace test { class QuicServerSessionPeer { public: - static ReliableQuicStream* GetIncomingReliableStream( + static QuicDataStream* GetIncomingReliableStream( QuicServerSession* s, QuicStreamId id) { return s->GetIncomingReliableStream(id); } - static ReliableQuicStream* GetStream(QuicServerSession* s, QuicStreamId id) { - return s->GetStream(id); + static QuicDataStream* GetDataStream(QuicServerSession* s, QuicStreamId id) { + return s->GetDataStream(id); } }; -class CloseOnDataStream : public ReliableQuicStream { +class CloseOnDataStream : public QuicDataStream { public: CloseOnDataStream(QuicStreamId id, QuicSession* session) - : ReliableQuicStream(id, session) { + : QuicDataStream(id, session) { } virtual bool OnStreamFrame(const QuicStreamFrame& frame) OVERRIDE { @@ -65,9 +64,9 @@ class TestQuicQuicServerSession : public QuicServerSession { close_stream_on_data_(false) { } - virtual ReliableQuicStream* CreateIncomingReliableStream( + virtual QuicDataStream* CreateIncomingDataStream( QuicStreamId id) OVERRIDE { - if (!ShouldCreateIncomingReliableStream(id)) { + if (!ShouldCreateIncomingDataStream(id)) { return NULL; } if (close_stream_on_data_) { @@ -104,10 +103,10 @@ class QuicServerSessionTest : public ::testing::Test { } void MarkHeadersReadForStream(QuicStreamId id) { - ReliableQuicStream* stream = QuicServerSessionPeer::GetStream( + QuicDataStream* stream = QuicServerSessionPeer::GetDataStream( session_.get(), id); ASSERT_TRUE(stream != NULL); - ReliableQuicStreamPeer::SetHeadersDecompressed(stream, true); + QuicDataStreamPeer::SetHeadersDecompressed(stream, true); } QuicGuid guid_; diff --git a/net/tools/quic/quic_spdy_client_stream.cc b/net/tools/quic/quic_spdy_client_stream.cc index 761c829..1956c79 100644 --- a/net/tools/quic/quic_spdy_client_stream.cc +++ b/net/tools/quic/quic_spdy_client_stream.cc @@ -18,7 +18,7 @@ static const size_t kHeaderBufInitialSize = 4096; QuicSpdyClientStream::QuicSpdyClientStream(QuicStreamId id, QuicClientSession* session) - : QuicReliableClientStream(id, session), + : QuicDataStream(id, session), read_buf_(new GrowableIOBuffer()), response_headers_received_(false) { } @@ -26,6 +26,15 @@ QuicSpdyClientStream::QuicSpdyClientStream(QuicStreamId id, QuicSpdyClientStream::~QuicSpdyClientStream() { } +bool QuicSpdyClientStream::OnStreamFrame(const QuicStreamFrame& frame) { + if (!write_side_closed()) { + DLOG(INFO) << "Got a response before the request was complete. " + << "Aborting request."; + CloseWriteSide(); + } + return QuicDataStream::OnStreamFrame(frame); +} + uint32 QuicSpdyClientStream::ProcessData(const char* data, uint32 length) { uint32 total_bytes_processed = 0; @@ -39,8 +48,7 @@ uint32 QuicSpdyClientStream::ProcessData(const char* data, uint32 length) { read_buf_->set_offset(read_buf_->offset() + length); ParseResponseHeaders(); } else { - mutable_data()->append(data + total_bytes_processed, - length - total_bytes_processed); + data_.append(data + total_bytes_processed, length - total_bytes_processed); } return length; } @@ -51,7 +59,7 @@ void QuicSpdyClientStream::OnFinRead() { Reset(QUIC_BAD_APPLICATION_PAYLOAD); } else if ((headers().content_length_status() == BalsaHeadersEnums::VALID_CONTENT_LENGTH) && - mutable_data()->size() != headers().content_length()) { + data_.size() != headers().content_length()) { Reset(QUIC_BAD_APPLICATION_PAYLOAD); } } @@ -62,15 +70,13 @@ ssize_t QuicSpdyClientStream::SendRequest(const BalsaHeaders& headers, SpdyHeaderBlock header_block = SpdyUtils::RequestHeadersToSpdyHeaders(headers); + bool send_fin_with_headers = fin && body.empty(); string headers_string = session()->compressor()->CompressHeadersWithPriority( priority(), header_block); + WriteOrBufferData(headers_string, send_fin_with_headers); - bool has_body = !body.empty(); - - WriteData(headers_string, fin && !has_body); // last_data - - if (has_body) { - WriteData(body, fin); + if (!body.empty()) { + WriteOrBufferData(body, fin); } return headers_string.size() + body.size(); @@ -87,7 +93,7 @@ int QuicSpdyClientStream::ParseResponseHeaders() { return -1; } - if (!SpdyUtils::FillBalsaResponseHeaders(headers, mutable_headers())) { + if (!SpdyUtils::FillBalsaResponseHeaders(headers, &headers_)) { Reset(QUIC_BAD_APPLICATION_PAYLOAD); return -1; } @@ -95,11 +101,16 @@ int QuicSpdyClientStream::ParseResponseHeaders() { size_t delta = read_buf_len - len; if (delta > 0) { - mutable_data()->append(data + len, delta); + data_.append(data + len, delta); } return len; } +// Sends body data to the server and returns the number of bytes sent. +void QuicSpdyClientStream::SendBody(const string& data, bool fin) { + return WriteOrBufferData(data, fin); +} + } // namespace tools } // namespace net diff --git a/net/tools/quic/quic_spdy_client_stream.h b/net/tools/quic/quic_spdy_client_stream.h index 7c3e53e..2b17205 100644 --- a/net/tools/quic/quic_spdy_client_stream.h +++ b/net/tools/quic/quic_spdy_client_stream.h @@ -5,42 +5,65 @@ #ifndef NET_TOOLS_QUIC_QUIC_SPDY_CLIENT_STREAM_H_ #define NET_TOOLS_QUIC_QUIC_SPDY_CLIENT_STREAM_H_ +#include <sys/types.h> +#include <string> + #include "base/strings/string_piece.h" #include "net/base/io_buffer.h" -#include "net/tools/quic/quic_reliable_client_stream.h" +#include "net/quic/quic_data_stream.h" +#include "net/quic/quic_protocol.h" +#include "net/tools/balsa/balsa_frame.h" +#include "net/tools/balsa/balsa_headers.h" namespace net { -class BalsaHeaders; - namespace tools { class QuicClientSession; // All this does right now is send an SPDY request, and aggregate the // SPDY response. -class QuicSpdyClientStream : public QuicReliableClientStream { +class QuicSpdyClientStream : public QuicDataStream { public: QuicSpdyClientStream(QuicStreamId id, QuicClientSession* session); virtual ~QuicSpdyClientStream(); + // Override the base class to close the write side as soon as we get a + // response. + // SPDY/HTTP does not support bidirectional streaming. + virtual bool OnStreamFrame(const QuicStreamFrame& frame) OVERRIDE; + // ReliableQuicStream implementation called by the session when there's // data for us. virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE; virtual void OnFinRead() OVERRIDE; - virtual ssize_t SendRequest(const BalsaHeaders& headers, - base::StringPiece body, - bool fin) OVERRIDE; + // Serializes the headers and body, sends it to the server, and + // returns the number of bytes sent. + ssize_t SendRequest(const BalsaHeaders& headers, + base::StringPiece body, + bool fin); + + // Sends body data to the server, or buffers if it can't be sent immediately. + void SendBody(const std::string& data, bool fin); + + // Returns the response data. + const std::string& data() { return data_; } + + // Returns whatever headers have been received for this stream. + const BalsaHeaders& headers() { return headers_; } // While the server's set_priority shouldn't be called externally, the creator // of client-side streams should be able to set the priority. - using QuicReliableClientStream::set_priority; + using QuicDataStream::set_priority; private: int ParseResponseHeaders(); + BalsaHeaders headers_; + std::string data_; + scoped_refptr<GrowableIOBuffer> read_buf_; bool response_headers_received_; }; diff --git a/net/tools/quic/quic_reliable_client_stream_test.cc b/net/tools/quic/quic_spdy_client_stream_test.cc index 8233ad9..e7a63a6 100644 --- a/net/tools/quic/quic_reliable_client_stream_test.cc +++ b/net/tools/quic/quic_spdy_client_stream_test.cc @@ -1,8 +1,8 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "net/tools/quic/quic_reliable_client_stream.h" +#include "net/tools/quic/quic_spdy_client_stream.h" #include "base/strings/string_number_conversions.h" #include "net/quic/quic_utils.h" @@ -23,9 +23,9 @@ namespace tools { namespace test { namespace { -class QuicClientStreamTest : public ::testing::Test { +class QuicSpdyClientStreamTest : public ::testing::Test { public: - QuicClientStreamTest() + QuicSpdyClientStreamTest() : session_("example.com", DefaultQuicConfig(), new MockConnection(1, IPEndPoint(), 0, &eps_, false), &crypto_config_), @@ -42,14 +42,14 @@ class QuicClientStreamTest : public ::testing::Test { EpollServer eps_; QuicClientSession session_; - scoped_ptr<QuicReliableClientStream> stream_; + scoped_ptr<QuicSpdyClientStream> stream_; BalsaHeaders headers_; string headers_string_; string body_; QuicCryptoClientConfig crypto_config_; }; -TEST_F(QuicClientStreamTest, TestFraming) { +TEST_F(QuicSpdyClientStreamTest, TestFraming) { EXPECT_EQ(headers_string_.size(), stream_->ProcessData( headers_string_.c_str(), headers_string_.size())); EXPECT_EQ(body_.size(), @@ -58,7 +58,7 @@ TEST_F(QuicClientStreamTest, TestFraming) { EXPECT_EQ(body_, stream_->data()); } -TEST_F(QuicClientStreamTest, TestFramingOnePacket) { +TEST_F(QuicSpdyClientStreamTest, TestFramingOnePacket) { string message = headers_string_ + body_; EXPECT_EQ(message.size(), stream_->ProcessData( @@ -67,7 +67,7 @@ TEST_F(QuicClientStreamTest, TestFramingOnePacket) { EXPECT_EQ(body_, stream_->data()); } -TEST_F(QuicClientStreamTest, DISABLED_TestFramingExtraData) { +TEST_F(QuicSpdyClientStreamTest, DISABLED_TestFramingExtraData) { string large_body = "hello world!!!!!!"; EXPECT_EQ(headers_string_.size(), stream_->ProcessData( @@ -77,12 +77,11 @@ TEST_F(QuicClientStreamTest, DISABLED_TestFramingExtraData) { EXPECT_EQ(200u, stream_->headers().parsed_response_code()); stream_->ProcessData(large_body.c_str(), large_body.size()); - stream_->OnFinRead(); EXPECT_NE(QUIC_STREAM_NO_ERROR, stream_->stream_error()); } -TEST_F(QuicClientStreamTest, TestNoBidirectionalStreaming) { +TEST_F(QuicSpdyClientStreamTest, TestNoBidirectionalStreaming) { QuicStreamFrame frame(3, false, 3, MakeIOVector("asd")); EXPECT_FALSE(stream_->write_side_closed()); diff --git a/net/tools/quic/quic_spdy_server_stream.cc b/net/tools/quic/quic_spdy_server_stream.cc index e325fe3..c1a9cf1 100644 --- a/net/tools/quic/quic_spdy_server_stream.cc +++ b/net/tools/quic/quic_spdy_server_stream.cc @@ -4,10 +4,13 @@ #include "net/tools/quic/quic_spdy_server_stream.h" +#include "base/memory/singleton.h" #include "net/quic/quic_session.h" #include "net/spdy/spdy_framer.h" +#include "net/tools/quic/quic_in_memory_cache.h" #include "net/tools/quic/spdy_utils.h" +using base::StringPiece; using std::string; namespace net { @@ -17,7 +20,7 @@ static const size_t kHeaderBufInitialSize = 4096; QuicSpdyServerStream::QuicSpdyServerStream(QuicStreamId id, QuicSession* session) - : QuicReliableServerStream(id, session), + : QuicDataStream(id, session), read_buf_(new GrowableIOBuffer()), request_headers_received_(false) { } @@ -38,8 +41,7 @@ uint32 QuicSpdyServerStream::ProcessData(const char* data, uint32 length) { read_buf_->set_offset(read_buf_->offset() + length); ParseRequestHeaders(); } else { - mutable_body()->append(data + total_bytes_processed, - length - total_bytes_processed); + body_.append(data + total_bytes_processed, length - total_bytes_processed); } return length; } @@ -52,26 +54,15 @@ void QuicSpdyServerStream::OnFinRead() { if (!request_headers_received_) { SendErrorResponse(); // We're not done reading headers. - } else if ((headers().content_length_status() == + } else if ((headers_.content_length_status() == BalsaHeadersEnums::VALID_CONTENT_LENGTH) && - mutable_body()->size() != headers().content_length()) { + body_.size() != headers_.content_length()) { SendErrorResponse(); // Invalid content length } else { SendResponse(); } } -void QuicSpdyServerStream::SendHeaders( - const BalsaHeaders& response_headers) { - SpdyHeaderBlock header_block = - SpdyUtils::ResponseHeadersToSpdyHeaders(response_headers); - - string headers_string; - headers_string = session()->compressor()->CompressHeaders(header_block); - - WriteData(headers_string, false); -} - int QuicSpdyServerStream::ParseRequestHeaders() { size_t read_buf_len = static_cast<size_t>(read_buf_->offset()); SpdyFramer framer(SPDY3); @@ -83,19 +74,61 @@ int QuicSpdyServerStream::ParseRequestHeaders() { return -1; } - if (!SpdyUtils::FillBalsaRequestHeaders(headers, mutable_headers())) { + if (!SpdyUtils::FillBalsaRequestHeaders(headers, &headers_)) { SendErrorResponse(); return -1; } size_t delta = read_buf_len - len; if (delta > 0) { - mutable_body()->append(data + len, delta); + body_.append(data + len, delta); } request_headers_received_ = true; return len; } +void QuicSpdyServerStream::SendResponse() { + // Find response in cache. If not found, send error response. + const QuicInMemoryCache::Response* response = + QuicInMemoryCache::GetInstance()->GetResponse(headers_); + if (response == NULL) { + SendErrorResponse(); + return; + } + + DLOG(INFO) << "Sending response for stream " << id(); + SendHeadersAndBody(response->headers(), response->body()); +} + +void QuicSpdyServerStream::SendErrorResponse() { + DLOG(INFO) << "Sending error response for stream " << id(); + BalsaHeaders headers; + headers.SetResponseFirstlineFromStringPieces( + "HTTP/1.1", "500", "Server Error"); + headers.ReplaceOrAppendHeader("content-length", "3"); + SendHeadersAndBody(headers, "bad"); +} + +void QuicSpdyServerStream:: SendHeadersAndBody( + const BalsaHeaders& response_headers, + StringPiece body) { + // We only support SPDY and HTTP, and neither handles bidirectional streaming. + if (!read_side_closed()) { + CloseReadSide(); + } + + SpdyHeaderBlock header_block = + SpdyUtils::ResponseHeadersToSpdyHeaders(response_headers); + + string headers_string = + session()->compressor()->CompressHeaders(header_block); + WriteOrBufferData(headers_string, body.empty()); + + if (!body.empty()) { + WriteOrBufferData(body, true); + } +} + } // namespace tools } // namespace net diff --git a/net/tools/quic/quic_spdy_server_stream.h b/net/tools/quic/quic_spdy_server_stream.h index b5f100b..574ef76 100644 --- a/net/tools/quic/quic_spdy_server_stream.h +++ b/net/tools/quic/quic_spdy_server_stream.h @@ -8,7 +8,9 @@ #include <string> #include "net/base/io_buffer.h" -#include "net/tools/quic/quic_reliable_server_stream.h" +#include "net/quic/quic_data_stream.h" +#include "net/quic/quic_protocol.h" +#include "net/tools/balsa/balsa_headers.h" namespace net { @@ -16,9 +18,13 @@ class QuicSession; namespace tools { -// All this does right now is aggregate data, and on fin, send a cached +namespace test { +class QuicSpdyServerStreamPeer; +} // namespace test + +// All this does right now is aggregate data, and on fin, send an HTTP // response. -class QuicSpdyServerStream : public QuicReliableServerStream { +class QuicSpdyServerStream : public QuicDataStream { public: QuicSpdyServerStream(QuicStreamId id, QuicSession* session); virtual ~QuicSpdyServerStream(); @@ -26,13 +32,26 @@ class QuicSpdyServerStream : public QuicReliableServerStream { // ReliableQuicStream implementation called by the session when there's // data for us. virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE; - - virtual void SendHeaders(const BalsaHeaders& response_headers) OVERRIDE; + virtual void OnFinRead() OVERRIDE; int ParseRequestHeaders(); - protected: - virtual void OnFinRead() OVERRIDE; + private: + friend class test::QuicSpdyServerStreamPeer; + + // Sends a basic 200 response using SendHeaders for the headers and WriteData + // for the body. + void SendResponse(); + + // Sends a basic 500 response using SendHeaders for the headers and WriteData + // for the body + void SendErrorResponse(); + + void SendHeadersAndBody(const BalsaHeaders& response_headers, + base::StringPiece body); + + BalsaHeaders headers_; + string body_; // Buffer into which response header data is read. scoped_refptr<GrowableIOBuffer> read_buf_; diff --git a/net/tools/quic/quic_spdy_server_stream_test.cc b/net/tools/quic/quic_spdy_server_stream_test.cc index ecdd1c7..ad67f0f 100644 --- a/net/tools/quic/quic_spdy_server_stream_test.cc +++ b/net/tools/quic/quic_spdy_server_stream_test.cc @@ -4,35 +4,241 @@ #include "net/tools/quic/quic_spdy_server_stream.h" +#include "base/strings/string_number_conversions.h" #include "base/strings/string_piece.h" #include "net/quic/quic_connection.h" #include "net/quic/quic_protocol.h" +#include "net/quic/quic_spdy_compressor.h" +#include "net/quic/quic_utils.h" #include "net/quic/test_tools/quic_test_utils.h" +#include "net/tools/epoll_server/epoll_server.h" +#include "net/tools/quic/quic_in_memory_cache.h" +#include "net/tools/quic/spdy_utils.h" +#include "net/tools/quic/test_tools/quic_in_memory_cache_peer.h" #include "net/tools/quic/test_tools/quic_test_utils.h" +#include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" using base::StringPiece; using net::tools::test::MockConnection; using net::test::MockSession; +using std::string; +using testing::_; +using testing::AnyNumber; +using testing::Invoke; +using testing::InvokeArgument; +using testing::InSequence; +using testing::Return; +using testing::StrEq; +using testing::StrictMock; +using testing::WithArgs; namespace net { namespace tools { namespace test { + +class QuicSpdyServerStreamPeer : public QuicSpdyServerStream { + public: + QuicSpdyServerStreamPeer(QuicStreamId stream_id, QuicSession* session) + : QuicSpdyServerStream(stream_id, session) { + } + + using QuicSpdyServerStream::SendResponse; + using QuicSpdyServerStream::SendErrorResponse; + + const string& body() { + return body_; + } + + const BalsaHeaders& headers() { + return headers_; + } + + BalsaHeaders* mutable_headers() { + return &headers_; + } +}; + namespace { class QuicSpdyServerStreamTest : public ::testing::Test { public: QuicSpdyServerStreamTest() - : connection_(new MockConnection(1, IPEndPoint(), false)), - session_(connection_, true), - stream_(1, &session_) { + : session_(new MockConnection(1, IPEndPoint(), true), true), + body_("hello world") { + BalsaHeaders request_headers; + request_headers.SetRequestFirstlineFromStringPieces( + "POST", "https://www.google.com/", "HTTP/1.1"); + request_headers.ReplaceOrAppendHeader("content-length", "11"); + + headers_string_ = SpdyUtils::SerializeRequestHeaders(request_headers); + stream_.reset(new QuicSpdyServerStreamPeer(3, &session_)); + } + + QuicConsumedData ValidateHeaders(const struct iovec* iov) { + StringPiece headers = + StringPiece(static_cast<const char*>(iov[0].iov_base), iov[0].iov_len); + headers_string_ = SpdyUtils::SerializeResponseHeaders( + response_headers_); + QuicSpdyDecompressor decompressor; + TestDecompressorVisitor visitor; + + // First the header id, then the compressed data. + EXPECT_EQ(1, headers[0]); + EXPECT_EQ(0, headers[1]); + EXPECT_EQ(0, headers[2]); + EXPECT_EQ(0, headers[3]); + EXPECT_EQ(static_cast<size_t>(headers.length() - 4), + decompressor.DecompressData(headers.substr(4), &visitor)); + + EXPECT_EQ(headers_string_, visitor.data()); + + return QuicConsumedData(headers.size(), false); + } + + static void SetUpTestCase() { + QuicInMemoryCachePeer::ResetForTests(); + } + + virtual void SetUp() { + QuicInMemoryCache* cache = QuicInMemoryCache::GetInstance(); + + BalsaHeaders request_headers, response_headers; + StringPiece body("Yum"); + request_headers.SetRequestFirstlineFromStringPieces( + "GET", + "https://www.google.com/foo", + "HTTP/1.1"); + response_headers.SetRequestFirstlineFromStringPieces("HTTP/1.1", + "200", + "OK"); + response_headers.AppendHeader("content-length", + base::IntToString(body.length())); + + // Check if response already exists and matches. + const QuicInMemoryCache::Response* cached_response = + cache->GetResponse(request_headers); + if (cached_response != NULL) { + string cached_response_headers_str, response_headers_str; + cached_response->headers().DumpToString(&cached_response_headers_str); + response_headers.DumpToString(&response_headers_str); + CHECK_EQ(cached_response_headers_str, response_headers_str); + CHECK_EQ(cached_response->body(), body); + return; + } + + cache->AddResponse(request_headers, response_headers, body); } - MockConnection* connection_; - MockSession session_; - QuicSpdyServerStream stream_; + BalsaHeaders response_headers_; + EpollServer eps_; + StrictMock<MockSession> session_; + scoped_ptr<QuicSpdyServerStreamPeer> stream_; + string headers_string_; + string body_; }; +QuicConsumedData ConsumeAllData( + QuicStreamId id, + const struct iovec* iov, + int iov_count, + QuicStreamOffset offset, + bool fin, + QuicAckNotifier::DelegateInterface* /*ack_notifier_delegate*/) { + ssize_t consumed_length = 0; + for (int i = 0; i < iov_count; ++i) { + consumed_length += iov[i].iov_len; + } + return QuicConsumedData(consumed_length, fin); +} + +TEST_F(QuicSpdyServerStreamTest, TestFraming) { + EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). + WillRepeatedly(Invoke(ConsumeAllData)); + + EXPECT_EQ(headers_string_.size(), stream_->ProcessData( + headers_string_.c_str(), headers_string_.size())); + EXPECT_EQ(body_.size(), stream_->ProcessData(body_.c_str(), body_.size())); + EXPECT_EQ(11u, stream_->headers().content_length()); + EXPECT_EQ("https://www.google.com/", stream_->headers().request_uri()); + EXPECT_EQ("POST", stream_->headers().request_method()); + EXPECT_EQ(body_, stream_->body()); +} + +TEST_F(QuicSpdyServerStreamTest, TestFramingOnePacket) { + EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). + WillRepeatedly(Invoke(ConsumeAllData)); + + string message = headers_string_ + body_; + + EXPECT_EQ(message.size(), stream_->ProcessData( + message.c_str(), message.size())); + EXPECT_EQ(11u, stream_->headers().content_length()); + EXPECT_EQ("https://www.google.com/", + stream_->headers().request_uri()); + EXPECT_EQ("POST", stream_->headers().request_method()); + EXPECT_EQ(body_, stream_->body()); +} + +TEST_F(QuicSpdyServerStreamTest, TestFramingExtraData) { + string large_body = "hello world!!!!!!"; + + // We'll automatically write out an error (headers + body) + EXPECT_CALL(session_, WritevData(_, _, _, _, _, _)).Times(AnyNumber()). + WillRepeatedly(Invoke(ConsumeAllData)); + + EXPECT_EQ(headers_string_.size(), stream_->ProcessData( + headers_string_.c_str(), headers_string_.size())); + // Content length is still 11. This will register as an error and we won't + // accept the bytes. + stream_->ProcessData(large_body.c_str(), large_body.size()); + EXPECT_EQ(11u, stream_->headers().content_length()); + EXPECT_EQ("https://www.google.com/", stream_->headers().request_uri()); + EXPECT_EQ("POST", stream_->headers().request_method()); +} + +TEST_F(QuicSpdyServerStreamTest, TestSendResponse) { + BalsaHeaders* request_headers = stream_->mutable_headers(); + request_headers->SetRequestFirstlineFromStringPieces( + "GET", + "https://www.google.com/foo", + "HTTP/1.1"); + + response_headers_.SetResponseFirstlineFromStringPieces( + "HTTP/1.1", "200", "OK"); + response_headers_.ReplaceOrAppendHeader("content-length", "3"); + + InSequence s; + EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1) + .WillOnce(WithArgs<1>(Invoke( + this, &QuicSpdyServerStreamTest::ValidateHeaders))); + + EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1). + WillOnce(Return(QuicConsumedData(3, true))); + + stream_->SendResponse(); + EXPECT_TRUE(stream_->read_side_closed()); + EXPECT_TRUE(stream_->write_side_closed()); +} + +TEST_F(QuicSpdyServerStreamTest, TestSendErrorResponse) { + response_headers_.SetResponseFirstlineFromStringPieces( + "HTTP/1.1", "500", "Server Error"); + response_headers_.ReplaceOrAppendHeader("content-length", "3"); + + InSequence s; + EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1) + .WillOnce(WithArgs<1>(Invoke( + this, &QuicSpdyServerStreamTest::ValidateHeaders))); + + EXPECT_CALL(session_, WritevData(_, _, 1, _, _, _)).Times(1). + WillOnce(Return(QuicConsumedData(3, true))); + + stream_->SendErrorResponse(); + EXPECT_TRUE(stream_->read_side_closed()); + EXPECT_TRUE(stream_->write_side_closed()); +} + TEST_F(QuicSpdyServerStreamTest, InvalidHeadersWithFin) { char arr[] = { 0x00, 0x00, 0x00, 0x05, // .... @@ -60,9 +266,9 @@ TEST_F(QuicSpdyServerStreamTest, InvalidHeadersWithFin) { 0x31, 0x2e, 0x31, // 1.1 }; QuicStreamFrame frame( - 1, true, 0, MakeIOVector(StringPiece(arr, arraysize(arr)))); + stream_->id(), true, 0, MakeIOVector(StringPiece(arr, arraysize(arr)))); // Verify that we don't crash when we get a invalid headers in stream frame. - stream_.OnStreamFrame(frame); + stream_->OnStreamFrame(frame); } } // namespace diff --git a/net/tools/quic/test_tools/quic_test_client.cc b/net/tools/quic/test_tools/quic_test_client.cc index 7d396db..57edade 100644 --- a/net/tools/quic/test_tools/quic_test_client.cc +++ b/net/tools/quic/test_tools/quic_test_client.cc @@ -215,7 +215,7 @@ ssize_t QuicTestClient::SendMessage(const HTTPMessage& message) { } } - QuicReliableClientStream* stream = GetOrCreateStream(); + QuicSpdyClientStream* stream = GetOrCreateStream(); if (!stream) { return 0; } scoped_ptr<BalsaHeaders> munged_headers(MungeHeaders(message.headers(), @@ -229,7 +229,7 @@ ssize_t QuicTestClient::SendMessage(const HTTPMessage& message) { } ssize_t QuicTestClient::SendData(string data, bool last_data) { - QuicReliableClientStream* stream = GetOrCreateStream(); + QuicSpdyClientStream* stream = GetOrCreateStream(); if (!stream) { return 0; } GetOrCreateStream()->SendBody(data, last_data); WaitForWriteToFlush(); @@ -252,7 +252,7 @@ string QuicTestClient::SendSynchronousRequest(const string& uri) { return response_; } -QuicReliableClientStream* QuicTestClient::GetOrCreateStream() { +QuicSpdyClientStream* QuicTestClient::GetOrCreateStream() { if (!connect_attempted_ || auto_reconnect_) { if (!connected()) { Connect(); @@ -401,7 +401,7 @@ size_t QuicTestClient::bytes_written() const { return bytes_written_; } -void QuicTestClient::OnClose(ReliableQuicStream* stream) { +void QuicTestClient::OnClose(QuicDataStream* stream) { if (stream_ != stream) { return; } diff --git a/net/tools/quic/test_tools/quic_test_client.h b/net/tools/quic/test_tools/quic_test_client.h index 6abdac1..e67a2a1 100644 --- a/net/tools/quic/test_tools/quic_test_client.h +++ b/net/tools/quic/test_tools/quic_test_client.h @@ -26,7 +26,7 @@ namespace test { class HTTPMessage; // A toy QUIC client used for testing. -class QuicTestClient : public ReliableQuicStream::Visitor { +class QuicTestClient : public QuicDataStream::Visitor { public: QuicTestClient(IPEndPoint server_address, const string& server_hostname, const QuicVersionVector& supported_versions); @@ -81,8 +81,8 @@ class QuicTestClient : public ReliableQuicStream::Visitor { bool buffer_body() const { return buffer_body_; } void set_buffer_body(bool buffer_body) { buffer_body_ = buffer_body; } - // From ReliableQuicStream::Visitor - virtual void OnClose(ReliableQuicStream* stream) OVERRIDE; + // From QuicDataStream::Visitor + virtual void OnClose(QuicDataStream* stream) OVERRIDE; // Configures client_ to take ownership of and use the writer. // Must be called before initial connect. @@ -92,7 +92,7 @@ class QuicTestClient : public ReliableQuicStream::Visitor { void UseGuid(QuicGuid guid); // Returns NULL if the maximum number of streams have already been created. - QuicReliableClientStream* GetOrCreateStream(); + QuicSpdyClientStream* GetOrCreateStream(); QuicRstStreamErrorCode stream_error() { return stream_error_; } QuicErrorCode connection_error() { return client()->session()->error(); } @@ -118,7 +118,7 @@ class QuicTestClient : public ReliableQuicStream::Visitor { IPEndPoint server_address_; IPEndPoint client_address_; scoped_ptr<QuicClient> client_; // The actual client - QuicReliableClientStream* stream_; + QuicSpdyClientStream* stream_; QuicRstStreamErrorCode stream_error_; diff --git a/net/tools/quic/test_tools/quic_test_utils.h b/net/tools/quic/test_tools/quic_test_utils.h index 52b74c1..c559b30 100644 --- a/net/tools/quic/test_tools/quic_test_utils.h +++ b/net/tools/quic/test_tools/quic_test_utils.h @@ -105,9 +105,8 @@ class TestSession : public QuicSession { bool is_server); virtual ~TestSession(); - MOCK_METHOD1(CreateIncomingReliableStream, - ReliableQuicStream*(QuicStreamId id)); - MOCK_METHOD0(CreateOutgoingReliableStream, ReliableQuicStream*()); + MOCK_METHOD1(CreateIncomingDataStream, QuicDataStream*(QuicStreamId id)); + MOCK_METHOD0(CreateOutgoingDataStream, QuicDataStream*()); void SetCryptoStream(QuicCryptoStream* stream); |