diff options
author | tbarzic <tbarzic@chromium.org> | 2015-08-10 15:32:45 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-08-10 22:33:29 +0000 |
commit | 6ca50550a8136fdde5fe87d9fe483826f2f0c74e (patch) | |
tree | 6f21df570dd970bdcaa53f306f831b9e987e5af8 /chromeos | |
parent | a79d317a3e68bfa96b1d3552148c21170395a8c9 (diff) | |
download | chromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.zip chromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.tar.gz chromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.tar.bz2 |
User MessageLoopForIO::WatchFileDescriptor in proces_output_watcher
The main point is to avoid usage of select for observing process'
pseudo-terminal fd. Main problems with select is that it doesn't work
for fd > 1024, which started to be passed to ProcessOutputWatcher.
Added benefit of MessageLoopForIO::WatchFileDescriptor is that it's async,
so it enables us to do some clean up for process proxy thread management.
BUG=495165
TEST=Open a lot of crosh tabs (e.g. ~30). Crosh is started in all of them.
Review URL: https://codereview.chromium.org/1258193002
Cr-Commit-Position: refs/heads/master@{#342716}
Diffstat (limited to 'chromeos')
-rw-r--r-- | chromeos/process_proxy/process_output_watcher.cc | 121 | ||||
-rw-r--r-- | chromeos/process_proxy/process_output_watcher.h | 42 | ||||
-rw-r--r-- | chromeos/process_proxy/process_output_watcher_unittest.cc | 36 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy.cc | 93 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy.h | 23 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_registry.cc | 45 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_registry.h | 9 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_unittest.cc | 2 |
8 files changed, 162 insertions, 209 deletions
diff --git a/chromeos/process_proxy/process_output_watcher.cc b/chromeos/process_proxy/process_output_watcher.cc index 17c1c94..92868c0 100644 --- a/chromeos/process_proxy/process_output_watcher.cc +++ b/chromeos/process_proxy/process_output_watcher.cc @@ -4,35 +4,20 @@ #include "chromeos/process_proxy/process_output_watcher.h" -#include <sys/ioctl.h> -#include <sys/select.h> -#include <unistd.h> - #include <algorithm> #include <cstdio> #include <cstring> - +#include "base/bind.h" +#include "base/location.h" #include "base/logging.h" #include "base/posix/eintr_wrapper.h" +#include "base/single_thread_task_runner.h" #include "base/third_party/icu/icu_utf.h" +#include "base/thread_task_runner_handle.h" +#include "base/time/time.h" namespace { -void InitReadFdSet(int out_fd, int stop_fd, fd_set* set) { - FD_ZERO(set); - if (out_fd != -1) - FD_SET(out_fd, set); - FD_SET(stop_fd, set); -} - -void CloseFd(int* fd) { - if (*fd >= 0) { - if (IGNORE_EINTR(close(*fd)) != 0) - DPLOG(WARNING) << "close fd " << *fd << " failed."; - } - *fd = -1; -} - // Gets byte size for a UTF8 character given it's leading byte. The character // size is encoded as number of leading '1' bits in the character's leading // byte. If the most significant bit is '0', the character is a valid ASCII @@ -55,87 +40,77 @@ size_t UTF8SizeFromLeadingByte(uint8 leading_byte) { namespace chromeos { -// static -bool ProcessOutputWatcher::VerifyFileDescriptor(int fd) { - return (fd >= 0) && (fd < FD_SETSIZE); -} - ProcessOutputWatcher::ProcessOutputWatcher( int out_fd, - int stop_fd, const ProcessOutputCallback& callback) : read_buffer_size_(0), - out_fd_(out_fd), - stop_fd_(stop_fd), - on_read_callback_(callback) { - CHECK(VerifyFileDescriptor(out_fd_)); - CHECK(VerifyFileDescriptor(stop_fd_)); - max_fd_ = std::max(out_fd_, stop_fd_); + process_output_file_(out_fd), + on_read_callback_(callback), + weak_factory_(this) { + CHECK_GE(out_fd, 0); // We want to be sure we will be able to add 0 at the end of the input, so -1. read_buffer_capacity_ = arraysize(read_buffer_) - 1; } +ProcessOutputWatcher::~ProcessOutputWatcher() {} + void ProcessOutputWatcher::Start() { WatchProcessOutput(); - OnStop(); -} - -ProcessOutputWatcher::~ProcessOutputWatcher() { - CloseFd(&out_fd_); - CloseFd(&stop_fd_); } -void ProcessOutputWatcher::WatchProcessOutput() { - while (true) { - // This has to be reset with every watch cycle. - fd_set rfds; - DCHECK_GE(stop_fd_, 0); - InitReadFdSet(out_fd_, stop_fd_, &rfds); - - int select_result = - HANDLE_EINTR(select(max_fd_ + 1, &rfds, NULL, NULL, NULL)); +void ProcessOutputWatcher::OnFileCanReadWithoutBlocking(int fd) { + DCHECK_EQ(process_output_file_.GetPlatformFile(), fd); - if (select_result < 0) { - DPLOG(WARNING) << "select failed"; - return; - } + output_file_watcher_.StopWatchingFileDescriptor(); + ReadFromFd(fd); +} - // Check if we were stopped. - if (FD_ISSET(stop_fd_, &rfds)) { - return; - } +void ProcessOutputWatcher::OnFileCanWriteWithoutBlocking(int fd) { + NOTREACHED(); +} - if (out_fd_ != -1 && FD_ISSET(out_fd_, &rfds)) { - ReadFromFd(PROCESS_OUTPUT_TYPE_OUT, &out_fd_); - } - } +void ProcessOutputWatcher::WatchProcessOutput() { + base::MessageLoopForIO::current()->WatchFileDescriptor( + process_output_file_.GetPlatformFile(), false, + base::MessageLoopForIO::WATCH_READ, &output_file_watcher_, this); } -void ProcessOutputWatcher::ReadFromFd(ProcessOutputType type, int* fd) { +void ProcessOutputWatcher::ReadFromFd(int fd) { // We don't want to necessary read pipe until it is empty so we don't starve // other streams in case data is written faster than we read it. If there is // more than read_buffer_size_ bytes in pipe, it will be read in the next // iteration. DCHECK_GT(read_buffer_capacity_, read_buffer_size_); ssize_t bytes_read = - HANDLE_EINTR(read(*fd, - &read_buffer_[read_buffer_size_], + HANDLE_EINTR(read(fd, &read_buffer_[read_buffer_size_], read_buffer_capacity_ - read_buffer_size_)); + + if (bytes_read > 0) { + ReportOutput(PROCESS_OUTPUT_TYPE_OUT, bytes_read); + + // Delay next read to make the process less likely to flood IPC channel + // when output is reported to terminal extension via terminalPrivate API + // (which is the only client of this code). + // TODO(tbarzic): Properly fix this!! Provide a mechanism for clients to + // ack reported output and continue watching the process when ack is + // received. https://crbug.com/398901 + base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( + FROM_HERE, base::Bind(&ProcessOutputWatcher::WatchProcessOutput, + weak_factory_.GetWeakPtr()), + base::TimeDelta::FromMilliseconds(10)); + + return; + } + if (bytes_read < 0) DPLOG(WARNING) << "read from buffer failed"; - if (bytes_read > 0) - ReportOutput(type, bytes_read); - // If there is nothing on the output the watched process has exited (slave end // of pty is closed). - if (bytes_read <= 0) { - // Slave pseudo terminal has been closed, we won't need master fd anymore. - CloseFd(fd); + on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, ""); - // We have lost contact with the process, so report it. - on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, ""); - } + // Cancel pending |WatchProcessOutput| calls. + weak_factory_.InvalidateWeakPtrs(); } size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() { @@ -191,8 +166,4 @@ void ProcessOutputWatcher::ReportOutput(ProcessOutputType type, read_buffer_size_ -= output_to_report; } -void ProcessOutputWatcher::OnStop() { - delete this; -} - } // namespace chromeos diff --git a/chromeos/process_proxy/process_output_watcher.h b/chromeos/process_proxy/process_output_watcher.h index eb0d046..a5aefb5 100644 --- a/chromeos/process_proxy/process_output_watcher.h +++ b/chromeos/process_proxy/process_output_watcher.h @@ -8,13 +8,15 @@ #include <string> #include "base/callback.h" +#include "base/files/file.h" +#include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" #include "chromeos/chromeos_export.h" namespace chromeos { enum ProcessOutputType { PROCESS_OUTPUT_TYPE_OUT, - PROCESS_OUTPUT_TYPE_ERR, PROCESS_OUTPUT_TYPE_EXIT }; @@ -23,31 +25,24 @@ typedef base::Callback<void(ProcessOutputType, const std::string&)> // Observes output on |out_fd| and invokes |callback| when some output is // detected. It assumes UTF8 output. -// If output is detected in |stop_fd|, the watcher is stopped. -// This class should live on its own thread because running class makes -// underlying thread block. It deletes itself when watching is stopped. -class CHROMEOS_EXPORT ProcessOutputWatcher { +class CHROMEOS_EXPORT ProcessOutputWatcher + : public base::MessageLoopForIO::Watcher { public: - // Verifies that fds that we got are properly set. - static bool VerifyFileDescriptor(int fd); + ProcessOutputWatcher(int out_fd, const ProcessOutputCallback& callback); + ~ProcessOutputWatcher() override; - ProcessOutputWatcher(int out_fd, int stop_fd, - const ProcessOutputCallback& callback); - - // This will block current thread!!!! void Start(); private: - // The object will destroy itself when it stops watching process output. - ~ProcessOutputWatcher(); + // MessageLoopForIO::Watcher overrides: + void OnFileCanReadWithoutBlocking(int fd) override; + void OnFileCanWriteWithoutBlocking(int fd) override; - // Listens to output from supplied fds. It guarantees data written to one fd - // will be reported in order that it has been written (this is not true across - // fds, it would be nicer if it was). + // Listens to output from fd passed to the constructor. void WatchProcessOutput(); - // Reads data from fd, and when it's done, invokes callback function. - void ReadFromFd(ProcessOutputType type, int* fd); + // Reads data from fd and invokes callback |on_read_callback_| with read data. + void ReadFromFd(int fd); // Checks if the read buffer has any trailing incomplete UTF8 characters and // returns the read buffer size without them. @@ -57,22 +52,21 @@ class CHROMEOS_EXPORT ProcessOutputWatcher { // output. void ReportOutput(ProcessOutputType type, size_t new_bytes_count); - // It will just delete this. - void OnStop(); - char read_buffer_[256]; // Maximum read buffer content size. size_t read_buffer_capacity_; // Current read bufferi content size. size_t read_buffer_size_; - int out_fd_; - int stop_fd_; - int max_fd_; + // Contains file descsriptor to which watched process output is written. + base::File process_output_file_; + base::MessageLoopForIO::FileDescriptorWatcher output_file_watcher_; // Callback that will be invoked when some output is detected. ProcessOutputCallback on_read_callback_; + base::WeakPtrFactory<ProcessOutputWatcher> weak_factory_; + DISALLOW_COPY_AND_ASSIGN(ProcessOutputWatcher); }; diff --git a/chromeos/process_proxy/process_output_watcher_unittest.cc b/chromeos/process_proxy/process_output_watcher_unittest.cc index 529ba18..2f2eb99 100644 --- a/chromeos/process_proxy/process_output_watcher_unittest.cc +++ b/chromeos/process_proxy/process_output_watcher_unittest.cc @@ -21,6 +21,8 @@ namespace chromeos { +namespace { + struct TestCase { TestCase(const std::string& input, bool send_terminating_null) : input(input), @@ -83,6 +85,12 @@ class ProcessWatcherExpectations { size_t received_from_out_; }; +void StopProcessOutputWatcher(scoped_ptr<ProcessOutputWatcher> watcher) { + // Just deleting |watcher| if sufficient. +} + +} // namespace + class ProcessOutputWatcherTest : public testing::Test { public: ProcessOutputWatcherTest() : output_watch_thread_started_(false), @@ -96,13 +104,6 @@ class ProcessOutputWatcherTest : public testing::Test { output_watch_thread_->Stop(); } - void StartWatch(int pt, int stop) { - // This will delete itself. - ProcessOutputWatcher* crosh_watcher = new ProcessOutputWatcher(pt, stop, - base::Bind(&ProcessOutputWatcherTest::OnRead, base::Unretained(this))); - crosh_watcher->Start(); - } - void OnRead(ProcessOutputType type, const std::string& output) { ASSERT_FALSE(failed_); // There may be an EXIT signal sent during test tear down (which is sent @@ -134,17 +135,20 @@ class ProcessOutputWatcherTest : public testing::Test { void RunTest(const std::vector<TestCase>& test_cases) { ASSERT_FALSE(output_watch_thread_started_); output_watch_thread_.reset(new base::Thread("ProcessOutpuWatchThread")); - output_watch_thread_started_ = output_watch_thread_->Start(); + output_watch_thread_started_ = output_watch_thread_->StartWithOptions( + base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); ASSERT_TRUE(output_watch_thread_started_); - int pt_pipe[2], stop_pipe[2]; + int pt_pipe[2]; ASSERT_FALSE(HANDLE_EINTR(pipe(pt_pipe))); - ASSERT_FALSE(HANDLE_EINTR(pipe(stop_pipe))); + + scoped_ptr<ProcessOutputWatcher> crosh_watcher(new ProcessOutputWatcher( + pt_pipe[0], + base::Bind(&ProcessOutputWatcherTest::OnRead, base::Unretained(this)))); output_watch_thread_->task_runner()->PostTask( - FROM_HERE, - base::Bind(&ProcessOutputWatcherTest::StartWatch, - base::Unretained(this), pt_pipe[0], stop_pipe[0])); + FROM_HERE, base::Bind(&ProcessOutputWatcher::Start, + base::Unretained(crosh_watcher.get()))); for (size_t i = 0; i < test_cases.size(); i++) { expectations_.SetTestCase(test_cases[i]); @@ -168,10 +172,10 @@ class ProcessOutputWatcherTest : public testing::Test { break; } - // Send stop signal. It is not important which string we send. - EXPECT_TRUE(base::WriteFileDescriptor(stop_pipe[1], "q", 1)); + output_watch_thread_->task_runner()->PostTask( + FROM_HERE, + base::Bind(&StopProcessOutputWatcher, base::Passed(&crosh_watcher))); - EXPECT_NE(-1, IGNORE_EINTR(close(stop_pipe[1]))); EXPECT_NE(-1, IGNORE_EINTR(close(pt_pipe[1]))); } diff --git a/chromeos/process_proxy/process_proxy.cc b/chromeos/process_proxy/process_proxy.cc index 4907d8b..8ec9ceb 100644 --- a/chromeos/process_proxy/process_proxy.cc +++ b/chromeos/process_proxy/process_proxy.cc @@ -17,16 +17,10 @@ #include "base/process/process.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" -#include "base/threading/thread.h" #include "third_party/cros_system_api/switches/chrome_switches.h" namespace { -enum PipeEnd { - PIPE_END_READ, - PIPE_END_WRITE -}; - enum PseudoTerminalFd { PT_MASTER_FD, PT_SLAVE_FD @@ -34,16 +28,18 @@ enum PseudoTerminalFd { const int kInvalidFd = -1; +void StopOutputWatcher(scoped_ptr<chromeos::ProcessOutputWatcher> watcher) { + // Just deleting |watcher| if sufficient to stop it. +} + } // namespace namespace chromeos { -ProcessProxy::ProcessProxy(): process_launched_(false), - callback_set_(false), - watcher_started_(false) { +ProcessProxy::ProcessProxy() : process_launched_(false), callback_set_(false) { // Set pipes to initial, invalid value so we can easily know if a pipe was // opened by us. - ClearAllFdPairs(); + ClearFdPair(pt_pair_); } bool ProcessProxy::Open(const std::string& command, pid_t* pid) { @@ -67,45 +63,33 @@ bool ProcessProxy::Open(const std::string& command, pid_t* pid) { return process_launched_; } -bool ProcessProxy::StartWatchingOnThread( - base::Thread* watch_thread, +bool ProcessProxy::StartWatchingOutput( + const scoped_refptr<base::SingleThreadTaskRunner>& watcher_runner, const ProcessOutputCallback& callback) { DCHECK(process_launched_); - if (watcher_started_) - return false; - if (pipe(shutdown_pipe_) || - !ProcessOutputWatcher::VerifyFileDescriptor( - shutdown_pipe_[PIPE_END_READ])) { - return false; - } + CHECK(!output_watcher_.get()); // We give ProcessOutputWatcher a copy of master to make life easier during // tear down. // TODO(tbarzic): improve fd managment. int master_copy = HANDLE_EINTR(dup(pt_pair_[PT_MASTER_FD])); - if (!ProcessOutputWatcher::VerifyFileDescriptor(master_copy)) + if (master_copy < 0) return false; callback_set_ = true; callback_ = callback; callback_runner_ = base::ThreadTaskRunnerHandle::Get(); + watcher_runner_ = watcher_runner; // This object will delete itself once watching is stopped. // It also takes ownership of the passed fds. - ProcessOutputWatcher* output_watcher = - new ProcessOutputWatcher(master_copy, - shutdown_pipe_[PIPE_END_READ], - base::Bind(&ProcessProxy::OnProcessOutput, - this)); - - // Output watcher took ownership of the read end of shutdown pipe. - shutdown_pipe_[PIPE_END_READ] = -1; + output_watcher_.reset(new ProcessOutputWatcher( + master_copy, base::Bind(&ProcessProxy::OnProcessOutput, this))); - // |watch| thread is blocked by |output_watcher| from now on. - watch_thread->task_runner()->PostTask( + watcher_runner_->PostTask( FROM_HERE, base::Bind(&ProcessOutputWatcher::Start, - base::Unretained(output_watcher))); - watcher_started_ = true; + base::Unretained(output_watcher_.get()))); + return true; } @@ -131,14 +115,13 @@ void ProcessProxy::CallOnProcessOutputCallback(ProcessOutputType type, callback_.Run(type, output); } -bool ProcessProxy::StopWatching() { - if (!watcher_started_) - return true; - // Signal Watcher that we are done. We use self-pipe trick to unblock watcher. - // Anything may be written to the pipe. - const char message[] = "q"; - return base::WriteFileDescriptor(shutdown_pipe_[PIPE_END_WRITE], - message, sizeof(message)); +void ProcessProxy::StopWatching() { + if (!output_watcher_.get()) + return; + + watcher_runner_->PostTask( + FROM_HERE, + base::Bind(&StopOutputWatcher, base::Passed(&output_watcher_))); } void ProcessProxy::Close() { @@ -153,10 +136,8 @@ void ProcessProxy::Close() { base::Process process = base::Process::DeprecatedGetProcessFromHandle(pid_); process.Terminate(0, true /* wait */); - // TODO(tbarzic): What if this fails? StopWatching(); - - CloseAllFdPairs(); + CloseFdPair(pt_pair_); } bool ProcessProxy::Write(const std::string& text) { @@ -183,13 +164,7 @@ bool ProcessProxy::OnTerminalResize(int width, int height) { } ProcessProxy::~ProcessProxy() { - // In case watcher did not started, we may get deleted without calling Close. - // In that case we have to clean up created pipes. If watcher had been - // started, there will be a callback with our reference owned by - // process_output_watcher until Close is called, so we know Close has been - // called by now (and pipes have been cleaned). - if (!watcher_started_) - CloseAllFdPairs(); + Close(); } bool ProcessProxy::CreatePseudoTerminalPair(int *pt_pair) { @@ -246,14 +221,9 @@ bool ProcessProxy::LaunchProcess(const std::string& command, int slave_fd, return process.IsValid(); } -void ProcessProxy::CloseAllFdPairs() { - CloseFdPair(pt_pair_); - CloseFdPair(shutdown_pipe_); -} - void ProcessProxy::CloseFdPair(int* pipe) { - CloseFd(&(pipe[PIPE_END_READ])); - CloseFd(&(pipe[PIPE_END_WRITE])); + CloseFd(&(pipe[PT_MASTER_FD])); + CloseFd(&(pipe[PT_SLAVE_FD])); } void ProcessProxy::CloseFd(int* fd) { @@ -264,14 +234,9 @@ void ProcessProxy::CloseFd(int* fd) { *fd = kInvalidFd; } -void ProcessProxy::ClearAllFdPairs() { - ClearFdPair(pt_pair_); - ClearFdPair(shutdown_pipe_); -} - void ProcessProxy::ClearFdPair(int* pipe) { - pipe[PIPE_END_READ] = kInvalidFd; - pipe[PIPE_END_WRITE] = kInvalidFd; + pipe[PT_MASTER_FD] = kInvalidFd; + pipe[PT_SLAVE_FD] = kInvalidFd; } } // namespace chromeos diff --git a/chromeos/process_proxy/process_proxy.h b/chromeos/process_proxy/process_proxy.h index 65f3d34..d49ba80 100644 --- a/chromeos/process_proxy/process_proxy.h +++ b/chromeos/process_proxy/process_proxy.h @@ -15,11 +15,15 @@ #include "chromeos/process_proxy/process_output_watcher.h" namespace base { +class SingleThreadTaskRunner; class TaskRunner; -class Thread; } // namespace base namespace chromeos { +class ProcessOutputWatcher; +} // namespace chromeos + +namespace chromeos { // Proxy to a single ChromeOS process. // This is refcounted. Note that output watcher, when it gets triggered owns a @@ -32,11 +36,9 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> { // Opens a process using command |command|. |pid| is set to new process' pid. bool Open(const std::string& command, pid_t* pid); - // Triggers watcher object on |watch_thread|. |watch_thread| gets blocked, so - // it should not be one of commonly used threads. It should be thread created - // specifically for running process output watcher. - bool StartWatchingOnThread(base::Thread* watch_thread, - const ProcessOutputCallback& callback); + bool StartWatchingOutput( + const scoped_refptr<base::SingleThreadTaskRunner>& watcher_runner, + const ProcessOutputCallback& callback); // Sends some data to the process. bool Write(const std::string& text); @@ -67,15 +69,12 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> { void CallOnProcessOutputCallback(ProcessOutputType type, const std::string& output); - bool StopWatching(); + void StopWatching(); - // Methods for cleaning up pipes. - void CloseAllFdPairs(); // Expects array of 2 file descripters. void CloseFdPair(int* pipe); // Expects pointer to single file descriptor. void CloseFd(int* fd); - void ClearAllFdPairs(); // Expects array of 2 file descripters. void ClearFdPair(int* pipe); @@ -85,11 +84,11 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> { bool callback_set_; ProcessOutputCallback callback_; scoped_refptr<base::TaskRunner> callback_runner_; + scoped_refptr<base::SingleThreadTaskRunner> watcher_runner_; - bool watcher_started_; + scoped_ptr<ProcessOutputWatcher> output_watcher_; int pt_pair_[2]; - int shutdown_pipe_[2]; DISALLOW_COPY_AND_ASSIGN(ProcessProxy); }; diff --git a/chromeos/process_proxy/process_proxy_registry.cc b/chromeos/process_proxy/process_proxy_registry.cc index aa2e9d8..3619720 100644 --- a/chromeos/process_proxy/process_proxy_registry.cc +++ b/chromeos/process_proxy/process_proxy_registry.cc @@ -13,15 +13,12 @@ namespace { const char kWatcherThreadName[] = "ProcessWatcherThread"; const char kStdoutOutputType[] = "stdout"; -const char kStderrOutputType[] = "stderr"; const char kExitOutputType[] = "exit"; const char* ProcessOutputTypeToString(ProcessOutputType type) { switch (type) { case PROCESS_OUTPUT_TYPE_OUT: return kStdoutOutputType; - case PROCESS_OUTPUT_TYPE_ERR: - return kStderrOutputType; case PROCESS_OUTPUT_TYPE_EXIT: return kExitOutputType; default: @@ -40,7 +37,7 @@ ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo() { ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo( const ProcessProxyInfo& other) { // This should be called with empty info only. - DCHECK(!other.proxy.get() && !other.watcher_thread.get()); + DCHECK(!other.proxy.get()); } ProcessProxyRegistry::ProcessProxyInfo::~ProcessProxyInfo() { @@ -54,9 +51,18 @@ ProcessProxyRegistry::~ProcessProxyRegistry() { // on a different thread (it's a LazyInstance). DetachFromThread(); + ShutDown(); +} + +void ProcessProxyRegistry::ShutDown() { // Close all proxies we own. while (!proxy_map_.empty()) CloseProcess(proxy_map_.begin()->first); + + if (watcher_thread_) { + watcher_thread_->Stop(); + watcher_thread_.reset(); + } } // static @@ -70,13 +76,8 @@ bool ProcessProxyRegistry::OpenProcess( const ProcessOutputCallbackWithPid& callback) { DCHECK(CalledOnValidThread()); - // TODO(tbarzic): Instead of creating a new thread for each new process proxy, - // use one thread for all processes. - // We will need new thread for proxy's outpu watcher. - scoped_ptr<base::Thread> watcher_thread(new base::Thread(kWatcherThreadName)); - if (!watcher_thread->Start()) { + if (!EnsureWatcherThreadStarted()) return false; - } // Create and open new proxy. scoped_refptr<ProcessProxy> proxy(new ProcessProxy()); @@ -85,12 +86,12 @@ bool ProcessProxyRegistry::OpenProcess( // Kick off watcher. // We can use Unretained because proxy will stop calling callback after it is - // closed, which is done befire this object goes away. - if (!proxy->StartWatchingOnThread(watcher_thread.get(), - base::Bind(&ProcessProxyRegistry::OnProcessOutput, - base::Unretained(this), *pid))) { + // closed, which is done before this object goes away. + if (!proxy->StartWatchingOutput( + watcher_thread_->task_runner(), + base::Bind(&ProcessProxyRegistry::OnProcessOutput, + base::Unretained(this), *pid))) { proxy->Close(); - watcher_thread->Stop(); return false; } @@ -100,7 +101,6 @@ bool ProcessProxyRegistry::OpenProcess( // created because we don't know |pid| then. ProcessProxyInfo& info = proxy_map_[*pid]; info.proxy.swap(proxy); - info.watcher_thread.reset(watcher_thread.release()); info.process_id = *pid; info.callback = callback; return true; @@ -123,7 +123,6 @@ bool ProcessProxyRegistry::CloseProcess(pid_t pid) { return false; it->second.proxy->Close(); - it->second.watcher_thread->Stop(); proxy_map_.erase(it); return true; } @@ -156,4 +155,16 @@ void ProcessProxyRegistry::OnProcessOutput(pid_t pid, CloseProcess(pid); } +bool ProcessProxyRegistry::EnsureWatcherThreadStarted() { + if (watcher_thread_.get()) + return true; + + // TODO(tbarzic): Change process output watcher to watch for fd readability on + // FILE thread, and move output reading to worker thread instead of + // spinning a new thread. + watcher_thread_.reset(new base::Thread(kWatcherThreadName)); + return watcher_thread_->StartWithOptions( + base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); +} + } // namespace chromeos diff --git a/chromeos/process_proxy/process_proxy_registry.h b/chromeos/process_proxy/process_proxy_registry.h index 4280ebd..554af9e 100644 --- a/chromeos/process_proxy/process_proxy_registry.h +++ b/chromeos/process_proxy/process_proxy_registry.h @@ -6,6 +6,7 @@ #define CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_REGISTRY_H_ #include <map> +#include <string> #include "base/callback.h" #include "base/lazy_instance.h" @@ -28,7 +29,6 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe { // Info we need about a ProcessProxy instance. struct ProcessProxyInfo { scoped_refptr<ProcessProxy> proxy; - scoped_ptr<base::Thread> watcher_thread; ProcessOutputCallbackWithPid callback; pid_t process_id; @@ -50,6 +50,9 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe { // Reports terminal resize to process proxy. bool OnTerminalResize(pid_t pid, int width, int height); + // Shuts down registry, closing all associated processed. + void ShutDown(); + // Currently used for testing. void SetOutputCallback(const ProcessOutputCallback& callback); @@ -64,9 +67,13 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe { ProcessOutputType type, const std::string& data); + bool EnsureWatcherThreadStarted(); + // Map of all existing ProcessProxies. std::map<pid_t, ProcessProxyInfo> proxy_map_; + scoped_ptr<base::Thread> watcher_thread_; + DISALLOW_COPY_AND_ASSIGN(ProcessProxyRegistry); }; diff --git a/chromeos/process_proxy/process_proxy_unittest.cc b/chromeos/process_proxy/process_proxy_unittest.cc index 20221c2..b7b7c7f 100644 --- a/chromeos/process_proxy/process_proxy_unittest.cc +++ b/chromeos/process_proxy/process_proxy_unittest.cc @@ -208,6 +208,8 @@ class ProcessProxyTest : public testing::Test { process.Terminate(0, true); } + registry_->ShutDown(); + base::ThreadTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::MessageLoop::QuitClosure()); } |