diff options
author | hansmuller <hansmuller@chromium.org> | 2014-09-25 11:41:18 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-09-25 18:42:04 +0000 |
commit | 5a5cb9e791961a2491c4269fc0f2144f385dc173 (patch) | |
tree | 14b352dc925a64dc7ef52df6b38991cb1ee79dde | |
parent | 1c863aa414026f2ece4ce5b930bcbce1cd07d6f0 (diff) | |
download | chromium_src-5a5cb9e791961a2491c4269fc0f2144f385dc173.zip chromium_src-5a5cb9e791961a2491c4269fc0f2144f385dc173.tar.gz chromium_src-5a5cb9e791961a2491c4269fc0f2144f385dc173.tar.bz2 |
Mojo JS bindings: draining a DataPipe
Add a drainData(dataPipeHandle) utility function to the Mojo JS core module.
The drainData() function asynchronously reads from the data pipe until the remote handle is closed or an error occurs. A Promise is returned whose settled value is an object like this: {result: core.RESULT_OK, buffer: dataArrayBuffer}. If the read failed, then the result will be the actual error code and the buffer will contain whatever was read before the error occurred. The drainData dataPipeHandle argument is closed automatically.
BUG=414338
Review URL: https://codereview.chromium.org/577733002
Cr-Commit-Position: refs/heads/master@{#296750}
-rw-r--r-- | gin/BUILD.gn | 2 | ||||
-rw-r--r-- | gin/gin.gyp | 2 | ||||
-rw-r--r-- | gin/isolate_holder.cc | 16 | ||||
-rw-r--r-- | gin/public/isolate_holder.h | 14 | ||||
-rw-r--r-- | gin/run_microtasks_observer.cc | 21 | ||||
-rw-r--r-- | gin/run_microtasks_observer.h | 30 | ||||
-rw-r--r-- | mojo/apps/js/js_app.cc | 4 | ||||
-rw-r--r-- | mojo/apps/js/main.js | 45 | ||||
-rw-r--r-- | mojo/bindings/js/BUILD.gn | 2 | ||||
-rw-r--r-- | mojo/bindings/js/core.cc | 14 | ||||
-rw-r--r-- | mojo/bindings/js/drain_data.cc | 131 | ||||
-rw-r--r-- | mojo/bindings/js/drain_data.h | 64 | ||||
-rw-r--r-- | mojo/mojo_base.gyp | 2 |
13 files changed, 317 insertions, 30 deletions
diff --git a/gin/BUILD.gn b/gin/BUILD.gn index 21807ae..b352c3a 100644 --- a/gin/BUILD.gn +++ b/gin/BUILD.gn @@ -47,6 +47,8 @@ component("gin") { "public/wrapper_info.h", "runner.cc", "runner.h", + "run_microtasks_observer.cc", + "run_microtasks_observer.h", "shell_runner.cc", "shell_runner.h", "try_catch.cc", diff --git a/gin/gin.gyp b/gin/gin.gyp index caa2ba4..b38dc85 100644 --- a/gin/gin.gyp +++ b/gin/gin.gyp @@ -67,6 +67,8 @@ 'public/wrapper_info.h', 'runner.cc', 'runner.h', + 'run_microtasks_observer.cc', + 'run_microtasks_observer.h', 'shell_runner.cc', 'shell_runner.h', 'try_catch.cc', diff --git a/gin/isolate_holder.cc b/gin/isolate_holder.cc index db4473e..00e9d84 100644 --- a/gin/isolate_holder.cc +++ b/gin/isolate_holder.cc @@ -8,6 +8,7 @@ #include <string.h> #include "base/logging.h" +#include "base/message_loop/message_loop.h" #include "base/rand_util.h" #include "base/sys_info.h" #include "gin/array_buffer.h" @@ -15,6 +16,7 @@ #include "gin/function_template.h" #include "gin/per_isolate_data.h" #include "gin/public/v8_platform.h" +#include "gin/run_microtasks_observer.h" namespace gin { @@ -43,6 +45,8 @@ IsolateHolder::IsolateHolder() { } IsolateHolder::~IsolateHolder() { + if (task_observer_.get()) + base::MessageLoop::current()->RemoveTaskObserver(task_observer_.get()); isolate_data_.reset(); isolate_->Dispose(); } @@ -66,4 +70,16 @@ void IsolateHolder::Initialize(ScriptMode mode, v8_is_initialized = true; } +void IsolateHolder::AddRunMicrotasksObserver() { + DCHECK(!task_observer_.get()); + task_observer_.reset(new RunMicrotasksObserver(isolate_));; + base::MessageLoop::current()->AddTaskObserver(task_observer_.get()); +} + +void IsolateHolder::RemoveRunMicrotasksObserver() { + DCHECK(task_observer_.get()); + base::MessageLoop::current()->RemoveTaskObserver(task_observer_.get()); + task_observer_.reset(); +} + } // namespace gin diff --git a/gin/public/isolate_holder.h b/gin/public/isolate_holder.h index da65fac..29cc208 100644 --- a/gin/public/isolate_holder.h +++ b/gin/public/isolate_holder.h @@ -13,6 +13,7 @@ namespace gin { class PerIsolateData; +class RunMicrotasksObserver; // To embed Gin, first initialize gin using IsolateHolder::Initialize and then // create an instance of IsolateHolder to hold the v8::Isolate in which you @@ -36,9 +37,22 @@ class GIN_EXPORT IsolateHolder { v8::Isolate* isolate() { return isolate_; } + // The implementations of Object.observe() and Promise enqueue v8 Microtasks + // that should be executed just before control is returned to the message + // loop. This method adds a MessageLoop TaskObserver which runs any pending + // Microtasks each time a Task is completed. This method should be called + // once, when a MessageLoop is created and it should be called on the + // MessageLoop's thread. + void AddRunMicrotasksObserver(); + + // This method should also only be called once, and on the MessageLoop's + // thread. + void RemoveRunMicrotasksObserver(); + private: v8::Isolate* isolate_; scoped_ptr<PerIsolateData> isolate_data_; + scoped_ptr<RunMicrotasksObserver> task_observer_; DISALLOW_COPY_AND_ASSIGN(IsolateHolder); }; diff --git a/gin/run_microtasks_observer.cc b/gin/run_microtasks_observer.cc new file mode 100644 index 0000000..f453a66 --- /dev/null +++ b/gin/run_microtasks_observer.cc @@ -0,0 +1,21 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "gin/run_microtasks_observer.h" + +namespace gin { + +RunMicrotasksObserver::RunMicrotasksObserver(v8::Isolate* isolate) + : isolate_(isolate) { +} + +void RunMicrotasksObserver::WillProcessTask(const base::PendingTask& task) { +} + +void RunMicrotasksObserver::DidProcessTask(const base::PendingTask& task) { + v8::Isolate::Scope scope(isolate_); + isolate_->RunMicrotasks(); +} + +} // namespace gin diff --git a/gin/run_microtasks_observer.h b/gin/run_microtasks_observer.h new file mode 100644 index 0000000..e848239 --- /dev/null +++ b/gin/run_microtasks_observer.h @@ -0,0 +1,30 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef GIN_RUN_MICROTASKS_OBSERVER_H_ +#define GIN_RUN_MICROTASKS_OBSERVER_H_ + +#include "base/message_loop/message_loop.h" +#include "v8/include/v8.h" + +namespace gin { + +// Runs any pending v8 Microtasks each time a task is completed. +// TODO(hansmuller); At some point perhaps this can be replaced with +// the (currently experimental) Isolate::SetAutorunMicrotasks() method. + +class RunMicrotasksObserver : public base::MessageLoop::TaskObserver { + public: + RunMicrotasksObserver(v8::Isolate* isolate); + + virtual void WillProcessTask(const base::PendingTask& pending_task) OVERRIDE; + virtual void DidProcessTask(const base::PendingTask& pending_task) OVERRIDE; + + private: + v8::Isolate* isolate_; +}; + +} // namespace gin + +#endif // GIN_RUN_MICROTASKS_OBSERVER_H_ diff --git a/mojo/apps/js/js_app.cc b/mojo/apps/js/js_app.cc index c6d3b91..db51c09 100644 --- a/mojo/apps/js/js_app.cc +++ b/mojo/apps/js/js_app.cc @@ -47,7 +47,7 @@ bool JSApp::Start() { void JSApp::Quit() { CHECK(on_js_app_thread()); - // The the terminate operation is posted to the message_loop so that + // The terminate operation is posted to the message_loop so that // the shell_runner isn't destroyed before this JS function returns. thread_.message_loop()->PostTask( FROM_HERE, base::Bind(&JSApp::Terminate, base::Unretained(this))); @@ -80,6 +80,7 @@ void JSApp::Run() { gin::IsolateHolder::Initialize(gin::IsolateHolder::kStrictMode, gin::ArrayBufferAllocator::SharedInstance()); isolate_holder_.reset(new gin::IsolateHolder()); + isolate_holder_->AddRunMicrotasksObserver(); shell_runner_.reset( new gin::ShellRunner(&runner_delegate_, isolate_holder_->isolate())); @@ -90,6 +91,7 @@ void JSApp::Run() { } void JSApp::Terminate() { + isolate_holder_->RemoveRunMicrotasksObserver(); shell_runner_.reset(NULL); // This JSApp's thread must be stopped on the thread that started it. Ask the diff --git a/mojo/apps/js/main.js b/mojo/apps/js/main.js index 8915f2e..c8eaac5 100644 --- a/mojo/apps/js/main.js +++ b/mojo/apps/js/main.js @@ -14,50 +14,37 @@ define("test", [ "mojo/public/js/bindings/core", "mojo/public/js/bindings/connection", + "mojo/public/js/bindings/support", "mojo/services/public/interfaces/network/network_service.mojom", "mojo/services/public/interfaces/network/url_loader.mojom", "mojo", "console" -], function(core, connection, network, loader, mojo, console) { +], function(core, connection, support, net, loader, mojo, console) { - function NetworkServiceImpl(remote) { } - NetworkServiceImpl.prototype = - Object.create(network.NetworkServiceStub.prototype); - - function URLLoaderImpl(remote) { } - URLLoaderImpl.prototype = - Object.create(loader.URLLoaderStub.prototype); - - var networkServiceHandle = mojo.connectToService( + var netServiceHandle = mojo.connectToService( "mojo:mojo_network_service", "mojo::NetworkService"); - var networkConnection = new connection.Connection( - networkServiceHandle, NetworkServiceImpl, network.NetworkServiceProxy); + var netConnection = new connection.Connection( + netServiceHandle, net.NetworkServiceStub, net.NetworkServiceProxy); var urlLoaderPipe = new core.createMessagePipe(); - networkConnection.remote.createURLLoader(urlLoaderPipe.handle1); + netConnection.remote.createURLLoader(urlLoaderPipe.handle1); var urlLoaderConnection = new connection.Connection( - urlLoaderPipe.handle0, URLLoaderImpl, loader.URLLoaderProxy); + urlLoaderPipe.handle0, loader.URLLoaderStub, loader.URLLoaderProxy); var urlRequest = new loader.URLRequest(); urlRequest.url = "http://www.cnn.com"; urlRequest.method = "GET"; urlRequest.auto_follow_redirects = true; + var urlRequestPromise = urlLoaderConnection.remote.start(urlRequest); - urlRequestPromise.then( - function(result) { - var body = core.readData(result.response.body, - core.READ_DATA_FLAG_ALL_OR_NONE); - if (body.result == core.RESULT_OK) - console.log("body.buffer.byteLength=" + body.buffer.byteLength); - else - console.log("core.readData() failed err=" + body.result); - for(var key in result.response) - console.log(key + " => " + result.response[key]); - mojo.quit(); - }, - function(error) { - console.log("FAIL " + error.toString()); + urlRequestPromise.then(function(result) { + for(var key in result.response) + console.log(key + " => " + result.response[key]); + var drainDataPromise = core.drainData(result.response.body); + drainDataPromise.then(function(result) { + console.log("read " + result.buffer.byteLength + " bytes"); + }).then(function() { mojo.quit(); }); - + }); }); diff --git a/mojo/bindings/js/BUILD.gn b/mojo/bindings/js/BUILD.gn index ab75546..7e64c0f 100644 --- a/mojo/bindings/js/BUILD.gn +++ b/mojo/bindings/js/BUILD.gn @@ -7,6 +7,8 @@ source_set("js") { sources = [ "core.cc", "core.h", + "drain_data.cc", + "drain_data.h", "handle.cc", "handle.h", "handle_close_observer.h", diff --git a/mojo/bindings/js/core.cc b/mojo/bindings/js/core.cc index ccabe87..9ab2f19 100644 --- a/mojo/bindings/js/core.cc +++ b/mojo/bindings/js/core.cc @@ -16,6 +16,7 @@ #include "gin/per_isolate_data.h" #include "gin/public/wrapper_info.h" #include "gin/wrappable.h" +#include "mojo/bindings/js/drain_data.h" #include "mojo/bindings/js/handle.h" namespace mojo { @@ -221,6 +222,18 @@ gin::Dictionary ReadData(const gin::Arguments& args, return dictionary; } +// Asynchronously read all of the data available for the specified data pipe +// consumer handle until the remote handle is closed or an error occurs. A +// Promise is returned whose settled value is an object like this: +// {result: core.RESULT_OK, buffer: dataArrayBuffer}. If the read failed, +// then the Promise is rejected, the result will be the actual error code, +// and the buffer will contain whatever was read before the error occurred. +// The drainData data pipe handle argument is closed automatically. + +v8::Handle<v8::Value> DoDrainData(gin::Arguments* args, mojo::Handle handle) { + return (new DrainData(args->isolate(), handle))->GetPromise(); +} + gin::WrapperInfo g_wrapper_info = { gin::kEmbedderNativeGin }; } // namespace @@ -245,6 +258,7 @@ v8::Local<v8::Value> Core::GetModule(v8::Isolate* isolate) { .SetMethod("createDataPipe", CreateDataPipe) .SetMethod("writeData", WriteData) .SetMethod("readData", ReadData) + .SetMethod("drainData", DoDrainData) .SetValue("RESULT_OK", MOJO_RESULT_OK) .SetValue("RESULT_CANCELLED", MOJO_RESULT_CANCELLED) diff --git a/mojo/bindings/js/drain_data.cc b/mojo/bindings/js/drain_data.cc new file mode 100644 index 0000000..a615cd6 --- /dev/null +++ b/mojo/bindings/js/drain_data.cc @@ -0,0 +1,131 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/bindings/js/drain_data.h" + +#include "gin/array_buffer.h" +#include "gin/converter.h" +#include "gin/dictionary.h" +#include "gin/per_context_data.h" +#include "gin/per_isolate_data.h" +#include "mojo/public/cpp/environment/environment.h" +#include "mojo/public/cpp/system/core.h" + +namespace mojo { +namespace js { + +DrainData::DrainData(v8::Isolate* isolate, mojo::Handle handle) + : isolate_(isolate), + handle_(DataPipeConsumerHandle(handle.value())), + wait_id_(0) { + + v8::Handle<v8::Context> context(isolate_->GetCurrentContext()); + runner_ = gin::PerContextData::From(context)->runner()->GetWeakPtr(); + + WaitForData(); +} + +v8::Handle<v8::Value> DrainData::GetPromise() { + CHECK(resolver_.IsEmpty()); + v8::Handle<v8::Promise::Resolver> resolver( + v8::Promise::Resolver::New(isolate_)); + resolver_.Reset(isolate_, resolver); + return resolver->GetPromise(); +} + +DrainData::~DrainData() { + if (wait_id_) + Environment::GetDefaultAsyncWaiter()->CancelWait(wait_id_); + resolver_.Reset(); +} + +void DrainData::WaitForData() { + wait_id_ = Environment::GetDefaultAsyncWaiter()->AsyncWait( + handle_.get().value(), + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + &DrainData::WaitCompleted, + this); +} + +void DrainData::DataReady(MojoResult result) { + wait_id_ = 0; + if (result != MOJO_RESULT_OK) { + DeliverData(result); + return; + } + while (result == MOJO_RESULT_OK) { + result = ReadData(); + if (result == MOJO_RESULT_SHOULD_WAIT) + WaitForData(); + else if (result != MOJO_RESULT_OK) + DeliverData(result); + } +} + +MojoResult DrainData::ReadData() { + const void* buffer; + uint32_t num_bytes = 0; + MojoResult result = BeginReadDataRaw( + handle_.get(), &buffer, &num_bytes, MOJO_READ_DATA_FLAG_NONE); + if (result != MOJO_RESULT_OK) + return result; + const char* p = static_cast<const char*>(buffer); + DataBuffer* data_buffer = new DataBuffer(p, p + num_bytes); + data_buffers_.push_back(data_buffer); + return EndReadDataRaw(handle_.get(), num_bytes); +} + +void DrainData::DeliverData(MojoResult result) { + if (!runner_) { + delete this; + return; + } + + size_t total_bytes = 0; + for (unsigned i = 0; i < data_buffers_.size(); i++) + total_bytes += data_buffers_[i]->size(); + + // Create a total_bytes length ArrayBuffer return value. + gin::Runner::Scope scope(runner_.get()); + v8::Handle<v8::ArrayBuffer> array_buffer = + v8::ArrayBuffer::New(isolate_, total_bytes); + gin::ArrayBuffer buffer; + ConvertFromV8(isolate_, array_buffer, &buffer); + CHECK_EQ(total_bytes, buffer.num_bytes()); + + // Copy the data_buffers into the ArrayBuffer. + char* array_buffer_ptr = static_cast<char*>(buffer.bytes()); + size_t offset = 0; + for (size_t i = 0; i < data_buffers_.size(); i++) { + size_t num_bytes = data_buffers_[i]->size(); + if (num_bytes == 0) + continue; + const char* data_buffer_ptr = &((*data_buffers_[i])[0]); + memcpy(array_buffer_ptr + offset, data_buffer_ptr, num_bytes); + offset += num_bytes; + } + + // The "settled" value of the promise always includes all of the data + // that was read before either an error occurred or the remote pipe handle + // was closed. The latter is indicated by MOJO_RESULT_FAILED_PRECONDITION. + + v8::Handle<v8::Promise::Resolver> resolver( + v8::Local<v8::Promise::Resolver>::New(isolate_, resolver_)); + + gin::Dictionary dictionary = gin::Dictionary::CreateEmpty(isolate_); + dictionary.Set("result", result); + dictionary.Set("buffer", array_buffer); + v8::Handle<v8::Value> settled_value(ConvertToV8(isolate_, dictionary)); + + if (result == MOJO_RESULT_FAILED_PRECONDITION) + resolver->Resolve(settled_value); + else + resolver->Reject(settled_value); + + delete this; +} + +} // namespace js +} // namespace mojo diff --git a/mojo/bindings/js/drain_data.h b/mojo/bindings/js/drain_data.h new file mode 100644 index 0000000..27d8f52 --- /dev/null +++ b/mojo/bindings/js/drain_data.h @@ -0,0 +1,64 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_BINDINGS_JS_DRAIN_DATA_H_ +#define MOJO_BINDINGS_JS_DRAIN_DATA_H_ + +#include "base/memory/scoped_vector.h" +#include "gin/runner.h" +#include "mojo/public/c/environment/async_waiter.h" +#include "mojo/public/cpp/system/core.h" +#include "v8/include/v8.h" + +namespace mojo { +namespace js { + +// This class is the implementation of the Mojo JavaScript core module's +// drainData() method. It is not intended to be used directly. The caller +// allocates a DrainData on the heap and returns GetPromise() to JS. The +// implementation deletes itself after reading as much data as possible +// and rejecting or resolving the Promise. + +class DrainData { + public: + // Starts waiting for data on the specified data pipe consumer handle. + // See WaitForData(). The constructor does not block. + DrainData(v8::Isolate* isolate, mojo::Handle handle); + + // Returns a Promise that will be settled when no more data can be read. + // Should be called just once on a newly allocated DrainData object. + v8::Handle<v8::Value> GetPromise(); + + private: + ~DrainData(); + + // Registers an "async waiter" that calls DataReady() via WaitCompleted(). + void WaitForData(); + static void WaitCompleted(void* self, MojoResult result) { + static_cast<DrainData*>(self)->DataReady(result); + } + + // Use ReadData() to read whatever is availble now on handle_ and save + // it in data_buffers_. + void DataReady(MojoResult result); + MojoResult ReadData(); + + // When the remote data pipe handle is closed, or an error occurs, deliver + // all of the buffered data to the JS Promise and then delete this. + void DeliverData(MojoResult result); + + typedef std::vector<char> DataBuffer; + + v8::Isolate* isolate_; + ScopedDataPipeConsumerHandle handle_; + MojoAsyncWaitID wait_id_; + base::WeakPtr<gin::Runner> runner_; + v8::UniquePersistent<v8::Promise::Resolver> resolver_; + ScopedVector<DataBuffer> data_buffers_; +}; + +} // namespace js +} // namespace mojo + +#endif // MOJO_BINDINGS_JS_DRAIN_DATA_H_ diff --git a/mojo/mojo_base.gyp b/mojo/mojo_base.gyp index b7f2354..9f1127a 100644 --- a/mojo/mojo_base.gyp +++ b/mojo/mojo_base.gyp @@ -505,6 +505,8 @@ # Sources list duplicated in GN build. 'bindings/js/core.cc', 'bindings/js/core.h', + 'bindings/js/drain_data.cc', + 'bindings/js/drain_data.h', 'bindings/js/handle.cc', 'bindings/js/handle.h', 'bindings/js/handle_close_observer.h', |