diff options
author | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-03 16:55:32 +0000 |
---|---|---|
committer | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-03 16:55:32 +0000 |
commit | 1bb0fd5dbf2357a6c7fa837c6e3e1e427d9a7e99 (patch) | |
tree | a2181772ce5955c2e9a589f09f97bc3e8559265a /net | |
parent | 843552f0c31d71c618c8a332c055aa52fbb55ee0 (diff) | |
download | chromium_src-1bb0fd5dbf2357a6c7fa837c6e3e1e427d9a7e99.zip chromium_src-1bb0fd5dbf2357a6c7fa837c6e3e1e427d9a7e99.tar.gz chromium_src-1bb0fd5dbf2357a6c7fa837c6e3e1e427d9a7e99.tar.bz2 |
Add QuicStream and friends to QUIC code.
Review URL: https://chromiumcodereview.appspot.com/11300020
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@165858 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/net.gyp | 11 | ||||
-rw-r--r-- | net/quic/quic_crypto_stream.cc | 46 | ||||
-rw-r--r-- | net/quic/quic_crypto_stream.h | 59 | ||||
-rw-r--r-- | net/quic/quic_crypto_stream_test.cc | 100 | ||||
-rw-r--r-- | net/quic/quic_session.cc | 195 | ||||
-rw-r--r-- | net/quic/quic_session.h | 117 | ||||
-rw-r--r-- | net/quic/quic_session_test.cc | 158 | ||||
-rw-r--r-- | net/quic/quic_stream_sequencer.cc | 203 | ||||
-rw-r--r-- | net/quic/quic_stream_sequencer.h | 90 | ||||
-rw-r--r-- | net/quic/quic_stream_sequencer_test.cc | 413 | ||||
-rw-r--r-- | net/quic/reliable_quic_stream.cc | 126 | ||||
-rw-r--r-- | net/quic/reliable_quic_stream.h | 80 | ||||
-rw-r--r-- | net/quic/test_tools/quic_test_utils.cc | 41 | ||||
-rw-r--r-- | net/quic/test_tools/quic_test_utils.h | 77 |
14 files changed, 1714 insertions, 2 deletions
diff --git a/net/net.gyp b/net/net.gyp index e6a3279..d2cbb7307 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -655,6 +655,8 @@ 'quic/crypto/quic_decrypter.cc', 'quic/crypto/quic_encrypter.h', 'quic/crypto/quic_encrypter.cc', + 'quic/quic_crypto_stream.cc', + 'quic/quic_crypto_stream.h', 'quic/quic_clock.cc', 'quic/quic_clock.h', 'quic/quic_connection.cc', @@ -671,8 +673,14 @@ 'quic/quic_packet_creator.h', 'quic/quic_protocol.cc', 'quic/quic_protocol.h', + 'quic/quic_session.cc', + 'quic/quic_session.h', + 'quic/quic_stream_sequencer.cc', + 'quic/quic_stream_sequencer.h', 'quic/quic_utils.cc', 'quic/quic_utils.h', + 'quic/reliable_quic_stream.cc', + 'quic/reliable_quic_stream.h', 'socket/buffered_write_stream_socket.cc', 'socket/buffered_write_stream_socket.h', 'socket/client_socket_factory.cc', @@ -1408,9 +1416,12 @@ 'quic/test_tools/quic_test_utils.cc', 'quic/test_tools/quic_test_utils.h', 'quic/quic_connection_test.cc', + 'quic/quic_crypto_stream_test.cc', 'quic/quic_fec_group_test.cc', 'quic/quic_framer_test.cc', 'quic/quic_packet_creator_test.cc', + 'quic/quic_session_test.cc', + 'quic/quic_stream_sequencer_test.cc', 'socket/buffered_write_stream_socket_unittest.cc', 'socket/client_socket_pool_base_unittest.cc', 'socket/deterministic_socket_data_unittest.cc', diff --git a/net/quic/quic_crypto_stream.cc b/net/quic/quic_crypto_stream.cc new file mode 100644 index 0000000..64c02ee --- /dev/null +++ b/net/quic/quic_crypto_stream.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_crypto_stream.h" +#include "net/quic/quic_session.h" + +using base::StringPiece; + +namespace net { + +QuicCryptoStream::QuicCryptoStream(QuicSession* session) + : ReliableQuicStream(kCryptoStreamId, session), + handshake_complete_(false) { + crypto_framer_.set_visitor(this); +} + +void QuicCryptoStream::OnError(CryptoFramer* framer) { + session()->ConnectionClose(framer->error(), false); +} + +uint32 QuicCryptoStream::ProcessData(const char* data, + uint32 data_len) { + // Do not process handshake messages after the handshake is complete. + if (handshake_complete()) { + CloseConnection(QUIC_CRYPTO_MESSAGE_AFTER_HANDSHAKE_COMPLETE); + return 0; + } + if (!crypto_framer_.ProcessInput(StringPiece(data, data_len))) { + CloseConnection(crypto_framer_.error()); + return 0; + } + return data_len; +} + +void QuicCryptoStream::CloseConnection(QuicErrorCode error) { + session()->connection()->SendConnectionClose(error); +} + +void QuicCryptoStream::SendHandshakeMessage( + const CryptoHandshakeMessage& message) { + scoped_ptr<QuicData> data(crypto_framer_.ConstructHandshakeMessage(message)); + WriteData(string(data->data(), data->length()), false); +} + +} // namespace net diff --git a/net/quic/quic_crypto_stream.h b/net/quic/quic_crypto_stream.h new file mode 100644 index 0000000..dfcd66c --- /dev/null +++ b/net/quic/quic_crypto_stream.h @@ -0,0 +1,59 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_QUIC_QUIC_CRYPTO_STREAM_H_ +#define NET_QUIC_QUIC_CRYPTO_STREAM_H_ + +#include "net/quic/crypto/crypto_framer.h" +#include "net/quic/reliable_quic_stream.h" + +namespace net { + +class QuicSession; + +// Crypto handshake messages in QUIC take place over a reserved +// reliable stream with the id 1. Each endpoint (client and server) +// will allocate an instance of a subclass of QuicCryptoStream +// to send and receive handshake messages. (In the normal 1-RTT +// handshake, the client will send a client hello, CHLO, message. +// The server will receive this message and respond with a server +// hello message, SHLO. At this point both sides will have established +// a crypto context they can use to send encrypted messages. +// +// For more details: http://goto.google.com/quic-crypto +class NET_EXPORT_PRIVATE QuicCryptoStream + : public ReliableQuicStream, + public CryptoFramerVisitorInterface { + + public: + explicit QuicCryptoStream(QuicSession* session); + + // CryptoFramerVisitorInterface implementation + virtual void OnError(CryptoFramer* framer) OVERRIDE; + virtual void OnHandshakeMessage(const CryptoHandshakeMessage& message) = 0; + + // ReliableQuicStream implementation + virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE; + + // Sends |message| to the peer. + void SendHandshakeMessage(const CryptoHandshakeMessage& message); + + bool handshake_complete() { return handshake_complete_; } + + protected: + // Closes the connection + void CloseConnection(QuicErrorCode error); + + void set_handshake_complete(bool complete) { + handshake_complete_ = complete; + } + + private: + CryptoFramer crypto_framer_; + bool handshake_complete_; +}; + +} // namespace net + +#endif // NET_QUIC_QUIC_CRYPTO_STREAM_H_ diff --git a/net/quic/quic_crypto_stream_test.cc b/net/quic/quic_crypto_stream_test.cc new file mode 100644 index 0000000..5aee0e4 --- /dev/null +++ b/net/quic/quic_crypto_stream_test.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_crypto_stream.h" + +#include <map> +#include <string> + +#include "net/quic/quic_utils.h" +#include "net/quic/test_tools/quic_test_utils.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using std::map; +using std::string; + +namespace net { +namespace test { +namespace { + +class MockQuicCryptoStream : public QuicCryptoStream { + public: + explicit MockQuicCryptoStream(QuicSession* session) + : QuicCryptoStream(session) { + } + + void OnHandshakeMessage(const CryptoHandshakeMessage& message) { + message_tags_.push_back(message.tag); + message_maps_.push_back(map<CryptoTag, string>()); + CryptoTagValueMap::const_iterator it = message.tag_value_map.begin(); + while (it != message.tag_value_map.end()) { + message_maps_.back()[it->first] = it->second.as_string(); + ++it; + } + } + + std::vector<CryptoTag> message_tags_; + std::vector<map<CryptoTag, string> > message_maps_; +}; + +class QuicCryptoStreamTest : public ::testing::Test { + public: + QuicCryptoStreamTest() + : addr_(IPAddressNumber(), 1), + connection_(new MockConnection(1, addr_)), + session_(connection_, true), + stream_(&session_) { + message_.tag = kSHLO; + message_.tag_value_map[1] = "abc"; + message_.tag_value_map[2] = "def"; + ConstructHandshakeMessage(); + } + + void ConstructHandshakeMessage() { + CryptoFramer framer; + message_data_.reset(framer.ConstructHandshakeMessage(message_)); + } + + IPEndPoint addr_; + MockConnection* connection_; + MockSession session_; + MockQuicCryptoStream stream_; + CryptoHandshakeMessage message_; + scoped_ptr<QuicData> message_data_; +}; + +TEST_F(QuicCryptoStreamTest, NotInitiallyConected) { + EXPECT_FALSE(stream_.handshake_complete()); +} + +TEST_F(QuicCryptoStreamTest, OnErrorClosesConnection) { + CryptoFramer framer; + EXPECT_CALL(session_, ConnectionClose(QUIC_NO_ERROR, false)); + stream_.OnError(&framer); +} + +TEST_F(QuicCryptoStreamTest, ProcessData) { + EXPECT_EQ(message_data_->length(), + stream_.ProcessData(message_data_->data(), + message_data_->length())); + ASSERT_EQ(1u, stream_.message_tags_.size()); + EXPECT_EQ(kSHLO, stream_.message_tags_[0]); + EXPECT_EQ(2u, stream_.message_maps_[0].size()); + EXPECT_EQ("abc",stream_.message_maps_[0][1]); + EXPECT_EQ("def", stream_.message_maps_[0][2]); +} + +TEST_F(QuicCryptoStreamTest, ProcessBadData) { + string bad(message_data_->data(), message_data_->length()); + bad[6] = 0x7F; // out of order tag + + EXPECT_CALL(*connection_, + SendConnectionClose(QUIC_CRYPTO_TAGS_OUT_OF_ORDER)); + EXPECT_EQ(0u, stream_.ProcessData(bad.data(), bad.length())); +} + +} // namespace +} // namespace test +} // namespace net diff --git a/net/quic/quic_session.cc b/net/quic/quic_session.cc new file mode 100644 index 0000000..2021887 --- /dev/null +++ b/net/quic/quic_session.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_session.h" + +#include "net/quic/quic_connection.h" + +using base::StringPiece; +using base::hash_map; +using base::hash_set; +using std::vector; + +namespace net { + +QuicSession::QuicSession(QuicConnection* connection, bool is_server) + : connection_(connection), + max_open_streams_(kDefaultMaxStreamsPerConnection), + next_stream_id_(is_server ? 2 : 3), + is_server_(is_server), + largest_peer_created_stream_id_(0) { + connection_->set_visitor(this); +} + +QuicSession::~QuicSession() { +} + +bool QuicSession::OnPacket(const IPEndPoint& self_address, + const IPEndPoint& peer_address, + const QuicPacketHeader& header, + const vector<QuicStreamFrame>& frames) { + if (header.guid != connection()->guid()) { + DLOG(INFO) << "Got packet header for invalid GUID: " << header.guid; + return false; + } + for (size_t i = 0; i < frames.size(); ++i) { + // TODO(rch) deal with the error case of stream id 0 + if (IsClosedStream(frames[i].stream_id)) continue; + + ReliableQuicStream* stream = GetStream(frames[i].stream_id); + if (stream == NULL) return false; + if (!stream->WillAcceptStreamFrame(frames[i])) return false; + + // TODO(alyssar) check against existing connection address: if changed, make + // sure we update the connection. + } + + for (size_t i = 0; i < frames.size(); ++i) { + ReliableQuicStream* stream = GetStream(frames[i].stream_id); + if (stream) { + stream->OnStreamFrame(frames[i]); + } + } + return true; +} + +void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { + ReliableQuicStream* stream = GetStream(frame.stream_id); + if (!stream) { + return; // Errors are handled by GetStream. + } + stream->OnStreamReset(frame.error_code, frame.offset); +} + +void QuicSession::ConnectionClose(QuicErrorCode error, bool from_peer) { + while (stream_map_.size() != 0) { + ReliableStreamMap::iterator it = stream_map_.begin(); + QuicStreamId id = it->first; + it->second->ConnectionClose(error, from_peer); + // The stream should call CloseStream as part of ConnectionClose. + if (stream_map_.find(id) != stream_map_.end()) { + LOG(DFATAL) << "Stream failed to close under ConnectionClose"; + CloseStream(id); + } + } +} + +int QuicSession::WriteData(QuicStreamId id, StringPiece data, + QuicStreamOffset offset, bool fin) { + return connection_->SendStreamData(id, data, offset, fin, NULL); +} + +void QuicSession::SendRstStream(QuicStreamId id, + QuicErrorCode error, + QuicStreamOffset offset) { + connection_->SendRstStream(id, error, offset); + CloseStream(id); +} + +void QuicSession::CloseStream(QuicStreamId stream_id) { + DLOG(INFO) << "Closing stream " << stream_id; + + ReliableStreamMap::iterator it = stream_map_.find(stream_id); + if (it == stream_map_.end()) { + DLOG(INFO) << "Stream is already closed: " << stream_id; + return; + } + stream_map_.erase(it); +} + +bool QuicSession::IsHandshakeComplete() { + return GetCryptoStream()->handshake_complete(); +} + +void QuicSession::ActivateStream(ReliableQuicStream* stream) { + LOG(INFO) << "num_streams: " << stream_map_.size() + << ". activating " << stream->id(); + DCHECK(stream_map_.count(stream->id()) == 0); + stream_map_[stream->id()] = stream; +} + +QuicStreamId QuicSession::GetNextStreamId() { + QuicStreamId id = next_stream_id_; + next_stream_id_ += 2; + return id; +} + +ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { + if (stream_id == kCryptoStreamId) { + return GetCryptoStream(); + } + + ReliableStreamMap::iterator it = stream_map_.find(stream_id); + if (it != stream_map_.end()) { + return it->second; + } + + if (stream_id % 2 == next_stream_id_ % 2) { + // We've received a frame for a locally-created stream that is not + // currently active. This is an error. + connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); + return NULL; + } + + return GetIncomingReliableStream(stream_id); +} + +ReliableQuicStream* QuicSession::GetIncomingReliableStream( + QuicStreamId stream_id) { + if (IsClosedStream(stream_id)) { + return NULL; + } + + implicitly_created_streams_.erase(stream_id); + if (stream_id > largest_peer_created_stream_id_) { + // TODO(rch) add unit test for this + if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { + connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); + return NULL; + } + if (largest_peer_created_stream_id_ != 0) { + for (QuicStreamId id = largest_peer_created_stream_id_ + 2; + id < stream_id; + id += 2) { + implicitly_created_streams_.insert(id); + } + } + largest_peer_created_stream_id_ = stream_id; + } + ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id); + if (stream == NULL) { + connection()->SendConnectionClose(QUIC_TOO_MANY_OPEN_STREAMS); + return NULL; + } + ActivateStream(stream); + return stream; +} + +bool QuicSession::IsClosedStream(QuicStreamId id) { + DCHECK_NE(0u, id); + if (id == kCryptoStreamId) { + return false; + } + if (stream_map_.count(id) != 0) { + // Stream is active + return false; + } + if (id % 2 == next_stream_id_ % 2) { + // If the stream was locally initiated we strictly in-order creation. + // If the id is in the range of created streams and it's not active, it + // must have been closed. + return id < next_stream_id_; + } else { + // For peer created streams, we also need to consider + // implicitly created streams. + return id <= largest_peer_created_stream_id_ && + implicitly_created_streams_.count(id) == 0; + } +} + +size_t QuicSession::GetNumOpenStreams() { + return stream_map_.size() + implicitly_created_streams_.size(); +} + +} // namespace net diff --git a/net/quic/quic_session.h b/net/quic/quic_session.h new file mode 100644 index 0000000..cc4edc6 --- /dev/null +++ b/net/quic/quic_session.h @@ -0,0 +1,117 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A QuicSession, which demuxes a single connection to individual streams. + +#ifndef NET_QUIC_QUIC_SESSION_H_ +#define NET_QUIC_QUIC_SESSION_H_ + +#include <vector> + +#include "base/compiler_specific.h" +#include "base/hash_tables.h" +#include "net/base/ip_endpoint.h" +#include "net/quic/quic_connection.h" +#include "net/quic/quic_crypto_stream.h" +#include "net/quic/quic_packet_creator.h" +#include "net/quic/quic_protocol.h" +#include "net/quic/reliable_quic_stream.h" + +namespace net { + +class NET_EXPORT_PRIVATE QuicSession : public QuicConnectionVisitorInterface { + public: + QuicSession(QuicConnection* connection, bool is_server); + + virtual ~QuicSession(); + + // QuicConnectionVisitorInterface methods: + virtual bool OnPacket(const IPEndPoint& self_address, + const IPEndPoint& peer_address, + const QuicPacketHeader& header, + const std::vector<QuicStreamFrame>& frame) OVERRIDE; + virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE; + virtual void ConnectionClose(QuicErrorCode error, bool from_peer) OVERRIDE; + // Not needed for HTTP. + virtual void OnAck(AckedPackets acked_packets) OVERRIDE {} + + // Called by streams when they want to write data to the peer. + virtual int WriteData(QuicStreamId id, base::StringPiece data, + QuicStreamOffset offset, bool fin); + // Called by streams when they want to close the stream in both directions. + void SendRstStream(QuicStreamId id, + QuicErrorCode error, + QuicStreamOffset offset); + + // Removes the stream associated with 'stream_id' from the active stream map. + virtual void CloseStream(QuicStreamId stream_id); + + // Returns true once the crypto handshake is complete. + virtual bool IsHandshakeComplete(); + + QuicConnection* connection() { return connection_.get(); } + size_t num_active_requests() const { return stream_map_.size(); } + const IPEndPoint& peer_address() const { + return connection_->peer_address(); + } + + QuicPacketCreator::Options* options() { return connection()->options(); } + + // Returns the number of currently open streams, including those which have + // been implicitly created. + virtual size_t GetNumOpenStreams(); + + protected: + // Creates a new stream, owned by the caller, to handle a peer-initiated + // stream. Returns NULL if max streams have already been opened. + virtual ReliableQuicStream* CreateIncomingReliableStream(QuicStreamId id) = 0; + + // Create a new stream, owned by the caller, to handle a locally-initiated + // stream. Returns NULL if max streams have already been opened. + virtual ReliableQuicStream* CreateOutgoingReliableStream() = 0; + + // Return the reserved crypto stream. + virtual QuicCryptoStream* GetCryptoStream() = 0; + + // Adds 'stream' to the active stream map. + void ActivateStream(ReliableQuicStream* stream); + + // Returns the stream id for a new stream. + QuicStreamId GetNextStreamId(); + + // Returns true if the stream existed previously and has been closed. + bool IsClosedStream(QuicStreamId id); + + ReliableQuicStream* GetIncomingReliableStream(QuicStreamId stream_id); + + size_t get_max_open_streams() const { + return max_open_streams_; + } + + private: + friend class QuicSessionPeer; + + typedef base::hash_map<QuicStreamId, ReliableQuicStream*> ReliableStreamMap; + + ReliableQuicStream* GetStream(const QuicStreamId stream_id); + + scoped_ptr<QuicConnection> connection_; + + // Returns the maximum number of streams this connection can open. + const size_t max_open_streams_; + + // Map from StreamId to pointers to streams that are owned by the caller. + ReliableStreamMap stream_map_; + QuicStreamId next_stream_id_; + bool is_server_; + + // Set of stream ids that have been "implicitly created" by receipt + // of a stream id larger than the next expected stream id. + base::hash_set<QuicStreamId> implicitly_created_streams_; + QuicStreamId largest_peer_created_stream_id_; +}; + +} // namespace net + +#endif // NET_QUIC_QUIC_SESSION_H_ diff --git a/net/quic/quic_session_test.cc b/net/quic/quic_session_test.cc new file mode 100644 index 0000000..deebc78 --- /dev/null +++ b/net/quic/quic_session_test.cc @@ -0,0 +1,158 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_session.h" +#include "net/quic/quic_connection.h" + +#include <set> + +#include "base/hash_tables.h" +#include "net/quic/test_tools/quic_test_utils.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::hash_map; +using std::set; +using testing::_; + +namespace net { +namespace test { +namespace { + +class TestCryptoStream : public QuicCryptoStream { + public: + explicit TestCryptoStream(QuicSession* session) + : QuicCryptoStream(session) { + } + + void OnHandshakeMessage(const CryptoHandshakeMessage& message) { + set_handshake_complete(true); + } +}; + +class TestStream : public ReliableQuicStream { + public: + TestStream(QuicStreamId id, QuicSession* session) + : ReliableQuicStream(id, session) { + } + + virtual uint32 ProcessData(const char* data, uint32 data_len) { + return data_len; + } +}; + +class TestSession : public QuicSession { + public: + TestSession(QuicConnection* connection, bool is_server) + : QuicSession(connection, is_server), + crypto_stream_(this) { + } + + virtual QuicCryptoStream* GetCryptoStream() { + return &crypto_stream_; + } + + virtual TestStream* CreateOutgoingReliableStream() { + TestStream* stream = new TestStream(GetNextStreamId(), this); + ActivateStream(stream); + return stream; + } + + virtual TestStream* CreateIncomingReliableStream(QuicStreamId id) { + return new TestStream(id, this); + } + + bool IsClosedStream(QuicStreamId id) { + return QuicSession::IsClosedStream(id); + } + + ReliableQuicStream* GetIncomingReliableStream(QuicStreamId stream_id) { + return QuicSession::GetIncomingReliableStream(stream_id); + } + + TestCryptoStream crypto_stream_; +}; + +class QuicSessionTest : public ::testing::Test { + protected: + QuicSessionTest() + : guid_(1), + connection_(new MockConnection(guid_, IPEndPoint())), + session_(connection_, true) { + } + + void CheckClosedStreams() { + for (int i = kCryptoStreamId; i < 100; i++) { + if (closed_streams_.count(i) == 0) { + EXPECT_FALSE(session_.IsClosedStream(i)) << " stream id: " << i; + } else { + EXPECT_TRUE(session_.IsClosedStream(i)) << " stream id: " << i; + } + } + } + + void CloseStream(QuicStreamId id) { + session_.CloseStream(id); + closed_streams_.insert(id); + } + + QuicGuid guid_; + MockConnection* connection_; + TestSession session_; + QuicConnectionVisitorInterface* visitor_; + hash_map<QuicStreamId, ReliableQuicStream*>* streams_; + set<QuicStreamId> closed_streams_; +}; + +TEST_F(QuicSessionTest, IsHandshakeComplete) { + EXPECT_FALSE(session_.IsHandshakeComplete()); + CryptoHandshakeMessage message; + session_.crypto_stream_.OnHandshakeMessage(message); + EXPECT_TRUE(session_.IsHandshakeComplete()); +} + +TEST_F(QuicSessionTest, IsClosedStreamDefault) { + // Ensure that no streams are initially closed. + for (int i = kCryptoStreamId; i < 100; i++) { + EXPECT_FALSE(session_.IsClosedStream(i)); + } +} + +TEST_F(QuicSessionTest, IsClosedStreamLocallyCreated) { + scoped_ptr<TestStream> stream2(session_.CreateOutgoingReliableStream()); + scoped_ptr<TestStream> stream4(session_.CreateOutgoingReliableStream()); + + CheckClosedStreams(); + CloseStream(4); + CheckClosedStreams(); + CloseStream(2); + CheckClosedStreams(); +} + +TEST_F(QuicSessionTest, IsClosedStreamPeerCreated) { + scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3)); + scoped_ptr<ReliableQuicStream> stream5(session_.GetIncomingReliableStream(5)); + + CheckClosedStreams(); + CloseStream(3); + CheckClosedStreams(); + CloseStream(5); + // Create stream id 9, and implicitly 7 + scoped_ptr<ReliableQuicStream> stream9(session_.GetIncomingReliableStream(9)); + CheckClosedStreams(); + // Close 9, but make sure 7 is still not closed + CloseStream(9); + CheckClosedStreams(); +} + +TEST_F(QuicSessionTest, StreamIdTooLarge) { + scoped_ptr<ReliableQuicStream> stream3(session_.GetIncomingReliableStream(3)); + EXPECT_CALL(*connection_, SendConnectionClose(QUIC_INVALID_STREAM_ID)); + scoped_ptr<ReliableQuicStream> stream5( + session_.GetIncomingReliableStream(105)); +} + +} // namespace +} // namespace test +} // namespace net diff --git a/net/quic/quic_stream_sequencer.cc b/net/quic/quic_stream_sequencer.cc new file mode 100644 index 0000000..f730379 --- /dev/null +++ b/net/quic/quic_stream_sequencer.cc @@ -0,0 +1,203 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_stream_sequencer.h" + +#include <algorithm> +#include <limits> + +#include "base/logging.h" +#include "net/quic/reliable_quic_stream.h" + +using std::min; +using std::numeric_limits; + +namespace net { + +QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) + : stream_(quic_stream), + num_bytes_consumed_(0), + max_frame_memory_(numeric_limits<size_t>::max()), + close_offset_(numeric_limits<QuicStreamOffset>::max()), + half_close_(true) { +} + +QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory, + ReliableQuicStream* quic_stream) + : stream_(quic_stream), + num_bytes_consumed_(0), + max_frame_memory_(max_frame_memory), + close_offset_(numeric_limits<QuicStreamOffset>::max()), + half_close_(true) { + if (max_frame_memory < kMaxPacketSize) { + LOG(DFATAL) << "Setting max frame memory to " << max_frame_memory + << ". Some frames will be impossible to handle."; + } +} + +QuicStreamSequencer::~QuicStreamSequencer() { +} + +bool QuicStreamSequencer::WillAcceptStreamFrame( + const QuicStreamFrame& frame) const { + QuicStreamOffset byte_offset = frame.offset; + size_t data_len = frame.data.size(); + DCHECK_LE(data_len, max_frame_memory_); + + if (byte_offset < num_bytes_consumed_ || + frames_.find(byte_offset) != frames_.end()) { + return false; + } + if (data_len > max_frame_memory_) { + // We're never going to buffer this frame and we can't pass it up the + // stream might only consume part of it and we'd need a partial ack. + // + // Ideally this should never happen, as we check that + // max_frame_memory_ > kMaxPacketSize and lower levels should reject + // frames larger than that. + return false; + } + if (byte_offset + data_len - num_bytes_consumed_ > max_frame_memory_) { + // We can buffer this but not right now. Toss it. + // It might be worth trying an experiment where we try best-effort buffering + return false; + } + return true; +} + +bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { + QuicStreamOffset byte_offset = frame.offset; + const char* data = frame.data.data(); + size_t data_len = frame.data.size(); + + if (!WillAcceptStreamFrame(frame)) { + // This should not happen, as WillAcceptFrame should be called before + // OnStreamFrame. Error handling should be done by the caller. + return false; + } + + if (byte_offset == num_bytes_consumed_) { + DVLOG(1) << "Processing byte offset " << byte_offset; + size_t bytes_consumed = stream_->ProcessData(data, data_len); + num_bytes_consumed_ += bytes_consumed; + + if (MaybeCloseStream()) { + return true; + } + if (bytes_consumed > data_len) { + stream_->Close(QUIC_SERVER_ERROR_PROCESSING_STREAM); + return false; + } else if (bytes_consumed == data_len) { + FlushBufferedFrames(); + return true; // it's safe to ack this frame. + } else { + // Set ourselves up to buffer what's left + data_len -= bytes_consumed; + data += bytes_consumed; + byte_offset += bytes_consumed; + } + } + + DVLOG(1) << "Buffering packet at offset " << byte_offset; + frames_.insert(make_pair(byte_offset, string(data, data_len))); + return true; +} + +void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset, + bool half_close) { + const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); + + // If we have a scheduled termination or close, any new offset should match + // it. + if (close_offset_ != kMaxOffset && offset != close_offset_) { + stream_->Close(QUIC_MULTIPLE_TERMINATION_OFFSETS); + return; + } + + close_offset_ = offset; + // Full close overrides half close. + if (half_close == false) { + half_close_ = false; + } + + MaybeCloseStream(); +} + +bool QuicStreamSequencer::MaybeCloseStream() { + if (IsHalfClosed()) { + DVLOG(1) << "Passing up termination, as we've processed " + << num_bytes_consumed_ << " of " << close_offset_ + << " bytes."; + // Technically it's an error if num_bytes_consumed isn't exactly + // equal, but error handling seems silly at this point. + stream_->TerminateFromPeer(half_close_); + return true; + } + return false; +} + +void QuicStreamSequencer::AdvanceReadablePtr(size_t data_read) { + FrameMap::iterator it = frames_.begin(); + + while (data_read) { + if (it->first != num_bytes_consumed_ || it == frames_.end()) { + stream_->Close(QUIC_SERVER_ERROR_PROCESSING_STREAM); // Programming error + return; + } + + if (data_read >= it->second.size()) { + data_read -= it->second.size(); + num_bytes_consumed_ += it->second.size(); + frames_.erase(it); + it = frames_.begin(); + } else { + frames_.insert(make_pair(it->first + data_read, + it->second.substr(data_read))); + frames_.erase(frames_.begin()); + num_bytes_consumed_ += data_read; + data_read = 0; + } + } +} + +bool QuicStreamSequencer::HasBytesToRead() { + FrameMap::iterator it = frames_.begin(); + + return it != frames_.end() && it->first == num_bytes_consumed_; +} + +bool QuicStreamSequencer::IsHalfClosed() { + return num_bytes_consumed_ >= close_offset_; +} + +bool QuicStreamSequencer::IsClosed() { + return num_bytes_consumed_ >= close_offset_ && half_close_ == false; +} + +void QuicStreamSequencer::FlushBufferedFrames() { + FrameMap::iterator it = frames_.find(num_bytes_consumed_); + while (it != frames_.end()) { + DVLOG(1) << "Flushing buffered packet at offset " << it->first; + string* data = &it->second; + size_t bytes_consumed = stream_->ProcessData(data->c_str(), data->size()); + num_bytes_consumed_ += bytes_consumed; + if (MaybeCloseStream()) { + return; + } + if (bytes_consumed > data->size()) { + stream_->Close(QUIC_SERVER_ERROR_PROCESSING_STREAM); // Programming error + return; + } else if (bytes_consumed == data->size()) { + frames_.erase(it); + it = frames_.find(num_bytes_consumed_); + } else { + string new_data = it->second.substr(bytes_consumed); + frames_.erase(it); + frames_.insert(make_pair(num_bytes_consumed_, new_data)); + return; + } + } +} + +} // namespace net diff --git a/net/quic/quic_stream_sequencer.h b/net/quic/quic_stream_sequencer.h new file mode 100644 index 0000000..971aaec --- /dev/null +++ b/net/quic/quic_stream_sequencer.h @@ -0,0 +1,90 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_QUIC_QUIC_STREAM_SEQUENCER_H_ +#define NET_QUIC_QUIC_STREAM_SEQUENCER_H_ + +#include <map> + +#include "base/basictypes.h" +#include "base/memory/scoped_ptr.h" +#include "net/quic/quic_protocol.h" + +using std::map; +using std::string; + +namespace net { + +class QuicSession; +class ReliableQuicStream; + +// Buffers frames until we have something which can be passed +// up to the next layer. +// TOOD(alyssar) add some checks for overflow attempts [1, 256,] [2, 256] +class NET_EXPORT_PRIVATE QuicStreamSequencer { + public: + static size_t kMaxUdpPacketSize; + + explicit QuicStreamSequencer(ReliableQuicStream* quic_stream); + QuicStreamSequencer(size_t max_frame_memory, + ReliableQuicStream* quic_stream); + + virtual ~QuicStreamSequencer(); + + // Returns the expected value of OnStreamFrame for this frame. + bool WillAcceptStreamFrame(const QuicStreamFrame& frame) const; + + // If the frame is the next one we need in order to process in-order data, + // ProcessData will be immediately called on the stream until all buffered + // data is processed or the stream fails to consume data. Any unconsumed + // data will be buffered. + // + // If the frame is not the next in line, it will either be buffered, and + // this will return true, or it will be rejected and this will return false. + bool OnStreamFrame(const QuicStreamFrame& frame); + + // Wait until we've seen 'offset' bytes, and then terminate the stream. + void CloseStreamAtOffset(QuicStreamOffset offset, bool half_close); + + // Once data is buffered, it's up to the stream to read it when the stream + // can handle more data. The following three functions make that possible. + + // Advances the readable pointer num_bytes bytes, releasing any buffered data + // which is no longer in uses + void AdvanceReadablePtr(size_t num_bytes); + + // Returns true if the sequncer has bytes available for reading. + bool HasBytesToRead(); + + // Returns true if the sequencer has delivered a half close. + bool IsHalfClosed(); + + // Returns true if the sequencer has delivered a full close. + bool IsClosed(); + + private: + bool MaybeCloseStream(); + + friend class QuicStreamSequencerPeer; + // TODO(alyssar) use something better than strings. + typedef map<QuicStreamOffset, string> FrameMap; + + void FlushBufferedFrames(); + + ReliableQuicStream* stream_; // The stream which owns this sequencer. + QuicStreamOffset num_bytes_consumed_; // The last data consumed by the stream + FrameMap frames_; // sequence number -> frame + size_t max_frame_memory_; // the maximum memory the sequencer can buffer. + // The offset, if any, we got a stream cancelation for. When this many bytes + // have been processed, the stream will be half or full closed depending on + // the half_close_ bool. + QuicStreamOffset close_offset_; + // Only valid if close_offset_ is set. Indicates if it's a half or a full + // close. + bool half_close_; +}; + +} // namespace net + +#endif // NET_QUIC_QUIC_STREAM_SEQUENCER_H_ diff --git a/net/quic/quic_stream_sequencer_test.cc b/net/quic/quic_stream_sequencer_test.cc new file mode 100644 index 0000000..e24d309 --- /dev/null +++ b/net/quic/quic_stream_sequencer_test.cc @@ -0,0 +1,413 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/quic_stream_sequencer.h" + +#include <utility> +#include <vector> + +#include "base/rand_util.h" +#include "net/quic/reliable_quic_stream.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::StringPiece; +using std::min; +using std::pair; +using std::vector; +using testing::_; +using testing::AnyNumber; +using testing::InSequence; +using testing::Return; +using testing::StrEq; + +namespace net { + +class QuicStreamSequencerPeer : public QuicStreamSequencer { + public: + explicit QuicStreamSequencerPeer(ReliableQuicStream* stream) + : QuicStreamSequencer(stream) { + } + + QuicStreamSequencerPeer(int32 max_mem, ReliableQuicStream* stream) + : QuicStreamSequencer(max_mem, stream) {} + + virtual bool OnFrame(QuicStreamOffset byte_offset, + const char* data, + uint32 data_len) { + QuicStreamFrame frame; + frame.stream_id = 1; + frame.offset = byte_offset; + frame.data = StringPiece(data, data_len); + return OnStreamFrame(frame); + } + + void SetMemoryLimit(size_t limit) { + max_frame_memory_ = limit; + } + + ReliableQuicStream* stream() { return stream_; } + uint64 num_bytes_consumed() { return num_bytes_consumed_; } + FrameMap* frames() { return &frames_; } + int32 max_frame_memory() { return max_frame_memory_; } + QuicStreamOffset close_offset() { return close_offset_; } +}; + +class MockStream : public ReliableQuicStream { + public: + MockStream(QuicSession* session, QuicStreamId id) + : ReliableQuicStream(id, session) { + } + + MOCK_METHOD1(TerminateFromPeer, void(bool half_close)); + MOCK_METHOD2(ProcessData, uint32(const char* data, uint32 data_len)); + MOCK_METHOD1(Close, void(QuicErrorCode error)); +}; + +namespace { + +static const char kPayload[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +class QuicStreamSequencerTest : public ::testing::Test { + protected: + QuicStreamSequencerTest() + : session_(NULL), + stream_(session_, 1), + sequencer_(new QuicStreamSequencerPeer(&stream_)) { + } + + QuicSession* session_; + testing::StrictMock<MockStream> stream_; + scoped_ptr<QuicStreamSequencerPeer> sequencer_; +}; + +TEST_F(QuicStreamSequencerTest, RejectOldFrame) { + EXPECT_CALL(stream_, ProcessData("abc", 3)) + .WillOnce(Return(3)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(0u, sequencer_->frames()->size()); + EXPECT_EQ(3u, sequencer_->num_bytes_consumed()); + // Nack this - it matches a past sequence number and we should not see it + // again. + EXPECT_FALSE(sequencer_->OnFrame(0, "def", 3)); + EXPECT_EQ(0u, sequencer_->frames()->size()); +} + +TEST_F(QuicStreamSequencerTest, RejectOverlyLargeFrame) { + /* + EXPECT_DFATAL(sequencer_.reset(new QuicStreamSequencerPeer(2, &stream_)), + "Setting max frame memory to 2. " + "Some frames will be impossible to handle."); + + EXPECT_DEBUG_DEATH(sequencer_->OnFrame(0, "abc", 3), ""); + */ +} + +TEST_F(QuicStreamSequencerTest, DropFramePastBuffering) { + sequencer_->SetMemoryLimit(3); + + EXPECT_FALSE(sequencer_->OnFrame(3, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, RejectBufferedFrame) { + EXPECT_CALL(stream_, ProcessData("abc", 3)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + // Ignore this - it matches a buffered frame. + // Right now there's no checking that the payload is consistent. + EXPECT_FALSE(sequencer_->OnFrame(0, "def", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); +} + +TEST_F(QuicStreamSequencerTest, FullFrameConsumed) { + EXPECT_CALL(stream_, ProcessData("abc", 3)) + .WillOnce(Return(3)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(0u, sequencer_->frames()->size()); + EXPECT_EQ(3u, sequencer_->num_bytes_consumed()); +} + +TEST_F(QuicStreamSequencerTest, PartialFrameConsumed) { + EXPECT_CALL(stream_, ProcessData("abc", 3)) + .WillOnce(Return(2)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); + EXPECT_EQ(2u, sequencer_->num_bytes_consumed()); + EXPECT_EQ("c", sequencer_->frames()->find(2)->second); +} + +TEST_F(QuicStreamSequencerTest, NextxFrameNotConsumed) { + EXPECT_CALL(stream_, ProcessData("abc", 3)) + .WillOnce(Return(0)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + EXPECT_EQ("abc", sequencer_->frames()->find(0)->second); +} + +TEST_F(QuicStreamSequencerTest, FutureFrameNotProcessed) { + EXPECT_TRUE(sequencer_->OnFrame(3, "abc", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + EXPECT_EQ("abc", sequencer_->frames()->find(3)->second); +} + +TEST_F(QuicStreamSequencerTest, OutOfOrderFrameProcessed) { + // Buffer the first + EXPECT_TRUE(sequencer_->OnFrame(6, "ghi", 3)); + EXPECT_EQ(1u, sequencer_->frames()->size()); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + // Buffer the second + EXPECT_TRUE(sequencer_->OnFrame(3, "def", 3)); + EXPECT_EQ(2u, sequencer_->frames()->size()); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData("abc", 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("ghi"), 3)).WillOnce(Return(3)); + + // Ack right away + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(9u, sequencer_->num_bytes_consumed()); + + EXPECT_EQ(0u, sequencer_->frames()->size()); +} + +TEST_F(QuicStreamSequencerTest, OutOfOrderFramesProcessedWithBuffering) { + sequencer_->SetMemoryLimit(9); + + // Too far to buffer. + EXPECT_FALSE(sequencer_->OnFrame(9, "jkl", 3)); + + // We can afford to buffer this. + EXPECT_TRUE(sequencer_->OnFrame(6, "ghi", 3)); + EXPECT_EQ(0u, sequencer_->num_bytes_consumed()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData("abc", 3)).WillOnce(Return(3)); + + // Ack right away + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + EXPECT_EQ(3u, sequencer_->num_bytes_consumed()); + + // We should be willing to buffer this now. + EXPECT_TRUE(sequencer_->OnFrame(9, "jkl", 3)); + EXPECT_EQ(3u, sequencer_->num_bytes_consumed()); + + EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("ghi"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("jkl"), 3)).WillOnce(Return(3)); + + EXPECT_TRUE(sequencer_->OnFrame(3, "def", 3)); + EXPECT_EQ(12u, sequencer_->num_bytes_consumed()); + EXPECT_EQ(0u, sequencer_->frames()->size()); +} + +TEST_F(QuicStreamSequencerTest, BasicCloseOrdered) { + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + + EXPECT_CALL(stream_, TerminateFromPeer(false)); + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); +} + +TEST_F(QuicStreamSequencerTest, BasicHalfOrdered) { + InSequence s; + + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); + + EXPECT_CALL(stream_, TerminateFromPeer(true)); + sequencer_->CloseStreamAtOffset(3, true); + EXPECT_EQ(3u, sequencer_->close_offset()); +} + +TEST_F(QuicStreamSequencerTest, BasicCloseUnordered) { + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(false)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, BasicHalfUnorderedWithFlush) { + sequencer_->CloseStreamAtOffset(6, true); + EXPECT_EQ(6u, sequencer_->close_offset()); + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(true)); + + EXPECT_TRUE(sequencer_->OnFrame(3, "def", 3)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, BasicCloseUnorderedWithFlush) { + sequencer_->CloseStreamAtOffset(6, false); + EXPECT_EQ(6u, sequencer_->close_offset()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, ProcessData(StrEq("def"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(false)); + + EXPECT_TRUE(sequencer_->OnFrame(3, "def", 3)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, BasicHalfUnordered) { + sequencer_->CloseStreamAtOffset(3, true); + EXPECT_EQ(3u, sequencer_->close_offset()); + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(true)); + + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, CloseStreamBeforeCloseEqual) { + sequencer_->CloseStreamAtOffset(3, true); + EXPECT_EQ(3u, sequencer_->close_offset()); + + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(false)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, CloseBeforeTermianteEqual) { + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + sequencer_->CloseStreamAtOffset(3, true); + EXPECT_EQ(3u, sequencer_->close_offset()); + + InSequence s; + EXPECT_CALL(stream_, ProcessData(StrEq("abc"), 3)).WillOnce(Return(3)); + EXPECT_CALL(stream_, TerminateFromPeer(false)); + EXPECT_TRUE(sequencer_->OnFrame(0, "abc", 3)); +} + +TEST_F(QuicStreamSequencerTest, MutipleOffsets) { + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + EXPECT_CALL(stream_, Close(QUIC_MULTIPLE_TERMINATION_OFFSETS)); + sequencer_->CloseStreamAtOffset(5, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + EXPECT_CALL(stream_, Close(QUIC_MULTIPLE_TERMINATION_OFFSETS)); + sequencer_->CloseStreamAtOffset(1, false); + EXPECT_EQ(3u, sequencer_->close_offset()); + + sequencer_->CloseStreamAtOffset(3, false); + EXPECT_EQ(3u, sequencer_->close_offset()); +} + +class QuicSequencerRandomTest : public QuicStreamSequencerTest { + public: + typedef pair<int, string> Frame; + typedef vector<Frame> FrameList; + + void CreateFrames() { + int payload_size = arraysize(kPayload) - 1; + int remaining_payload = payload_size; + while (remaining_payload != 0) { + int size = min(OneToN(6), remaining_payload); + int idx = payload_size - remaining_payload; + list_.push_back(make_pair(idx, string(kPayload + idx, size))); + remaining_payload -= size; + } + } + + QuicSequencerRandomTest() { + //int32 seed = ACMRandom::HostnamePidTimeSeed(); + //LOG(INFO) << "**** The current seed is " << seed << " ****"; + //random_.reset(new ACMRandom(seed)); + + CreateFrames(); + } + + int OneToN(int n) { + return base::RandInt(1, n); + } + + int MaybeProcessMaybeBuffer(const char* data, uint32 len) { + int to_process = len; + if (base::RandUint64() % 2 != 0) { + to_process = base::RandInt(0, len); + } + output_.append(data, to_process); + LOG(ERROR) << output_; + return to_process; + } + + string output_; + //scoped_ptr<ACMRandom> random_; + FrameList list_; +}; + +// All frames are processed as soon as we have sequential data. +// Infinite buffering, so all frames are acked right away. +TEST_F(QuicSequencerRandomTest, RandomFramesNoDroppingNoBackup) { + InSequence s; + for (size_t i = 0; i < list_.size(); ++i) { + string* data = &list_[i].second; + EXPECT_CALL(stream_, ProcessData(StrEq(*data), data->size())) + .WillOnce(Return(data->size())); + } + + while (list_.size() != 0) { + int idx = OneToN(list_.size()) - 1; + LOG(ERROR) << "Sending index " << idx << " " << list_[idx].second.c_str(); + EXPECT_TRUE(sequencer_->OnFrame( + list_[idx].first, list_[idx].second.c_str(), + list_[idx].second.size())); + list_.erase(list_.begin() + idx); + } +} + +// All frames are processed as soon as we have sequential data. +// Buffering, so some frames are rejected. +TEST_F(QuicSequencerRandomTest, RandomFramesDroppingNoBackup) { + sequencer_->SetMemoryLimit(26); + + InSequence s; + for (size_t i = 0; i < list_.size(); ++i) { + string* data = &list_[i].second; + EXPECT_CALL(stream_, ProcessData(StrEq(*data), data->size())) + .WillOnce(Return(data->size())); + } + + while (list_.size() != 0) { + int idx = OneToN(list_.size()) - 1; + LOG(ERROR) << "Sending index " << idx << " " << list_[idx].second.c_str(); + bool acked = sequencer_->OnFrame( + list_[idx].first, list_[idx].second.c_str(), + list_[idx].second.size()); + if (acked) { + list_.erase(list_.begin() + idx); + } + } +} + +} // namespace + +} // namespace net diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc new file mode 100644 index 0000000..3cdde26 --- /dev/null +++ b/net/quic/reliable_quic_stream.cc @@ -0,0 +1,126 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/quic/reliable_quic_stream.h" + +#include "net/quic/quic_session.h" + +using base::StringPiece; + +namespace net { + +ReliableQuicStream::ReliableQuicStream(QuicStreamId id, + QuicSession* session) + : sequencer_(this), + id_(id), + offset_(0), + session_(session), + error_(QUIC_NO_ERROR), + read_side_closed_(false), + write_side_closed_(false) { +} + +ReliableQuicStream::~ReliableQuicStream() { +} + +bool ReliableQuicStream::WillAcceptStreamFrame( + const QuicStreamFrame& frame) const { + if (read_side_closed_) { + return false; + } + if (frame.stream_id != id_) { + LOG(ERROR) << "Error!"; + return false; + } + return sequencer_.WillAcceptStreamFrame(frame); +} + +bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { + DCHECK_EQ(frame.stream_id, id_); + if (read_side_closed_) { + // This can only happen if a client sends data after sending a fin or stream + // reset. + Close(QUIC_STREAM_DATA_AFTER_TERMINATION); + return false; + } + + bool accepted = sequencer_.OnStreamFrame(frame); + + if (frame.fin) { + sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), + true); + } + + return accepted; +} + +void ReliableQuicStream::OnStreamReset(QuicErrorCode error, + QuicStreamOffset offset) { + error_ = error; + sequencer_.CloseStreamAtOffset(offset, false); // Full close +} + +void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { + error_ = error; + if (from_peer) { + TerminateFromPeer(false); + } else { + CloseWriteSide(); + CloseReadSide(); + } +} + +void ReliableQuicStream::TerminateFromPeer(bool half_close) { + if (!half_close) { + CloseWriteSide(); + } + CloseReadSide(); +} + +void ReliableQuicStream::Close(QuicErrorCode error) { + error_ = error; + session()->SendRstStream(id(), error, offset_); +} + +bool ReliableQuicStream::IsHalfClosed() { + return sequencer_.IsHalfClosed(); +} + +bool ReliableQuicStream::HasBytesToRead() { + return sequencer_.HasBytesToRead(); +} + +int ReliableQuicStream::WriteData(StringPiece data, bool fin) { + if (write_side_closed_) { + DLOG(ERROR) << "Attempt to write when the write side is closed"; + return 0; + } + + session()->WriteData(id(), data, offset_, fin); + offset_ += data.length(); + if (fin) { + CloseWriteSide(); + } + return data.length(); +} + +void ReliableQuicStream::CloseReadSide() { + DLOG(INFO) << "Done reading from stream " << id(); + + read_side_closed_ = true; + if (write_side_closed_) { + session_->CloseStream(id()); + } +} + +void ReliableQuicStream::CloseWriteSide() { + DLOG(INFO) << "Done writing to stream " << id(); + + write_side_closed_ = true; + if (read_side_closed_) { + session_->CloseStream(id()); + } +} + +} // namespace net diff --git a/net/quic/reliable_quic_stream.h b/net/quic/reliable_quic_stream.h new file mode 100644 index 0000000..366a310 --- /dev/null +++ b/net/quic/reliable_quic_stream.h @@ -0,0 +1,80 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The base class for client/server reliable streams. + +#ifndef NET_QUIC_RELIABLE_QUIC_STREAM_H_ +#define NET_QUIC_RELIABLE_QUIC_STREAM_H_ + +#include "net/quic/quic_stream_sequencer.h" + +namespace net { + +class QuicSession; + +// All this does right now is send data to subclasses via the sequencer. +class NET_EXPORT_PRIVATE ReliableQuicStream { + public: + ReliableQuicStream(QuicStreamId id, + QuicSession* session); + + virtual ~ReliableQuicStream(); + + bool WillAcceptStreamFrame(const QuicStreamFrame& frame) const; + bool OnStreamFrame(const QuicStreamFrame& frame); + + // Called when we get a stream reset from the client. + // The rst will be passed through the sequencer, which will call + // TerminateFromPeer when 'offset' bytes have been processed. + void OnStreamReset(QuicErrorCode error, QuicStreamOffset ofset); + + // Called when we get or send a connection close, and should immediately + // close the stream. This is not passed through the sequencer, + // but is handled immediately. + virtual void ConnectionClose(QuicErrorCode error, bool from_peer); + + // Called by the sequencer, when we should process a stream termination or + // stream close from the peer. + virtual void TerminateFromPeer(bool half_close); + + virtual uint32 ProcessData(const char* data, uint32 data_len) = 0; + + // Called to close the stream from this end. + virtual void Close(QuicErrorCode error); + + // This block of functions wraps the sequencer's functions of the same + // name. + virtual bool IsHalfClosed(); + virtual bool HasBytesToRead(); + + QuicStreamId id() { return id_; } + + QuicErrorCode error() { return error_; } + + protected: + virtual int WriteData(base::StringPiece data, bool fin); + // Close the read side of the socket. Further frames will not be accepted. + virtual void CloseReadSide(); + // Close the write side of the socket. Further writes will fail. + void CloseWriteSide(); + + QuicSession* session() { return session_; } + + private: + friend class ReliableQuicStreamPeer; + + QuicStreamSequencer sequencer_; + QuicStreamId id_; + QuicStreamOffset offset_; + QuicSession* session_; + QuicErrorCode error_; + // True if the read side is closed and further frames should be rejected. + bool read_side_closed_; + // True if the write side is closed, and further writes should fail. + bool write_side_closed_; +}; + +} // namespace net + +#endif // NET_QUIC_RELIABLE_QUIC_STREAM_H_ diff --git a/net/quic/test_tools/quic_test_utils.cc b/net/quic/test_tools/quic_test_utils.cc index a1c1179..3e3171f 100644 --- a/net/quic/test_tools/quic_test_utils.cc +++ b/net/quic/test_tools/quic_test_utils.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. #include "net/quic/test_tools/quic_test_utils.h" + #include "net/quic/crypto/crypto_framer.h" using std::max; @@ -34,6 +35,16 @@ void FramerVisitorCapturingAcks::OnAckFrame(const QuicAckFrame& frame) { frame_ = frame; } +MockHelper::MockHelper() { +} + +MockHelper::~MockHelper() { +} + +QuicClock* MockHelper::GetClock() { + return &clock_; +} + MockConnectionVisitor::MockConnectionVisitor() { } @@ -156,6 +167,36 @@ QuicPacket* ConstructHandshakePacket(QuicGuid guid, CryptoTag tag) { return packet; } +MockConnection::MockConnection(QuicGuid guid, IPEndPoint address) + : QuicConnection(guid, address, new MockHelper()) { +} + +MockConnection::~MockConnection() { +} + +PacketSavingConnection::PacketSavingConnection(QuicGuid guid, + IPEndPoint address) + : MockConnection(guid, address) { +} + +PacketSavingConnection::~PacketSavingConnection() { +} + +bool PacketSavingConnection::SendPacket(QuicPacketSequenceNumber number, + QuicPacket* packet, + bool resend, + bool force) { + packets_.push_back(packet); + return true; +} + +MockSession::MockSession(QuicConnection* connection, bool is_server) + : QuicSession(connection, is_server) { +} + +MockSession::~MockSession() { +} + } // namespace test } // namespace net diff --git a/net/quic/test_tools/quic_test_utils.h b/net/quic/test_tools/quic_test_utils.h index e3a75c4..acabd7c 100644 --- a/net/quic/test_tools/quic_test_utils.h +++ b/net/quic/test_tools/quic_test_utils.h @@ -10,6 +10,8 @@ #include "net/quic/congestion_control/quic_send_scheduler.h" #include "net/quic/quic_connection.h" #include "net/quic/quic_framer.h" +#include "net/quic/quic_session.h" +#include "net/quic/test_tools/mock_clock.h" #include "testing/gmock/include/gmock/gmock.h" namespace net { @@ -66,7 +68,6 @@ class NoOpFramerVisitor : public QuicFramerVisitorInterface { virtual void OnPacketComplete() OVERRIDE {} }; - class FramerVisitorCapturingAcks : public NoOpFramerVisitor { public: // NoOpFramerVisitor @@ -83,7 +84,7 @@ class FramerVisitorCapturingAcks : public NoOpFramerVisitor { class MockConnectionVisitor : public QuicConnectionVisitorInterface { public: MockConnectionVisitor(); - ~MockConnectionVisitor(); + virtual ~MockConnectionVisitor(); MOCK_METHOD4(OnPacket, bool(const IPEndPoint& self_address, const IPEndPoint& peer_address, const QuicPacketHeader& header, @@ -103,6 +104,78 @@ class MockScheduler : public QuicSendScheduler { MOCK_METHOD3(SentPacket, void(QuicPacketSequenceNumber, size_t, bool)); }; +class MockHelper : public QuicConnectionHelperInterface { + public: + MockHelper(); + virtual ~MockHelper(); + + MOCK_METHOD1(SetConnection, void(QuicConnection* connection)); + QuicClock* GetClock(); + MOCK_METHOD4(WritePacketToWire, int(QuicPacketSequenceNumber number, + const QuicEncryptedPacket& packet, + bool resend, + int* error)); + MOCK_METHOD2(SetResendAlarm, void(QuicPacketSequenceNumber sequence_number, + uint64 delay_in_us)); + MOCK_METHOD1(SetSendAlarm, void(uint64 delay_in_us)); + MOCK_METHOD1(SetTimeoutAlarm, void(uint64 delay_in_us)); + MOCK_METHOD0(IsSendAlarmSet, bool()); + MOCK_METHOD0(UnregisterSendAlarmIfRegistered, void()); + private: + MockClock clock_; +}; + +class MockConnection : public QuicConnection { + public: + MockConnection(QuicGuid guid, IPEndPoint address); + virtual ~MockConnection(); + + MOCK_METHOD3(ProcessUdpPacket, void(const IPEndPoint& self_address, + const IPEndPoint& peer_address, + const QuicEncryptedPacket& packet)); + MOCK_METHOD1(SendConnectionClose, void(QuicErrorCode error)); + + MOCK_METHOD3(SendRstStream, void(QuicStreamId id, + QuicErrorCode error, + QuicStreamOffset offset)); + + MOCK_METHOD0(OnCanWrite, bool()); +}; + +class PacketSavingConnection : public MockConnection { + public: + PacketSavingConnection(QuicGuid guid, IPEndPoint address); + virtual ~PacketSavingConnection(); + + virtual bool SendPacket(QuicPacketSequenceNumber number, + QuicPacket* packet, + bool resend, + bool force) OVERRIDE; + + std::vector<QuicPacket*> packets_; +}; + +class MockSession : public QuicSession { + public: + MockSession(QuicConnection* connection, bool is_server); + ~MockSession(); + + MOCK_METHOD4(OnPacket, bool(const IPEndPoint& seld_address, + const IPEndPoint& peer_address, + const QuicPacketHeader& header, + const std::vector<QuicStreamFrame>& frame)); + MOCK_METHOD2(ConnectionClose, void(QuicErrorCode error, bool from_peer)); + MOCK_METHOD1(CreateIncomingReliableStream, + ReliableQuicStream*(QuicStreamId id)); + MOCK_METHOD0(GetCryptoStream, QuicCryptoStream*()); + MOCK_METHOD0(CreateOutgoingReliableStream, ReliableQuicStream*()); + MOCK_METHOD3(WriteData, + void(QuicStreamId id, base::StringPiece data, bool fin)); + MOCK_METHOD4(WriteData, int(QuicStreamId id, base::StringPiece data, + QuicStreamOffset offset, bool fin)); + MOCK_METHOD0(IsHandshakeComplete, bool()); +}; + } // namespace test } // namespace net |