diff options
author | yzshen <yzshen@chromium.org> | 2015-05-20 16:43:37 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-05-20 23:44:38 +0000 |
commit | b34518faaa7fa63d3b99b91ef28bd98bf7e725f1 (patch) | |
tree | 08e165da443b3a6d3ef709685df9fcfc2f6cdd3c | |
parent | 1fc345cd960f7efd4e6b07d1f050615239d82d60 (diff) | |
download | chromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.zip chromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.tar.gz chromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.tar.bz2 |
Fix WebSocket{Read,Write}Queue.
This CL fixes:
- WebSocketReadQueue may discard data unexpectedly.
- WebSocket{Read,Write}Queue: may stop processing queued operations.
BUG=490196,490200
TEST=None
Review URL: https://codereview.chromium.org/1148913002
Cr-Commit-Position: refs/heads/master@{#330846}
5 files changed, 101 insertions, 56 deletions
diff --git a/mojo/services/network/public/DEPS b/mojo/services/network/public/DEPS index b8b14d2..86a3623 100644 --- a/mojo/services/network/public/DEPS +++ b/mojo/services/network/public/DEPS @@ -4,7 +4,6 @@ include_rules = [ "-net", "-services", "-testing", - "!base", "!mojo/common", "+mojo/services/network/public", "+third_party/mojo/src/mojo/public", diff --git a/mojo/services/network/public/cpp/web_socket_read_queue.cc b/mojo/services/network/public/cpp/web_socket_read_queue.cc index 1bcf503..2d3499c 100644 --- a/mojo/services/network/public/cpp/web_socket_read_queue.cc +++ b/mojo/services/network/public/cpp/web_socket_read_queue.cc @@ -5,6 +5,7 @@ #include "network/public/cpp/web_socket_read_queue.h" #include "base/bind.h" +#include "base/logging.h" namespace mojo { @@ -14,7 +15,7 @@ struct WebSocketReadQueue::Operation { }; WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) - : handle_(handle), is_waiting_(false) { + : handle_(handle), is_busy_(false), weak_factory_(this) { } WebSocketReadQueue::~WebSocketReadQueue() { @@ -27,34 +28,57 @@ void WebSocketReadQueue::Read(uint32_t num_bytes, op->callback_ = callback; queue_.push_back(op); - if (!is_waiting_) - TryToRead(); + if (is_busy_) + return; + + is_busy_ = true; + TryToRead(); } void WebSocketReadQueue::TryToRead() { - Operation* op = queue_[0]; - const void* buffer = NULL; - uint32_t bytes_read = op->num_bytes_; - MojoResult result = BeginReadDataRaw( - handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); - if (result == MOJO_RESULT_SHOULD_WAIT) { - EndReadDataRaw(handle_, bytes_read); - Wait(); - return; - } + DCHECK(is_busy_); + DCHECK(!queue_.empty()); + do { + Operation* op = queue_[0]; + const void* buffer = NULL; + uint32_t bytes_read = op->num_bytes_; + MojoResult result = BeginReadDataRaw( + handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); + if (result == MOJO_RESULT_SHOULD_WAIT) { + Wait(); + return; + } - // Ensure |op| is deleted, whether or not |this| goes away. - scoped_ptr<Operation> op_deleter(op); - queue_.weak_erase(queue_.begin()); - if (result != MOJO_RESULT_OK) - return; - DataPipeConsumerHandle handle = handle_; - op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this| - EndReadDataRaw(handle, bytes_read); + // Ensure |op| is deleted, whether or not |this| goes away. + scoped_ptr<Operation> op_deleter(op); + queue_.weak_erase(queue_.begin()); + + // http://crbug.com/490193 This should run callback as well. May need to + // change the callback signature. + if (result != MOJO_RESULT_OK) + return; + + uint32_t num_bytes = op_deleter->num_bytes_; + DCHECK_LE(num_bytes, bytes_read); + DataPipeConsumerHandle handle = handle_; + + base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr()); + + // This call may delete |this|. In that case, |self| will be invalidated. + // It may re-enter Read() too. Because |is_busy_| is true during the whole + // process, TryToRead() won't be re-entered. + op->callback_.Run(static_cast<const char*>(buffer)); + + EndReadDataRaw(handle, num_bytes); + + if (!self) + return; + } while (!queue_.empty()); + is_busy_ = false; } void WebSocketReadQueue::Wait() { - is_waiting_ = true; + DCHECK(is_busy_); handle_watcher_.Start( handle_, MOJO_HANDLE_SIGNAL_READABLE, @@ -63,7 +87,7 @@ void WebSocketReadQueue::Wait() { } void WebSocketReadQueue::OnHandleReady(MojoResult result) { - is_waiting_ = false; + DCHECK(is_busy_); TryToRead(); } diff --git a/mojo/services/network/public/cpp/web_socket_read_queue.h b/mojo/services/network/public/cpp/web_socket_read_queue.h index 2731510..287e91c 100644 --- a/mojo/services/network/public/cpp/web_socket_read_queue.h +++ b/mojo/services/network/public/cpp/web_socket_read_queue.h @@ -7,6 +7,7 @@ #include "base/callback.h" #include "base/memory/scoped_vector.h" +#include "base/memory/weak_ptr.h" #include "mojo/common/handle_watcher.h" #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" @@ -19,7 +20,7 @@ namespace mojo { // See also: WebSocketWriteQueue class WebSocketReadQueue { public: - WebSocketReadQueue(DataPipeConsumerHandle handle); + explicit WebSocketReadQueue(DataPipeConsumerHandle handle); ~WebSocketReadQueue(); void Read(uint32_t num_bytes, base::Callback<void(const char*)> callback); @@ -34,7 +35,8 @@ class WebSocketReadQueue { DataPipeConsumerHandle handle_; common::HandleWatcher handle_watcher_; ScopedVector<Operation> queue_; - bool is_waiting_; + bool is_busy_; + base::WeakPtrFactory<WebSocketReadQueue> weak_factory_; }; } // namespace mojo diff --git a/mojo/services/network/public/cpp/web_socket_write_queue.cc b/mojo/services/network/public/cpp/web_socket_write_queue.cc index 9f2d132..9a0079c 100644 --- a/mojo/services/network/public/cpp/web_socket_write_queue.cc +++ b/mojo/services/network/public/cpp/web_socket_write_queue.cc @@ -5,6 +5,7 @@ #include "network/public/cpp/web_socket_write_queue.h" #include "base/bind.h" +#include "base/logging.h" namespace mojo { @@ -19,7 +20,7 @@ struct WebSocketWriteQueue::Operation { }; WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) - : handle_(handle), is_waiting_(false) { + : handle_(handle), is_busy_(false), weak_factory_(this) { } WebSocketWriteQueue::~WebSocketWriteQueue() { @@ -34,41 +35,58 @@ void WebSocketWriteQueue::Write(const char* data, op->data_ = data; queue_.push_back(op); - MojoResult result = MOJO_RESULT_SHOULD_WAIT; - if (!is_waiting_) - result = TryToWrite(); + if (!is_busy_) { + is_busy_ = true; + // This call may reset |is_busy_| to false. + TryToWrite(); + } - // If we have to wait, make a local copy of the data so we know it will - // live until we need it. - if (result == MOJO_RESULT_SHOULD_WAIT) { + if (is_busy_) { + // If we have to wait, make a local copy of the data so we know it will + // live until we need it. op->data_copy_.resize(num_bytes); memcpy(&op->data_copy_[0], data, num_bytes); op->data_ = &op->data_copy_[0]; } } -MojoResult WebSocketWriteQueue::TryToWrite() { - Operation* op = queue_[0]; - uint32_t bytes_written = op->num_bytes_; - MojoResult result = WriteDataRaw( - handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); - if (result == MOJO_RESULT_SHOULD_WAIT) { - Wait(); - return result; - } - - // Ensure |op| is deleted, whether or not |this| goes away. - scoped_ptr<Operation> op_deleter(op); - queue_.weak_erase(queue_.begin()); - if (result != MOJO_RESULT_OK) - return result; - - op->callback_.Run(op->data_); // may delete |this| - return result; +void WebSocketWriteQueue::TryToWrite() { + DCHECK(is_busy_); + DCHECK(!queue_.empty()); + do { + Operation* op = queue_[0]; + uint32_t bytes_written = op->num_bytes_; + MojoResult result = WriteDataRaw( + handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); + if (result == MOJO_RESULT_SHOULD_WAIT) { + Wait(); + return; + } + + // Ensure |op| is deleted, whether or not |this| goes away. + scoped_ptr<Operation> op_deleter(op); + queue_.weak_erase(queue_.begin()); + + // http://crbug.com/490193 This should run callback as well. May need to + // change the callback signature. + if (result != MOJO_RESULT_OK) + return; + + base::WeakPtr<WebSocketWriteQueue> self(weak_factory_.GetWeakPtr()); + + // This call may delete |this|. In that case, |self| will be invalidated. + // It may re-enter Write() too. Because |is_busy_| is true during the whole + // process, TryToWrite() won't be re-entered. + op->callback_.Run(op->data_); + + if (!self) + return; + } while (!queue_.empty()); + is_busy_ = false; } void WebSocketWriteQueue::Wait() { - is_waiting_ = true; + DCHECK(is_busy_); handle_watcher_.Start(handle_, MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE, @@ -77,7 +95,7 @@ void WebSocketWriteQueue::Wait() { } void WebSocketWriteQueue::OnHandleReady(MojoResult result) { - is_waiting_ = false; + DCHECK(is_busy_); TryToWrite(); } diff --git a/mojo/services/network/public/cpp/web_socket_write_queue.h b/mojo/services/network/public/cpp/web_socket_write_queue.h index b2c48fc..91e7924 100644 --- a/mojo/services/network/public/cpp/web_socket_write_queue.h +++ b/mojo/services/network/public/cpp/web_socket_write_queue.h @@ -7,6 +7,7 @@ #include "base/callback.h" #include "base/memory/scoped_vector.h" +#include "base/memory/weak_ptr.h" #include "mojo/common/handle_watcher.h" #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" @@ -19,7 +20,7 @@ namespace mojo { // See also: WebSocketReadQueue class WebSocketWriteQueue { public: - WebSocketWriteQueue(DataPipeProducerHandle handle); + explicit WebSocketWriteQueue(DataPipeProducerHandle handle); ~WebSocketWriteQueue(); void Write(const char* data, @@ -29,14 +30,15 @@ class WebSocketWriteQueue { private: struct Operation; - MojoResult TryToWrite(); + void TryToWrite(); void Wait(); void OnHandleReady(MojoResult result); DataPipeProducerHandle handle_; common::HandleWatcher handle_watcher_; ScopedVector<Operation> queue_; - bool is_waiting_; + bool is_busy_; + base::WeakPtrFactory<WebSocketWriteQueue> weak_factory_; }; } // namespace mojo |