From 188bd40f723812b8a718195fdc5af274c21891e4 Mon Sep 17 00:00:00 2001 From: xunjieli Date: Fri, 11 Mar 2016 16:17:25 -0800 Subject: Only MarkTrailersConsumed when actually sending trailers notification MarkTrailersConsumed is currently done before the notification is posted asynchronously. However, there might be a pending read before that notification is executed but after MarkTrailersConsumed, which leads to delegate mistakenly to assume that the trailers are consumed. This CL changes so that we MarkTrailersConsumed when we actually will immediately notify delegate. This CL also patches some internal trailers changes in quic_spdy_stream.cc and quic_spdy_client_stream.cc. BUG=586207 Review URL: https://codereview.chromium.org/1776423004 Cr-Commit-Position: refs/heads/master@{#380799} --- net/quic/quic_chromium_client_stream.cc | 39 +++++------ net/quic/quic_chromium_client_stream_test.cc | 96 ++++++++++++++++++++++++++- net/quic/quic_http_stream_test.cc | 2 + net/quic/quic_spdy_stream.cc | 18 ++++- net/quic/quic_spdy_stream.h | 7 ++ net/quic/quic_spdy_stream_test.cc | 39 +++++++++++ net/tools/quic/quic_client.cc | 3 +- net/tools/quic/quic_spdy_client_stream.cc | 14 +--- net/tools/quic/quic_spdy_client_stream.h | 6 -- net/tools/quic/test_tools/quic_test_client.cc | 2 +- 10 files changed, 180 insertions(+), 46 deletions(-) diff --git a/net/quic/quic_chromium_client_stream.cc b/net/quic/quic_chromium_client_stream.cc index b68fa82..529f0a7 100644 --- a/net/quic/quic_chromium_client_stream.cc +++ b/net/quic/quic_chromium_client_stream.cc @@ -36,35 +36,27 @@ QuicChromiumClientStream::~QuicChromiumClientStream() { void QuicChromiumClientStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { QuicSpdyStream::OnStreamHeadersComplete(fin, frame_len); - SpdyHeaderBlock headers; - SpdyFramer framer(HTTP2); - - size_t headers_len; - const char* header_data; if (decompressed_headers().empty() && !decompressed_trailers().empty()) { DCHECK(trailers_decompressed()); - headers_len = decompressed_trailers().length(); - header_data = decompressed_trailers().data(); + // The delegate will read the trailers via a posted task. + NotifyDelegateOfHeadersCompleteLater(response_trailers(), frame_len); } else { DCHECK(!headers_delivered_); - headers_len = decompressed_headers().length(); - header_data = decompressed_headers().data(); - } - if (!framer.ParseHeaderBlockInBuffer(header_data, headers_len, &headers)) { - DLOG(WARNING) << "Invalid headers"; - Reset(QUIC_BAD_APPLICATION_PAYLOAD); - return; - } - - if (!headers_delivered_) { + SpdyHeaderBlock headers; + SpdyFramer framer(HTTP2); + size_t headers_len = decompressed_headers().length(); + const char* header_data = decompressed_headers().data(); + if (!framer.ParseHeaderBlockInBuffer(header_data, headers_len, &headers)) { + DLOG(WARNING) << "Invalid headers"; + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + return; + } MarkHeadersConsumed(headers_len); session_->OnInitialHeadersComplete(id(), headers); - } else { - MarkTrailersConsumed(headers_len); - } - // The delegate will read the headers via a posted task. - NotifyDelegateOfHeadersCompleteLater(headers, frame_len); + // The delegate will read the headers via a posted task. + NotifyDelegateOfHeadersCompleteLater(headers, frame_len); + } } void QuicChromiumClientStream::OnPromiseHeadersComplete( @@ -194,6 +186,9 @@ void QuicChromiumClientStream::NotifyDelegateOfHeadersComplete( size_t frame_len) { if (!delegate_) return; + // Only mark trailers consumed when we are about to notify delegate. + if (headers_delivered_) + MarkTrailersConsumed(decompressed_trailers().length()); headers_delivered_ = true; delegate_->OnHeadersAvailable(headers, frame_len); diff --git a/net/quic/quic_chromium_client_stream_test.cc b/net/quic/quic_chromium_client_stream_test.cc index 19c427c..4e9e2bc 100644 --- a/net/quic/quic_chromium_client_stream_test.cc +++ b/net/quic/quic_chromium_client_stream_test.cc @@ -5,6 +5,8 @@ #include "net/quic/quic_chromium_client_stream.h" #include "base/macros.h" +#include "base/run_loop.h" +#include "base/strings/string_number_conversions.h" #include "net/base/net_errors.h" #include "net/base/test_completion_callback.h" #include "net/quic/quic_chromium_client_session.h" @@ -35,7 +37,8 @@ class MockDelegate : public QuicChromiumClientStream::Delegate { MOCK_METHOD0(OnSendData, int()); MOCK_METHOD2(OnSendDataComplete, int(int, bool*)); - MOCK_METHOD2(OnHeadersAvailable, void(const SpdyHeaderBlock&, size_t)); + MOCK_METHOD2(OnHeadersAvailable, + void(const SpdyHeaderBlock& headers, size_t frame_len)); MOCK_METHOD2(OnDataReceived, int(const char*, int)); MOCK_METHOD0(OnDataAvailable, void()); MOCK_METHOD1(OnClose, void(QuicErrorCode)); @@ -313,14 +316,103 @@ TEST_P(QuicChromiumClientStreamTest, OnTrailers) { SpdyHeaderBlock trailers; trailers["bar"] = "foo"; + trailers[kFinalOffsetHeaderKey] = base::IntToString(strlen(data)); std::string uncompressed_trailers = SpdyUtils::SerializeUncompressedHeaders(trailers); stream_->OnStreamHeaders(uncompressed_trailers); stream_->OnStreamHeadersComplete(true, uncompressed_trailers.length()); + SpdyHeaderBlock actual_trailers; + + base::RunLoop run_loop; + EXPECT_CALL(delegate_, OnHeadersAvailable(_, uncompressed_trailers.length())) + .WillOnce(testing::DoAll( + testing::SaveArg<0>(&actual_trailers), + testing::InvokeWithoutArgs([&run_loop]() { run_loop.Quit(); }))); + + run_loop.Run(); + // Make sure kFinalOffsetHeaderKey is gone from the delivered actual trailers. + trailers.erase(kFinalOffsetHeaderKey); + EXPECT_EQ(trailers, actual_trailers); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_CALL(delegate_, OnClose(QUIC_NO_ERROR)); +} + +// Tests that trailers are marked as consumed only before delegate is to be +// immediately notified about trailers. +TEST_P(QuicChromiumClientStreamTest, MarkTrailersConsumedWhenNotifyDelegate) { + InitializeHeaders(); + std::string uncompressed_headers = + SpdyUtils::SerializeUncompressedHeaders(headers_); + stream_->OnStreamHeaders(uncompressed_headers); + stream_->OnStreamHeadersComplete(false, uncompressed_headers.length()); + + EXPECT_CALL(delegate_, + OnHeadersAvailable(headers_, uncompressed_headers.length())); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(stream_->decompressed_headers().empty()); + + const char data[] = "hello world!"; + stream_->OnStreamFrame(QuicStreamFrame(kTestStreamId, /*fin=*/false, + /*offset=*/0, data)); + + base::RunLoop run_loop; + EXPECT_CALL(delegate_, OnDataAvailable()) + .Times(1) + .WillOnce(testing::DoAll( + testing::Invoke(CreateFunctor( + &QuicChromiumClientStreamTest::ReadData, base::Unretained(this), + StringPiece(data, arraysize(data) - 1))), + testing::Invoke([&run_loop]() { run_loop.Quit(); }))); + + // Wait for the read to complete. + run_loop.Run(); + + // Read again, and it will be pending. + scoped_refptr buffer(new IOBuffer(1)); + EXPECT_EQ(ERR_IO_PENDING, stream_->Read(buffer.get(), 1)); + + SpdyHeaderBlock trailers; + trailers["bar"] = "foo"; + trailers[kFinalOffsetHeaderKey] = base::IntToString(strlen(data)); + std::string uncompressed_trailers = + SpdyUtils::SerializeUncompressedHeaders(trailers); + + stream_->OnStreamHeaders(uncompressed_trailers); + stream_->OnStreamHeadersComplete(true, uncompressed_trailers.length()); + EXPECT_FALSE(stream_->IsDoneReading()); + + // Now the pending should complete. Make sure that IsDoneReading() is false + // even though ReadData returns 0 byte, because OnHeadersAvailable callback + // comes after this OnDataAvailable callback. + base::RunLoop run_loop2; + EXPECT_CALL(delegate_, OnDataAvailable()) + .Times(1) + .WillOnce(testing::DoAll( + testing::Invoke(CreateFunctor(&QuicChromiumClientStreamTest::ReadData, + base::Unretained(this), StringPiece())), + testing::InvokeWithoutArgs([&run_loop2]() { run_loop2.Quit(); }))); + run_loop2.Run(); + // Make sure that the stream is not closed, even though ReadData returns 0. + EXPECT_FALSE(stream_->IsDoneReading()); + + // The OnHeadersAvailable call should follow. + base::RunLoop run_loop3; + SpdyHeaderBlock actual_trailers; EXPECT_CALL(delegate_, - OnHeadersAvailable(trailers, uncompressed_trailers.length())); + OnHeadersAvailable(trailers, uncompressed_trailers.length())) + .WillOnce(testing::DoAll( + testing::SaveArg<0>(&actual_trailers), + testing::InvokeWithoutArgs([&run_loop3]() { run_loop3.Quit(); }))); + + run_loop3.Run(); + // Make sure the stream is properly closed since trailers and data are all + // consumed. + EXPECT_TRUE(stream_->IsDoneReading()); + // Make sure kFinalOffsetHeaderKey is gone from the delivered actual trailers. + trailers.erase(kFinalOffsetHeaderKey); + EXPECT_EQ(trailers, actual_trailers); base::MessageLoop::current()->RunUntilIdle(); EXPECT_CALL(delegate_, OnClose(QUIC_NO_ERROR)); diff --git a/net/quic/quic_http_stream_test.cc b/net/quic/quic_http_stream_test.cc index 3399cc9..e64a8aa 100644 --- a/net/quic/quic_http_stream_test.cc +++ b/net/quic/quic_http_stream_test.cc @@ -9,6 +9,7 @@ #include #include "base/memory/scoped_ptr.h" +#include "base/strings/string_number_conversions.h" #include "base/thread_task_runner_handle.h" #include "net/base/chunked_upload_data_stream.h" #include "net/base/elements_upload_data_stream.h" @@ -580,6 +581,7 @@ TEST_P(QuicHttpStreamTest, GetRequestWithTrailers) { SpdyHeaderBlock trailers; size_t spdy_trailers_frame_length; trailers["foo"] = "bar"; + trailers[kFinalOffsetHeaderKey] = base::IntToString(strlen(kResponseBody)); ProcessPacket(ConstructResponseTrailersPacket( 4, kFin, trailers, &spdy_trailers_frame_length, &offset)); diff --git a/net/quic/quic_spdy_stream.cc b/net/quic/quic_spdy_stream.cc index 75b160e..3486593 100644 --- a/net/quic/quic_spdy_stream.cc +++ b/net/quic/quic_spdy_stream.cc @@ -10,6 +10,7 @@ #include "net/quic/quic_spdy_session.h" #include "net/quic/quic_utils.h" #include "net/quic/quic_write_blocked_list.h" +#include "net/quic/spdy_utils.h" using base::StringPiece; using std::min; @@ -213,7 +214,22 @@ void QuicSpdyStream::OnTrailingHeadersComplete(bool fin, size_t /*frame_len*/) { return; } - OnStreamFrame(QuicStreamFrame(id(), fin, stream_bytes_read(), StringPiece())); + size_t final_byte_offset = 0; + SpdyHeaderBlock trailers; + if (!SpdyUtils::ParseTrailers(decompressed_trailers().data(), + decompressed_trailers().length(), + &final_byte_offset, &response_trailers_)) { + DLOG(ERROR) << "Trailers are malformed: " << id(); + session()->connection()->SendConnectionCloseWithDetails( + QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers are malformed"); + return; + } + + // The data on this stream ends at |final_byte_offset|. + DVLOG(1) << "Stream ends at byte offset: " << final_byte_offset + << " currently read: " << stream_bytes_read(); + + OnStreamFrame(QuicStreamFrame(id(), fin, final_byte_offset, StringPiece())); trailers_decompressed_ = true; } diff --git a/net/quic/quic_spdy_stream.h b/net/quic/quic_spdy_stream.h index 65304d6..a5adfe2 100644 --- a/net/quic/quic_spdy_stream.h +++ b/net/quic/quic_spdy_stream.h @@ -141,6 +141,11 @@ class NET_EXPORT_PRIVATE QuicSpdyStream : public ReliableQuicStream { return decompressed_trailers_; } + // Returns whatever trailers have been received for this stream. + const SpdyHeaderBlock& response_trailers() const { + return response_trailers_; + } + virtual SpdyPriority priority() const; // Sets priority_ to priority. This should only be called before bytes are @@ -182,6 +187,8 @@ class NET_EXPORT_PRIVATE QuicSpdyStream : public ReliableQuicStream { // Contains a copy of the decompressed trailers until they are consumed // via ProcessData or Readv. std::string decompressed_trailers_; + // The parsed trailers received from the peer. + SpdyHeaderBlock response_trailers_; DISALLOW_COPY_AND_ASSIGN(QuicSpdyStream); }; diff --git a/net/quic/quic_spdy_stream_test.cc b/net/quic/quic_spdy_stream_test.cc index 4f71cb7..1e7abc6 100644 --- a/net/quic/quic_spdy_stream_test.cc +++ b/net/quic/quic_spdy_stream_test.cc @@ -631,6 +631,7 @@ TEST_P(QuicSpdyStreamTest, ReceivingTrailers) { trailers_block["key1"] = "value1"; trailers_block["key2"] = "value2"; trailers_block["key3"] = "value3"; + trailers_block[kFinalOffsetHeaderKey] = "0"; string trailers = SpdyUtils::SerializeUncompressedHeaders(trailers_block); stream_->OnStreamHeaders(trailers); stream_->OnStreamHeadersComplete(/*fin=*/true, trailers.size()); @@ -709,6 +710,44 @@ TEST_P(QuicSpdyStreamTest, ReceivingTrailersAfterBodyWithFin) { stream_->OnStreamHeadersComplete(/*fin=*/true, trailers.size()); } +TEST_P(QuicSpdyStreamTest, ReceivingTrailersWithOffset) { + // Test that when receiving trailing headers with an offset before response + // body, stream is closed at the right offset. + Initialize(kShouldProcessData); + + // Receive initial headers. + string headers = SpdyUtils::SerializeUncompressedHeaders(headers_); + stream_->OnStreamHeaders(headers); + stream_->OnStreamHeadersComplete(false, headers.size()); + stream_->MarkHeadersConsumed(stream_->decompressed_headers().size()); + + const string body = "this is the body"; + // Receive trailing headers. + SpdyHeaderBlock trailers_block; + trailers_block["key1"] = "value1"; + trailers_block["key2"] = "value2"; + trailers_block["key3"] = "value3"; + trailers_block[kFinalOffsetHeaderKey] = base::IntToString(body.size()); + string trailers = SpdyUtils::SerializeUncompressedHeaders(trailers_block); + stream_->OnStreamHeaders(trailers); + stream_->OnStreamHeadersComplete(/*fin=*/true, trailers.size()); + + // The trailers should be decompressed, and readable from the stream. + EXPECT_TRUE(stream_->trailers_decompressed()); + const string decompressed_trailers = stream_->decompressed_trailers(); + EXPECT_EQ(trailers, decompressed_trailers); + // Consuming the trailers erases them from the stream. + stream_->MarkTrailersConsumed(decompressed_trailers.size()); + EXPECT_EQ("", stream_->decompressed_trailers()); + + EXPECT_FALSE(stream_->IsDoneReading()); + // Receive and consume body. + QuicStreamFrame frame(kClientDataStreamId1, /*fin=*/false, 0, body); + stream_->OnStreamFrame(frame); + EXPECT_EQ(body, stream_->data()); + EXPECT_TRUE(stream_->IsDoneReading()); +} + TEST_P(QuicSpdyStreamTest, ClosingStreamWithNoTrailers) { // Verify that a stream receiving headers, body, and no trailers is correctly // marked as done reading on consumption of headers and body. diff --git a/net/tools/quic/quic_client.cc b/net/tools/quic/quic_client.cc index 56514ae..04b7d2f 100644 --- a/net/tools/quic/quic_client.cc +++ b/net/tools/quic/quic_client.cc @@ -434,7 +434,8 @@ void QuicClient::OnClose(QuicSpdyStream* stream) { latest_response_code_ = headers.parsed_response_code(); headers.DumpHeadersToString(&latest_response_headers_); latest_response_body_ = client_stream->data(); - latest_response_trailers_ = client_stream->trailers().DebugString(); + latest_response_trailers_ = + client_stream->response_trailers().DebugString(); } } diff --git a/net/tools/quic/quic_spdy_client_stream.cc b/net/tools/quic/quic_spdy_client_stream.cc index 1228936..615fab4 100644 --- a/net/tools/quic/quic_spdy_client_stream.cc +++ b/net/tools/quic/quic_spdy_client_stream.cc @@ -72,20 +72,8 @@ void QuicSpdyClientStream::OnInitialHeadersComplete(bool fin, void QuicSpdyClientStream::OnTrailingHeadersComplete(bool fin, size_t frame_len) { - size_t final_byte_offset = 0; - if (!SpdyUtils::ParseTrailers(decompressed_trailers().data(), - decompressed_trailers().length(), - &final_byte_offset, &response_trailers_)) { - Reset(QUIC_BAD_APPLICATION_PAYLOAD); - return; - } + QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len); MarkTrailersConsumed(decompressed_trailers().length()); - - // The data on this stream ends at |final_byte_offset|. - DVLOG(1) << "Stream ends at byte offset: " << final_byte_offset - << " currently read: " << stream_bytes_read(); - OnStreamFrame( - QuicStreamFrame(id(), /*fin=*/true, final_byte_offset, StringPiece())); } void QuicSpdyClientStream::OnPromiseHeadersComplete(QuicStreamId promised_id, diff --git a/net/tools/quic/quic_spdy_client_stream.h b/net/tools/quic/quic_spdy_client_stream.h index 7268ff0..dcb8896 100644 --- a/net/tools/quic/quic_spdy_client_stream.h +++ b/net/tools/quic/quic_spdy_client_stream.h @@ -64,9 +64,6 @@ class QuicSpdyClientStream : public QuicSpdyStream { // Returns whatever headers have been received for this stream. const SpdyHeaderBlock& headers() { return response_headers_; } - // Returns whatever trailers have been received for this stream. - const SpdyHeaderBlock& trailers() { return response_trailers_; } - size_t header_bytes_read() const { return header_bytes_read_; } size_t header_bytes_written() const { return header_bytes_written_; } @@ -89,9 +86,6 @@ class QuicSpdyClientStream : public QuicSpdyStream { // The parsed headers received from the server. SpdyHeaderBlock response_headers_; - // The parsed trailers received from the server. - SpdyHeaderBlock response_trailers_; - // The parsed content-length, or -1 if none is specified. int content_length_; int response_code_; diff --git a/net/tools/quic/test_tools/quic_test_client.cc b/net/tools/quic/test_tools/quic_test_client.cc index e926279a..fad3ab9 100644 --- a/net/tools/quic/test_tools/quic_test_client.cc +++ b/net/tools/quic/test_tools/quic_test_client.cc @@ -568,7 +568,7 @@ void QuicTestClient::OnClose(QuicSpdyStream* stream) { response_complete_ = true; response_headers_complete_ = stream_->headers_decompressed(); SpdyBalsaUtils::SpdyHeadersToResponseHeaders(stream_->headers(), &headers_); - response_trailers_ = stream_->trailers(); + response_trailers_ = stream_->response_trailers(); stream_error_ = stream_->stream_error(); bytes_read_ = stream_->stream_bytes_read() + stream_->header_bytes_read(); bytes_written_ = -- cgit v1.1