summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryzshen <yzshen@chromium.org>2015-05-20 16:43:37 -0700
committerCommit bot <commit-bot@chromium.org>2015-05-20 23:44:38 +0000
commitb34518faaa7fa63d3b99b91ef28bd98bf7e725f1 (patch)
tree08e165da443b3a6d3ef709685df9fcfc2f6cdd3c
parent1fc345cd960f7efd4e6b07d1f050615239d82d60 (diff)
downloadchromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.zip
chromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.tar.gz
chromium_src-b34518faaa7fa63d3b99b91ef28bd98bf7e725f1.tar.bz2
Fix WebSocket{Read,Write}Queue.
This CL fixes: - WebSocketReadQueue may discard data unexpectedly. - WebSocket{Read,Write}Queue: may stop processing queued operations. BUG=490196,490200 TEST=None Review URL: https://codereview.chromium.org/1148913002 Cr-Commit-Position: refs/heads/master@{#330846}
-rw-r--r--mojo/services/network/public/DEPS1
-rw-r--r--mojo/services/network/public/cpp/web_socket_read_queue.cc70
-rw-r--r--mojo/services/network/public/cpp/web_socket_read_queue.h6
-rw-r--r--mojo/services/network/public/cpp/web_socket_write_queue.cc72
-rw-r--r--mojo/services/network/public/cpp/web_socket_write_queue.h8
5 files changed, 101 insertions, 56 deletions
diff --git a/mojo/services/network/public/DEPS b/mojo/services/network/public/DEPS
index b8b14d2..86a3623 100644
--- a/mojo/services/network/public/DEPS
+++ b/mojo/services/network/public/DEPS
@@ -4,7 +4,6 @@ include_rules = [
"-net",
"-services",
"-testing",
- "!base",
"!mojo/common",
"+mojo/services/network/public",
"+third_party/mojo/src/mojo/public",
diff --git a/mojo/services/network/public/cpp/web_socket_read_queue.cc b/mojo/services/network/public/cpp/web_socket_read_queue.cc
index 1bcf503..2d3499c 100644
--- a/mojo/services/network/public/cpp/web_socket_read_queue.cc
+++ b/mojo/services/network/public/cpp/web_socket_read_queue.cc
@@ -5,6 +5,7 @@
#include "network/public/cpp/web_socket_read_queue.h"
#include "base/bind.h"
+#include "base/logging.h"
namespace mojo {
@@ -14,7 +15,7 @@ struct WebSocketReadQueue::Operation {
};
WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
- : handle_(handle), is_waiting_(false) {
+ : handle_(handle), is_busy_(false), weak_factory_(this) {
}
WebSocketReadQueue::~WebSocketReadQueue() {
@@ -27,34 +28,57 @@ void WebSocketReadQueue::Read(uint32_t num_bytes,
op->callback_ = callback;
queue_.push_back(op);
- if (!is_waiting_)
- TryToRead();
+ if (is_busy_)
+ return;
+
+ is_busy_ = true;
+ TryToRead();
}
void WebSocketReadQueue::TryToRead() {
- Operation* op = queue_[0];
- const void* buffer = NULL;
- uint32_t bytes_read = op->num_bytes_;
- MojoResult result = BeginReadDataRaw(
- handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
- if (result == MOJO_RESULT_SHOULD_WAIT) {
- EndReadDataRaw(handle_, bytes_read);
- Wait();
- return;
- }
+ DCHECK(is_busy_);
+ DCHECK(!queue_.empty());
+ do {
+ Operation* op = queue_[0];
+ const void* buffer = NULL;
+ uint32_t bytes_read = op->num_bytes_;
+ MojoResult result = BeginReadDataRaw(
+ handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ Wait();
+ return;
+ }
- // Ensure |op| is deleted, whether or not |this| goes away.
- scoped_ptr<Operation> op_deleter(op);
- queue_.weak_erase(queue_.begin());
- if (result != MOJO_RESULT_OK)
- return;
- DataPipeConsumerHandle handle = handle_;
- op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this|
- EndReadDataRaw(handle, bytes_read);
+ // Ensure |op| is deleted, whether or not |this| goes away.
+ scoped_ptr<Operation> op_deleter(op);
+ queue_.weak_erase(queue_.begin());
+
+ // http://crbug.com/490193 This should run callback as well. May need to
+ // change the callback signature.
+ if (result != MOJO_RESULT_OK)
+ return;
+
+ uint32_t num_bytes = op_deleter->num_bytes_;
+ DCHECK_LE(num_bytes, bytes_read);
+ DataPipeConsumerHandle handle = handle_;
+
+ base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
+
+ // This call may delete |this|. In that case, |self| will be invalidated.
+ // It may re-enter Read() too. Because |is_busy_| is true during the whole
+ // process, TryToRead() won't be re-entered.
+ op->callback_.Run(static_cast<const char*>(buffer));
+
+ EndReadDataRaw(handle, num_bytes);
+
+ if (!self)
+ return;
+ } while (!queue_.empty());
+ is_busy_ = false;
}
void WebSocketReadQueue::Wait() {
- is_waiting_ = true;
+ DCHECK(is_busy_);
handle_watcher_.Start(
handle_,
MOJO_HANDLE_SIGNAL_READABLE,
@@ -63,7 +87,7 @@ void WebSocketReadQueue::Wait() {
}
void WebSocketReadQueue::OnHandleReady(MojoResult result) {
- is_waiting_ = false;
+ DCHECK(is_busy_);
TryToRead();
}
diff --git a/mojo/services/network/public/cpp/web_socket_read_queue.h b/mojo/services/network/public/cpp/web_socket_read_queue.h
index 2731510..287e91c 100644
--- a/mojo/services/network/public/cpp/web_socket_read_queue.h
+++ b/mojo/services/network/public/cpp/web_socket_read_queue.h
@@ -7,6 +7,7 @@
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
+#include "base/memory/weak_ptr.h"
#include "mojo/common/handle_watcher.h"
#include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
@@ -19,7 +20,7 @@ namespace mojo {
// See also: WebSocketWriteQueue
class WebSocketReadQueue {
public:
- WebSocketReadQueue(DataPipeConsumerHandle handle);
+ explicit WebSocketReadQueue(DataPipeConsumerHandle handle);
~WebSocketReadQueue();
void Read(uint32_t num_bytes, base::Callback<void(const char*)> callback);
@@ -34,7 +35,8 @@ class WebSocketReadQueue {
DataPipeConsumerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
- bool is_waiting_;
+ bool is_busy_;
+ base::WeakPtrFactory<WebSocketReadQueue> weak_factory_;
};
} // namespace mojo
diff --git a/mojo/services/network/public/cpp/web_socket_write_queue.cc b/mojo/services/network/public/cpp/web_socket_write_queue.cc
index 9f2d132..9a0079c 100644
--- a/mojo/services/network/public/cpp/web_socket_write_queue.cc
+++ b/mojo/services/network/public/cpp/web_socket_write_queue.cc
@@ -5,6 +5,7 @@
#include "network/public/cpp/web_socket_write_queue.h"
#include "base/bind.h"
+#include "base/logging.h"
namespace mojo {
@@ -19,7 +20,7 @@ struct WebSocketWriteQueue::Operation {
};
WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
- : handle_(handle), is_waiting_(false) {
+ : handle_(handle), is_busy_(false), weak_factory_(this) {
}
WebSocketWriteQueue::~WebSocketWriteQueue() {
@@ -34,41 +35,58 @@ void WebSocketWriteQueue::Write(const char* data,
op->data_ = data;
queue_.push_back(op);
- MojoResult result = MOJO_RESULT_SHOULD_WAIT;
- if (!is_waiting_)
- result = TryToWrite();
+ if (!is_busy_) {
+ is_busy_ = true;
+ // This call may reset |is_busy_| to false.
+ TryToWrite();
+ }
- // If we have to wait, make a local copy of the data so we know it will
- // live until we need it.
- if (result == MOJO_RESULT_SHOULD_WAIT) {
+ if (is_busy_) {
+ // If we have to wait, make a local copy of the data so we know it will
+ // live until we need it.
op->data_copy_.resize(num_bytes);
memcpy(&op->data_copy_[0], data, num_bytes);
op->data_ = &op->data_copy_[0];
}
}
-MojoResult WebSocketWriteQueue::TryToWrite() {
- Operation* op = queue_[0];
- uint32_t bytes_written = op->num_bytes_;
- MojoResult result = WriteDataRaw(
- handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
- if (result == MOJO_RESULT_SHOULD_WAIT) {
- Wait();
- return result;
- }
-
- // Ensure |op| is deleted, whether or not |this| goes away.
- scoped_ptr<Operation> op_deleter(op);
- queue_.weak_erase(queue_.begin());
- if (result != MOJO_RESULT_OK)
- return result;
-
- op->callback_.Run(op->data_); // may delete |this|
- return result;
+void WebSocketWriteQueue::TryToWrite() {
+ DCHECK(is_busy_);
+ DCHECK(!queue_.empty());
+ do {
+ Operation* op = queue_[0];
+ uint32_t bytes_written = op->num_bytes_;
+ MojoResult result = WriteDataRaw(
+ handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ Wait();
+ return;
+ }
+
+ // Ensure |op| is deleted, whether or not |this| goes away.
+ scoped_ptr<Operation> op_deleter(op);
+ queue_.weak_erase(queue_.begin());
+
+ // http://crbug.com/490193 This should run callback as well. May need to
+ // change the callback signature.
+ if (result != MOJO_RESULT_OK)
+ return;
+
+ base::WeakPtr<WebSocketWriteQueue> self(weak_factory_.GetWeakPtr());
+
+ // This call may delete |this|. In that case, |self| will be invalidated.
+ // It may re-enter Write() too. Because |is_busy_| is true during the whole
+ // process, TryToWrite() won't be re-entered.
+ op->callback_.Run(op->data_);
+
+ if (!self)
+ return;
+ } while (!queue_.empty());
+ is_busy_ = false;
}
void WebSocketWriteQueue::Wait() {
- is_waiting_ = true;
+ DCHECK(is_busy_);
handle_watcher_.Start(handle_,
MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_DEADLINE_INDEFINITE,
@@ -77,7 +95,7 @@ void WebSocketWriteQueue::Wait() {
}
void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
- is_waiting_ = false;
+ DCHECK(is_busy_);
TryToWrite();
}
diff --git a/mojo/services/network/public/cpp/web_socket_write_queue.h b/mojo/services/network/public/cpp/web_socket_write_queue.h
index b2c48fc..91e7924 100644
--- a/mojo/services/network/public/cpp/web_socket_write_queue.h
+++ b/mojo/services/network/public/cpp/web_socket_write_queue.h
@@ -7,6 +7,7 @@
#include "base/callback.h"
#include "base/memory/scoped_vector.h"
+#include "base/memory/weak_ptr.h"
#include "mojo/common/handle_watcher.h"
#include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
@@ -19,7 +20,7 @@ namespace mojo {
// See also: WebSocketReadQueue
class WebSocketWriteQueue {
public:
- WebSocketWriteQueue(DataPipeProducerHandle handle);
+ explicit WebSocketWriteQueue(DataPipeProducerHandle handle);
~WebSocketWriteQueue();
void Write(const char* data,
@@ -29,14 +30,15 @@ class WebSocketWriteQueue {
private:
struct Operation;
- MojoResult TryToWrite();
+ void TryToWrite();
void Wait();
void OnHandleReady(MojoResult result);
DataPipeProducerHandle handle_;
common::HandleWatcher handle_watcher_;
ScopedVector<Operation> queue_;
- bool is_waiting_;
+ bool is_busy_;
+ base::WeakPtrFactory<WebSocketWriteQueue> weak_factory_;
};
} // namespace mojo