diff options
author | kmarshall <kmarshall@chromium.org> | 2015-12-02 11:23:15 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-12-02 19:24:14 +0000 |
commit | d9e1c5c6a8780007ea3dc03e594889a5597d32d5 (patch) | |
tree | 9ca88561d2379eae501442921cc7b0c34d84fcab /blimp/net | |
parent | 8cafdc1d0e5f922d1c1cb7f24b827de82c2fffe9 (diff) | |
download | chromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.zip chromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.tar.gz chromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.tar.bz2 |
Implementation for the Blimp Message Buffer.
This class provides a FIFO buffer for reliable, ordered message
delivery, and schedules completion callbacks in response
to message delivery acknowledgements.
BUG=557360
R=wez@chromium.org,haibinlu@chromium.org
Review URL: https://codereview.chromium.org/1458633002
Cr-Commit-Position: refs/heads/master@{#362769}
Diffstat (limited to 'blimp/net')
-rw-r--r-- | blimp/net/BUILD.gn | 1 | ||||
-rw-r--r-- | blimp/net/blimp_message_output_buffer.cc | 137 | ||||
-rw-r--r-- | blimp/net/blimp_message_output_buffer.h | 65 | ||||
-rw-r--r-- | blimp/net/blimp_message_output_buffer_unittest.cc | 262 | ||||
-rw-r--r-- | blimp/net/browser_connection_handler.cc | 13 | ||||
-rw-r--r-- | blimp/net/browser_connection_handler.h | 6 | ||||
-rw-r--r-- | blimp/net/test_common.h | 8 |
7 files changed, 467 insertions, 25 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index 4010a0b..f20c77c 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -80,6 +80,7 @@ source_set("unit_tests") { "blimp_connection_unittest.cc", "blimp_message_demultiplexer_unittest.cc", "blimp_message_multiplexer_unittest.cc", + "blimp_message_output_buffer_unittest.cc", "blimp_message_pump_unittest.cc", "engine_connection_manager_unittest.cc", "input_message_unittest.cc", diff --git a/blimp/net/blimp_message_output_buffer.cc b/blimp/net/blimp_message_output_buffer.cc index dd79563a..62cc259 100644 --- a/blimp/net/blimp_message_output_buffer.cc +++ b/blimp/net/blimp_message_output_buffer.cc @@ -4,26 +4,151 @@ #include "blimp/net/blimp_message_output_buffer.h" +#include <algorithm> + #include "base/macros.h" +#include "base/message_loop/message_loop.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "net/base/net_errors.h" namespace blimp { -BlimpMessageOutputBuffer::BlimpMessageOutputBuffer() { - NOTIMPLEMENTED(); +BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) + : max_buffer_size_bytes_(max_buffer_size_bytes) {} + +BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} + +void BlimpMessageOutputBuffer::SetOutputProcessor( + BlimpMessageProcessor* processor) { + // Check that we are setting or removing the processor, not replacing it. + if (processor) { + DCHECK(!output_processor_); + output_processor_ = processor; + write_complete_cb_.Reset(base::Bind( + &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); + WriteNextMessageIfReady(); + } else { + DCHECK(output_processor_); + output_processor_ = nullptr; + write_complete_cb_.Cancel(); + } +} + +void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { + DCHECK(output_processor_); + + // Prepend the entirety of |ack_buffer_| to |write_buffer_|. + write_buffer_.insert(write_buffer_.begin(), + std::make_move_iterator(ack_buffer_.begin()), + std::make_move_iterator(ack_buffer_.end())); + ack_buffer_.clear(); + + WriteNextMessageIfReady(); } -BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() { - NOTIMPLEMENTED(); +int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { + return write_buffer_.size() + ack_buffer_.size(); +} + +int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { + return ack_buffer_.size(); } void BlimpMessageOutputBuffer::ProcessMessage( scoped_ptr<BlimpMessage> message, const net::CompletionCallback& callback) { - NOTIMPLEMENTED(); + VLOG(2) << "ProcessMessage (id=" << message->message_id() + << ", type=" << message->type() << ")"; + + message->set_message_id(++prev_message_id_); + + current_buffer_size_bytes_ += message->ByteSize(); + DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); + + write_buffer_.push_back( + make_scoped_ptr(new BufferEntry(message.Pass(), callback))); + + // Write the message + if (write_buffer_.size() == 1 && output_processor_) { + WriteNextMessageIfReady(); + } } +// Flushes acknowledged messages from the buffer and invokes their +// |callbacks|, if any. void BlimpMessageOutputBuffer::OnMessageCheckpoint(int64 message_id) { - NOTIMPLEMENTED(); + VLOG(2) << "OnMessageCheckpoint (message_id=" << message_id << ")"; + if (ack_buffer_.empty()) { + LOG(WARNING) << "Checkpoint called while buffer is empty."; + return; + } + if (message_id > prev_message_id_) { + LOG(WARNING) << "Illegal checkpoint response: " << message_id; + return; + } + + // Remove all acknowledged messages through |message_id| and invoke their + // write callbacks, if set. + while (!ack_buffer_.empty() && + ack_buffer_.front()->message->message_id() <= message_id) { + const BufferEntry& ack_entry = *ack_buffer_.front(); + current_buffer_size_bytes_ -= ack_entry.message->GetCachedSize(); + DCHECK_GE(current_buffer_size_bytes_, 0); + VLOG(3) << "Buffer size: " << current_buffer_size_bytes_ + << " (max=" << current_buffer_size_bytes_ << ")"; + + if (!ack_entry.callback.is_null()) { + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(ack_entry.callback, net::OK)); + } + + ack_buffer_.pop_front(); + } + + // An empty buffer should have a zero-byte footprint. + DCHECK(current_buffer_size_bytes_ > 0 || + (ack_buffer_.empty() && write_buffer_.empty())) + << "Expected zero-length buffer size, was " << current_buffer_size_bytes_ + << " bytes instead."; +} + +BlimpMessageOutputBuffer::BufferEntry::BufferEntry( + scoped_ptr<BlimpMessage> message, + net::CompletionCallback callback) + : message(message.Pass()), callback(callback) {} + +BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} + +void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { + if (write_buffer_.empty()) { + VLOG(2) << "Nothing to write."; + return; + } + + scoped_ptr<BlimpMessage> message_to_write( + new BlimpMessage(*write_buffer_.front()->message)); + VLOG(3) << "Writing message (id=" + << write_buffer_.front()->message->message_id() + << ", type=" << message_to_write->type() << ")"; + + output_processor_->ProcessMessage(message_to_write.Pass(), + write_complete_cb_.callback()); + VLOG(3) << "Queue size: " << write_buffer_.size(); +} + +void BlimpMessageOutputBuffer::OnWriteComplete(int result) { + DCHECK_LE(result, net::OK); + VLOG(2) << "Write complete, result=" << result; + + if (result == net::OK) { + ack_buffer_.push_back(std::move(write_buffer_.front())); + write_buffer_.pop_front(); + WriteNextMessageIfReady(); + } else { + // An error occurred while writing to the network connection. + // Stop writing more messages until a new connection is established. + DLOG(WARNING) << "Write error (result=" << result << ")"; + } } } // namespace blimp diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h index c1df574..0a7bddf 100644 --- a/blimp/net/blimp_message_output_buffer.h +++ b/blimp/net/blimp_message_output_buffer.h @@ -5,10 +5,15 @@ #ifndef BLIMP_NET_BLIMP_MESSAGE_OUTPUT_BUFFER_H_ #define BLIMP_NET_BLIMP_MESSAGE_OUTPUT_BUFFER_H_ +#include <list> +#include <queue> +#include <utility> + #include "base/macros.h" #include "blimp/net/blimp_message_checkpoint_observer.h" #include "blimp/net/blimp_message_processor.h" #include "blimp/net/blimp_net_export.h" +#include "net/base/completion_callback.h" namespace blimp { @@ -16,19 +21,23 @@ class BlimpConnection; // Provides a FIFO buffer for reliable, ordered message delivery. // Messages are retained for redelivery until they are acknowledged by the -// receiving end (via BlimpMessageAckObserver). +// receiving end (via BlimpMessageCheckpointObserver). +// Messages can be paired with callbacks that are invoked on successful +// message acknowledgement. // (Redelivery will be used in a future CL to implement Fast Recovery // of dropped connections.) class BLIMP_NET_EXPORT BlimpMessageOutputBuffer : public BlimpMessageProcessor, public BlimpMessageCheckpointObserver { public: - BlimpMessageOutputBuffer(); + explicit BlimpMessageOutputBuffer(int max_buffer_size_bytes); ~BlimpMessageOutputBuffer() override; - // Sets the processor to receive messages from this buffer. - // TODO(kmarshall): implement this. - void set_output_processor(BlimpMessageProcessor* processor) {} + // Sets the processor that will be used for writing buffered messages. + void SetOutputProcessor(BlimpMessageProcessor* processor); + + // Marks all messages in buffer for retransmission. + void RetransmitBufferedMessages(); // BlimpMessageProcessor implementation. // |callback|, if set, will be called once the remote end has acknowledged the @@ -37,13 +46,51 @@ class BLIMP_NET_EXPORT BlimpMessageOutputBuffer const net::CompletionCallback& callback) override; // MessageCheckpointObserver implementation. - // Flushes acknowledged messages from the buffer and invokes their - // |callbacks|, if any. - // Callbacks should not destroy |this| so as to not interfere with the - // processing of any other pending callbacks. void OnMessageCheckpoint(int64 message_id) override; + int GetBufferByteSizeForTest() const; + int GetUnacknowledgedMessageCountForTest() const; + private: + struct BufferEntry { + BufferEntry(scoped_ptr<BlimpMessage> message, + net::CompletionCallback callback); + ~BufferEntry(); + + const scoped_ptr<BlimpMessage> message; + const net::CompletionCallback callback; + }; + + typedef std::list<scoped_ptr<BufferEntry>> MessageBuffer; + + // Writes the next message in the buffer if an output processor is attached + // and the buffer contains a message. + void WriteNextMessageIfReady(); + + // Receives the completion status of a write operation. + void OnWriteComplete(int result); + + BlimpMessageProcessor* output_processor_ = nullptr; + net::CancelableCompletionCallback write_complete_cb_; + + // Maximum serialized footprint of buffered messages. + int max_buffer_size_bytes_; + + // Serialized footprint of the messages contained in the write and ack + // buffers. + int current_buffer_size_bytes_ = 0; + + // The ID used by the last outgoing message. + int64 prev_message_id_ = 0; + + // List of unsent messages. + MessageBuffer write_buffer_; + + // List of messages that are sent and awaiting acknoweldgement. + // The messages in |ack_buffer_| are contiguous with the messages in + // |write_buffer_|. + MessageBuffer ack_buffer_; + DISALLOW_COPY_AND_ASSIGN(BlimpMessageOutputBuffer); }; diff --git a/blimp/net/blimp_message_output_buffer_unittest.cc b/blimp/net/blimp_message_output_buffer_unittest.cc new file mode 100644 index 0000000..2278fb4 --- /dev/null +++ b/blimp/net/blimp_message_output_buffer_unittest.cc @@ -0,0 +1,262 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/blimp_message_output_buffer.h" + +#include "base/callback_helpers.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "blimp/net/test_common.h" +#include "net/base/net_errors.h" +#include "net/base/test_completion_callback.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::InvokeArgument; +using testing::Ref; +using testing::Return; +using testing::SaveArg; + +namespace blimp { +namespace { + +class BlimpMessageOutputBufferTest : public testing::Test { + public: + BlimpMessageOutputBufferTest() {} + + void SetUp() override { + input_msg_.set_type(BlimpMessage::INPUT); + input_msg_.set_message_id(1); + compositor_msg_.set_type(BlimpMessage::COMPOSITOR); + compositor_msg_.set_message_id(2); + + // Buffer should only have space for two unacknowledged messages + // (with message IDs). + ASSERT_EQ(input_msg_.ByteSize(), compositor_msg_.ByteSize()); + buffer_.reset(new BlimpMessageOutputBuffer(2 * input_msg_.GetCachedSize())); + } + + protected: + void AddOutputExpectation(const BlimpMessage& msg) { + EXPECT_CALL(output_processor_, MockableProcessMessage(EqualsProto(msg), _)) + .WillOnce(SaveArg<1>(&captured_cb_)) + .RetiresOnSaturation(); + } + + BlimpMessage WithMessageId(const BlimpMessage& message, int64 message_id) { + BlimpMessage output = message; + output.set_message_id(message_id); + return output; + } + + BlimpMessage input_msg_; + BlimpMessage compositor_msg_; + + base::MessageLoop message_loop_; + net::CompletionCallback captured_cb_; + MockBlimpMessageProcessor output_processor_; + scoped_ptr<BlimpMessageOutputBuffer> buffer_; + testing::InSequence s; +}; + +// Verify batched writes and acknowledgements. +TEST_F(BlimpMessageOutputBufferTest, SeparatelyBufferWriteAck) { + net::TestCompletionCallback complete_cb_1; + net::TestCompletionCallback complete_cb_2; + + AddOutputExpectation(input_msg_); + AddOutputExpectation(compositor_msg_); + + // Accumulate two messages. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), + complete_cb_1.callback()); + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), + complete_cb_2.callback()); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + + // Write two messages. + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->SetOutputProcessor(&output_processor_); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Both messages are acknowledged by separate checkpoints. + buffer_->OnMessageCheckpoint(1); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + buffer_->OnMessageCheckpoint(2); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); +} + +// Verify buffer writes from an empty state. +TEST_F(BlimpMessageOutputBufferTest, WritesFromEmptyBuffer) { + net::TestCompletionCallback complete_cb_1; + net::TestCompletionCallback complete_cb_2; + + AddOutputExpectation(input_msg_); + AddOutputExpectation(compositor_msg_); + + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->SetOutputProcessor(&output_processor_); + + // Message #0 is buffered, sent, acknowledged. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), + complete_cb_1.callback()); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + buffer_->OnMessageCheckpoint(1); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), + complete_cb_2.callback()); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + buffer_->OnMessageCheckpoint(2); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); +} + +// Verify that a single checkpoint can be used to acknowledge two writes. +TEST_F(BlimpMessageOutputBufferTest, SharedCheckpoint) { + net::TestCompletionCallback complete_cb_1; + net::TestCompletionCallback complete_cb_2; + + AddOutputExpectation(input_msg_); + AddOutputExpectation(compositor_msg_); + + // Message #1 is written but unacknowledged. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), + complete_cb_1.callback()); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->SetOutputProcessor(&output_processor_); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Message #2 is written but unacknowledged. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), + complete_cb_2.callback()); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_TRUE(captured_cb_.is_null()); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Both messages are acknowledged in one checkpoint. + buffer_->OnMessageCheckpoint(2); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); +} + +// Verify that messages that fail to write are kept in a pending write state. +TEST_F(BlimpMessageOutputBufferTest, WriteError) { + net::TestCompletionCallback complete_cb_1; + net::TestCompletionCallback complete_cb_2; + + AddOutputExpectation(input_msg_); + AddOutputExpectation(input_msg_); + + // Accumulate two messages. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), + complete_cb_1.callback()); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + + // First write attempt, which fails. + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->SetOutputProcessor(&output_processor_); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::ERR_FAILED); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Simulate disconnect. + buffer_->SetOutputProcessor(nullptr); + + // Reconnect. Should immediately try to write the contents of the buffer. + buffer_->SetOutputProcessor(&output_processor_); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + buffer_->OnMessageCheckpoint(1); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); +} + +// Verify that unacknowledged messages can be moved back to a pending write +// state (recovery after a lost connection.) +TEST_F(BlimpMessageOutputBufferTest, MessageRetransmit) { + net::TestCompletionCallback complete_cb_1; + net::TestCompletionCallback complete_cb_2; + + AddOutputExpectation(input_msg_); + AddOutputExpectation(compositor_msg_); + AddOutputExpectation(compositor_msg_); // Retransmitted message. + + // Accumulate two messages. + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), + complete_cb_1.callback()); + buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), + complete_cb_2.callback()); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + + // Write two messages. + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->SetOutputProcessor(&output_processor_); + ASSERT_FALSE(captured_cb_.is_null()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Simulate disconnect & reconnect. + buffer_->SetOutputProcessor(nullptr); + buffer_->SetOutputProcessor(&output_processor_); + + // Remote end indicates that it only received message #0. + // Message #1 should be moved from an unacknowledged state to a pending write + // state. + ASSERT_TRUE(captured_cb_.is_null()); + buffer_->OnMessageCheckpoint(1); + buffer_->RetransmitBufferedMessages(); + ASSERT_FALSE(captured_cb_.is_null()); + ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + base::ResetAndReturn(&captured_cb_).Run(net::OK); + ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); + + // Remote end acknowledges #1, buffer should be empty. + buffer_->OnMessageCheckpoint(2); + ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); + ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); + EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); +} + +} // namespace +} // namespace blimp diff --git a/blimp/net/browser_connection_handler.cc b/blimp/net/browser_connection_handler.cc index d47623e8..897ae62 100644 --- a/blimp/net/browser_connection_handler.cc +++ b/blimp/net/browser_connection_handler.cc @@ -13,10 +13,17 @@ #include "blimp/net/blimp_message_processor.h" namespace blimp { +namespace { + +// Maximum footprint of the output buffer. +// TODO(kmarshall): Use a value that's computed from the platform. +const int kMaxBufferSizeBytes = 1 << 24; + +} // namespace BrowserConnectionHandler::BrowserConnectionHandler() : demultiplexer_(new BlimpMessageDemultiplexer), - output_buffer_(new BlimpMessageOutputBuffer), + output_buffer_(new BlimpMessageOutputBuffer(kMaxBufferSizeBytes)), multiplexer_(new BlimpMessageMultiplexer(output_buffer_.get())) {} BrowserConnectionHandler::~BrowserConnectionHandler() {} @@ -37,7 +44,7 @@ void BrowserConnectionHandler::HandleConnection( // Connect the incoming & outgoing message streams. connection_->SetIncomingMessageProcessor(demultiplexer_.get()); - output_buffer_->set_output_processor( + output_buffer_->SetOutputProcessor( connection_->GetOutgoingMessageProcessor()); } @@ -45,7 +52,7 @@ void BrowserConnectionHandler::DropCurrentConnection() { if (!connection_) return; connection_->SetIncomingMessageProcessor(nullptr); - output_buffer_->set_output_processor(nullptr); + output_buffer_->SetOutputProcessor(nullptr); connection_.reset(); } diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h index d739e50..03b2cd0 100644 --- a/blimp/net/browser_connection_handler.h +++ b/blimp/net/browser_connection_handler.h @@ -49,9 +49,6 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler private: void DropCurrentConnection(); - // Holds network resources while there is a Client connected. - scoped_ptr<BlimpConnection> connection_; - // Routes incoming messages to the relevant feature-specific handlers. scoped_ptr<BlimpMessageDemultiplexer> demultiplexer_; @@ -62,6 +59,9 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler // message stream. scoped_ptr<BlimpMessageMultiplexer> multiplexer_; + // Holds network resources while there is a Client connected. + scoped_ptr<BlimpConnection> connection_; + DISALLOW_COPY_AND_ASSIGN(BrowserConnectionHandler); }; diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h index ca766a6..be66ab2 100644 --- a/blimp/net/test_common.h +++ b/blimp/net/test_common.h @@ -67,6 +67,10 @@ ACTION_TEMPLATE(FillBufferFromString, memcpy(testing::get<buf_idx>(args)->data(), str.data(), str.size()); } +// Returns true if |buf| has a prefix of |str|. +// Behavior is undefined if len(buf) < len(str). +bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str); + // GMock action that writes data from a blimp message to an IOBuffer . // // buf_idx (template parameter 0): 0-based index of the IOBuffer arg. @@ -177,10 +181,6 @@ class MockBlimpMessageProcessor : public BlimpMessageProcessor { const net::CompletionCallback& callback)); }; -// Returns true if |buf| has a prefix of |str|. -// Behavior is undefined if len(buf) < len(str). -bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str); - } // namespace blimp #endif // BLIMP_NET_TEST_COMMON_H_ |