diff options
author | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-12 20:22:48 +0000 |
---|---|---|
committer | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-12 20:22:48 +0000 |
commit | 3a01162bc4f385c6353a603662721eb95530b526 (patch) | |
tree | 8a46fdd70745d81c24288705f2e1539b64047a5a /chromeos/process_proxy | |
parent | b0cc590aacd6c9908d154970eabe41cb370c2cbe (diff) | |
download | chromium_src-3a01162bc4f385c6353a603662721eb95530b526.zip chromium_src-3a01162bc4f385c6353a603662721eb95530b526.tar.gz chromium_src-3a01162bc4f385c6353a603662721eb95530b526.tar.bz2 |
Move chrome/browser/chromeos/process_proxy to chromeos
BUG=180711
Review URL: https://codereview.chromium.org/12433023
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@187641 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chromeos/process_proxy')
-rw-r--r-- | chromeos/process_proxy/OWNERS | 1 | ||||
-rw-r--r-- | chromeos/process_proxy/process_output_watcher.cc | 120 | ||||
-rw-r--r-- | chromeos/process_proxy/process_output_watcher.h | 73 | ||||
-rw-r--r-- | chromeos/process_proxy/process_output_watcher_unittest.cc | 171 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy.cc | 265 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy.h | 99 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_registry.cc | 159 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_registry.h | 75 | ||||
-rw-r--r-- | chromeos/process_proxy/process_proxy_unittest.cc | 257 |
9 files changed, 1220 insertions, 0 deletions
diff --git a/chromeos/process_proxy/OWNERS b/chromeos/process_proxy/OWNERS new file mode 100644 index 0000000..d063efc --- /dev/null +++ b/chromeos/process_proxy/OWNERS @@ -0,0 +1 @@ +tbarzic@chromium.org diff --git a/chromeos/process_proxy/process_output_watcher.cc b/chromeos/process_proxy/process_output_watcher.cc new file mode 100644 index 0000000..40ed8cf --- /dev/null +++ b/chromeos/process_proxy/process_output_watcher.cc @@ -0,0 +1,120 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chromeos/process_proxy/process_output_watcher.h" + +#include <algorithm> +#include <cstdio> +#include <cstring> + +#include <sys/ioctl.h> +#include <sys/select.h> +#include <unistd.h> + +#include "base/logging.h" +#include "base/posix/eintr_wrapper.h" + +namespace { + +void InitReadFdSet(int out_fd, int stop_fd, fd_set* set) { + FD_ZERO(set); + if (out_fd != -1) + FD_SET(out_fd, set); + FD_SET(stop_fd, set); +} + +void CloseFd(int* fd) { + if (*fd >= 0) { + if (HANDLE_EINTR(close(*fd)) != 0) + DPLOG(WARNING) << "close fd " << *fd << " failed."; + } + *fd = -1; +} + +} // namespace + +namespace chromeos { + +ProcessOutputWatcher::ProcessOutputWatcher(int out_fd, int stop_fd, + const ProcessOutputCallback& callback) + : out_fd_(out_fd), + stop_fd_(stop_fd), + on_read_callback_(callback) { + VerifyFileDescriptor(out_fd_); + VerifyFileDescriptor(stop_fd_); + max_fd_ = std::max(out_fd_, stop_fd_); + // We want to be sure we will be able to add 0 at the end of the input, so -1. + read_buffer_size_ = arraysize(read_buffer_) - 1; +} + +void ProcessOutputWatcher::Start() { + WatchProcessOutput(); + OnStop(); +} + +ProcessOutputWatcher::~ProcessOutputWatcher() { + CloseFd(&out_fd_); + CloseFd(&stop_fd_); +} + +void ProcessOutputWatcher::WatchProcessOutput() { + while (true) { + // This has to be reset with every watch cycle. + fd_set rfds; + DCHECK(stop_fd_ >= 0); + InitReadFdSet(out_fd_, stop_fd_, &rfds); + + int select_result = + HANDLE_EINTR(select(max_fd_ + 1, &rfds, NULL, NULL, NULL)); + + if (select_result < 0) { + DPLOG(WARNING) << "select failed"; + return; + } + + // Check if we were stopped. + if (FD_ISSET(stop_fd_, &rfds)) { + return; + } + + if (out_fd_ != -1 && FD_ISSET(out_fd_, &rfds)) { + ReadFromFd(PROCESS_OUTPUT_TYPE_OUT, &out_fd_); + } + } +} + +void ProcessOutputWatcher::VerifyFileDescriptor(int fd) { + CHECK_LE(0, fd); + CHECK_GT(FD_SETSIZE, fd); +} + +void ProcessOutputWatcher::ReadFromFd(ProcessOutputType type, int* fd) { + // We don't want to necessary read pipe until it is empty so we don't starve + // other streams in case data is written faster than we read it. If there is + // more than read_buffer_size_ bytes in pipe, it will be read in the next + // iteration. + ssize_t bytes_read = HANDLE_EINTR(read(*fd, read_buffer_, read_buffer_size_)); + if (bytes_read < 0) + DPLOG(WARNING) << "read from buffer failed"; + + if (bytes_read > 0) { + on_read_callback_.Run(type, std::string(read_buffer_, bytes_read)); + } + + // If there is nothing on the output the watched process has exited (slave end + // of pty is closed). + if (bytes_read <= 0) { + // Slave pseudo terminal has been closed, we won't need master fd anymore. + CloseFd(fd); + + // We have lost contact with the process, so report it. + on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, ""); + } +} + +void ProcessOutputWatcher::OnStop() { + delete this; +} + +} // namespace chromeos diff --git a/chromeos/process_proxy/process_output_watcher.h b/chromeos/process_proxy/process_output_watcher.h new file mode 100644 index 0000000..2a58f8a --- /dev/null +++ b/chromeos/process_proxy/process_output_watcher.h @@ -0,0 +1,73 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROMEOS_PROCESS_PROXY_PROCESS_OUTPUT_WATCHER_H_ +#define CHROMEOS_PROCESS_PROXY_PROCESS_OUTPUT_WATCHER_H_ + +#include <string> + +#include "base/callback.h" +#include "chromeos/chromeos_export.h" + +namespace { + +const int kReadBufferSize = 256; + +} // namespace + +namespace chromeos { + +enum ProcessOutputType { + PROCESS_OUTPUT_TYPE_OUT, + PROCESS_OUTPUT_TYPE_ERR, + PROCESS_OUTPUT_TYPE_EXIT +}; + +typedef base::Callback<void(ProcessOutputType, const std::string&)> + ProcessOutputCallback; + +// This class should live on its own thread because running class makes +// underlying thread block. It deletes itself when watching is stopped. +class CHROMEOS_EXPORT ProcessOutputWatcher { + public: + ProcessOutputWatcher(int out_fd, int stop_fd, + const ProcessOutputCallback& callback); + + // This will block current thread!!!! + void Start(); + + private: + // The object will destroy itself when it stops watching process output. + ~ProcessOutputWatcher(); + + // Listens to output from supplied fds. It guarantees data written to one fd + // will be reported in order that it has been written (this is not true across + // fds, it would be nicer if it was). + void WatchProcessOutput(); + + // Verifies that fds that we got are properly set. + void VerifyFileDescriptor(int fd); + + // Reads data from fd, and when it's done, invokes callback function. + void ReadFromFd(ProcessOutputType type, int* fd); + + // It will just delete this. + void OnStop(); + + char read_buffer_[kReadBufferSize]; + ssize_t read_buffer_size_; + + int out_fd_; + int stop_fd_; + int max_fd_; + + // Callback that will be invoked when some output is detected. + ProcessOutputCallback on_read_callback_; + + DISALLOW_COPY_AND_ASSIGN(ProcessOutputWatcher); +}; + +} // namespace chromeos + +#endif // CHROMEOS_PROCESS_PROXY_PROCESS_OUTPUT_WATCHER_H_ diff --git a/chromeos/process_proxy/process_output_watcher_unittest.cc b/chromeos/process_proxy/process_output_watcher_unittest.cc new file mode 100644 index 0000000..eb0a969 --- /dev/null +++ b/chromeos/process_proxy/process_output_watcher_unittest.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <gtest/gtest.h> + +#include <queue> +#include <string> +#include <vector> + +#include <sys/wait.h> + +#include "base/bind.h" +#include "base/file_util.h" +#include "base/posix/eintr_wrapper.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/thread.h" +#include "chromeos/process_proxy/process_output_watcher.h" + +namespace chromeos { + +struct TestCase { + std::string str; + bool should_send_terminating_null; + + TestCase(const std::string& expected_string, + bool send_terminating_null) + : str(expected_string), + should_send_terminating_null(send_terminating_null) { + } +}; + +class ProcessWatcherExpectations { + public: + ProcessWatcherExpectations() {} + + void Init(const std::vector<TestCase>& expectations) { + received_from_out_ = 0; + + for (size_t i = 0; i < expectations.size(); i++) { + out_expectations_.append(expectations[i].str); + if (expectations[i].should_send_terminating_null) + out_expectations_.append(std::string("", 1)); + } + } + + bool CheckExpectations(const std::string& data, ProcessOutputType type) { + EXPECT_EQ(PROCESS_OUTPUT_TYPE_OUT, type); + if (!type == PROCESS_OUTPUT_TYPE_OUT) + return false; + + EXPECT_LT(received_from_out_, out_expectations_.length()); + if (received_from_out_ >= out_expectations_.length()) + return false; + + EXPECT_EQ(received_from_out_, + out_expectations_.find(data, received_from_out_)); + + received_from_out_ += data.length(); + return true; + } + + bool IsDone() { + return received_from_out_ >= out_expectations_.length(); + } + + private: + std::string out_expectations_; + size_t received_from_out_; +}; + +class ProcessOutputWatcherTest : public testing::Test { +public: + void StartWatch(int pt, int stop, + const std::vector<TestCase>& expectations) { + expectations_.Init(expectations); + + // This will delete itself. + ProcessOutputWatcher* crosh_watcher = new ProcessOutputWatcher(pt, stop, + base::Bind(&ProcessOutputWatcherTest::OnRead, base::Unretained(this))); + crosh_watcher->Start(); + } + + void OnRead(ProcessOutputType type, const std::string& output) { + bool success = expectations_.CheckExpectations(output, type); + if (!success || expectations_.IsDone()) + all_data_received_->Signal(); + } + + protected: + std::string VeryLongString() { + std::string result = "0123456789"; + for (int i = 0; i < 8; i++) + result = result.append(result); + return result; + } + + void RunTest(const std::vector<TestCase>& test_cases) { + all_data_received_.reset(new base::WaitableEvent(true, false)); + + base::Thread output_watch_thread("ProcessOutpuWatchThread"); + ASSERT_TRUE(output_watch_thread.Start()); + + int pt_pipe[2], stop_pipe[2]; + ASSERT_FALSE(HANDLE_EINTR(pipe(pt_pipe))); + ASSERT_FALSE(HANDLE_EINTR(pipe(stop_pipe))); + + output_watch_thread.message_loop()->PostTask(FROM_HERE, + base::Bind(&ProcessOutputWatcherTest::StartWatch, + base::Unretained(this), + pt_pipe[0], stop_pipe[0], test_cases)); + + for (size_t i = 0; i < test_cases.size(); i++) { + const std::string& test_str = test_cases[i].str; + // Let's make inputs not NULL terminated, unless other is specified in + // the test case. + ssize_t test_size = test_str.length() * sizeof(*test_str.c_str()); + if (test_cases[i].should_send_terminating_null) + test_size += sizeof(*test_str.c_str()); + EXPECT_EQ(test_size, + file_util::WriteFileDescriptor(pt_pipe[1], test_str.c_str(), + test_size)); + } + + all_data_received_->Wait(); + + // Send stop signal. It is not important which string we send. + EXPECT_EQ(1, file_util::WriteFileDescriptor(stop_pipe[1], "q", 1)); + + EXPECT_NE(-1, HANDLE_EINTR(close(stop_pipe[1]))); + EXPECT_NE(-1, HANDLE_EINTR(close(pt_pipe[1]))); + + output_watch_thread.Stop(); + } + + scoped_ptr<base::WaitableEvent> all_data_received_; + + private: + ProcessWatcherExpectations expectations_; + std::vector<TestCase> exp; +}; + + +TEST_F(ProcessOutputWatcherTest, OutputWatcher) { + std::vector<TestCase> test_cases; + test_cases.push_back(TestCase("testing output\n", false)); + test_cases.push_back(TestCase("testing error\n", false)); + test_cases.push_back(TestCase("testing error1\n", false)); + test_cases.push_back(TestCase("testing output1\n", false)); + test_cases.push_back(TestCase("testing output2\n", false)); + test_cases.push_back(TestCase("testing output3\n", false)); + test_cases.push_back(TestCase(VeryLongString(), false)); + test_cases.push_back(TestCase("testing error2\n", false)); + + RunTest(test_cases); +}; + +// Verifies that sending '\0' generates PROCESS_OUTPUT_TYPE_OUT event and does +// not terminate output watcher. +TEST_F(ProcessOutputWatcherTest, SendNull) { + std::vector<TestCase> test_cases; + // This will send '\0' to output wathcer. + test_cases.push_back(TestCase("", true)); + // Let's verify that next input also gets detected (i.e. output watcher does + // not exit after seeing '\0' from previous test case). + test_cases.push_back(TestCase("a", true)); + + RunTest(test_cases); +}; + +} // namespace chromeos diff --git a/chromeos/process_proxy/process_proxy.cc b/chromeos/process_proxy/process_proxy.cc new file mode 100644 index 0000000..3f67671 --- /dev/null +++ b/chromeos/process_proxy/process_proxy.cc @@ -0,0 +1,265 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chromeos/process_proxy/process_proxy.h" + +#include <fcntl.h> +#include <stdlib.h> +#include <sys/ioctl.h> + +#include "base/bind.h" +#include "base/command_line.h" +#include "base/file_util.h" +#include "base/posix/eintr_wrapper.h" +#include "base/process_util.h" +#include "base/logging.h" +#include "base/threading/thread.h" +#include "chromeos/process_proxy/process_output_watcher.h" + +namespace { + +enum PipeEnd { + PIPE_END_READ, + PIPE_END_WRITE +}; + +enum PseudoTerminalFd { + PT_MASTER_FD, + PT_SLAVE_FD +}; + +const int kInvalidFd = -1; + +} // namespace + +namespace chromeos { + +ProcessProxy::ProcessProxy(): process_launched_(false), + callback_set_(false), + watcher_started_(false) { + // Set pipes to initial, invalid value so we can easily know if a pipe was + // opened by us. + ClearAllFdPairs(); +}; + +bool ProcessProxy::Open(const std::string& command, pid_t* pid) { + if (process_launched_) + return false; + + if (!CreatePseudoTerminalPair(pt_pair_)) { + return false; + } + + process_launched_ = LaunchProcess(command, pt_pair_[PT_SLAVE_FD], &pid_); + + if (process_launched_) { + // We won't need these anymore. These will be used by the launched process. + CloseFd(&pt_pair_[PT_SLAVE_FD]); + *pid = pid_; + LOG(WARNING) << "Process launched: " << pid_; + } else { + CloseFdPair(pt_pair_); + } + return process_launched_; +} + +bool ProcessProxy::StartWatchingOnThread( + base::Thread* watch_thread, + const ProcessOutputCallback& callback) { + DCHECK(process_launched_); + if (watcher_started_) + return false; + if (pipe(shutdown_pipe_)) + return false; + + // We give ProcessOutputWatcher a copy of master to make life easier during + // tear down. + // TODO(tbarzic): improve fd managment. + int master_copy = HANDLE_EINTR(dup(pt_pair_[PT_MASTER_FD])); + if (master_copy == -1) + return false; + + callback_set_ = true; + callback_ = callback; + callback_runner_ = base::MessageLoopProxy::current(); + + // This object will delete itself once watching is stopped. + // It also takes ownership of the passed fds. + ProcessOutputWatcher* output_watcher = + new ProcessOutputWatcher(master_copy, + shutdown_pipe_[PIPE_END_READ], + base::Bind(&ProcessProxy::OnProcessOutput, + this)); + + // Output watcher took ownership of the read end of shutdown pipe. + shutdown_pipe_[PIPE_END_READ] = -1; + + // |watch| thread is blocked by |output_watcher| from now on. + watch_thread->message_loop()->PostTask(FROM_HERE, + base::Bind(&ProcessOutputWatcher::Start, + base::Unretained(output_watcher))); + watcher_started_ = true; + return true; +} + +void ProcessProxy::OnProcessOutput(ProcessOutputType type, + const std::string& output) { + if (!callback_runner_) + return; + + callback_runner_->PostTask( + FROM_HERE, + base::Bind(&ProcessProxy::CallOnProcessOutputCallback, + this, type, output)); +} + +void ProcessProxy::CallOnProcessOutputCallback(ProcessOutputType type, + const std::string& output) { + // We may receive some output even after Close was called (crosh process does + // not have to quit instantly, or there may be some trailing data left in + // output stream fds). In that case owner of the callback may be gone so we + // don't want to send it anything. |callback_set_| is reset when this gets + // closed. + if (callback_set_) + callback_.Run(type, output); +} + +bool ProcessProxy::StopWatching() { + if (!watcher_started_) + return true; + // Signal Watcher that we are done. We use self-pipe trick to unblock watcher. + // Anything may be written to the pipe. + const char message[] = "q"; + return file_util::WriteFileDescriptor(shutdown_pipe_[PIPE_END_WRITE], + message, sizeof(message)); +} + +void ProcessProxy::Close() { + if (!process_launched_) + return; + + process_launched_ = false; + callback_set_ = false; + callback_ = ProcessOutputCallback(); + callback_runner_ = NULL; + + base::KillProcess(pid_, 0, true /* wait */); + + // TODO(tbarzic): What if this fails? + StopWatching(); + + CloseAllFdPairs(); +} + +bool ProcessProxy::Write(const std::string& text) { + if (!process_launched_) + return false; + + // We don't want to write '\0' to the pipe. + size_t data_size = text.length() * sizeof(*text.c_str()); + int bytes_written = + file_util::WriteFileDescriptor(pt_pair_[PT_MASTER_FD], + text.c_str(), data_size); + return (bytes_written == static_cast<int>(data_size)); +} + +bool ProcessProxy::OnTerminalResize(int width, int height) { + if (width < 0 || height < 0) + return false; + + winsize ws; + // Number of rows. + ws.ws_row = height; + // Number of columns. + ws.ws_col = width; + + return (HANDLE_EINTR(ioctl(pt_pair_[PT_MASTER_FD], TIOCSWINSZ, &ws)) != -1); +} + +ProcessProxy::~ProcessProxy() { + // In case watcher did not started, we may get deleted without calling Close. + // In that case we have to clean up created pipes. If watcher had been + // started, there will be a callback with our reference owned by + // process_output_watcher until Close is called, so we know Close has been + // called by now (and pipes have been cleaned). + if (!watcher_started_) + CloseAllFdPairs(); +} + +bool ProcessProxy::CreatePseudoTerminalPair(int *pt_pair) { + ClearFdPair(pt_pair); + + // Open Master. + pt_pair[PT_MASTER_FD] = HANDLE_EINTR(posix_openpt(O_RDWR | O_NOCTTY)); + if (pt_pair[PT_MASTER_FD] == -1) + return false; + + if (grantpt(pt_pair_[PT_MASTER_FD]) != 0 || + unlockpt(pt_pair_[PT_MASTER_FD]) != 0) { + CloseFd(&pt_pair[PT_MASTER_FD]); + return false; + } + char* slave_name = NULL; + // Per man page, slave_name must not be freed. + slave_name = ptsname(pt_pair_[PT_MASTER_FD]); + if (slave_name) + pt_pair_[PT_SLAVE_FD] = HANDLE_EINTR(open(slave_name, O_RDWR | O_NOCTTY)); + + if (pt_pair_[PT_SLAVE_FD] == -1) { + CloseFdPair(pt_pair); + return false; + } + + return true; +} + +bool ProcessProxy::LaunchProcess(const std::string& command, int slave_fd, + pid_t* pid) { + // Redirect crosh process' output and input so we can read it. + base::FileHandleMappingVector fds_mapping; + fds_mapping.push_back(std::make_pair(slave_fd, STDIN_FILENO)); + fds_mapping.push_back(std::make_pair(slave_fd, STDOUT_FILENO)); + fds_mapping.push_back(std::make_pair(slave_fd, STDERR_FILENO)); + base::LaunchOptions options; + options.fds_to_remap = &fds_mapping; + options.ctrl_terminal_fd = slave_fd; + + base::EnvironmentVector environ; + environ.push_back(std::make_pair("TERM", "xterm")); + options.environ = &environ; + + // Launch the process. + return base::LaunchProcess(CommandLine(base::FilePath(command)), options, + pid); +} + +void ProcessProxy::CloseAllFdPairs() { + CloseFdPair(pt_pair_); + CloseFdPair(shutdown_pipe_); +} + +void ProcessProxy::CloseFdPair(int* pipe) { + CloseFd(&(pipe[PIPE_END_READ])); + CloseFd(&(pipe[PIPE_END_WRITE])); +} + +void ProcessProxy::CloseFd(int* fd) { + if (*fd != kInvalidFd) { + if (HANDLE_EINTR(close(*fd)) != 0) + DPLOG(WARNING) << "close fd failed."; + } + *fd = kInvalidFd; +} + +void ProcessProxy::ClearAllFdPairs() { + ClearFdPair(pt_pair_); + ClearFdPair(shutdown_pipe_); +} + +void ProcessProxy::ClearFdPair(int* pipe) { + pipe[PIPE_END_READ] = kInvalidFd; + pipe[PIPE_END_WRITE] = kInvalidFd; +} + +} // namespace chromeos diff --git a/chromeos/process_proxy/process_proxy.h b/chromeos/process_proxy/process_proxy.h new file mode 100644 index 0000000..65f3d34 --- /dev/null +++ b/chromeos/process_proxy/process_proxy.h @@ -0,0 +1,99 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_H_ +#define CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_H_ + +#include <fcntl.h> +#include <signal.h> + +#include <cstdio> +#include <string> + +#include "base/memory/ref_counted.h" +#include "chromeos/process_proxy/process_output_watcher.h" + +namespace base { +class TaskRunner; +class Thread; +} // namespace base + +namespace chromeos { + +// Proxy to a single ChromeOS process. +// This is refcounted. Note that output watcher, when it gets triggered owns a +// a callback with ref to this, so in order for this to be freed, the watcher +// must be destroyed. This is done in Close. +class ProcessProxy : public base::RefCountedThreadSafe<ProcessProxy> { + public: + ProcessProxy(); + + // Opens a process using command |command|. |pid| is set to new process' pid. + bool Open(const std::string& command, pid_t* pid); + + // Triggers watcher object on |watch_thread|. |watch_thread| gets blocked, so + // it should not be one of commonly used threads. It should be thread created + // specifically for running process output watcher. + bool StartWatchingOnThread(base::Thread* watch_thread, + const ProcessOutputCallback& callback); + + // Sends some data to the process. + bool Write(const std::string& text); + + // Closes the process. + // Must be called if we want this to be eventually deleted. + void Close(); + + // Notifies underlaying process of terminal size change. + bool OnTerminalResize(int width, int height); + + private: + friend class base::RefCountedThreadSafe<ProcessProxy>; + // We want this be used as ref counted object only. + ~ProcessProxy(); + + // Create master and slave end of pseudo terminal that will be used to + // communicate with process. + // pt_pair[0] -> master, pt_pair[1] -> slave. + // pt_pair must be allocated (to size at least 2). + bool CreatePseudoTerminalPair(int *pt_pair); + + bool LaunchProcess(const std::string& command, int slave_fd, pid_t* pid); + + // Gets called by output watcher when the process writes something to its + // output streams. + void OnProcessOutput(ProcessOutputType type, const std::string& output); + void CallOnProcessOutputCallback(ProcessOutputType type, + const std::string& output); + + bool StopWatching(); + + // Methods for cleaning up pipes. + void CloseAllFdPairs(); + // Expects array of 2 file descripters. + void CloseFdPair(int* pipe); + // Expects pointer to single file descriptor. + void CloseFd(int* fd); + void ClearAllFdPairs(); + // Expects array of 2 file descripters. + void ClearFdPair(int* pipe); + + bool process_launched_; + pid_t pid_; + + bool callback_set_; + ProcessOutputCallback callback_; + scoped_refptr<base::TaskRunner> callback_runner_; + + bool watcher_started_; + + int pt_pair_[2]; + int shutdown_pipe_[2]; + + DISALLOW_COPY_AND_ASSIGN(ProcessProxy); +}; + +} // namespace chromeos + +#endif // CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_H_ diff --git a/chromeos/process_proxy/process_proxy_registry.cc b/chromeos/process_proxy/process_proxy_registry.cc new file mode 100644 index 0000000..aa2e9d8 --- /dev/null +++ b/chromeos/process_proxy/process_proxy_registry.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chromeos/process_proxy/process_proxy_registry.h" + +#include "base/bind.h" + +namespace chromeos { + +namespace { + +const char kWatcherThreadName[] = "ProcessWatcherThread"; + +const char kStdoutOutputType[] = "stdout"; +const char kStderrOutputType[] = "stderr"; +const char kExitOutputType[] = "exit"; + +const char* ProcessOutputTypeToString(ProcessOutputType type) { + switch (type) { + case PROCESS_OUTPUT_TYPE_OUT: + return kStdoutOutputType; + case PROCESS_OUTPUT_TYPE_ERR: + return kStderrOutputType; + case PROCESS_OUTPUT_TYPE_EXIT: + return kExitOutputType; + default: + return NULL; + } +} + +static base::LazyInstance<ProcessProxyRegistry> g_process_proxy_registry = + LAZY_INSTANCE_INITIALIZER; + +} // namespace + +ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo() { +} + +ProcessProxyRegistry::ProcessProxyInfo::ProcessProxyInfo( + const ProcessProxyInfo& other) { + // This should be called with empty info only. + DCHECK(!other.proxy.get() && !other.watcher_thread.get()); +} + +ProcessProxyRegistry::ProcessProxyInfo::~ProcessProxyInfo() { +} + +ProcessProxyRegistry::ProcessProxyRegistry() { +} + +ProcessProxyRegistry::~ProcessProxyRegistry() { + // TODO(tbarzic): Fix issue with ProcessProxyRegistry being destroyed + // on a different thread (it's a LazyInstance). + DetachFromThread(); + + // Close all proxies we own. + while (!proxy_map_.empty()) + CloseProcess(proxy_map_.begin()->first); +} + +// static +ProcessProxyRegistry* ProcessProxyRegistry::Get() { + return g_process_proxy_registry.Pointer(); +} + +bool ProcessProxyRegistry::OpenProcess( + const std::string& command, + pid_t* pid, + const ProcessOutputCallbackWithPid& callback) { + DCHECK(CalledOnValidThread()); + + // TODO(tbarzic): Instead of creating a new thread for each new process proxy, + // use one thread for all processes. + // We will need new thread for proxy's outpu watcher. + scoped_ptr<base::Thread> watcher_thread(new base::Thread(kWatcherThreadName)); + if (!watcher_thread->Start()) { + return false; + } + + // Create and open new proxy. + scoped_refptr<ProcessProxy> proxy(new ProcessProxy()); + if (!proxy->Open(command, pid)) + return false; + + // Kick off watcher. + // We can use Unretained because proxy will stop calling callback after it is + // closed, which is done befire this object goes away. + if (!proxy->StartWatchingOnThread(watcher_thread.get(), + base::Bind(&ProcessProxyRegistry::OnProcessOutput, + base::Unretained(this), *pid))) { + proxy->Close(); + watcher_thread->Stop(); + return false; + } + + DCHECK(proxy_map_.find(*pid) == proxy_map_.end()); + + // Save info for newly created proxy. We cannot do this before ProcessProxy is + // created because we don't know |pid| then. + ProcessProxyInfo& info = proxy_map_[*pid]; + info.proxy.swap(proxy); + info.watcher_thread.reset(watcher_thread.release()); + info.process_id = *pid; + info.callback = callback; + return true; +} + +bool ProcessProxyRegistry::SendInput(pid_t pid, const std::string& data) { + DCHECK(CalledOnValidThread()); + + std::map<pid_t, ProcessProxyInfo>::iterator it = proxy_map_.find(pid); + if (it == proxy_map_.end()) + return false; + return it->second.proxy->Write(data); +} + +bool ProcessProxyRegistry::CloseProcess(pid_t pid) { + DCHECK(CalledOnValidThread()); + + std::map<pid_t, ProcessProxyInfo>::iterator it = proxy_map_.find(pid); + if (it == proxy_map_.end()) + return false; + + it->second.proxy->Close(); + it->second.watcher_thread->Stop(); + proxy_map_.erase(it); + return true; +} + +bool ProcessProxyRegistry::OnTerminalResize(pid_t pid, int width, int height) { + DCHECK(CalledOnValidThread()); + + std::map<pid_t, ProcessProxyInfo>::iterator it = proxy_map_.find(pid); + if (it == proxy_map_.end()) + return false; + + return it->second.proxy->OnTerminalResize(width, height); +} + +void ProcessProxyRegistry::OnProcessOutput(pid_t pid, + ProcessOutputType type, const std::string& data) { + DCHECK(CalledOnValidThread()); + + const char* type_str = ProcessOutputTypeToString(type); + DCHECK(type_str); + + std::map<pid_t, ProcessProxyInfo>::iterator it = proxy_map_.find(pid); + if (it == proxy_map_.end()) + return; + it->second.callback.Run(pid, std::string(type_str), data); + + // Contact with the slave end of the terminal has been lost. We have to close + // the process. + if (type == PROCESS_OUTPUT_TYPE_EXIT) + CloseProcess(pid); +} + +} // namespace chromeos diff --git a/chromeos/process_proxy/process_proxy_registry.h b/chromeos/process_proxy/process_proxy_registry.h new file mode 100644 index 0000000..4280ebd --- /dev/null +++ b/chromeos/process_proxy/process_proxy_registry.h @@ -0,0 +1,75 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_REGISTRY_H_ +#define CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_REGISTRY_H_ + +#include <map> + +#include "base/callback.h" +#include "base/lazy_instance.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/ref_counted.h" +#include "base/threading/non_thread_safe.h" +#include "base/threading/thread.h" +#include "chromeos/chromeos_export.h" +#include "chromeos/process_proxy/process_proxy.h" + +namespace chromeos { + +typedef base::Callback<void(pid_t, const std::string&, const std::string&)> + ProcessOutputCallbackWithPid; + +// Keeps track of all created ProcessProxies. It is created lazily and should +// live on a single thread (where all methods must be called). +class CHROMEOS_EXPORT ProcessProxyRegistry : public base::NonThreadSafe { + public: + // Info we need about a ProcessProxy instance. + struct ProcessProxyInfo { + scoped_refptr<ProcessProxy> proxy; + scoped_ptr<base::Thread> watcher_thread; + ProcessOutputCallbackWithPid callback; + pid_t process_id; + + ProcessProxyInfo(); + // This is to make map::insert happy, we don't init anything. + ProcessProxyInfo(const ProcessProxyInfo& other); + ~ProcessProxyInfo(); + }; + + static ProcessProxyRegistry* Get(); + + // Starts new ProcessProxy (which starts new process). + bool OpenProcess(const std::string& command, pid_t* pid, + const ProcessOutputCallbackWithPid& callback); + // Sends data to the process with id |pid|. + bool SendInput(pid_t pid, const std::string& data); + // Stops the process with id |pid|. + bool CloseProcess(pid_t pid); + // Reports terminal resize to process proxy. + bool OnTerminalResize(pid_t pid, int width, int height); + + // Currently used for testing. + void SetOutputCallback(const ProcessOutputCallback& callback); + + private: + friend struct ::base::DefaultLazyInstanceTraits<ProcessProxyRegistry>; + + ProcessProxyRegistry(); + ~ProcessProxyRegistry(); + + // Gets called when output gets detected. + void OnProcessOutput(pid_t pid, + ProcessOutputType type, + const std::string& data); + + // Map of all existing ProcessProxies. + std::map<pid_t, ProcessProxyInfo> proxy_map_; + + DISALLOW_COPY_AND_ASSIGN(ProcessProxyRegistry); +}; + +} // namespace chromeos + +#endif // CHROMEOS_PROCESS_PROXY_PROCESS_PROXY_REGISTRY_H_ diff --git a/chromeos/process_proxy/process_proxy_unittest.cc b/chromeos/process_proxy/process_proxy_unittest.cc new file mode 100644 index 0000000..ad0ba4b --- /dev/null +++ b/chromeos/process_proxy/process_proxy_unittest.cc @@ -0,0 +1,257 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <gtest/gtest.h> + +#include <string> +#include <sys/wait.h> + +#include "base/bind.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "base/posix/eintr_wrapper.h" +#include "base/process_util.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/thread.h" +#include "chromeos/process_proxy/process_proxy_registry.h" + +namespace chromeos { + +namespace { + +// The test line must have all distinct characters. +const char kTestLineToSend[] = "abcdefgh\n"; +const char kTestLineExpected[] = "abcdefgh\r\n"; + +const char kCatCommand[] = "cat"; +const char kStdoutType[] = "stdout"; +const int kTestLineNum = 100; + +class TestRunner { + public: + virtual ~TestRunner() {} + virtual void SetupExpectations(pid_t pid) = 0; + virtual void OnSomeRead(pid_t pid, const std::string& type, + const std::string& output) = 0; + virtual void StartRegistryTest(ProcessProxyRegistry* registry) = 0; + + protected: + pid_t pid_; +}; + +class RegistryTestRunner : public TestRunner { + public: + virtual ~RegistryTestRunner() {} + + virtual void SetupExpectations(pid_t pid) OVERRIDE { + pid_ = pid; + left_to_check_index_[0] = 0; + left_to_check_index_[1] = 0; + // We consider that a line processing has started if a value in + // left_to_check__[index] is set to 0, thus -2. + lines_left_ = 2 * kTestLineNum - 2; + expected_line_ = kTestLineExpected; + } + + // Method to test validity of received input. We will receive two streams of + // the same data. (input will be echoed twice by the testing process). Each + // stream will contain the same string repeated |kTestLineNum| times. So we + // have to match 2 * |kTestLineNum| lines. The problem is the received lines + // from different streams may be interleaved (e.g. we may receive + // abc|abcdef|defgh|gh). To deal with that, we allow to test received text + // against two lines. The lines MUST NOT have two same characters for this + // algorithm to work. + virtual void OnSomeRead(pid_t pid, const std::string& type, + const std::string& output) OVERRIDE { + EXPECT_EQ(type, kStdoutType); + EXPECT_EQ(pid_, pid); + + bool valid = true; + for (size_t i = 0; i < output.length(); i++) { + // The character output[i] should be next in at least one of the lines we + // are testing. + valid = (ProcessReceivedCharacter(output[i], 0) || + ProcessReceivedCharacter(output[i], 1)); + EXPECT_TRUE(valid) << "Received: " << output; + } + + if (!valid || TestSucceeded()) { + MessageLoop::current()->PostTask(FROM_HERE, + MessageLoop::QuitClosure()); + } + } + + virtual void StartRegistryTest(ProcessProxyRegistry* registry) OVERRIDE { + for (int i = 0; i < kTestLineNum; i++) { + EXPECT_TRUE(registry->SendInput(pid_, kTestLineToSend)); + } + } + + private: + bool ProcessReceivedCharacter(char received, size_t stream) { + if (stream >= arraysize(left_to_check_index_)) + return false; + bool success = left_to_check_index_[stream] < expected_line_.length() && + expected_line_[left_to_check_index_[stream]] == received; + if (success) + left_to_check_index_[stream]++; + if (left_to_check_index_[stream] == expected_line_.length() && + lines_left_ > 0) { + // Take another line to test for this stream, if there are any lines left. + // If not, this stream is done. + left_to_check_index_[stream] = 0; + lines_left_--; + } + return success; + } + + bool TestSucceeded() { + return left_to_check_index_[0] == expected_line_.length() && + left_to_check_index_[1] == expected_line_.length() && + lines_left_ == 0; + } + + size_t left_to_check_index_[2]; + size_t lines_left_; + std::string expected_line_; +}; + +class RegistryNotifiedOnProcessExitTestRunner : public TestRunner { + public: + virtual ~RegistryNotifiedOnProcessExitTestRunner() {} + + virtual void SetupExpectations(pid_t pid) OVERRIDE { + output_received_ = false; + pid_ = pid; + } + + virtual void OnSomeRead(pid_t pid, const std::string& type, + const std::string& output) OVERRIDE { + EXPECT_EQ(pid_, pid); + if (!output_received_) { + output_received_ = true; + EXPECT_EQ(type, "stdout"); + EXPECT_EQ(output, "p"); + base::KillProcess(pid_, 0 , true); + return; + } + EXPECT_EQ("exit", type); + MessageLoop::current()->PostTask(FROM_HERE, + MessageLoop::QuitClosure()); + } + + virtual void StartRegistryTest(ProcessProxyRegistry* registry) OVERRIDE { + EXPECT_TRUE(registry->SendInput(pid_, "p")); + } + + private: + bool output_received_; +}; + +class SigIntTestRunner : public TestRunner { + public: + virtual ~SigIntTestRunner() {} + + virtual void SetupExpectations(pid_t pid) OVERRIDE { + pid_ = pid; + } + + virtual void OnSomeRead(pid_t pid, const std::string& type, + const std::string& output) OVERRIDE { + EXPECT_EQ(pid_, pid); + // We may receive ^C on stdout, but we don't care about that, as long as we + // eventually received exit event. + if (type == "exit") { + MessageLoop::current()->PostTask(FROM_HERE, + MessageLoop::QuitClosure()); + } + } + + virtual void StartRegistryTest(ProcessProxyRegistry* registry) OVERRIDE { + // Send SingInt and verify the process exited. + EXPECT_TRUE(registry->SendInput(pid_, "\003")); + } +}; + +} // namespace + +class ProcessProxyTest : public testing::Test { + public: + ProcessProxyTest() {} + virtual ~ProcessProxyTest() {} + + protected: + void InitRegistryTest() { + registry_ = ProcessProxyRegistry::Get(); + + EXPECT_TRUE(registry_->OpenProcess( + kCatCommand, &pid_, + base::Bind(&TestRunner::OnSomeRead, + base::Unretained(test_runner_.get())))); + + test_runner_->SetupExpectations(pid_); + test_runner_->StartRegistryTest(registry_); + } + + void EndRegistryTest() { + registry_->CloseProcess(pid_); + + base::TerminationStatus status = base::GetTerminationStatus(pid_, NULL); + EXPECT_NE(base::TERMINATION_STATUS_STILL_RUNNING, status); + if (status == base::TERMINATION_STATUS_STILL_RUNNING) + base::KillProcess(pid_, 0, true); + + MessageLoop::current()->PostTask(FROM_HERE, + MessageLoop::QuitClosure()); + } + + void RunTest() { + MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&ProcessProxyTest::InitRegistryTest, + base::Unretained(this))); + + // Wait until all data from output watcher is received (QuitTask will be + // fired on watcher thread). + MessageLoop::current()->Run(); + + MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&ProcessProxyTest::EndRegistryTest, + base::Unretained(this))); + + // Wait until we clean up the process proxy. + MessageLoop::current()->Run(); + } + + scoped_ptr<TestRunner> test_runner_; + + private: + ProcessProxyRegistry* registry_; + pid_t pid_; + + MessageLoop message_loop_; +}; + +// Test will open new process that will run cat command, and verify data we +// write to process gets echoed back. +TEST_F(ProcessProxyTest, RegistryTest) { + test_runner_.reset(new RegistryTestRunner()); + RunTest(); +} + +// Open new process, then kill it. Verifiy that we detect when the process dies. +TEST_F(ProcessProxyTest, RegistryNotifiedOnProcessExit) { + test_runner_.reset(new RegistryNotifiedOnProcessExitTestRunner()); + RunTest(); +} + +// Test verifies that \003 message send to process is processed as SigInt. +// Timing out on the waterfall: http://crbug.com/115064 +TEST_F(ProcessProxyTest, DISABLED_SigInt) { + test_runner_.reset(new SigIntTestRunner()); + RunTest(); +} + +} // namespace chromeos |