summaryrefslogtreecommitdiffstats
path: root/chromeos
diff options
context:
space:
mode:
authortbarzic <tbarzic@chromium.org>2015-08-10 15:32:45 -0700
committerCommit bot <commit-bot@chromium.org>2015-08-10 22:33:29 +0000
commit6ca50550a8136fdde5fe87d9fe483826f2f0c74e (patch)
tree6f21df570dd970bdcaa53f306f831b9e987e5af8 /chromeos
parenta79d317a3e68bfa96b1d3552148c21170395a8c9 (diff)
downloadchromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.zip
chromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.tar.gz
chromium_src-6ca50550a8136fdde5fe87d9fe483826f2f0c74e.tar.bz2
User MessageLoopForIO::WatchFileDescriptor in proces_output_watcher
The main point is to avoid usage of select for observing process' pseudo-terminal fd. Main problems with select is that it doesn't work for fd > 1024, which started to be passed to ProcessOutputWatcher. Added benefit of MessageLoopForIO::WatchFileDescriptor is that it's async, so it enables us to do some clean up for process proxy thread management. BUG=495165 TEST=Open a lot of crosh tabs (e.g. ~30). Crosh is started in all of them. Review URL: https://codereview.chromium.org/1258193002 Cr-Commit-Position: refs/heads/master@{#342716}
Diffstat (limited to 'chromeos')
-rw-r--r--chromeos/process_proxy/process_output_watcher.cc121
-rw-r--r--chromeos/process_proxy/process_output_watcher.h42
-rw-r--r--chromeos/process_proxy/process_output_watcher_unittest.cc36
-rw-r--r--chromeos/process_proxy/process_proxy.cc93
-rw-r--r--chromeos/process_proxy/process_proxy.h23
-rw-r--r--chromeos/process_proxy/process_proxy_registry.cc45
-rw-r--r--chromeos/process_proxy/process_proxy_registry.h9
-rw-r--r--chromeos/process_proxy/process_proxy_unittest.cc2
8 files changed, 162 insertions, 209 deletions
diff --git a/chromeos/process_proxy/process_output_watcher.cc b/chromeos/process_proxy/process_output_watcher.cc
index 17c1c94..92868c0 100644
--- a/chromeos/process_proxy/process_output_watcher.cc
+++ b/chromeos/process_proxy/process_output_watcher.cc
@@ -4,35 +4,20 @@
#include "chromeos/process_proxy/process_output_watcher.h"
-#include <sys/ioctl.h>
-#include <sys/select.h>
-#include <unistd.h>
-
#include <algorithm>
#include <cstdio>
#include <cstring>
-
+#include "base/bind.h"
+#include "base/location.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
+#include "base/single_thread_task_runner.h"
#include "base/third_party/icu/icu_utf.h"
+#include "base/thread_task_runner_handle.h"
+#include "base/time/time.h"
namespace {
-void InitReadFdSet(int out_fd, int stop_fd, fd_set* set) {
- FD_ZERO(set);
- if (out_fd != -1)
- FD_SET(out_fd, set);
- FD_SET(stop_fd, set);
-}
-
-void CloseFd(int* fd) {
- if (*fd >= 0) {
- if (IGNORE_EINTR(close(*fd)) != 0)
- DPLOG(WARNING) << "close fd " << *fd << " failed.";
- }
- *fd = -1;
-}
-
// Gets byte size for a UTF8 character given it's leading byte. The character
// size is encoded as number of leading '1' bits in the character's leading
// byte. If the most significant bit is '0', the character is a valid ASCII
@@ -55,87 +40,77 @@ size_t UTF8SizeFromLeadingByte(uint8 leading_byte) {
namespace chromeos {
-// static
-bool ProcessOutputWatcher::VerifyFileDescriptor(int fd) {
- return (fd >= 0) && (fd < FD_SETSIZE);
-}
-
ProcessOutputWatcher::ProcessOutputWatcher(
int out_fd,
- int stop_fd,
const ProcessOutputCallback& callback)
: read_buffer_size_(0),
- out_fd_(out_fd),
- stop_fd_(stop_fd),
- on_read_callback_(callback) {
- CHECK(VerifyFileDescriptor(out_fd_));
- CHECK(VerifyFileDescriptor(stop_fd_));
- max_fd_ = std::max(out_fd_, stop_fd_);
+ process_output_file_(out_fd),
+ on_read_callback_(callback),
+ weak_factory_(this) {
+ CHECK_GE(out_fd, 0);
// We want to be sure we will be able to add 0 at the end of the input, so -1.
read_buffer_capacity_ = arraysize(read_buffer_) - 1;
}
+ProcessOutputWatcher::~ProcessOutputWatcher() {}
+
void ProcessOutputWatcher::Start() {
WatchProcessOutput();
- OnStop();
-}
-
-ProcessOutputWatcher::~ProcessOutputWatcher() {
- CloseFd(&out_fd_);
- CloseFd(&stop_fd_);
}
-void ProcessOutputWatcher::WatchProcessOutput() {
- while (true) {
- // This has to be reset with every watch cycle.
- fd_set rfds;
- DCHECK_GE(stop_fd_, 0);
- InitReadFdSet(out_fd_, stop_fd_, &rfds);
-
- int select_result =
- HANDLE_EINTR(select(max_fd_ + 1, &rfds, NULL, NULL, NULL));
+void ProcessOutputWatcher::OnFileCanReadWithoutBlocking(int fd) {
+ DCHECK_EQ(process_output_file_.GetPlatformFile(), fd);
- if (select_result < 0) {
- DPLOG(WARNING) << "select failed";
- return;
- }
+ output_file_watcher_.StopWatchingFileDescriptor();
+ ReadFromFd(fd);
+}
- // Check if we were stopped.
- if (FD_ISSET(stop_fd_, &rfds)) {
- return;
- }
+void ProcessOutputWatcher::OnFileCanWriteWithoutBlocking(int fd) {
+ NOTREACHED();
+}
- if (out_fd_ != -1 && FD_ISSET(out_fd_, &rfds)) {
- ReadFromFd(PROCESS_OUTPUT_TYPE_OUT, &out_fd_);
- }
- }
+void ProcessOutputWatcher::WatchProcessOutput() {
+ base::MessageLoopForIO::current()->WatchFileDescriptor(
+ process_output_file_.GetPlatformFile(), false,
+ base::MessageLoopForIO::WATCH_READ, &output_file_watcher_, this);
}
-void ProcessOutputWatcher::ReadFromFd(ProcessOutputType type, int* fd) {
+void ProcessOutputWatcher::ReadFromFd(int fd) {
// We don't want to necessary read pipe until it is empty so we don't starve
// other streams in case data is written faster than we read it. If there is
// more than read_buffer_size_ bytes in pipe, it will be read in the next
// iteration.
DCHECK_GT(read_buffer_capacity_, read_buffer_size_);
ssize_t bytes_read =
- HANDLE_EINTR(read(*fd,
- &read_buffer_[read_buffer_size_],
+ HANDLE_EINTR(read(fd, &read_buffer_[read_buffer_size_],
read_buffer_capacity_ - read_buffer_size_));
+
+ if (bytes_read > 0) {
+ ReportOutput(PROCESS_OUTPUT_TYPE_OUT, bytes_read);
+
+ // Delay next read to make the process less likely to flood IPC channel
+ // when output is reported to terminal extension via terminalPrivate API
+ // (which is the only client of this code).
+ // TODO(tbarzic): Properly fix this!! Provide a mechanism for clients to
+ // ack reported output and continue watching the process when ack is
+ // received. https://crbug.com/398901
+ base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
+ FROM_HERE, base::Bind(&ProcessOutputWatcher::WatchProcessOutput,
+ weak_factory_.GetWeakPtr()),
+ base::TimeDelta::FromMilliseconds(10));
+
+ return;
+ }
+
if (bytes_read < 0)
DPLOG(WARNING) << "read from buffer failed";
- if (bytes_read > 0)
- ReportOutput(type, bytes_read);
-
// If there is nothing on the output the watched process has exited (slave end
// of pty is closed).
- if (bytes_read <= 0) {
- // Slave pseudo terminal has been closed, we won't need master fd anymore.
- CloseFd(fd);
+ on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, "");
- // We have lost contact with the process, so report it.
- on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, "");
- }
+ // Cancel pending |WatchProcessOutput| calls.
+ weak_factory_.InvalidateWeakPtrs();
}
size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() {
@@ -191,8 +166,4 @@ void ProcessOutputWatcher::ReportOutput(ProcessOutputType type,
read_buffer_size_ -= output_to_report;
}
-void ProcessOutputWatcher::OnStop() {
- delete this;
-}
-
} // namespace chromeos
diff --git a/chromeos/process_proxy/process_output_watcher.h b/chromeos/process_proxy/process_output_watcher.h
index eb0d046..a5aefb5 100644
--- a/chromeos/process_proxy/process_output_watcher.h
+++ b/chromeos/process_proxy/process_output_watcher.h
@@ -8,13 +8,15 @@
#include <string>
#include "base/callback.h"
+#include "base/files/file.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
#include "chromeos/chromeos_export.h"
namespace chromeos {
enum ProcessOutputType {
PROCESS_OUTPUT_TYPE_OUT,
- PROCESS_OUTPUT_TYPE_ERR,
PROCESS_OUTPUT_TYPE_EXIT
};
@@ -23,31 +25,24 @@ typedef base::Callback<void(ProcessOutputType, const std::string&)>
// Observes output on |out_fd| and invokes |callback| when some output is
// detected. It assumes UTF8 output.
-// If output is detected in |stop_fd|, the watcher is stopped.
-// This class should live on its own thread because running class makes
-// underlying thread block. It deletes itself when watching is stopped.
-class CHROMEOS_EXPORT ProcessOutputWatcher {
+class CHROMEOS_EXPORT ProcessOutputWatcher
+ : public base::MessageLoopForIO::Watcher {
public:
- // Verifies that fds that we got are properly set.
- static bool VerifyFileDescriptor(int fd);
+ ProcessOutputWatcher(int out_fd, const ProcessOutputCallback& callback);
+ ~ProcessOutputWatcher() override;
- ProcessOutputWatcher(int out_fd, int stop_fd,
- const ProcessOutputCallback& callback);
-
- // This will block current thread!!!!
void Start();
private:
- // The object will destroy itself when it stops watching process output.
- ~ProcessOutputWatcher();
+ // MessageLoopForIO::Watcher overrides:
+ void OnFileCanReadWithoutBlocking(int fd) override;
+ void OnFileCanWriteWithoutBlocking(int fd) override;
- // Listens to output from supplied fds. It guarantees data written to one fd
- // will be reported in order that it has been written (this is not true across
- // fds, it would be nicer if it was).
+ // Listens to output from fd passed to the constructor.
void WatchProcessOutput();
- // Reads data from fd, and when it's done, invokes callback function.
- void ReadFromFd(ProcessOutputType type, int* fd);
+ // Reads data from fd and invokes callback |on_read_callback_| with read data.
+ void ReadFromFd(int fd);
// Checks if the read buffer has any trailing incomplete UTF8 characters and
// returns the read buffer size without them.
@@ -57,22 +52,21 @@ class CHROMEOS_EXPORT ProcessOutputWatcher {
// output.
void ReportOutput(ProcessOutputType type, size_t new_bytes_count);
- // It will just delete this.
- void OnStop();
-
char read_buffer_[256];
// Maximum read buffer content size.
size_t read_buffer_capacity_;
// Current read bufferi content size.
size_t read_buffer_size_;
- int out_fd_;
- int stop_fd_;
- int max_fd_;
+ // Contains file descsriptor to which watched process output is written.
+ base::File process_output_file_;
+ base::MessageLoopForIO::FileDescriptorWatcher output_file_watcher_;
// Callback that will be invoked when some output is detected.
ProcessOutputCallback on_read_callback_;
+ base::WeakPtrFactory<ProcessOutputWatcher> weak_factory_;
+
DISALLOW_COPY_AND_ASSIGN(ProcessOutputWatcher);
};
diff --git a/chromeos/process_proxy/process_output_watcher_unittest.cc b/chromeos/process_proxy/process_output_watcher_unittest.cc
index 529ba18..2f2eb99 100644
--- a/chromeos/process_proxy/process_output_watcher_unittest.cc
+++ b/chromeos/process_proxy/process_output_watcher_unittest.cc
@@ -21,6 +21,8 @@
namespace chromeos {
+namespace {
+
struct TestCase {
TestCase(const std::string& input, bool send_terminating_null)
: input(input),
@@ -83,6 +85,12 @@ class ProcessWatcherExpectations {
size_t received_from_out_;
};
+void StopProcessOutputWatcher(scoped_ptr<ProcessOutputWatcher> watcher) {
+ // Just deleting |watcher| if sufficient.
+}
+
+} // namespace
+
class ProcessOutputWatcherTest : public testing::Test {
public:
ProcessOutputWatcherTest() : output_watch_thread_started_(false),
@@ -96,13 +104,6 @@ class ProcessOutputWatcherTest : public testing::Test {
output_watch_thread_->Stop();
}
- void StartWatch(int pt, int stop) {
- // This will delete itself.
- ProcessOutputWatcher* crosh_watcher = new ProcessOutputWatcher(pt, stop,
- base::Bind(&ProcessOutputWatcherTest::OnRead, base::Unretained(this)));
- crosh_watcher->Start();
- }
-
void OnRead(ProcessOutputType type, const std::string& output) {
ASSERT_FALSE(failed_);
// There may be an EXIT signal sent during test tear down (which is sent
@@ -134,17 +135,20 @@ class ProcessOutputWatcherTest : public testing::Test {
void RunTest(const std::vector<TestCase>& test_cases) {
ASSERT_FALSE(output_watch_thread_started_);
output_watch_thread_.reset(new base::Thread("ProcessOutpuWatchThread"));
- output_watch_thread_started_ = output_watch_thread_->Start();
+ output_watch_thread_started_ = output_watch_thread_->StartWithOptions(
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
ASSERT_TRUE(output_watch_thread_started_);
- int pt_pipe[2], stop_pipe[2];
+ int pt_pipe[2];
ASSERT_FALSE(HANDLE_EINTR(pipe(pt_pipe)));
- ASSERT_FALSE(HANDLE_EINTR(pipe(stop_pipe)));
+
+ scoped_ptr<ProcessOutputWatcher> crosh_watcher(new ProcessOutputWatcher(
+ pt_pipe[0],
+ base::Bind(&ProcessOutputWatcherTest::OnRead, base::Unretained(this))));
output_watch_thread_->task_runner()->PostTask(
- FROM_HERE,
- base::Bind(&ProcessOutputWatcherTest::StartWatch,
- base::Unretained(this), pt_pipe[0], stop_pipe[0]));
+ FROM_HERE, base::Bind(&ProcessOutputWatcher::Start,
+ base::Unretained(crosh_watcher.get())));
for (size_t i = 0; i < test_cases.size(); i++) {
expectations_.SetTestCase(test_cases[i]);
@@ -168,10 +172,10 @@ class ProcessOutputWatcherTest : public testing::Test {
break;
}
- // Send stop signal. It is not important which string we send.
- EXPECT_TRUE(base::WriteFileDescriptor(stop_pipe[1], "q", 1));
+ output_watch_thread_->task_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&StopProcessOutputWatcher, base::Passed(&crosh_watcher)));
- EXPECT_NE(-1, IGNORE_EINTR(close(stop_pipe[1])));
EXPECT_NE(-1, IGNORE_EINTR(close(pt_pipe[1])));
}
diff --git a/chromeos/process_proxy/process_proxy.cc b/chromeos/process_proxy/process_proxy.cc
index 4907d8b..8ec9ceb 100644
--- a/chromeos/process_proxy/process_proxy.cc
+++ b/chromeos/process_proxy/process_proxy.cc
@@ -17,16 +17,10 @@
#include "base/process/process.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
-#include "base/threading/thread.h"
#include "third_party/cros_system_api/switches/chrome_switches.h"
namespace {
-enum PipeEnd {
- PIPE_END_READ,
- PIPE_END_WRITE
-};
-
enum PseudoTerminalFd {
PT_MASTER_FD,
PT_SLAVE_FD
@@ -34,16 +28,18 @@ enum PseudoTerminalFd {
const int kInvalidFd = -1;
+void StopOutputWatcher(scoped_ptr<chromeos::ProcessOutputWatcher> watcher) {
+ // Just deleting |watcher| if sufficient to stop it.
+}
+
} // namespace
namespace chromeos {
-ProcessProxy::ProcessProxy(): process_launched_(false),
- callback_set_(false),
- watcher_started_(false) {
+ProcessProxy::ProcessProxy() : process_launched_(false), callback_set_(false) {
// Set pipes to initial, invalid value so we can easily know if a pipe was
// opened by us.
- ClearAllFdPairs();
+ ClearFdPair(pt_pair_);
}
bool ProcessProxy::Open(const std::string& command, pid_t* pid) {
@@ -67,45 +63,33 @@ bool ProcessProxy::Open(const std::string& command, pid_t* pid) {
return process_launched_;
}
-bool ProcessProxy::StartWatchingOnThread(
- base::Thread* watch_thread,
+bool ProcessProxy::StartWatchingOutput(
+ const scoped_refptr<base::SingleThreadTaskRunner>& watcher_runner,
const ProcessOutputCallback& callback) {
DCHECK(process_launched_);
- if (watcher_started_)
- return false;
- if (pipe(shutdown_pipe_) ||
- !ProcessOutputWatcher::VerifyFileDescriptor(
- shutdown_pipe_[PIPE_END_READ])) {
- return false;
- }
+ CHECK(!output_watcher_.get());
// We give ProcessOutputWatcher a copy of master to make life easier during
// tear down.
// TODO(tbarzic): improve fd managment.
int master_copy = HANDLE_EINTR(dup(pt_pair_[PT_MASTER_FD]));
- if (!ProcessOutputWatcher::VerifyFileDescriptor(master_copy))
+ if (master_copy < 0)
return false;
callback_set_ = true;
callback_ = callback;
callback_runner_ = base::ThreadTaskRunnerHandle::Get();
+ watcher_runner_ = watcher_runner;
// This object will delete itself once watching is stopped.
// It also takes ownership of the passed fds.
- ProcessOutputWatcher* output_watcher =
- new ProcessOutputWatcher(master_copy,
- shutdown_pipe_[PIPE_END_READ],
- base::Bind(&ProcessProxy::OnProcessOutput,
- this));
-
- // Output watcher took ownership of the read end of shutdown pipe.
- shutdown_pipe_[PIPE_END_READ] = -1;
+ output_watcher_.reset(new ProcessOutputWatcher(
+ master_copy, base::Bind(&ProcessProxy::OnProcessOutput, this)));
- // |watch| thread is blocked by |output_watcher| from now on.
- watch_thread->task_runner()->PostTask(
+ watcher_runner_->PostTask(
FROM_HERE, base::Bind(&ProcessOutputWatcher::Start,
- base::Unretained(output_watcher)));
- watcher_started_ = true;
+ base::Unretained(output_watcher_.get())));
+
return true;
}
@@ -131,14 +115,13 @@ void ProcessProxy::CallOnProcessOutputCallback(ProcessOutputType type,
callback_.Run(type, output);
}
-bool ProcessProxy::StopWatching() {
- if (!watcher_started_)
- return true;
- // Signal Watcher that we are done. We use self-pipe trick to unblock watcher.
- // Anything may be written to the pipe.
- const char message[] = "q";
- return base::WriteFileDescriptor(shutdown_pipe_[PIPE_END_WRITE],
- message, sizeof(message));
+void ProcessProxy::StopWatching() {
+ if (!output_watcher_.get())
+ return;
+
+ watcher_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&StopOutputWatcher, base::Passed(&output_watcher_)));
}
void ProcessProxy::Close() {
@@ -153,10 +136,8 @@ void ProcessProxy::Close() {
base::Process process = base::Process::DeprecatedGetProcessFromHandle(pid_);
process.Terminate(0, true /* wait */);
- // TODO(tbarzic): What if this fails?
StopWatching();
-
- CloseAllFdPairs();
+ CloseFdPair(pt_pair_);
}
bool ProcessProxy::Write(const std::string& text) {
@@ -183,13 +164,7 @@ bool ProcessProxy::OnTerminalResize(int width, int height) {
}
ProcessProxy::~ProcessProxy() {
- // In case watcher did not started, we may get deleted without calling Close.
- // In that case we have to clean up created pipes. If watcher had been
- // started, there will be a callback with our reference owned by
- // process_output_watcher until Close is called, so we know Close has been
- // called by now (and pipes have been cleaned).
- if (!watcher_started_)
- CloseAllFdPairs();
+ Close();
}
bool ProcessProxy::CreatePseudoTerminalPair(int *pt_pair) {
@@ -246,14 +221,9 @@ bool ProcessProxy::LaunchProcess(const std::string& command, int slave_fd,
return process.IsValid();
}
-void ProcessProxy::CloseAllFdPairs() {
- CloseFdPair(pt_pair_);
- CloseFdPair(shutdown_pipe_);
-}
-
void ProcessProxy::CloseFdPair(int* pipe) {
- CloseFd(&(pipe[PIPE_END_READ]));
- CloseFd(&(pipe[PIPE_END_WRITE]));
+ CloseFd(&(pipe[PT_MASTER_FD]));
+ CloseFd(&(pipe[PT_SLAVE_FD]));
}
void ProcessProxy::CloseFd(int* fd) {
@@ -264,14 +234,9 @@ void ProcessProxy::CloseFd(int* fd) {
*fd = kInvalidFd;
}
-void ProcessProxy::ClearAllFdPairs() {
- ClearFdPair(pt_pair_);
- ClearFdPair(shutdown_pipe_);
-}
-
void ProcessProxy::ClearFdPair(int* pipe) {
- pipe[PIPE_END_READ] = kInvalidFd;
- pipe[PIPE_END_WRITE] = kInvalidFd;
+ pipe[PT_MASTER_FD] = kInvalidFd;
+ pipe[PT_SLAVE_FD] = kInvalidFd;
}
} // namespace chromeos
diff --git a/chromeos/process_proxy/process_proxy.h b/chromeos/process_proxy/process_proxy.h
index 65f3d34..d49ba80 100644
--- a/chromeos/process_proxy/process_proxy.h
+++ b/chromeos/process_proxy/process_proxy.h
@@ -15,11 +15,15 @@
#include "chromeos/process_proxy/process_output_watcher.h"
namespace base {
+class SingleThreadTaskRunner;
class TaskRunner;
-class Thread;
} // namespace base
namespace chromeos {
+class ProcessOutputWatcher;
+} // namespace chromeos
+
+namespace chromeos {
// Proxy to a single ChromeOS process.
// This is refcounted. Note that output watcher, when it gets triggered owns a
@@ -32,11 +36,9 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> {
// Opens a process using command |command|. |pid| is set to new process' pid.
bool Open(const std::string& command, pid_t* pid);
- // Triggers watcher object on |watch_thread|. |watch_thread| gets blocked, so
- // it should not be one of commonly used threads. It should be thread created
- // specifically for running process output watcher.
- bool StartWatchingOnThread(base::Thread* watch_thread,
- const ProcessOutputCallback& callback);
+ bool StartWatchingOutput(
+ const scoped_refptr<base::SingleThreadTaskRunner>& watcher_runner,
+ const ProcessOutputCallback& callback);
// Sends some data to the process.
bool Write(const std::string& text);
@@ -67,15 +69,12 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> {
void CallOnProcessOutputCallback(ProcessOutputType type,
const std::string& output);
- bool StopWatching();
+ void StopWatching();
- // Methods for cleaning up pipes.
- void CloseAllFdPairs();
// Expects array of 2 file descripters.
void CloseFdPair(int* pipe);
// Expects pointer to single file descriptor.
void CloseFd(int* fd);
- void ClearAllFdPairs();
// Expects array of 2 file descripters.
void ClearFdPair(int* pipe);
@@ -85,11 +84,11 @@ class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> {
bool callback_set_;
ProcessOutputCallback callback_;
scoped_refptr<base::TaskRunner> callback_runner_;
+ scoped_refptr<base::SingleThreadTaskRunner> watcher_runner_;
- bool watcher_started_;
+ scoped_ptr<ProcessOutputWatcher> output_watcher_;
int pt_pair_[2];
- int shutdown_pipe_[2];
DISALLOW_COPY_AND_ASSIGN(ProcessProxy);
};
diff --git a/chromeos/process_proxy/process_proxy_registry.cc b/chromeos/process_proxy/process_proxy_registry.cc
index aa2e9d8..3619720 100644
--- a/chromeos/process_proxy/process_proxy_registry.cc
+++ b/chromeos/process_proxy/process_proxy_registry.cc
@@ -13,15 +13,12 @@ namespace {
const char kWatcherThreadName[] = "ProcessWatcherThread";
const char kStdoutOutputType[] = "stdout";
-const char kStderrOutputType[] = "stderr";
const char kExitOutputType[] = "exit";
const char* ProcessOutputTypeToString(ProcessOutputType type) {
switch (type) {
case PROCESS_OUTPUT_TYPE_OUT:
return kStdoutOutputType;
- case PROCESS_OUTPUT_TYPE_ERR:
- return kStderrOutputType;
case PROCESS_OUTPUT_TYPE_EXIT:
return kExitOutputType;
default:
@@ -40,7 +37,7 @@ ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo() {
ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo(
const ProcessProxyInfo& other) {
// This should be called with empty info only.
- DCHECK(!other.proxy.get() && !other.watcher_thread.get());
+ DCHECK(!other.proxy.get());
}
ProcessProxyRegistry::ProcessProxyInfo::~ProcessProxyInfo() {
@@ -54,9 +51,18 @@ ProcessProxyRegistry::~ProcessProxyRegistry() {
// on a different thread (it's a LazyInstance).
DetachFromThread();
+ ShutDown();
+}
+
+void ProcessProxyRegistry::ShutDown() {
// Close all proxies we own.
while (!proxy_map_.empty())
CloseProcess(proxy_map_.begin()->first);
+
+ if (watcher_thread_) {
+ watcher_thread_->Stop();
+ watcher_thread_.reset();
+ }
}
// static
@@ -70,13 +76,8 @@ bool ProcessProxyRegistry::OpenProcess(
const ProcessOutputCallbackWithPid& callback) {
DCHECK(CalledOnValidThread());
- // TODO(tbarzic): Instead of creating a new thread for each new process proxy,
- // use one thread for all processes.
- // We will need new thread for proxy's outpu watcher.
- scoped_ptr<base::Thread> watcher_thread(new base::Thread(kWatcherThreadName));
- if (!watcher_thread->Start()) {
+ if (!EnsureWatcherThreadStarted())
return false;
- }
// Create and open new proxy.
scoped_refptr<ProcessProxy> proxy(new ProcessProxy());
@@ -85,12 +86,12 @@ bool ProcessProxyRegistry::OpenProcess(
// Kick off watcher.
// We can use Unretained because proxy will stop calling callback after it is
- // closed, which is done befire this object goes away.
- if (!proxy->StartWatchingOnThread(watcher_thread.get(),
- base::Bind(&ProcessProxyRegistry::OnProcessOutput,
- base::Unretained(this), *pid))) {
+ // closed, which is done before this object goes away.
+ if (!proxy->StartWatchingOutput(
+ watcher_thread_->task_runner(),
+ base::Bind(&ProcessProxyRegistry::OnProcessOutput,
+ base::Unretained(this), *pid))) {
proxy->Close();
- watcher_thread->Stop();
return false;
}
@@ -100,7 +101,6 @@ bool ProcessProxyRegistry::OpenProcess(
// created because we don't know |pid| then.
ProcessProxyInfo& info = proxy_map_[*pid];
info.proxy.swap(proxy);
- info.watcher_thread.reset(watcher_thread.release());
info.process_id = *pid;
info.callback = callback;
return true;
@@ -123,7 +123,6 @@ bool ProcessProxyRegistry::CloseProcess(pid_t pid) {
return false;
it->second.proxy->Close();
- it->second.watcher_thread->Stop();
proxy_map_.erase(it);
return true;
}
@@ -156,4 +155,16 @@ void ProcessProxyRegistry::OnProcessOutput(pid_t pid,
CloseProcess(pid);
}
+bool ProcessProxyRegistry::EnsureWatcherThreadStarted() {
+ if (watcher_thread_.get())
+ return true;
+
+ // TODO(tbarzic): Change process output watcher to watch for fd readability on
+ // FILE thread, and move output reading to worker thread instead of
+ // spinning a new thread.
+ watcher_thread_.reset(new base::Thread(kWatcherThreadName));
+ return watcher_thread_->StartWithOptions(
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
+}
+
} // namespace chromeos
diff --git a/chromeos/process_proxy/process_proxy_registry.h b/chromeos/process_proxy/process_proxy_registry.h
index 4280ebd..554af9e 100644
--- a/chromeos/process_proxy/process_proxy_registry.h
+++ b/chromeos/process_proxy/process_proxy_registry.h
@@ -6,6 +6,7 @@
#define CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_REGISTRY_H_
#include <map>
+#include <string>
#include "base/callback.h"
#include "base/lazy_instance.h"
@@ -28,7 +29,6 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe {
// Info we need about a ProcessProxy instance.
struct ProcessProxyInfo {
scoped_refptr<ProcessProxy> proxy;
- scoped_ptr<base::Thread> watcher_thread;
ProcessOutputCallbackWithPid callback;
pid_t process_id;
@@ -50,6 +50,9 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe {
// Reports terminal resize to process proxy.
bool OnTerminalResize(pid_t pid, int width, int height);
+ // Shuts down registry, closing all associated processed.
+ void ShutDown();
+
// Currently used for testing.
void SetOutputCallback(const ProcessOutputCallback& callback);
@@ -64,9 +67,13 @@ class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe {
ProcessOutputType type,
const std::string& data);
+ bool EnsureWatcherThreadStarted();
+
// Map of all existing ProcessProxies.
std::map<pid_t, ProcessProxyInfo> proxy_map_;
+ scoped_ptr<base::Thread> watcher_thread_;
+
DISALLOW_COPY_AND_ASSIGN(ProcessProxyRegistry);
};
diff --git a/chromeos/process_proxy/process_proxy_unittest.cc b/chromeos/process_proxy/process_proxy_unittest.cc
index 20221c2..b7b7c7f 100644
--- a/chromeos/process_proxy/process_proxy_unittest.cc
+++ b/chromeos/process_proxy/process_proxy_unittest.cc
@@ -208,6 +208,8 @@ class ProcessProxyTest : public testing::Test {
process.Terminate(0, true);
}
+ registry_->ShutDown();
+
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::MessageLoop::QuitClosure());
}