1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chromeos/process_proxy/process_output_watcher.h"
#include <algorithm>
#include <cstdio>
#include <cstring>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/single_thread_task_runner.h"
#include "base/third_party/icu/icu_utf.h"
#include "base/thread_task_runner_handle.h"
#include "base/time/time.h"
namespace {
// Gets byte size for a UTF8 character given it's leading byte. The character
// size is encoded as number of leading '1' bits in the character's leading
// byte. If the most significant bit is '0', the character is a valid ASCII
// and it's byte size is 1.
// The method returns 1 if the provided byte is invalid leading byte.
size_t UTF8SizeFromLeadingByte(uint8 leading_byte) {
size_t byte_count = 0;
uint8 mask = 1 << 7;
uint8 error_mask = 1 << (7 - CBU8_MAX_LENGTH);
while (leading_byte & mask) {
if (mask & error_mask)
return 1;
mask >>= 1;
++byte_count;
}
return byte_count ? byte_count : 1;
}
} // namespace
namespace chromeos {
ProcessOutputWatcher::ProcessOutputWatcher(
int out_fd,
const ProcessOutputCallback& callback)
: read_buffer_size_(0),
process_output_file_(out_fd),
on_read_callback_(callback),
weak_factory_(this) {
CHECK_GE(out_fd, 0);
// We want to be sure we will be able to add 0 at the end of the input, so -1.
read_buffer_capacity_ = arraysize(read_buffer_) - 1;
}
ProcessOutputWatcher::~ProcessOutputWatcher() {}
void ProcessOutputWatcher::Start() {
WatchProcessOutput();
}
void ProcessOutputWatcher::OnFileCanReadWithoutBlocking(int fd) {
DCHECK_EQ(process_output_file_.GetPlatformFile(), fd);
output_file_watcher_.StopWatchingFileDescriptor();
ReadFromFd(fd);
}
void ProcessOutputWatcher::OnFileCanWriteWithoutBlocking(int fd) {
NOTREACHED();
}
void ProcessOutputWatcher::WatchProcessOutput() {
base::MessageLoopForIO::current()->WatchFileDescriptor(
process_output_file_.GetPlatformFile(), false,
base::MessageLoopForIO::WATCH_READ, &output_file_watcher_, this);
}
void ProcessOutputWatcher::ReadFromFd(int fd) {
// We don't want to necessary read pipe until it is empty so we don't starve
// other streams in case data is written faster than we read it. If there is
// more than read_buffer_size_ bytes in pipe, it will be read in the next
// iteration.
DCHECK_GT(read_buffer_capacity_, read_buffer_size_);
ssize_t bytes_read =
HANDLE_EINTR(read(fd, &read_buffer_[read_buffer_size_],
read_buffer_capacity_ - read_buffer_size_));
if (bytes_read > 0) {
ReportOutput(PROCESS_OUTPUT_TYPE_OUT, bytes_read);
// Delay next read to make the process less likely to flood IPC channel
// when output is reported to terminal extension via terminalPrivate API
// (which is the only client of this code).
// TODO(tbarzic): Properly fix this!! Provide a mechanism for clients to
// ack reported output and continue watching the process when ack is
// received. https://crbug.com/398901
base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE, base::Bind(&ProcessOutputWatcher::WatchProcessOutput,
weak_factory_.GetWeakPtr()),
base::TimeDelta::FromMilliseconds(10));
return;
}
if (bytes_read < 0)
DPLOG(WARNING) << "read from buffer failed";
// If there is nothing on the output the watched process has exited (slave end
// of pty is closed).
on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, "");
// Cancel pending |WatchProcessOutput| calls.
weak_factory_.InvalidateWeakPtrs();
}
size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() {
// Find the last non-trailing character byte. This byte should be used to
// infer the last UTF8 character length.
int last_lead_byte = read_buffer_size_ - 1;
while (true) {
// If the series of trailing bytes is too long, something's not right.
// Report the whole output, without waiting for further character bytes.
if (read_buffer_size_ - last_lead_byte > CBU8_MAX_LENGTH)
return read_buffer_size_;
// If there are trailing characters, there must be a leading one in the
// buffer for a valid UTF8 character. Getting past the buffer begining
// signals something's wrong, or the buffer is empty. In both cases return
// the whole current buffer.
if (last_lead_byte < 0)
return read_buffer_size_;
// Found the starting character byte; stop searching.
if (!CBU8_IS_TRAIL(read_buffer_[last_lead_byte]))
break;
--last_lead_byte;
}
size_t last_length = UTF8SizeFromLeadingByte(read_buffer_[last_lead_byte]);
// Note that if |last_length| == 0 or
// |last_length| + |last_read_byte| < |read_buffer_size_|, the string is
// invalid UTF8. In that case, send the whole read buffer to the observer
// immediately, just as if there is no trailing incomplete UTF8 bytes.
if (!last_length || last_length + last_lead_byte <= read_buffer_size_)
return read_buffer_size_;
return last_lead_byte;
}
void ProcessOutputWatcher::ReportOutput(ProcessOutputType type,
size_t new_bytes_count) {
read_buffer_size_ += new_bytes_count;
size_t output_to_report = OutputSizeWithoutIncompleteUTF8();
on_read_callback_.Run(type, std::string(read_buffer_, output_to_report));
// Move the bytes that were left behind to the beginning of the buffer and
// update the buffer size accordingly.
if (output_to_report < read_buffer_size_) {
for (size_t i = output_to_report; i < read_buffer_size_; ++i) {
read_buffer_[i - output_to_report] = read_buffer_[i];
}
}
read_buffer_size_ -= output_to_report;
}
} // namespace chromeos
|