summaryrefslogtreecommitdiffstats
path: root/blimp/net
diff options
context:
space:
mode:
authorkmarshall <kmarshall@chromium.org>2015-12-02 11:23:15 -0800
committerCommit bot <commit-bot@chromium.org>2015-12-02 19:24:14 +0000
commitd9e1c5c6a8780007ea3dc03e594889a5597d32d5 (patch)
tree9ca88561d2379eae501442921cc7b0c34d84fcab /blimp/net
parent8cafdc1d0e5f922d1c1cb7f24b827de82c2fffe9 (diff)
downloadchromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.zip
chromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.tar.gz
chromium_src-d9e1c5c6a8780007ea3dc03e594889a5597d32d5.tar.bz2
Implementation for the Blimp Message Buffer.
This class provides a FIFO buffer for reliable, ordered message delivery, and schedules completion callbacks in response to message delivery acknowledgements. BUG=557360 R=wez@chromium.org,haibinlu@chromium.org Review URL: https://codereview.chromium.org/1458633002 Cr-Commit-Position: refs/heads/master@{#362769}
Diffstat (limited to 'blimp/net')
-rw-r--r--blimp/net/BUILD.gn1
-rw-r--r--blimp/net/blimp_message_output_buffer.cc137
-rw-r--r--blimp/net/blimp_message_output_buffer.h65
-rw-r--r--blimp/net/blimp_message_output_buffer_unittest.cc262
-rw-r--r--blimp/net/browser_connection_handler.cc13
-rw-r--r--blimp/net/browser_connection_handler.h6
-rw-r--r--blimp/net/test_common.h8
7 files changed, 467 insertions, 25 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn
index 4010a0b..f20c77c 100644
--- a/blimp/net/BUILD.gn
+++ b/blimp/net/BUILD.gn
@@ -80,6 +80,7 @@ source_set("unit_tests") {
"blimp_connection_unittest.cc",
"blimp_message_demultiplexer_unittest.cc",
"blimp_message_multiplexer_unittest.cc",
+ "blimp_message_output_buffer_unittest.cc",
"blimp_message_pump_unittest.cc",
"engine_connection_manager_unittest.cc",
"input_message_unittest.cc",
diff --git a/blimp/net/blimp_message_output_buffer.cc b/blimp/net/blimp_message_output_buffer.cc
index dd79563a..62cc259 100644
--- a/blimp/net/blimp_message_output_buffer.cc
+++ b/blimp/net/blimp_message_output_buffer.cc
@@ -4,26 +4,151 @@
#include "blimp/net/blimp_message_output_buffer.h"
+#include <algorithm>
+
#include "base/macros.h"
+#include "base/message_loop/message_loop.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "net/base/net_errors.h"
namespace blimp {
-BlimpMessageOutputBuffer::BlimpMessageOutputBuffer() {
- NOTIMPLEMENTED();
+BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes)
+ : max_buffer_size_bytes_(max_buffer_size_bytes) {}
+
+BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {}
+
+void BlimpMessageOutputBuffer::SetOutputProcessor(
+ BlimpMessageProcessor* processor) {
+ // Check that we are setting or removing the processor, not replacing it.
+ if (processor) {
+ DCHECK(!output_processor_);
+ output_processor_ = processor;
+ write_complete_cb_.Reset(base::Bind(
+ &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this)));
+ WriteNextMessageIfReady();
+ } else {
+ DCHECK(output_processor_);
+ output_processor_ = nullptr;
+ write_complete_cb_.Cancel();
+ }
+}
+
+void BlimpMessageOutputBuffer::RetransmitBufferedMessages() {
+ DCHECK(output_processor_);
+
+ // Prepend the entirety of |ack_buffer_| to |write_buffer_|.
+ write_buffer_.insert(write_buffer_.begin(),
+ std::make_move_iterator(ack_buffer_.begin()),
+ std::make_move_iterator(ack_buffer_.end()));
+ ack_buffer_.clear();
+
+ WriteNextMessageIfReady();
}
-BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {
- NOTIMPLEMENTED();
+int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const {
+ return write_buffer_.size() + ack_buffer_.size();
+}
+
+int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const {
+ return ack_buffer_.size();
}
void BlimpMessageOutputBuffer::ProcessMessage(
scoped_ptr<BlimpMessage> message,
const net::CompletionCallback& callback) {
- NOTIMPLEMENTED();
+ VLOG(2) << "ProcessMessage (id=" << message->message_id()
+ << ", type=" << message->type() << ")";
+
+ message->set_message_id(++prev_message_id_);
+
+ current_buffer_size_bytes_ += message->ByteSize();
+ DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_);
+
+ write_buffer_.push_back(
+ make_scoped_ptr(new BufferEntry(message.Pass(), callback)));
+
+ // Write the message
+ if (write_buffer_.size() == 1 && output_processor_) {
+ WriteNextMessageIfReady();
+ }
}
+// Flushes acknowledged messages from the buffer and invokes their
+// |callbacks|, if any.
void BlimpMessageOutputBuffer::OnMessageCheckpoint(int64 message_id) {
- NOTIMPLEMENTED();
+ VLOG(2) << "OnMessageCheckpoint (message_id=" << message_id << ")";
+ if (ack_buffer_.empty()) {
+ LOG(WARNING) << "Checkpoint called while buffer is empty.";
+ return;
+ }
+ if (message_id > prev_message_id_) {
+ LOG(WARNING) << "Illegal checkpoint response: " << message_id;
+ return;
+ }
+
+ // Remove all acknowledged messages through |message_id| and invoke their
+ // write callbacks, if set.
+ while (!ack_buffer_.empty() &&
+ ack_buffer_.front()->message->message_id() <= message_id) {
+ const BufferEntry& ack_entry = *ack_buffer_.front();
+ current_buffer_size_bytes_ -= ack_entry.message->GetCachedSize();
+ DCHECK_GE(current_buffer_size_bytes_, 0);
+ VLOG(3) << "Buffer size: " << current_buffer_size_bytes_
+ << " (max=" << current_buffer_size_bytes_ << ")";
+
+ if (!ack_entry.callback.is_null()) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(ack_entry.callback, net::OK));
+ }
+
+ ack_buffer_.pop_front();
+ }
+
+ // An empty buffer should have a zero-byte footprint.
+ DCHECK(current_buffer_size_bytes_ > 0 ||
+ (ack_buffer_.empty() && write_buffer_.empty()))
+ << "Expected zero-length buffer size, was " << current_buffer_size_bytes_
+ << " bytes instead.";
+}
+
+BlimpMessageOutputBuffer::BufferEntry::BufferEntry(
+ scoped_ptr<BlimpMessage> message,
+ net::CompletionCallback callback)
+ : message(message.Pass()), callback(callback) {}
+
+BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {}
+
+void BlimpMessageOutputBuffer::WriteNextMessageIfReady() {
+ if (write_buffer_.empty()) {
+ VLOG(2) << "Nothing to write.";
+ return;
+ }
+
+ scoped_ptr<BlimpMessage> message_to_write(
+ new BlimpMessage(*write_buffer_.front()->message));
+ VLOG(3) << "Writing message (id="
+ << write_buffer_.front()->message->message_id()
+ << ", type=" << message_to_write->type() << ")";
+
+ output_processor_->ProcessMessage(message_to_write.Pass(),
+ write_complete_cb_.callback());
+ VLOG(3) << "Queue size: " << write_buffer_.size();
+}
+
+void BlimpMessageOutputBuffer::OnWriteComplete(int result) {
+ DCHECK_LE(result, net::OK);
+ VLOG(2) << "Write complete, result=" << result;
+
+ if (result == net::OK) {
+ ack_buffer_.push_back(std::move(write_buffer_.front()));
+ write_buffer_.pop_front();
+ WriteNextMessageIfReady();
+ } else {
+ // An error occurred while writing to the network connection.
+ // Stop writing more messages until a new connection is established.
+ DLOG(WARNING) << "Write error (result=" << result << ")";
+ }
}
} // namespace blimp
diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h
index c1df574..0a7bddf 100644
--- a/blimp/net/blimp_message_output_buffer.h
+++ b/blimp/net/blimp_message_output_buffer.h
@@ -5,10 +5,15 @@
#ifndef BLIMP_NET_BLIMP_MESSAGE_OUTPUT_BUFFER_H_
#define BLIMP_NET_BLIMP_MESSAGE_OUTPUT_BUFFER_H_
+#include <list>
+#include <queue>
+#include <utility>
+
#include "base/macros.h"
#include "blimp/net/blimp_message_checkpoint_observer.h"
#include "blimp/net/blimp_message_processor.h"
#include "blimp/net/blimp_net_export.h"
+#include "net/base/completion_callback.h"
namespace blimp {
@@ -16,19 +21,23 @@ class BlimpConnection;
// Provides a FIFO buffer for reliable, ordered message delivery.
// Messages are retained for redelivery until they are acknowledged by the
-// receiving end (via BlimpMessageAckObserver).
+// receiving end (via BlimpMessageCheckpointObserver).
+// Messages can be paired with callbacks that are invoked on successful
+// message acknowledgement.
// (Redelivery will be used in a future CL to implement Fast Recovery
// of dropped connections.)
class BLIMP_NET_EXPORT BlimpMessageOutputBuffer
: public BlimpMessageProcessor,
public BlimpMessageCheckpointObserver {
public:
- BlimpMessageOutputBuffer();
+ explicit BlimpMessageOutputBuffer(int max_buffer_size_bytes);
~BlimpMessageOutputBuffer() override;
- // Sets the processor to receive messages from this buffer.
- // TODO(kmarshall): implement this.
- void set_output_processor(BlimpMessageProcessor* processor) {}
+ // Sets the processor that will be used for writing buffered messages.
+ void SetOutputProcessor(BlimpMessageProcessor* processor);
+
+ // Marks all messages in buffer for retransmission.
+ void RetransmitBufferedMessages();
// BlimpMessageProcessor implementation.
// |callback|, if set, will be called once the remote end has acknowledged the
@@ -37,13 +46,51 @@ class BLIMP_NET_EXPORT BlimpMessageOutputBuffer
const net::CompletionCallback& callback) override;
// MessageCheckpointObserver implementation.
- // Flushes acknowledged messages from the buffer and invokes their
- // |callbacks|, if any.
- // Callbacks should not destroy |this| so as to not interfere with the
- // processing of any other pending callbacks.
void OnMessageCheckpoint(int64 message_id) override;
+ int GetBufferByteSizeForTest() const;
+ int GetUnacknowledgedMessageCountForTest() const;
+
private:
+ struct BufferEntry {
+ BufferEntry(scoped_ptr<BlimpMessage> message,
+ net::CompletionCallback callback);
+ ~BufferEntry();
+
+ const scoped_ptr<BlimpMessage> message;
+ const net::CompletionCallback callback;
+ };
+
+ typedef std::list<scoped_ptr<BufferEntry>> MessageBuffer;
+
+ // Writes the next message in the buffer if an output processor is attached
+ // and the buffer contains a message.
+ void WriteNextMessageIfReady();
+
+ // Receives the completion status of a write operation.
+ void OnWriteComplete(int result);
+
+ BlimpMessageProcessor* output_processor_ = nullptr;
+ net::CancelableCompletionCallback write_complete_cb_;
+
+ // Maximum serialized footprint of buffered messages.
+ int max_buffer_size_bytes_;
+
+ // Serialized footprint of the messages contained in the write and ack
+ // buffers.
+ int current_buffer_size_bytes_ = 0;
+
+ // The ID used by the last outgoing message.
+ int64 prev_message_id_ = 0;
+
+ // List of unsent messages.
+ MessageBuffer write_buffer_;
+
+ // List of messages that are sent and awaiting acknoweldgement.
+ // The messages in |ack_buffer_| are contiguous with the messages in
+ // |write_buffer_|.
+ MessageBuffer ack_buffer_;
+
DISALLOW_COPY_AND_ASSIGN(BlimpMessageOutputBuffer);
};
diff --git a/blimp/net/blimp_message_output_buffer_unittest.cc b/blimp/net/blimp_message_output_buffer_unittest.cc
new file mode 100644
index 0000000..2278fb4
--- /dev/null
+++ b/blimp/net/blimp_message_output_buffer_unittest.cc
@@ -0,0 +1,262 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/blimp_message_output_buffer.h"
+
+#include "base/callback_helpers.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/test_common.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::InvokeArgument;
+using testing::Ref;
+using testing::Return;
+using testing::SaveArg;
+
+namespace blimp {
+namespace {
+
+class BlimpMessageOutputBufferTest : public testing::Test {
+ public:
+ BlimpMessageOutputBufferTest() {}
+
+ void SetUp() override {
+ input_msg_.set_type(BlimpMessage::INPUT);
+ input_msg_.set_message_id(1);
+ compositor_msg_.set_type(BlimpMessage::COMPOSITOR);
+ compositor_msg_.set_message_id(2);
+
+ // Buffer should only have space for two unacknowledged messages
+ // (with message IDs).
+ ASSERT_EQ(input_msg_.ByteSize(), compositor_msg_.ByteSize());
+ buffer_.reset(new BlimpMessageOutputBuffer(2 * input_msg_.GetCachedSize()));
+ }
+
+ protected:
+ void AddOutputExpectation(const BlimpMessage& msg) {
+ EXPECT_CALL(output_processor_, MockableProcessMessage(EqualsProto(msg), _))
+ .WillOnce(SaveArg<1>(&captured_cb_))
+ .RetiresOnSaturation();
+ }
+
+ BlimpMessage WithMessageId(const BlimpMessage& message, int64 message_id) {
+ BlimpMessage output = message;
+ output.set_message_id(message_id);
+ return output;
+ }
+
+ BlimpMessage input_msg_;
+ BlimpMessage compositor_msg_;
+
+ base::MessageLoop message_loop_;
+ net::CompletionCallback captured_cb_;
+ MockBlimpMessageProcessor output_processor_;
+ scoped_ptr<BlimpMessageOutputBuffer> buffer_;
+ testing::InSequence s;
+};
+
+// Verify batched writes and acknowledgements.
+TEST_F(BlimpMessageOutputBufferTest, SeparatelyBufferWriteAck) {
+ net::TestCompletionCallback complete_cb_1;
+ net::TestCompletionCallback complete_cb_2;
+
+ AddOutputExpectation(input_msg_);
+ AddOutputExpectation(compositor_msg_);
+
+ // Accumulate two messages.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)),
+ complete_cb_1.callback());
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)),
+ complete_cb_2.callback());
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+
+ // Write two messages.
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->SetOutputProcessor(&output_processor_);
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Both messages are acknowledged by separate checkpoints.
+ buffer_->OnMessageCheckpoint(1);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+ buffer_->OnMessageCheckpoint(2);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+ EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
+}
+
+// Verify buffer writes from an empty state.
+TEST_F(BlimpMessageOutputBufferTest, WritesFromEmptyBuffer) {
+ net::TestCompletionCallback complete_cb_1;
+ net::TestCompletionCallback complete_cb_2;
+
+ AddOutputExpectation(input_msg_);
+ AddOutputExpectation(compositor_msg_);
+
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->SetOutputProcessor(&output_processor_);
+
+ // Message #0 is buffered, sent, acknowledged.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)),
+ complete_cb_1.callback());
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ buffer_->OnMessageCheckpoint(1);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)),
+ complete_cb_2.callback());
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ buffer_->OnMessageCheckpoint(2);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+}
+
+// Verify that a single checkpoint can be used to acknowledge two writes.
+TEST_F(BlimpMessageOutputBufferTest, SharedCheckpoint) {
+ net::TestCompletionCallback complete_cb_1;
+ net::TestCompletionCallback complete_cb_2;
+
+ AddOutputExpectation(input_msg_);
+ AddOutputExpectation(compositor_msg_);
+
+ // Message #1 is written but unacknowledged.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)),
+ complete_cb_1.callback());
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->SetOutputProcessor(&output_processor_);
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Message #2 is written but unacknowledged.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)),
+ complete_cb_2.callback());
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_TRUE(captured_cb_.is_null());
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Both messages are acknowledged in one checkpoint.
+ buffer_->OnMessageCheckpoint(2);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+ EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
+}
+
+// Verify that messages that fail to write are kept in a pending write state.
+TEST_F(BlimpMessageOutputBufferTest, WriteError) {
+ net::TestCompletionCallback complete_cb_1;
+ net::TestCompletionCallback complete_cb_2;
+
+ AddOutputExpectation(input_msg_);
+ AddOutputExpectation(input_msg_);
+
+ // Accumulate two messages.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)),
+ complete_cb_1.callback());
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+
+ // First write attempt, which fails.
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->SetOutputProcessor(&output_processor_);
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::ERR_FAILED);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Simulate disconnect.
+ buffer_->SetOutputProcessor(nullptr);
+
+ // Reconnect. Should immediately try to write the contents of the buffer.
+ buffer_->SetOutputProcessor(&output_processor_);
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ buffer_->OnMessageCheckpoint(1);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+}
+
+// Verify that unacknowledged messages can be moved back to a pending write
+// state (recovery after a lost connection.)
+TEST_F(BlimpMessageOutputBufferTest, MessageRetransmit) {
+ net::TestCompletionCallback complete_cb_1;
+ net::TestCompletionCallback complete_cb_2;
+
+ AddOutputExpectation(input_msg_);
+ AddOutputExpectation(compositor_msg_);
+ AddOutputExpectation(compositor_msg_); // Retransmitted message.
+
+ // Accumulate two messages.
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)),
+ complete_cb_1.callback());
+ buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)),
+ complete_cb_2.callback());
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+
+ // Write two messages.
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->SetOutputProcessor(&output_processor_);
+ ASSERT_FALSE(captured_cb_.is_null());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Simulate disconnect & reconnect.
+ buffer_->SetOutputProcessor(nullptr);
+ buffer_->SetOutputProcessor(&output_processor_);
+
+ // Remote end indicates that it only received message #0.
+ // Message #1 should be moved from an unacknowledged state to a pending write
+ // state.
+ ASSERT_TRUE(captured_cb_.is_null());
+ buffer_->OnMessageCheckpoint(1);
+ buffer_->RetransmitBufferedMessages();
+ ASSERT_FALSE(captured_cb_.is_null());
+ ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+ base::ResetAndReturn(&captured_cb_).Run(net::OK);
+ ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest());
+
+ // Remote end acknowledges #1, buffer should be empty.
+ buffer_->OnMessageCheckpoint(2);
+ ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest());
+ ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest());
+ EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
+}
+
+} // namespace
+} // namespace blimp
diff --git a/blimp/net/browser_connection_handler.cc b/blimp/net/browser_connection_handler.cc
index d47623e8..897ae62 100644
--- a/blimp/net/browser_connection_handler.cc
+++ b/blimp/net/browser_connection_handler.cc
@@ -13,10 +13,17 @@
#include "blimp/net/blimp_message_processor.h"
namespace blimp {
+namespace {
+
+// Maximum footprint of the output buffer.
+// TODO(kmarshall): Use a value that's computed from the platform.
+const int kMaxBufferSizeBytes = 1 << 24;
+
+} // namespace
BrowserConnectionHandler::BrowserConnectionHandler()
: demultiplexer_(new BlimpMessageDemultiplexer),
- output_buffer_(new BlimpMessageOutputBuffer),
+ output_buffer_(new BlimpMessageOutputBuffer(kMaxBufferSizeBytes)),
multiplexer_(new BlimpMessageMultiplexer(output_buffer_.get())) {}
BrowserConnectionHandler::~BrowserConnectionHandler() {}
@@ -37,7 +44,7 @@ void BrowserConnectionHandler::HandleConnection(
// Connect the incoming & outgoing message streams.
connection_->SetIncomingMessageProcessor(demultiplexer_.get());
- output_buffer_->set_output_processor(
+ output_buffer_->SetOutputProcessor(
connection_->GetOutgoingMessageProcessor());
}
@@ -45,7 +52,7 @@ void BrowserConnectionHandler::DropCurrentConnection() {
if (!connection_)
return;
connection_->SetIncomingMessageProcessor(nullptr);
- output_buffer_->set_output_processor(nullptr);
+ output_buffer_->SetOutputProcessor(nullptr);
connection_.reset();
}
diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h
index d739e50..03b2cd0 100644
--- a/blimp/net/browser_connection_handler.h
+++ b/blimp/net/browser_connection_handler.h
@@ -49,9 +49,6 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler
private:
void DropCurrentConnection();
- // Holds network resources while there is a Client connected.
- scoped_ptr<BlimpConnection> connection_;
-
// Routes incoming messages to the relevant feature-specific handlers.
scoped_ptr<BlimpMessageDemultiplexer> demultiplexer_;
@@ -62,6 +59,9 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler
// message stream.
scoped_ptr<BlimpMessageMultiplexer> multiplexer_;
+ // Holds network resources while there is a Client connected.
+ scoped_ptr<BlimpConnection> connection_;
+
DISALLOW_COPY_AND_ASSIGN(BrowserConnectionHandler);
};
diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h
index ca766a6..be66ab2 100644
--- a/blimp/net/test_common.h
+++ b/blimp/net/test_common.h
@@ -67,6 +67,10 @@ ACTION_TEMPLATE(FillBufferFromString,
memcpy(testing::get<buf_idx>(args)->data(), str.data(), str.size());
}
+// Returns true if |buf| has a prefix of |str|.
+// Behavior is undefined if len(buf) < len(str).
+bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str);
+
// GMock action that writes data from a blimp message to an IOBuffer .
//
// buf_idx (template parameter 0): 0-based index of the IOBuffer arg.
@@ -177,10 +181,6 @@ class MockBlimpMessageProcessor : public BlimpMessageProcessor {
const net::CompletionCallback& callback));
};
-// Returns true if |buf| has a prefix of |str|.
-// Behavior is undefined if len(buf) < len(str).
-bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str);
-
} // namespace blimp
#endif // BLIMP_NET_TEST_COMMON_H_