summaryrefslogtreecommitdiffstats
path: root/native_client_sdk
diff options
context:
space:
mode:
authornoelallen@chromium.org <noelallen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-21 17:16:08 +0000
committernoelallen@chromium.org <noelallen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-21 17:16:08 +0000
commit41977ddd243018d25356cf0bc0f3b39574728751 (patch)
tree3a2f817ff2910cea2ebf5b512a0ca87ad9a81d62 /native_client_sdk
parent1d573ab20fd7517a3b726b1a44ac854b14a70d1b (diff)
downloadchromium_src-41977ddd243018d25356cf0bc0f3b39574728751.zip
chromium_src-41977ddd243018d25356cf0bc0f3b39574728751.tar.gz
chromium_src-41977ddd243018d25356cf0bc0f3b39574728751.tar.bz2
[NaCl SDK] Support non blocking TCP/UDP for NaCl IO library in NaCl SDK.
Does not affect chrome itself. This change bring in support for non-blocking TCP and UDP send and recv. This requires rework of EventEmitter, EventListner splitting them out of the MountNode so that the MountNode (socket) could be de-ref'd and closed, while the Emitter (fifo) was still available to accept IO cancellations. Part of the split out, also simplified EventListener use. Now listeners are per thread and RefCount emitters (not Nodes) significantly simplifying lifetime issues and reducing emitter/listener code complexity. In addition, the simplified locking allows for one-to-one Listeners to share locks with the emitter, for cases like read from UDP, where multiple threads could legally block on RecvFrom, and must only make progress one at a time to ensure correct timeout. Adds 'pipe', 'packet' and 'signal' Emitters to support pipes, udp sockets, and IO interruption (EINTR) via 'kill'. * split out EventEmitter and EventListener from MountNode * Remove ScopedRef from EventListener * Remove EventInfo * Create EventEmitterPipe (Emitter + FIFOChar) * Create EventEmitterTCP (Emitter + FIFOChar * 2) * Create EventEmitterUDP (Emitter + FIFOPacket * 2) * Create MountNodeStream base class * Create MountNodeSocket from MountNodeStream * Create MountNodePipe * Rename MountSocket to MountStream * Add plumbing for pipe * select uses poll functionality * move all send/recv on sockets to MountStream thread. BUG=257723 R=binji@chromium.org Review URL: https://codereview.chromium.org/23498015 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@224603 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'native_client_sdk')
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter.cc60
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter.h85
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.cc33
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.h41
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_stream.cc45
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_stream.h43
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.cc53
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.h41
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_tty.cc33
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_tty.h43
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_udp.cc49
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_emitter_udp.h42
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_listener.cc265
-rw-r--r--native_client_sdk/src/libraries/nacl_io/event_listener.h183
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_char.cc120
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_char.h54
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_interface.h32
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_null.h29
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_packet.cc72
-rw-r--r--native_client_sdk/src/libraries/nacl_io/fifo_packet.h59
-rw-r--r--native_client_sdk/src/libraries/nacl_io/kernel_intercept.cc5
-rw-r--r--native_client_sdk/src/libraries/nacl_io/kernel_intercept.h1
-rw-r--r--native_client_sdk/src/libraries/nacl_io/kernel_object.h1
-rw-r--r--native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc321
-rw-r--r--native_client_sdk/src/libraries/nacl_io/kernel_proxy.h11
-rw-r--r--native_client_sdk/src/libraries/nacl_io/library.dsc25
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node.cc11
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node.h6
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_char.h4
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_pipe.cc61
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_pipe.h35
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_socket.cc154
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_socket.h52
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_stream.cc58
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_stream.h62
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc254
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h37
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_tty.cc58
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_tty.h14
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc319
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_node_udp.h49
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_socket.cc28
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_socket.h37
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_stream.cc102
-rw-r--r--native_client_sdk/src/libraries/nacl_io/mount_stream.h81
-rw-r--r--native_client_sdk/src/libraries/nacl_io/packet.cc41
-rw-r--r--native_client_sdk/src/libraries/nacl_io/packet.h45
-rw-r--r--native_client_sdk/src/libraries/nacl_io/pepper/all_interfaces.h12
-rw-r--r--native_client_sdk/src/libraries/nacl_io/syscalls/pipe.c10
-rw-r--r--native_client_sdk/src/libraries/sdk_util/simple_lock.h3
-rw-r--r--native_client_sdk/src/tests/nacl_io_socket_test/event_test.cc480
-rw-r--r--native_client_sdk/src/tests/nacl_io_socket_test/socket_test.cc53
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/event_test.cc610
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/example.dsc1
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/fifo_test.cc130
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/kernel_proxy_mock.h1
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/kernel_wrap_test.cc7
-rw-r--r--native_client_sdk/src/tests/nacl_io_test/mount_node_tty_test.cc132
58 files changed, 2926 insertions, 1767 deletions
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter.cc
index a785c59..b3db06f 100644
--- a/native_client_sdk/src/libraries/nacl_io/event_emitter.cc
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter.cc
@@ -3,57 +3,57 @@
* found in the LICENSE file.
*/
#include <assert.h>
+#include <poll.h>
#include "nacl_io/event_emitter.h"
#include "nacl_io/event_listener.h"
+#include "nacl_io/fifo_interface.h"
#include "sdk_util/auto_lock.h"
namespace nacl_io {
-bool operator<(const ScopedEventInfo& src_a, const ScopedEventInfo& src_b) {
+bool operator<(const ScopedEventEmitter& src_a,
+ const ScopedEventEmitter& src_b) {
return src_a.get() < src_b.get();
}
-void EventEmitter::Destroy() {
- // We can not grab the EmitterLock prior to grabbing the EventListener lock,
- // however the ref count proves this is the only thread which has a
- // reference to the emitter at this point so accessing events_ is safe.
- EventInfoSet_t::iterator it;
- for (it = events_.begin(); it != events_.end(); it++) {
- ScopedEventInfo info = *it;
- info->listener->AbandonedEventInfo(info);
- }
+EventEmitter::EventEmitter() : event_status_(0) {}
+
+void EventEmitter::RegisterListener(EventListener* listener, uint32_t events) {
+ AUTO_LOCK(emitter_lock_);
+ RegisterListener_Locked(listener, events);
}
-void EventEmitter::RegisterEventInfo(const ScopedEventInfo& info) {
+void EventEmitter::UnregisterListener(EventListener* listener) {
AUTO_LOCK(emitter_lock_);
+ UnregisterListener_Locked(listener);
+}
- events_.insert(info);
- ChainRegisterEventInfo(info);
+void EventEmitter::RegisterListener_Locked(EventListener* listener,
+ uint32_t events) {
+ assert(listeners_.count(listener) == 0);
+ listeners_[listener] = events;
}
-void EventEmitter::UnregisterEventInfo(const ScopedEventInfo& info) {
- AUTO_LOCK(emitter_lock_);
+void EventEmitter::UnregisterListener_Locked(EventListener* listener) {
+ assert(listeners_.count(listener) == 1);
+ listeners_.erase(listener);
+}
- ChainUnregisterEventInfo(info);
- events_.erase(info);
+void EventEmitter::ClearEvents_Locked(uint32_t event_bits) {
+ event_status_ &= ~event_bits;
}
-void EventEmitter::RaiseEvent(uint32_t event_bits) {
- AUTO_LOCK(emitter_lock_);
- EventInfoSet_t::iterator it;
- for (it = events_.begin(); it != events_.end(); it++) {
- // If this event is allowed by the filter, signal it
- ScopedEventInfo info = *it;
- if (info->filter & event_bits) {
- info->events |= event_bits & info->filter;
- info->listener->Signal(info);
- }
+void EventEmitter::RaiseEvents_Locked(uint32_t event_bits) {
+ event_status_ |= event_bits;
+
+ EventListenerMap_t::iterator it;
+ for (it = listeners_.begin(); it != listeners_.end(); it++) {
+ uint32_t bits = it->second & event_bits;
+ if (0 != bits)
+ it->first->ReceiveEvents(this, bits);
}
}
-void EventEmitter::ChainRegisterEventInfo(const ScopedEventInfo& info) {}
-void EventEmitter::ChainUnregisterEventInfo(const ScopedEventInfo& info) {}
-
} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter.h b/native_client_sdk/src/libraries/nacl_io/event_emitter.h
index 96670b6..402a518 100644
--- a/native_client_sdk/src/libraries/nacl_io/event_emitter.h
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter.h
@@ -12,42 +12,22 @@
#include "nacl_io/error.h"
+#include "sdk_util/macros.h"
#include "sdk_util/ref_object.h"
#include "sdk_util/scoped_ref.h"
#include "sdk_util/simple_lock.h"
-
namespace nacl_io {
class EventEmitter;
class EventListener;
-// A ref counted object (non POD derived from RefObject) for storing the
-// state of a single signal request. Requests are unique to any
-// FD/EventListener pair.
-struct EventInfo : public sdk_util::RefObject {
- // User provied data to be returned on EventListener::Wait
- uint64_t user_data;
-
- // Bitfield of POLL events currently signaled.
- uint32_t events;
-
- // Bitfield of POLL events of interest.
- uint32_t filter;
-
- // We do not use a ScopedRef to prevent circular references.
- EventEmitter* emitter;
- EventListener* listener;
- uint32_t id;
-};
-
-typedef sdk_util::ScopedRef<EventInfo> ScopedEventInfo;
-// Provide comparison for std::map and std::set
-bool operator<(const ScopedEventInfo& src_a, const ScopedEventInfo& src_b);
+typedef sdk_util::ScopedRef<EventEmitter> ScopedEventEmitter;
+typedef std::map<EventListener*, uint32_t> EventListenerMap_t;
-typedef std::map<int, ScopedEventInfo> EventInfoMap_t;
-typedef std::set<ScopedEventInfo> EventInfoSet_t;
+bool operator<(const ScopedEventEmitter& src_a,
+ const ScopedEventEmitter& src_b);
// EventEmitter
//
@@ -56,47 +36,40 @@ typedef std::set<ScopedEventInfo> EventInfoSet_t;
// whenever thier state is changed.
//
// See "Kernel Events" in event_listener.h for additional information.
-class EventEmitter : public sdk_util::RefObject {
- protected:
- // Called automatically prior to delete to inform the EventListeners that
- // this EventEmitter is abandoning an associated EventInfo.
- virtual void Destroy();
-
- private:
- // Register or unregister an EventInfo. The lock of the EventListener
- // associated with this EventInfo must be held prior to calling these
- // functions. These functions are private to ensure they are called by the
- // EventListener.
- void RegisterEventInfo(const ScopedEventInfo& info);
- void UnregisterEventInfo(const ScopedEventInfo& info);
+class EventEmitter : public sdk_util::RefObject {
public:
- // Returns the current state of the emitter as POLL events bitfield.
- virtual uint32_t GetEventStatus() = 0;
+ EventEmitter();
- // Returns the type of the emitter (compatible with st_mode in stat)
- virtual int GetType() = 0;
+ // This returns a snapshot, to ensure the status doesn't change from
+ // fetch to use, hold the lock.
+ uint32_t GetEventStatus() { return event_status_; }
- protected:
- // Called by the thread causing the Event.
- void RaiseEvent(uint32_t events);
+ sdk_util::SimpleLock& GetLock() { return emitter_lock_; }
- // Provided to allow one EventEmitter to register the same EventInfo with
- // a child EventEmitter so that they can both signal the EventListener.
- // Called after registering locally, but while lock is still held.
- virtual void ChainRegisterEventInfo(const ScopedEventInfo& event);
+ // Updates the specified bits in the event status, and signals any
+ // listeners waiting on those bits.
+ void RaiseEvents_Locked(uint32_t events);
- // Called before unregistering locally, but while lock is still held.
- virtual void ChainUnregisterEventInfo(const ScopedEventInfo& event);
+ // Clears the specified bits in the event status.
+ void ClearEvents_Locked(uint32_t events);
-private:
+ // Register or unregister an EventInfo. The lock of the EventListener
+ // associated with this EventInfo must be held prior to calling these
+ // functions. These functions are private to ensure they are called by the
+ // EventListener.
+ void RegisterListener(EventListener* listener, uint32_t events);
+ void UnregisterListener(EventListener* listener);
+ void RegisterListener_Locked(EventListener* listener, uint32_t events);
+ void UnregisterListener_Locked(EventListener* listener);
+
+ private:
+ uint32_t event_status_;
sdk_util::SimpleLock emitter_lock_;
- EventInfoSet_t events_;
- friend class EventListener;
+ EventListenerMap_t listeners_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitter);
};
-typedef sdk_util::ScopedRef<EventEmitter> ScopedEventEmitter;
-
} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.cc
new file mode 100644
index 0000000..2c5308e
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.cc
@@ -0,0 +1,33 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/event_emitter_pipe.h"
+
+namespace nacl_io {
+
+EventEmitterPipe::EventEmitterPipe(size_t size)
+ : fifo_(std::max<size_t>(1, size)) {
+ UpdateStatus_Locked();
+}
+
+size_t EventEmitterPipe::Read_Locked(char* data, size_t len) {
+ size_t out_len = fifo_.Read(data, len);
+
+ UpdateStatus_Locked();
+ return out_len;
+}
+
+size_t EventEmitterPipe::Write_Locked(const char* data, size_t len) {
+ size_t out_len = fifo_.Write(data, len);
+
+ UpdateStatus_Locked();
+ return out_len;
+}
+
+} // namespace nacl_io
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.h b/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.h
new file mode 100644
index 0000000..27a5ba4
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_pipe.h
@@ -0,0 +1,41 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_EVENT_EMITTER_PIPE_H_
+#define LIBRARIES_NACL_IO_EVENT_EMITTER_PIPE_H_
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/event_emitter_stream.h"
+#include "nacl_io/fifo_char.h"
+
+#include "sdk_util/auto_lock.h"
+#include "sdk_util/macros.h"
+
+namespace nacl_io {
+
+class EventEmitterPipe;
+typedef sdk_util::ScopedRef<EventEmitterPipe> ScopedEventEmitterPipe;
+
+class EventEmitterPipe : public EventEmitterStream {
+ public:
+ EventEmitterPipe(size_t size);
+
+ size_t Read_Locked(char* data, size_t len);
+ size_t Write_Locked(const char* data, size_t len);
+
+ virtual FIFOChar* in_fifo() { return &fifo_; }
+ virtual FIFOChar* out_fifo() { return &fifo_; }
+
+private:
+ FIFOChar fifo_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitterPipe);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_EVENT_EMITTER_PIPE_H_
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.cc
new file mode 100644
index 0000000..600f234
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.cc
@@ -0,0 +1,45 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/event_emitter_stream.h"
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/fifo_interface.h"
+#include "sdk_util/auto_lock.h"
+
+namespace nacl_io {
+
+EventEmitterStream::EventEmitterStream() : stream_(NULL) {}
+
+void EventEmitterStream::AttachStream(MountNodeStream* stream) {
+ AUTO_LOCK(GetLock());
+ stream_ = stream;
+}
+
+void EventEmitterStream::DetachStream() {
+ AUTO_LOCK(GetLock());
+
+ RaiseEvents_Locked(POLLHUP);
+ stream_ = NULL;
+}
+
+void EventEmitterStream::UpdateStatus_Locked() {
+ uint32_t status = 0;
+ if (!in_fifo()->IsEmpty())
+ status |= POLLIN;
+
+ if (!out_fifo()->IsFull())
+ status |= POLLOUT;
+
+ ClearEvents_Locked(~status);
+ RaiseEvents_Locked(status);
+}
+
+
+} // namespace nacl_io
+
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.h b/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.h
new file mode 100644
index 0000000..34afec2
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_stream.h
@@ -0,0 +1,43 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_EVENT_EMITTER_STREAM_H_
+#define LIBRARIES_NACL_IO_EVENT_EMITTER_STREAM_H_
+
+#include "nacl_io/event_emitter.h"
+
+#include "sdk_util/macros.h"
+#include "sdk_util/scoped_ref.h"
+
+namespace nacl_io {
+
+class EventEmitterStream;
+class FIFOInterface;
+class MountNodeStream;
+
+typedef sdk_util::ScopedRef<EventEmitterStream> ScopedEventEmitterStream;
+
+class EventEmitterStream : public EventEmitter {
+ public:
+ EventEmitterStream();
+
+ void AttachStream(MountNodeStream* stream);
+ void DetachStream();
+
+ MountNodeStream* stream() { return stream_; }
+ virtual FIFOInterface* in_fifo() = 0;
+ virtual FIFOInterface* out_fifo() = 0;
+
+protected:
+ void UpdateStatus_Locked();
+
+protected:
+ MountNodeStream* stream_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitterStream);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_EVENT_EMITTER_STREAM_H_
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.cc
new file mode 100644
index 0000000..24d7cdd
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.cc
@@ -0,0 +1,53 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/event_emitter_tcp.h"
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/fifo_char.h"
+#include "sdk_util/auto_lock.h"
+
+namespace nacl_io {
+
+EventEmitterTCP::EventEmitterTCP(size_t rsize, size_t wsize)
+ : in_fifo_(std::max<size_t>(65536, rsize)),
+ out_fifo_(std::max<size_t>(65536, wsize)) {
+ UpdateStatus_Locked();
+}
+
+uint32_t EventEmitterTCP::ReadIn_Locked(char* data, uint32_t len) {
+ uint32_t count = in_fifo_.Read(data, len);
+
+ UpdateStatus_Locked();
+ return count;
+}
+
+uint32_t EventEmitterTCP::WriteIn_Locked(const char* data, uint32_t len) {
+ uint32_t count = in_fifo_.Write(data, len);
+
+ UpdateStatus_Locked();
+ return count;
+}
+
+uint32_t EventEmitterTCP::ReadOut_Locked(char* data, uint32_t len) {
+ uint32_t count = out_fifo_.Read(data, len);
+
+ UpdateStatus_Locked();
+ return count;
+}
+
+uint32_t EventEmitterTCP::WriteOut_Locked(const char* data, uint32_t len) {
+ uint32_t count = out_fifo_.Write(data, len);
+
+ UpdateStatus_Locked();
+ return count;
+}
+
+
+} // namespace nacl_io
+
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.h b/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.h
new file mode 100644
index 0000000..99a559a
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_tcp.h
@@ -0,0 +1,41 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_EVENT_EMITTER_TCP_H_
+#define LIBRARIES_NACL_IO_EVENT_EMITTER_TCP_H_
+
+#include "nacl_io/event_emitter_stream.h"
+#include "nacl_io/fifo_char.h"
+
+#include "sdk_util/macros.h"
+#include "sdk_util/scoped_ref.h"
+
+namespace nacl_io {
+
+class EventEmitterTCP;
+typedef sdk_util::ScopedRef<EventEmitterTCP> ScopedEventEmitterTCP;
+
+class EventEmitterTCP : public EventEmitterStream {
+ public:
+ EventEmitterTCP(size_t rsize, size_t wsize);
+
+ uint32_t ReadIn_Locked(char* buffer, uint32_t len);
+ uint32_t WriteIn_Locked(const char* buffer, uint32_t len);
+
+ uint32_t ReadOut_Locked(char* buffer, uint32_t len);
+ uint32_t WriteOut_Locked(const char* buffer, uint32_t len);
+
+ virtual FIFOChar* in_fifo() { return &in_fifo_; }
+ virtual FIFOChar* out_fifo() { return &out_fifo_; }
+
+protected:
+ FIFOChar in_fifo_;
+ FIFOChar out_fifo_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitterTCP);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_EVENT_EMITTER_TCP_H_
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.cc
new file mode 100644
index 0000000..7df065a
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.cc
@@ -0,0 +1,33 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/event_emitter_tty.h"
+
+namespace nacl_io {
+
+EventEmitterTTY::EventEmitterTTY(size_t size)
+ : fifo_(std::max<size_t>(1, size)) {
+ UpdateStatus_Locked();
+}
+
+size_t EventEmitterPipe::Read_Locked(char* data, size_t len) {
+ size_t out_len = fifo_.Read(data, len);
+
+ UpdateStatus_Locked();
+ return out_len;
+}
+
+size_t EventEmitterPipe::Write_Locked(const char* data, size_t len) {
+ size_t out_len = fifo_.Write(data, len);
+
+ UpdateStatus_Locked();
+ return out_len;
+}
+
+} // namespace nacl_io
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.h b/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.h
new file mode 100644
index 0000000..c0ef09c
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_tty.h
@@ -0,0 +1,43 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_EVENT_EMITTER_TTY_H_
+#define LIBRARIES_NACL_IO_EVENT_EMITTER_TTY_H_
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "nacl_io/event_emitter_stream.h"
+#include "nacl_io/fifo_char.h"
+#include "nacl_io/fifo_null.h"
+
+#include "sdk_util/auto_lock.h"
+#include "sdk_util/macros.h"
+
+namespace nacl_io {
+
+class EventEmitterTTY;
+typedef sdk_util::ScopedRef<EventEmitterTTY> ScopedEventEmitterTTY;
+
+class EventEmitterTTY : public EventEmitterStream {
+ public:
+ explicit EventEmitterTTY(size_t size);
+
+ size_t Read_Locked(char* data, size_t len);
+ size_t Write_Locked(const char* data, size_t len);
+
+ virtual FIFOChar* in_fifo() { return &fifo_; }
+ virtual FIFONull* out_fifo() { return &null_; }
+
+ private:
+ FIFOChar fifo_;
+ FIFONull null_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitterTTY);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_EVENT_EMITTER_PIPE_H_
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.cc b/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.cc
new file mode 100644
index 0000000..af9f160
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.cc
@@ -0,0 +1,49 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/event_emitter_udp.h"
+
+#include <poll.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "sdk_util/auto_lock.h"
+
+namespace nacl_io {
+
+EventEmitterUDP::EventEmitterUDP(size_t rsize, size_t wsize)
+ : in_fifo_(std::max<size_t>(1, rsize)),
+ out_fifo_(std::max<size_t>(1, wsize)) {
+ UpdateStatus_Locked();
+}
+
+Packet* EventEmitterUDP::ReadRXPacket_Locked() {
+ Packet* packet = in_fifo_.ReadPacket();
+
+ UpdateStatus_Locked();
+ return packet;
+}
+
+void EventEmitterUDP::WriteRXPacket_Locked(Packet* packet) {
+ in_fifo_.WritePacket(packet);
+
+ UpdateStatus_Locked();
+}
+
+Packet* EventEmitterUDP::ReadTXPacket_Locked() {
+ Packet* packet = out_fifo_.ReadPacket();
+
+ UpdateStatus_Locked();
+ return packet;
+}
+
+void EventEmitterUDP::WriteTXPacket_Locked(Packet* packet) {
+ out_fifo_.WritePacket(packet);
+
+ UpdateStatus_Locked();
+}
+
+} // namespace nacl_io
+
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.h b/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.h
new file mode 100644
index 0000000..02d64a4
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/event_emitter_udp.h
@@ -0,0 +1,42 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_EVENT_EMITTER_UDP_H_
+#define LIBRARIES_NACL_IO_EVENT_EMITTER_UDP_H_
+
+#include "nacl_io/event_emitter_stream.h"
+#include "nacl_io/fifo_packet.h"
+
+#include "sdk_util/macros.h"
+#include "sdk_util/scoped_ref.h"
+
+namespace nacl_io {
+
+class EventEmitterUDP;
+typedef sdk_util::ScopedRef<EventEmitterUDP> ScopedEventEmitterUDP;
+
+class EventEmitterUDP : public EventEmitterStream {
+ public:
+ EventEmitterUDP(size_t rsize, size_t wsize);
+
+ // Takes or gives away ownership of the packet.
+ Packet* ReadRXPacket_Locked();
+ void WriteRXPacket_Locked(Packet* packet);
+
+ Packet* ReadTXPacket_Locked();
+ void WriteTXPacket_Locked(Packet* packet);
+
+ virtual FIFOPacket* in_fifo() { return &in_fifo_; }
+ virtual FIFOPacket* out_fifo() { return &out_fifo_; }
+
+protected:
+ FIFOPacket in_fifo_;
+ FIFOPacket out_fifo_;
+ DISALLOW_COPY_AND_ASSIGN(EventEmitterUDP);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_EVENT_EMITTER_UDP_H_
+
diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.cc b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
index 11297b6..ce679f8 100644
--- a/native_client_sdk/src/libraries/nacl_io/event_listener.cc
+++ b/native_client_sdk/src/libraries/nacl_io/event_listener.cc
@@ -27,43 +27,6 @@ EventListener::~EventListener() {
pthread_cond_destroy(&signal_cond_);
}
-// Before we can destroy ourselves, we must first unregister all the
-// EventInfo objects from the various EventListeners
-void EventListener::Destroy() {
- EventInfoMap_t::iterator it;
-
- // We do not take the lock since this is the only reference to this object.
- for (it = event_info_map_.begin(); it != event_info_map_.end(); it++) {
- if (it->second->emitter) {
- it->second->emitter->UnregisterEventInfo(it->second);
- }
- }
-
- EventEmitter::Destroy();
-}
-
-uint32_t EventListener::GetEventStatus() {
- // Always writable, but we can only assume it to be readable if there
- // is an event waiting.
- return signaled_.empty() ? POLLOUT : POLLIN | POLLOUT;
-}
-
-int EventListener::GetType() {
- // For lack of a better type, report socket to signify it can be in an
- // used to signal.
- return S_IFSOCK;
-}
-
-// Called by EventEmitter, wakes up any blocking threads to verify if the wait
-// conditions have been met.
-void EventListener::Signal(const ScopedEventInfo& info) {
- AUTO_LOCK(signal_lock_);
- if (waiting_) {
- signaled_.insert(info);
- pthread_cond_broadcast(&signal_cond_);
- }
-}
-
static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
if (ms_timeout >= 0) {
uint64_t usec = usec_since_epoch();
@@ -77,168 +40,124 @@ static void AbsoluteFromDeltaMS(struct timespec* timeout, int ms_timeout) {
}
}
-Error EventListener::Wait(EventData* events,
- int max,
- int ms_timeout,
- int* out_count) {
- *out_count = 0;
-
- if (max <= 0)
- return EINVAL;
- if (NULL == events)
- return EFAULT;
-
- {
- AUTO_LOCK(info_lock_);
-
- // Go through the "live" event infos and see if they are in a signaled state
- EventInfoMap_t::iterator it = event_info_map_.begin();
- while ((it != event_info_map_.end()) && (*out_count < max)) {
- ScopedEventInfo& info = it->second;
- uint32_t event_bits = info->emitter->GetEventStatus() & info->filter;
-
- if (event_bits) {
- events[*out_count].events = event_bits;
- events[*out_count].user_data = info->user_data;
- (*out_count)++;
- }
+EventListenerLock::EventListenerLock(EventEmitter* emitter)
+ : EventListener(),
+ emitter_(emitter),
+ lock_(new sdk_util::AutoLock(emitter->GetLock())),
+ events_(0) {
+}
- it++;
- }
- } // End of info_lock scope.
+EventListenerLock::~EventListenerLock() {
+ delete lock_;
+}
- // We are done if we have a signal or no timeout specified.
- if ((*out_count > 0) || (0 == ms_timeout))
- return 0;
+void EventListenerLock::ReceiveEvents(EventEmitter* emitter,
+ uint32_t events) {
+ // We are using the emitter's mutex, which is already locked.
+ pthread_cond_signal(&signal_cond_);
+}
- // Compute the absolute time we can wait until.
+Error EventListenerLock::WaitOnEvent(uint32_t events, int ms_timeout) {
struct timespec timeout;
AbsoluteFromDeltaMS(&timeout, ms_timeout);
- // Keep looking if until we receive something.
- while (0 == *out_count) {
- // We are now officially waiting.
- AUTO_LOCK(signal_lock_);
- waiting_++;
-
- // If we don't have any signals yet, wait for any Emitter to Signal.
- while (signaled_.empty()) {
- int return_code;
- if (ms_timeout >= 0) {
- return_code = pthread_cond_timedwait(&signal_cond_,
- signal_lock_.mutex(),
- &timeout);
- } else {
- return_code = pthread_cond_wait(&signal_cond_, signal_lock_.mutex());
- }
-
- Error error(return_code);
-
- // If there is no error, then we may have been signaled.
- if (0 == error)
- break;
-
- // For any error case:
- if (ETIMEDOUT == error) {
- // A "TIMEOUT" is not an error.
- error = 0;
- } else {
- // Otherwise this has gone bad, so return EBADF.
- error = EBADF;
- }
-
- waiting_--;
- return error;
+ emitter_->RegisterListener_Locked(this, events);
+ while ((emitter_->GetEventStatus() & events) == 0) {
+ int return_code;
+ if (ms_timeout >= 0) {
+ return_code = pthread_cond_timedwait(&signal_cond_,
+ emitter_->GetLock().mutex(),
+ &timeout);
+ } else {
+ return_code = pthread_cond_wait(&signal_cond_,
+ emitter_->GetLock().mutex());
}
- // Copy signals over as long as we have room
- while (!signaled_.empty() && (*out_count < max)) {
- EventInfoSet_t::iterator it = signaled_.begin();
+ if (emitter_->GetEventStatus() & POLLERR)
+ return_code = EINTR;
- events[*out_count].events = (*it)->events;
- events[*out_count].user_data = (*it)->user_data;
- (*out_count)++;
-
- signaled_.erase(it);
+ // Return the failure, unlocked
+ if (return_code != 0) {
+ emitter_->UnregisterListener_Locked(this);
+ return Error(return_code);
}
-
- // If we are the last thread waiting, clear out the signalled set
- if (1 == waiting_)
- signaled_.clear();
-
- // We are done waiting.
- waiting_--;
}
+ emitter_->UnregisterListener_Locked(this);
return 0;
}
-Error EventListener::Track(int id,
- const ScopedEventEmitter& emitter,
- uint32_t filter,
- uint64_t user_data) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
-
- // If it's not a streaming type, then it can not be added.
- if ((emitter->GetType() & (S_IFIFO | S_IFSOCK | S_IFCHR)) == 0)
- return EPERM;
-
- if (it != event_info_map_.end())
- return EEXIST;
-
- if (emitter.get() == this)
- return EINVAL;
-
- ScopedEventInfo info(new EventInfo);
- info->emitter = emitter.get();
- info->listener = this;
- info->id = id;
- info->filter = filter;
- info->user_data = user_data;
- info->events = 0;
-
- emitter->RegisterEventInfo(info);
- event_info_map_[id] = info;
- return 0;
+void EventListenerPoll::ReceiveEvents(EventEmitter* emitter,
+ uint32_t events) {
+ AUTO_LOCK(signal_lock_);
+ emitters_[emitter]->events |= events;
+ signaled_++;
+ pthread_cond_signal(&signal_cond_);
}
-Error EventListener::Update(int id, uint32_t filter, uint64_t user_data) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
- if (it == event_info_map_.end())
- return ENOENT;
+Error EventListenerPoll::WaitOnAny(EventRequest* requests,
+ size_t cnt,
+ int ms_timeout) {
- ScopedEventInfo& info = it->second;
- info->filter = filter;
- info->user_data = user_data;
- return 0;
-}
+ signaled_ = 0;
-Error EventListener::Free(int id) {
- AUTO_LOCK(info_lock_);
- EventInfoMap_t::iterator it = event_info_map_.find(id);
- if (event_info_map_.end() == it)
- return ENOENT;
+ // Build a map of request emitters to request data before
+ // emitters can access them.
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ emitters_[request->emitter.get()] = request;
+ request->events = 0;
+ }
- it->second->emitter->UnregisterEventInfo(it->second);
- event_info_map_.erase(it);
- return 0;
-}
+ // Emitters can now accessed the unlocked set, since each emitter is
+ // responsible for it's own request.
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ requests->emitter->RegisterListener(this, request->filter);
+ uint32_t events = requests->emitter->GetEventStatus() & request->filter;
+
+ if (events) {
+ AUTO_LOCK(signal_lock_);
+ request->events |= events;
+ signaled_++;
+ }
+ }
+
+ struct timespec timeout;
+ AbsoluteFromDeltaMS(&timeout, ms_timeout);
+ int return_code = 0;
-void EventListener::AbandonedEventInfo(const ScopedEventInfo& event) {
{
- AUTO_LOCK(info_lock_);
+ AUTO_LOCK(signal_lock_)
+ while (0 == signaled_) {
+ if (ms_timeout >= 0) {
+ return_code = pthread_cond_timedwait(&signal_cond_,
+ signal_lock_.mutex(),
+ &timeout);
+ } else {
+ return_code = pthread_cond_wait(&signal_cond_,
+ signal_lock_.mutex());
+ }
- event->emitter = NULL;
- event_info_map_.erase(event->id);
+ if (return_code != 0)
+ signaled_++;
+ }
}
- // EventInfos abandoned by the destroyed emitter must still be kept in
- // signaled_ set for POLLHUP.
- event->events = POLLHUP;
- Signal(event);
+ // Unregister first to prevent emitters from modifying the set any further
+ for (size_t index = 0; index < cnt; index++) {
+ EventRequest* request = requests + index;
+ request->emitter->UnregisterListener(this);
+
+ if (request->events & POLLERR)
+ return_code = EINTR;
+ }
+
+ // We can now release the map.
+ emitters_.clear();
+
+ return Error(return_code);
}
} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/event_listener.h b/native_client_sdk/src/libraries/nacl_io/event_listener.h
index cb93f11..4b9647a 100644
--- a/native_client_sdk/src/libraries/nacl_io/event_listener.h
+++ b/native_client_sdk/src/libraries/nacl_io/event_listener.h
@@ -15,6 +15,8 @@
#include "nacl_io/error.h"
#include "nacl_io/event_emitter.h"
+#include "sdk_util/auto_lock.h"
+#include "sdk_util/macros.h"
#include "sdk_util/scoped_ref.h"
// Kernel Events
@@ -24,43 +26,43 @@
// down. EventListener provides a mechanism for a thread to wait on
// specific events from these objects which are derived from EventEmitters.
//
-// EventEmitter and EventListener together provide support for an "epoll"
-// like interface. See:
-// http://man7.org/linux/man-pages/man7/epoll.7.html
+// Calling RegisterListener_Locked on an event emitter, will cause all
+// Listeners matching the event mask are signaled so they may try to make
+// progress. In the case of "select" or "poll", multiple threads could be
+// notified that one or more emitters are signaled and allowed to make
+// progress. In the case of "read" or "write", only one thread at a time
+// should make progress, to ensure that if one thread consumes the signal,
+// the other can correctly timeout.
//
-// Such that we map the arguments at behavior of
-// epoll_wait maps to Wait, and
-// epoll_ctl maps to Track, Update, Free.
+// Events Listeners requirements:
+// 1- Must reference counting Emitters to ensure they are not destroyed
+// while waiting for a signal.
+// 2- Must unregister themselves from all emitters prior to being destoryed.
+// 3- Must never be shared between threads since interals may not be locked
+// to prevent dead-locks with emitter signals.
+// 4- Must never lock themselves before locking an emitter to prevent
+// dead-locks
//
-// Behavior of EventListeners
-// FDs are automatically removed when closed.
-// KE_SHUTDOWN can not be masked.
-// KE_SHUTDOWN is only seen if the hangup happens after Wait starts.
-// Dup'd FDs get their own event info which must also get signaled.
-// Adding a non streaming FD will fail.
-// EventEmitters can also be waited on.
-// It is illegal for an a EventListener to add itself.
+// There are two types of listeners, EventListenerSingle and EventListenerGroup
+// For Single listeners, all listeners are unblocked by the Emitter, but
+// they individually take the emitters lock and test against the current
+// status to ensure another listener didn't consume the signal.
//
// Locking:
-// EventListener::{Track/Update/Free}
-// AUTO_LOCK(EventListener::info_lock_)
-// EventEmitter::RegisterEventInfo
-// AUTO_LOCK(EventEmitter::emitter_lock_)
+// EventEmitter::<Backgroun IO>
+// *LOCK* EventEmitter::emitter_lock_
+// EventEmitter::RaiseEvent_Locked
+// EventListenerSingle::ReceiveEvents
+// <no locking, using emitter's lock>
+// EventListenerGroup::ReceiveEvents
+// *LOCK* EventListenerGroup::signal_lock_
//
-// EventEmitter::Destroy
-// EventListener::AbandonedEventInfo
-// AUTO_LOCK(EventListener::info_lock_)
+// EventListenerSingle::WaitOnLock
+// *LOCK* EventEmitter::emitter_lock_
//
-// EventListener::RaiseEvent
-// AUTO_LOCK(EventEmitter::emitter_lock_)
-// EventListener::Signal
-// AUTO_LOCK(EventListener::signal_lock_)
+// EventListenerGroup::WaitOnAny
+// *LOCK* EventListenerGroup::signal_lock_
//
-// EventListener::Wait
-// AUTO_LOCK(EventListener::info_lock_)
-// ...
-// AUTO_LOCK(EventListener::signal_lock_)
-// ...
namespace nacl_io {
@@ -70,74 +72,95 @@ struct EventData {
uint64_t user_data;
};
+struct EventRequest {
+ ScopedEventEmitter emitter;
+ uint32_t filter;
+ uint32_t events;
+};
+
+
+class EventListener;
+class EventListenerGroup;
+class EventListenerSingle;
+
+typedef std::map<EventEmitter*, EventRequest*> EmitterRequestMap_t;
// EventListener
//
// The EventListener class provides an object to wait on for specific events
// from EventEmitter objects. The EventListener becomes signalled for
// read when events are waiting, making it is also an Emitter.
-class EventListener : public EventEmitter {
+class EventListener {
public:
- EventListener();
- ~EventListener();
+ EventListener();
+ ~EventListener();
+
+ // Called by EventEmitter to signal the Listener that a new event is
+ // available.
+ virtual void ReceiveEvents(EventEmitter* emitter, uint32_t events) = 0;
protected:
- // Called prior to free to unregister all EventInfos from the EventEmitters.
- void Destroy();
+ pthread_cond_t signal_cond_;
+ DISALLOW_COPY_AND_ASSIGN(EventListener);
+};
+
+// EventListenerLock
+//
+// On construction, references and locks the emitter. WaitOnEvent will
+// temporarily unlock waiting for any event in |events| to become signaled.
+// The functione exits with the lock taken. The destructor will automatically
+// unlock the emitter.
+class EventListenerLock : public EventListener {
public:
- // Declared in EventEmitter
- virtual uint32_t GetEventStatus();
- virtual int GetType();
-
- // Called by EventEmitter to signal the Listener that a new event is
- // available.
- void Signal(const ScopedEventInfo& info);
-
- // Wait for one or more previously Tracked events to take place
- // or until ms_timeout expires, and fills |events| up to |max| limit.
- // The number of events recored is returned in |count|.
- Error Wait(EventData* events, int max, int ms_timeout, int* out_count);
-
- // Tracks a new set of POLL events for a given unique |id|. The
- // |user_data| will be returned in the Wait when an event of type |filter|
- // is received with that |id|.
- Error Track(int id,
- const ScopedEventEmitter& emitter,
- uint32_t filter,
- uint64_t user_data);
-
- // Updates the tracking of events for |id|, replacing the |user_data|
- // that's returned, as well as which events will signal.
- Error Update(int id, uint32_t filter, uint64_t user_data);
-
- // Unregisters the existing |id|.
- Error Free(int id);
-
- // Notification by EventEmitter that it is abandoning the event. Do not
- // access the emitter after this.
- void AbandonedEventInfo(const ScopedEventInfo& event);
+ explicit EventListenerLock(EventEmitter* emitter);
+ ~EventListenerLock();
+
+ // Called by EventEmitter to signal the Listener that a new event is
+ // available.
+ virtual void ReceiveEvents(EventEmitter* emitter, uint32_t events);
+
+ // Called with the emitters lock held (which happens in the constructor).
+ // Waits in a condvar until one of the events in |events| is raised or
+ // or the timeout expired. Returns with the emitter lock held, which
+ // will be release when the destructor is called.
+ //
+ // On Error:
+ // ETIMEOUT if the timeout is exceeded.
+ // EINTR if the wait was interrupted.
+ Error WaitOnEvent(uint32_t events, int ms_max);
+
+private:
+ EventEmitter* emitter_;
+ sdk_util::AutoLock* lock_;
+ uint32_t events_;
+ DISALLOW_COPY_AND_ASSIGN(EventListenerLock);
+};
- private:
- // Protects the data in the EventInfo map.
- sdk_util::SimpleLock info_lock_;
- // Map from ID to live a event info.
- EventInfoMap_t event_info_map_;
+class EventListenerPoll : public EventListener {
+ public:
+ EventListenerPoll() : EventListener(), signaled_(0) {}
- // Protects waiting_, signaled_ and used with the signal_cond_.
- sdk_util::SimpleLock signal_lock_;
- pthread_cond_t signal_cond_;
+ // Called by EventEmitter to signal the Listener that a new event is
+ // available.
+ virtual void ReceiveEvents(EventEmitter* emitter, uint32_t events);
- // The number of threads currently waiting on this Listener.
- uint32_t waiting_;
+ // Wait for the any requested emitter/filter pairs to emit one of the
+ // events in the matching filter. Returns 0 on success.
+ //
+ // On Error:
+ // ETIMEOUT if the timeout is exceeded.
+ // EINTR if the wait was interrupted.
+ Error WaitOnAny(EventRequest* requests, size_t cnt, int ms_max);
- // Set of event infos signaled during a wait.
- EventInfoSet_t signaled_;
+ private:
+ sdk_util::SimpleLock signal_lock_;
+ EmitterRequestMap_t emitters_;
+ size_t signaled_;
+ DISALLOW_COPY_AND_ASSIGN(EventListenerPoll);
};
-typedef sdk_util::ScopedRef<EventListener> ScopedEventListener;
-
} // namespace nacl_io
#endif /* LIBRARIES_NACL_IO_EVENT_LISTENER_H_ */
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_char.cc b/native_client_sdk/src/libraries/nacl_io/fifo_char.cc
new file mode 100644
index 0000000..db7956a
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_char.cc
@@ -0,0 +1,120 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/fifo_char.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <algorithm>
+
+namespace nacl_io {
+
+FIFOChar::FIFOChar(size_t size)
+ : buffer_(NULL),
+ size_(size),
+ avail_(0),
+ tail_(0) {
+ if (size) {
+ buffer_ = new char[size];
+ } else {
+ buffer_ = NULL;
+ }
+}
+
+FIFOChar::~FIFOChar() {
+ delete[] buffer_;
+}
+
+bool FIFOChar::IsEmpty() {
+ return avail_ == 0;
+}
+
+bool FIFOChar::IsFull() {
+ return avail_ >= size_;
+}
+
+bool FIFOChar::Resize(size_t len) {
+ // Can not resize smaller than the current size
+ if (len < avail_)
+ return false;
+
+ // Read current data into new buffer
+ char* data = new char[len];
+ avail_ = Read(data, avail_);
+
+ // Replace buffer
+ delete[] buffer_;
+ buffer_ = data;
+ size_ = len;
+ return true;
+}
+
+
+size_t FIFOChar::ReadAvailable() {
+ return avail_;
+}
+
+size_t FIFOChar::WriteAvailable() {
+ return size_ - avail_;
+}
+
+size_t FIFOChar::Peek(void* buf, size_t len) {
+ char* ptr = static_cast<char*>(buf);
+
+ size_t out = 0;
+ len = std::min(len, avail_);
+
+ size_t offs = tail_;
+ while (len > 0) {
+ size_t read_len = std::min(len, size_ - offs);
+ memcpy(ptr, &buffer_[offs], read_len);
+
+ ptr += read_len;
+ offs += read_len;
+ if (static_cast<size_t>(offs) == size_)
+ offs = 0;
+
+ out += read_len;
+ len -= read_len;
+ }
+
+ return out;
+}
+
+size_t FIFOChar::Read(void* buf, size_t len) {
+ size_t read_len = Peek(buf, len);
+ if (read_len > 0) {
+ avail_ -= read_len;
+ tail_ = (tail_ + read_len) % size_;
+ }
+ return read_len;
+}
+
+size_t FIFOChar::Write(const void* buf, size_t len) {
+ const char* ptr = static_cast<const char*>(buf);
+ size_t out = 0;
+ size_t room = size_ - avail_;
+ len = std::min(len, room);
+
+ size_t offs = tail_ + avail_;
+ while (len > 0) {
+ size_t write_len = std::min(len, size_ - offs);
+ memcpy(&buffer_[offs], ptr, write_len);
+
+ ptr += write_len;
+ offs += write_len;
+ if (offs == size_)
+ offs = 0;
+
+ out += write_len;
+ len -= write_len;
+ }
+
+ avail_ += out;
+ return out;
+}
+
+
+} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_char.h b/native_client_sdk/src/libraries/nacl_io/fifo_char.h
new file mode 100644
index 0000000..e38034a
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_char.h
@@ -0,0 +1,54 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_FIFO_CHAR_H_
+#define LIBRARIES_NACL_IO_FIFO_CHAR_H_
+
+#include <vector>
+
+#include "nacl_io/fifo_interface.h"
+
+#include "sdk_util/macros.h"
+
+namespace nacl_io {
+
+// FIFOChar
+//
+// A FIFOChar is a circular buffer, signalling FULL and EMPTY as appropriate.
+class FIFOChar : public FIFOInterface {
+ public:
+ explicit FIFOChar(size_t size);
+ virtual ~FIFOChar();
+
+ virtual bool IsEmpty();
+ virtual bool IsFull();
+ virtual bool Resize(size_t len);
+
+ size_t ReadAvailable();
+ size_t WriteAvailable();
+
+ // Reads out no more than the requested len without updating the tail.
+ // Returns actual amount read.
+ size_t Peek(void* buf, size_t len);
+
+ // Reads out the data making room in the FIFO. Returns actual amount
+ // read.
+ size_t Read(void* buf, size_t len);
+
+ // Writes into the FIFO no more than len bytes, returns actual amount
+ // written.
+ size_t Write(const void* buf, size_t len);
+
+private:
+ char* buffer_;
+ size_t size_; // Size of the FIFO
+ size_t avail_; // How much data is currently available
+ size_t tail_; // Next read location
+
+ DISALLOW_COPY_AND_ASSIGN(FIFOChar);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_FIFO_CHAR_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_interface.h b/native_client_sdk/src/libraries/nacl_io/fifo_interface.h
new file mode 100644
index 0000000..46d4e82
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_interface.h
@@ -0,0 +1,32 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_FIFO_INTERFACE_H_
+#define LIBRARIES_NACL_IO_FIFO_INTERFACE_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+namespace nacl_io {
+
+// FIFOInterface
+//
+// FIFOInterface provides a common interface for Emitters to update their
+// signalled state. FIFOs do not have any internal locking and instead
+// reply on a parent (usually an emitter) to lock for them as appropriate.
+class FIFOInterface {
+ public:
+ virtual ~FIFOInterface() {};
+
+ virtual bool IsEmpty() = 0;
+ virtual bool IsFull() = 0;
+ virtual bool Resize(size_t len) = 0;
+
+ virtual uint32_t ReadAvailable() = 0;
+ virtual uint32_t WriteAvailable() = 0;
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_FIFO_INTERFACE_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_null.h b/native_client_sdk/src/libraries/nacl_io/fifo_null.h
new file mode 100644
index 0000000..88bdbe5
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_null.h
@@ -0,0 +1,29 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_FIFO_NULL_H_
+#define LIBRARIES_NACL_IO_FIFO_NULL_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+namespace nacl_io {
+
+// FIFONull
+//
+// A null fifo is always ready to read or write, but never actually
+// provides or stores data.
+class FIFONull : public FIFOInterface {
+ public:
+ virtual bool IsEmpty() { return false; }
+ virtual bool IsFull() { return false; }
+ virtual bool Resize(size_t) { return false; }
+
+ virtual uint32_t ReadAvailable() { return 1; }
+ virtual uint32_t WriteAvailable() { return 1; }
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_FIFO_NULL_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_packet.cc b/native_client_sdk/src/libraries/nacl_io/fifo_packet.cc
new file mode 100644
index 0000000..94354af
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_packet.cc
@@ -0,0 +1,72 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/fifo_packet.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <algorithm>
+
+#include "nacl_io/packet.h"
+
+namespace nacl_io {
+
+FIFOPacket::FIFOPacket(size_t size)
+ : max_bytes_(size),
+ cur_bytes_(0) {}
+
+FIFOPacket::~FIFOPacket() {
+ while (!IsEmpty())
+ delete ReadPacket();
+}
+
+bool FIFOPacket::IsEmpty() {
+ return packets_.empty();
+}
+
+bool FIFOPacket::Resize(size_t len) {
+ max_bytes_ = len;
+ return true;
+}
+
+size_t FIFOPacket::ReadAvailable() {
+ return cur_bytes_;
+}
+
+size_t FIFOPacket::WriteAvailable() {
+ if (cur_bytes_ > max_bytes_)
+ return 0;
+
+ return max_bytes_ - cur_bytes_;
+}
+
+bool FIFOPacket::IsFull() {
+ return cur_bytes_ >= max_bytes_;
+}
+
+Packet* FIFOPacket::PeekPacket() {
+ if (packets_.empty())
+ return NULL;
+
+ return packets_.back();
+}
+
+Packet* FIFOPacket::ReadPacket() {
+ if (packets_.empty())
+ return NULL;
+
+ Packet* out = packets_.back();
+ packets_.pop_back();
+
+ cur_bytes_ -= out->len();
+ return out;
+}
+
+void FIFOPacket::WritePacket(Packet* packet) {
+ cur_bytes_ += packet->len();
+ packets_.push_front(packet);
+}
+
+} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/fifo_packet.h b/native_client_sdk/src/libraries/nacl_io/fifo_packet.h
new file mode 100644
index 0000000..b9e5c44
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/fifo_packet.h
@@ -0,0 +1,59 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_FIFO_PACKET_H_
+#define LIBRARIES_NACL_IO_FIFO_PACKET_H_
+
+#include <string.h>
+
+#include <list>
+#include <vector>
+
+#include "nacl_io/fifo_interface.h"
+#include "ppapi/c/pp_resource.h"
+
+#include "sdk_util/macros.h"
+
+namespace nacl_io {
+
+class Packet;
+
+// FIFOPacket
+//
+// A FIFOPackiet is linked list of packets. Data is stored and returned
+// in packet size increments. FIFOPacket signals EMPTY where there are
+// no packets, and FULL when the total bytes of all packets meets or
+// exceeds the max size hint.
+class FIFOPacket : public FIFOInterface {
+ public:
+ explicit FIFOPacket(size_t size);
+ virtual ~FIFOPacket();
+
+ virtual bool IsEmpty();
+ virtual bool IsFull();
+ virtual bool Resize(size_t len);
+
+ size_t ReadAvailable();
+ size_t WriteAvailable();
+
+ // Return a pointer to the top packet without releasing ownership.
+ Packet* PeekPacket();
+
+ // Relinquish top packet, and remove it from the FIFO.
+ Packet* ReadPacket();
+
+ // Take ownership of packet and place it in the FIFO.
+ void WritePacket(Packet* packet);
+
+ private:
+ std::list<Packet*> packets_;
+ uint32_t max_bytes_;
+ uint32_t cur_bytes_;
+
+ DISALLOW_COPY_AND_ASSIGN(FIFOPacket);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_FIFO_PACKET_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_intercept.cc b/native_client_sdk/src/libraries/nacl_io/kernel_intercept.cc
index b378e86..fc59036 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_intercept.cc
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_intercept.cc
@@ -131,6 +131,11 @@ int ki_open(const char *path, int oflag) {
return s_kp->open(path, oflag);
}
+int ki_pipe(int pipefds[2]) {
+ ON_NOSYS_RETURN(-1);
+ return s_kp->pipe(pipefds);
+}
+
ssize_t ki_read(int fd, void *buf, size_t nbyte) {
ON_NOSYS_RETURN(-1);
return s_kp->read(fd, buf, nbyte);
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_intercept.h b/native_client_sdk/src/libraries/nacl_io/kernel_intercept.h
index c17b61c..713fcbd 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_intercept.h
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_intercept.h
@@ -44,6 +44,7 @@ int ki_mount(const char* source, const char* target, const char* filesystemtype,
unsigned long mountflags, const void *data);
int ki_umount(const char* path);
int ki_open(const char* path, int oflag);
+int ki_pipe(int pipefds[2]);
ssize_t ki_read(int fd, void* buf, size_t nbyte);
ssize_t ki_write(int fd, const void* buf, size_t nbyte);
int ki_fstat(int fd, struct stat *buf);
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_object.h b/native_client_sdk/src/libraries/nacl_io/kernel_object.h
index 036ff64..0426f15 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_object.h
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_object.h
@@ -17,6 +17,7 @@
#include "nacl_io/mount_node.h"
#include "nacl_io/path.h"
+#include "sdk_util/macros.h"
#include "sdk_util/simple_lock.h"
namespace nacl_io {
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
index a929a6a..4445d39 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.cc
@@ -29,9 +29,11 @@
#include "nacl_io/mount_http.h"
#include "nacl_io/mount_mem.h"
#include "nacl_io/mount_node.h"
+#include "nacl_io/mount_node_pipe.h"
#include "nacl_io/mount_node_tcp.h"
#include "nacl_io/mount_node_udp.h"
#include "nacl_io/mount_passthrough.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/osmman.h"
#include "nacl_io/ossocket.h"
#include "nacl_io/osstat.h"
@@ -48,30 +50,10 @@
namespace nacl_io {
-class SignalEmitter : public EventEmitter {
- public:
- // From EventEmitter. The SignalEmitter exists in order
- // to inturrupt anything waiting in select()/poll() when kill()
- // is called. It is an edge trigger only and therefore has no
- // persistent readable/wriable/error state.
- uint32_t GetEventStatus() {
- return 0;
- }
-
- int GetType() {
- // For lack of a better type, report socket to signify it can be in an
- // used to signal.
- return S_IFSOCK;
- }
-
- void SignalOccurred() {
- RaiseEvent(POLLERR);
- }
-};
KernelProxy::KernelProxy() : dev_(0), ppapi_(NULL),
sigwinch_handler_(SIG_IGN),
- signal_emitter_(new SignalEmitter) {
+ signal_emitter_(new EventEmitter) {
}
@@ -132,8 +114,8 @@ Error KernelProxy::Init(PepperInterface* ppapi) {
#endif
StringMap_t args;
- socket_mount_.reset(new MountSocket());
- result = socket_mount_->Init(0, args, ppapi);
+ stream_mount_.reset(new MountStream());
+ result = stream_mount_->Init(0, args, ppapi);
if (result != 0) {
assert(false);
rtn = result;
@@ -193,6 +175,29 @@ int KernelProxy::open(const char* path, int oflags) {
return AllocateFD(handle);
}
+int KernelProxy::pipe(int pipefds[2]) {
+ MountNodePipe* pipe = new MountNodePipe(stream_mount_.get());
+ ScopedMountNode node(pipe);
+
+ if (pipe->Init(S_IREAD | S_IWRITE) == 0) {
+ ScopedKernelHandle handle0(new KernelHandle(stream_mount_, node));
+ ScopedKernelHandle handle1(new KernelHandle(stream_mount_, node));
+
+ // Should never fail, but...
+ if (handle0->Init(S_IREAD) || handle1->Init(S_IWRITE)) {
+ errno = EACCES;
+ return -1;
+ }
+
+ pipefds[0] = AllocateFD(handle0);
+ pipefds[1] = AllocateFD(handle1);
+ return 0;
+ }
+
+ errno = ENOSYS;
+ return -1;
+}
+
int KernelProxy::close(int fd) {
ScopedKernelHandle handle;
Error error = AcquireHandle(fd, &handle);
@@ -773,7 +778,8 @@ int KernelProxy::kill(pid_t pid, int sig) {
}
// Raise an event so that select/poll get interrupted.
- signal_emitter_->SignalOccurred();
+ AUTO_LOCK(signal_emitter_->GetLock())
+ signal_emitter_->RaiseEvents_Locked(POLLERR);
switch (sig) {
case SIGWINCH:
if (sigwinch_handler_ != SIG_IGN)
@@ -788,7 +794,6 @@ int KernelProxy::kill(pid_t pid, int sig) {
errno = EINVAL;
return -1;
}
-
return 0;
}
@@ -831,204 +836,166 @@ sighandler_t KernelProxy::sigset(int signum, sighandler_t handler) {
int KernelProxy::select(int nfds, fd_set* readfds, fd_set* writefds,
fd_set* exceptfds, struct timeval* timeout) {
- ScopedEventListener listener(new EventListener);
-
- std::vector<struct pollfd> fds;
-
- fd_set readout, writeout, exceptout;
-
- FD_ZERO(&readout);
- FD_ZERO(&writeout);
- FD_ZERO(&exceptout);
-
- int fd;
- size_t event_cnt = 0;
- int event_track = 0;
- for (fd = 0; fd < nfds; fd++) {
+ fd_set ignore;
+ std::vector<pollfd> pollfds;
+
+ // Simplify logic, by using an IGNORE set for any undefined set
+ FD_ZERO(&ignore);
+ if (NULL == readfds)
+ readfds = &ignore;
+ if (NULL == writefds)
+ writefds = &ignore;
+ if (NULL == exceptfds)
+ exceptfds = &ignore;
+
+ for (int fd = 0; fd < nfds; fd++) {
int events = 0;
-
- if (readfds != NULL && FD_ISSET(fd, readfds))
+ if (FD_ISSET(fd, readfds))
events |= POLLIN;
- if (writefds != NULL && FD_ISSET(fd, writefds))
+ if (FD_ISSET(fd, writefds))
events |= POLLOUT;
- if (exceptfds != NULL && FD_ISSET(fd, exceptfds))
+ if (FD_ISSET(fd, exceptfds))
events |= POLLERR | POLLHUP;
- // If we are not interested in this FD, skip it
- if (0 == events) continue;
-
- ScopedKernelHandle handle;
- Error err = AcquireHandle(fd, &handle);
-
- // Select will return immediately if there are bad FDs.
- if (err != 0) {
- errno = EBADF;
- return -1;
+ if (events) {
+ pollfd info;
+ info.fd = fd;
+ info.events = events;
+ pollfds.push_back(info);
}
+ }
- int status = handle->node()->GetEventStatus() & events;
- if (status & POLLIN) {
- FD_SET(fd, &readout);
- event_cnt++;
- }
+ FD_ZERO(readfds);
+ FD_ZERO(writefds);
+ FD_ZERO(exceptfds);
- if (status & POLLOUT) {
- FD_SET(fd, &writeout);
- event_cnt++;
- }
+ // NULL timeout signals wait forever.
+ int ms_timeout = -1;
+ if (timeout != NULL) {
+ int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
- if (status & (POLLERR | POLLHUP)) {
- FD_SET(fd, &exceptout);
- event_cnt++;
+ // If the timeout is invalid or too long (larger than signed 32 bit).
+ if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
+ (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
+ (ms < 0) || (ms >= INT_MAX)) {
+ errno = EINVAL;
+ return -1;
}
- // Otherwise track it.
- if (0 == status) {
- err = listener->Track(fd, handle->node(), events, fd);
- if (err != 0) {
- errno = EBADF;
- return -1;
- }
- event_track++;
- }
+ ms_timeout = static_cast<int>(ms);
}
- // If nothing is signaled, then we must wait.
- if (event_cnt == 0) {
- std::vector<EventData> events;
- int ready_cnt;
- int ms_timeout;
-
- // NULL timeout signals wait forever.
- if (timeout == NULL) {
- ms_timeout = -1;
- } else {
- int64_t ms = timeout->tv_sec * 1000 + ((timeout->tv_usec + 500) / 1000);
-
- // If the timeout is invalid or too long (larger than signed 32 bit).
- if ((timeout->tv_sec < 0) || (timeout->tv_sec >= (INT_MAX / 1000)) ||
- (timeout->tv_usec < 0) || (timeout->tv_usec >= 1000000) ||
- (ms < 0) || (ms >= INT_MAX)) {
- errno = EINVAL;
- return -1;
- }
+ int result = poll(&pollfds[0], pollfds.size(), ms_timeout);
+ if (result == -1)
+ return -1;
- ms_timeout = static_cast<int>(ms);
+ int event_cnt = 0;
+ for (size_t index = 0; index < pollfds.size(); index++) {
+ pollfd* info = &pollfds[index];
+ if (info->revents & POLLIN) {
+ FD_SET(info->fd, readfds);
+ event_cnt++;
}
-
- // Add a special node to listen for events
- // coming from the KernelProxy itself (kill will
- // generated a SIGERR event).
- listener->Track(-1, signal_emitter_, POLLERR, -1);
- event_track += 1;
-
- events.resize(event_track);
-
- bool interrupted = false;
- listener->Wait(events.data(), event_track, ms_timeout, &ready_cnt);
- for (fd = 0; static_cast<int>(fd) < ready_cnt; fd++) {
- if (events[fd].user_data == static_cast<uint64_t>(-1)) {
- if (events[fd].events & POLLERR) {
- interrupted = true;
- }
- continue;
- }
-
- if (events[fd].events & POLLIN) {
- FD_SET(events[fd].user_data, &readout);
- event_cnt++;
- }
-
- if (events[fd].events & POLLOUT) {
- FD_SET(events[fd].user_data, &writeout);
- event_cnt++;
- }
-
- if (events[fd].events & (POLLERR | POLLHUP)) {
- FD_SET(events[fd].user_data, &exceptout);
- event_cnt++;
- }
+ if (info->revents & POLLOUT) {
+ FD_SET(info->fd, writefds);
+ event_cnt++;
}
-
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
- return -1;
+ if (info->revents & (POLLHUP | POLLERR)) {
+ FD_SET(info->fd, exceptfds);
+ event_cnt++;
}
}
- // Copy out the results
- if (readfds != NULL)
- *readfds = readout;
+ return event_cnt;
+}
- if (writefds != NULL)
- *writefds = writeout;
+struct PollInfo {
+ PollInfo() : index(-1) {};
- if (exceptfds != NULL)
- *exceptfds = exceptout;
+ std::vector<struct pollfd*> fds;
+ int index;
+};
- return event_cnt;
-}
+typedef std::map<EventEmitter*, PollInfo> EventPollMap_t;
int KernelProxy::poll(struct pollfd *fds, nfds_t nfds, int timeout) {
- ScopedEventListener listener(new EventListener);
- listener->Track(-1, signal_emitter_, POLLERR, 0);
+ EventPollMap_t event_map;
- int index;
+ std::vector<EventRequest> requests;
size_t event_cnt = 0;
- size_t event_track = 1;
- for (index = 0; static_cast<nfds_t>(index) < nfds; index++) {
+
+ for (int index = 0; static_cast<nfds_t>(index) < nfds; index++) {
ScopedKernelHandle handle;
- struct pollfd* info = &fds[index];
- Error err = AcquireHandle(info->fd, &handle);
+ struct pollfd* fd_info = &fds[index];
+ Error err = AcquireHandle(fd_info->fd, &handle);
+
+ fd_info->revents = 0;
// If the node isn't open, or somehow invalid, mark it so.
if (err != 0) {
- info->revents = POLLNVAL;
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
// If it's already signaled, then just capture the event
- if (handle->node()->GetEventStatus() & info->events) {
- info->revents = info->events & handle->node()->GetEventStatus();
+ ScopedEventEmitter emitter(handle->node()->GetEventEmitter());
+ int events = POLLIN | POLLOUT;
+ if (emitter)
+ events = emitter->GetEventStatus();
+
+ if (events & fd_info->events) {
+ fd_info->revents = events & fd_info->events;
event_cnt++;
continue;
}
- // Otherwise try to track it.
- err = listener->Track(info->fd, handle->node(), info->events, index);
- if (err != 0) {
- info->revents = POLLNVAL;
+ if (NULL == emitter) {
+ fd_info->revents = POLLNVAL;
event_cnt++;
continue;
}
- event_track++;
+
+ // Otherwise try to track it.
+ PollInfo* info = &event_map[emitter.get()];
+ if (info->index == -1) {
+ EventRequest request;
+ request.emitter = emitter;
+ request.filter = fd_info->events;
+ request.events = 0;
+
+ info->index = requests.size();
+ requests.push_back(request);
+ }
+ info->fds.push_back(fd_info);
+ requests[info->index].filter |= fd_info->events;
}
- // If nothing is signaled, then we must wait.
+ // If nothing is signaled, then we must wait on the event map
if (0 == event_cnt) {
- std::vector<EventData> events;
- int ready_cnt;
-
- bool interrupted = false;
- events.resize(event_track);
- listener->Wait(events.data(), event_track, timeout, &ready_cnt);
- for (index = 0; index < ready_cnt; index++) {
- struct pollfd* info = &fds[events[index].user_data];
- if (!info) {
- interrupted = true;
- continue;
- }
-
- info->revents = events[index].events;
- event_cnt++;
- }
- if (0 == event_cnt && interrupted) {
- errno = EINTR;
+ EventListenerPoll wait;
+ Error err = wait.WaitOnAny(&requests[0], requests.size(), timeout);
+ if ((err != 0) && (err != ETIMEDOUT)) {
+ errno = err;
return -1;
}
+
+ for (size_t rindex = 0; rindex < requests.size(); rindex++) {
+ EventRequest* request = &requests[rindex];
+ if (request->events) {
+ PollInfo* poll_info = &event_map[request->emitter.get()];
+ for (size_t findex = 0; findex < poll_info->fds.size(); findex++) {
+ struct pollfd* fd_info = poll_info->fds[findex];
+ uint32_t events = fd_info->events & request->events;
+ if (events) {
+ fd_info->revents = events;
+ event_cnt++;
+ }
+ }
+ }
+ }
}
return event_cnt;
@@ -1337,11 +1304,11 @@ int KernelProxy::socket(int domain, int type, int protocol) {
MountNodeSocket* sock = NULL;
switch (type) {
case SOCK_DGRAM:
- sock = new MountNodeUDP(socket_mount_.get());
+ sock = new MountNodeUDP(stream_mount_.get());
break;
case SOCK_STREAM:
- sock = new MountNodeTCP(socket_mount_.get());
+ sock = new MountNodeTCP(stream_mount_.get());
break;
default:
@@ -1351,7 +1318,7 @@ int KernelProxy::socket(int domain, int type, int protocol) {
ScopedMountNode node(sock);
if (sock->Init(S_IREAD | S_IWRITE) == 0) {
- ScopedKernelHandle handle(new KernelHandle(socket_mount_, node));
+ ScopedKernelHandle handle(new KernelHandle(stream_mount_, node));
return AllocateFD(handle);
}
diff --git a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.h b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.h
index 038b726..27739a9 100644
--- a/native_client_sdk/src/libraries/nacl_io/kernel_proxy.h
+++ b/native_client_sdk/src/libraries/nacl_io/kernel_proxy.h
@@ -8,10 +8,11 @@
#include <map>
#include <string>
+#include "nacl_io/event_emitter.h"
#include "nacl_io/host_resolver.h"
#include "nacl_io/kernel_object.h"
#include "nacl_io/mount_factory.h"
-#include "nacl_io/mount_socket.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/ossignal.h"
#include "nacl_io/ossocket.h"
#include "nacl_io/ostypes.h"
@@ -22,9 +23,7 @@ struct timeval;
namespace nacl_io {
class PepperInterface;
-class SignalEmitter;
-typedef sdk_util::ScopedRef<SignalEmitter> ScopedSignalEmitter;
// KernelProxy provide one-to-one mapping for libc kernel calls. Calls to the
// proxy will result in IO access to the provided Mount and MountNode objects.
@@ -50,6 +49,8 @@ class KernelProxy : protected KernelObject {
// |ppapi| may be NULL. If so, no mount that uses pepper calls can be mounted.
virtual Error Init(PepperInterface* ppapi);
+ virtual int pipe(int pipefds[2]);
+
// NaCl-only function to read resources specified in the NMF file.
virtual int open_resource(const char* file);
@@ -182,7 +183,7 @@ class KernelProxy : protected KernelObject {
protected:
MountFactoryMap_t factories_;
- sdk_util::ScopedRef<MountSocket> socket_mount_;
+ sdk_util::ScopedRef<MountStream> stream_mount_;
int dev_;
PepperInterface* ppapi_;
static KernelProxy *s_instance_;
@@ -195,7 +196,7 @@ class KernelProxy : protected KernelObject {
virtual int AcquireSocketHandle(int fd, ScopedKernelHandle* handle);
#endif
- ScopedSignalEmitter signal_emitter_;
+ ScopedEventEmitter signal_emitter_;
DISALLOW_COPY_AND_ASSIGN(KernelProxy);
};
diff --git a/native_client_sdk/src/libraries/nacl_io/library.dsc b/native_client_sdk/src/libraries/nacl_io/library.dsc
index 533bcf4..08523c6c 100644
--- a/native_client_sdk/src/libraries/nacl_io/library.dsc
+++ b/native_client_sdk/src/libraries/nacl_io/library.dsc
@@ -13,7 +13,13 @@
'SOURCES' : [
'dbgprint.c',
"event_emitter.cc",
+ "event_emitter_pipe.cc",
+ "event_emitter_stream.cc",
+ "event_emitter_tcp.cc",
+ "event_emitter_udp.cc",
"event_listener.cc",
+ "fifo_char.cc",
+ "fifo_packet.cc",
"h_errno.cc",
"host_resolver.cc",
"kernel_handle.cc",
@@ -33,13 +39,16 @@
"mount_node_html5fs.cc",
"mount_node_http.cc",
"mount_node_mem.cc",
+ "mount_node_pipe.cc",
"mount_node_socket.cc",
+ "mount_node_stream.cc",
"mount_node_tcp.cc",
"mount_node_tty.cc",
"mount_node_udp.cc",
"mount_passthrough.cc",
- "mount_socket.cc",
+ "mount_stream.cc",
"nacl_io.cc",
+ "packet.cc",
"path.cc",
"pepper_interface.cc",
"pepper_interface_delegate.cc",
@@ -81,6 +90,7 @@
"syscalls/mount.c",
"syscalls/ntohl.c",
"syscalls/ntohs.c",
+ "syscalls/pipe.c",
"syscalls/poll.c",
"syscalls/rmdir.c",
"syscalls/recv.c",
@@ -117,6 +127,14 @@
"error.h",
"event_emitter.h",
"event_listener.h",
+ "event_emitter_pipe.h",
+ "event_emitter_stream.h",
+ "event_emitter_tcp.h",
+ "event_emitter_udp.h",
+ "fifo_char.h",
+ "fifo_interface.h",
+ "fifo_null.h",
+ "fifo_packet.h",
"host_resolver.h",
"inode_pool.h",
"ioctl.h",
@@ -138,12 +156,14 @@
"mount_node_html5fs.h",
"mount_node_http.h",
"mount_node_mem.h",
+ "mount_node_pipe.h",
"mount_node_socket.h",
+ "mount_node_stream.h",
"mount_node_tcp.h",
"mount_node_tty.h",
"mount_node_udp.h",
"mount_passthrough.h",
- "mount_socket.h",
+ "mount_stream.h",
"nacl_io.h",
"osdirent.h",
"osinttypes.h",
@@ -156,6 +176,7 @@
"osunistd.h",
"osutime.h",
"ostermios.h",
+ "packet.h",
"path.h",
"pepper_interface_delegate.h",
"pepper_interface_dummy.h",
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node.cc b/native_client_sdk/src/libraries/nacl_io/mount_node.cc
index 4a0c065..5598c27 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node.cc
@@ -49,13 +49,14 @@ void MountNode::Destroy() {
}
}
-// Declared in EventEmitter, default to regular files which always return
-// a ready of TRUE for read, write, or error.
+EventEmitter* MountNode::GetEventEmitter() { return NULL; }
+
uint32_t MountNode::GetEventStatus() {
- uint32_t val = POLLIN | POLLOUT | POLLERR;
- return val;
-}
+ if (GetEventEmitter())
+ return GetEventEmitter()->GetEventStatus();
+ return POLLIN | POLLOUT;
+}
Error MountNode::FSync() { return 0; }
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node.h b/native_client_sdk/src/libraries/nacl_io/mount_node.h
index 661a194..37847c9 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node.h
@@ -26,7 +26,7 @@ typedef sdk_util::ScopedRef<MountNode> ScopedMountNode;
// NOTE: The KernelProxy is the only class that should be setting errno. All
// other classes should return Error (as defined by nacl_io/error.h).
-class MountNode : public EventListener {
+class MountNode : public sdk_util::RefObject {
protected:
explicit MountNode(Mount* mount);
virtual ~MountNode();
@@ -37,7 +37,9 @@ class MountNode : public EventListener {
virtual void Destroy();
public:
- // Declared in EventEmitter. defaults to signalled.
+ // Returns the emitter for this Node if it has one, if not, assume this
+ // object can not block.
+ virtual EventEmitter* GetEventEmitter();
virtual uint32_t GetEventStatus();
// Normal OS operations on a node (file), can be called by the kernel
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_char.h b/native_client_sdk/src/libraries/nacl_io/mount_node_char.h
index ebb8616..48ad86f 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_char.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_char.h
@@ -14,10 +14,6 @@ class MountNodeCharDevice : public MountNode {
explicit MountNodeCharDevice(Mount* mount) : MountNode(mount) {
stat_.st_mode = S_IFCHR;
}
-
- virtual uint32_t GetEventStatus() {
- return 0;
- }
};
}
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.cc
new file mode 100644
index 0000000..20d3ab2
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.cc
@@ -0,0 +1,61 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/mount_node_pipe.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <string.h>
+
+#include "nacl_io/event_emitter_pipe.h"
+#include "nacl_io/ioctl.h"
+
+namespace {
+ const size_t kDefaultPipeSize = 512 * 1024;
+}
+
+namespace nacl_io {
+
+MountNodePipe::MountNodePipe(Mount* mnt)
+ : MountNodeStream(mnt),
+ pipe_(new EventEmitterPipe(kDefaultPipeSize)) {
+}
+
+EventEmitter* MountNodePipe::GetEventEmitter() {
+ return pipe_.get();
+}
+
+Error MountNodePipe::Read(size_t offs,
+ void *buf,
+ size_t count,
+ int* out_bytes) {
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLIN, ms);
+ if (err)
+ return err;
+
+ *out_bytes = pipe_->Read_Locked(static_cast<char *>(buf), count);
+ return 0;
+}
+
+Error MountNodePipe::Write(size_t offs,
+ const void *buf,
+ size_t count,
+ int* out_bytes) {
+ int ms = (GetMode() & O_NONBLOCK) ? 0 : write_timeout_;
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLOUT, ms);
+ if (err)
+ return err;
+
+ *out_bytes = pipe_->Write_Locked(static_cast<const char *>(buf), count);
+ return 0;
+}
+
+} // namespace nacl_io
+
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.h b/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.h
new file mode 100644
index 0000000..b7ae7f5
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_pipe.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_MOUNT_NODE_PIPE_H_
+#define LIBRARIES_NACL_IO_MOUNT_NODE_PIPE_H_
+
+#include <map>
+#include <string>
+
+#include "nacl_io/event_emitter_pipe.h"
+#include "nacl_io/mount_node_stream.h"
+
+namespace nacl_io {
+
+class MountNodePipe : public MountNodeStream {
+ public:
+ explicit MountNodePipe(Mount* mnt);
+
+ virtual EventEmitter* GetEventEmitter();
+ virtual Error Read(size_t offs, void *buf, size_t count, int* out_bytes);
+ virtual Error Write(size_t offs, const void *buf,
+ size_t count, int* out_bytes);
+
+ protected:
+ ScopedEventEmitterPipe pipe_;
+
+ friend class KernelProxy;
+ friend class MountStream;
+};
+
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_MOUNT_NODE_PIPE_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_socket.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_socket.cc
index be524d5..abaf55b 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_socket.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_socket.cc
@@ -7,6 +7,7 @@
#include <errno.h>
#include <string.h>
+#include <sys/fcntl.h>
#include "nacl_io/mount.h"
#include "nacl_io/mount_node_socket.h"
@@ -18,10 +19,12 @@
namespace nacl_io {
MountNodeSocket::MountNodeSocket(Mount* mount)
- : MountNode(mount),
+ : MountNodeStream(mount),
socket_resource_(0),
local_addr_(0),
- remote_addr_(0) {
+ remote_addr_(0),
+ socket_flags_(0),
+ last_errno_(0) {
stat_.st_mode |= S_IFSOCK;
}
@@ -32,11 +35,10 @@ void MountNodeSocket::Destroy() {
mount_->ppapi()->ReleaseResource(local_addr_);
if (remote_addr_)
mount_->ppapi()->ReleaseResource(remote_addr_);
-}
-// Default to always signaled, until socket select support is added.
-uint32_t MountNodeSocket::GetEventStatus() {
- return POLLIN | POLLOUT;
+ socket_resource_ = 0;
+ local_addr_ = 0;
+ remote_addr_ = 0;
}
// Assume that |addr| and |out_addr| are non-NULL.
@@ -49,7 +51,8 @@ Error MountNodeSocket::MMap(void* addr,
return EACCES;
}
-// Normal read/write operations on a file
+// Normal read/write operations on a Socket are equivalent to
+// send/recv with a flag value of 0.
Error MountNodeSocket::Read(size_t offs,
void* buf,
size_t count,
@@ -61,18 +64,36 @@ Error MountNodeSocket::Write(size_t offs,
const void* buf,
size_t count,
int* out_bytes) {
- if (0 == remote_addr_)
- return EDESTADDRREQ;
-
return Send(buf, count, 0, out_bytes);
}
-NetAddressInterface* MountNodeSocket::NetAddress() {
+
+NetAddressInterface* MountNodeSocket::NetInterface() {
+ if (mount_->ppapi() == NULL)
+ return NULL;
+
return mount_->ppapi()->GetNetAddressInterface();
}
+TCPSocketInterface* MountNodeSocket::TCPInterface() {
+ if (mount_->ppapi() == NULL)
+ return NULL;
+
+ return mount_->ppapi()->GetTCPSocketInterface();
+}
+
+UDPSocketInterface* MountNodeSocket::UDPInterface() {
+ if (mount_->ppapi() == NULL)
+ return NULL;
+
+ return mount_->ppapi()->GetUDPSocketInterface();
+}
+
PP_Resource MountNodeSocket::SockAddrToResource(const struct sockaddr* addr,
socklen_t len) {
+ if (NULL == addr)
+ return 0;
+
if (AF_INET == addr->sa_family) {
PP_NetAddress_IPv4 addr4;
const sockaddr_in* sin = reinterpret_cast<const sockaddr_in*>(addr);
@@ -115,7 +136,7 @@ socklen_t MountNodeSocket::ResourceToSockAddr(PP_Resource addr,
PP_NetAddress_IPv4 ipv4;
PP_NetAddress_IPv6 ipv6;
- if (PP_TRUE == NetAddress()->DescribeAsIPv4Address(addr, &ipv4)) {
+ if (PP_TRUE == NetInterface()->DescribeAsIPv4Address(addr, &ipv4)) {
sockaddr_in addr4;
addr4.sin_family = AF_INET;
addr4.sin_port = ipv4.port;
@@ -126,7 +147,7 @@ socklen_t MountNodeSocket::ResourceToSockAddr(PP_Resource addr,
return sizeof(sockaddr_in);
}
- if (PP_TRUE == NetAddress()->DescribeAsIPv6Address(addr, &ipv6)) {
+ if (PP_TRUE == NetInterface()->DescribeAsIPv6Address(addr, &ipv6)) {
sockaddr_in6 addr6;
addr6.sin6_family = AF_INET6;
addr6.sin6_port = ipv6.port;
@@ -197,8 +218,9 @@ Error MountNodeSocket::Bind(const struct sockaddr* addr, socklen_t len) {
return EINVAL;
}
+
Error MountNodeSocket::Recv(void* buf, size_t len, int flags, int* out_len) {
- return EINVAL;
+ return RecvFrom(buf, len, flags, NULL, 0, out_len);
}
Error MountNodeSocket::RecvFrom(void* buf,
@@ -207,14 +229,56 @@ Error MountNodeSocket::RecvFrom(void* buf,
struct sockaddr* src_addr,
socklen_t* addrlen,
int* out_len) {
- return EOPNOTSUPP;
+ PP_Resource addr;
+ Error err = RecvHelper(buf, len, flags, &addr, out_len);
+ if (0 != addr) {
+ if (src_addr)
+ *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
+
+ mount_->ppapi()->ReleaseResource(addr);
+ }
+
+ return err;
}
+Error MountNodeSocket::RecvHelper(void* buf,
+ size_t len,
+ int flags,
+ PP_Resource* addr,
+ int* out_len) {
+ if (0 == socket_resource_)
+ return EBADF;
+
+ int ms = read_timeout_;
+ if ((flags & MSG_DONTWAIT) || (GetMode() & O_NONBLOCK))
+ ms = 0;
+
+ //TODO(noelallen) BUG=295177
+ //For UDP we should support filtering packets when using connect
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLIN, ms);
+
+ // Timeout is treated as a would block for sockets.
+ if (ETIMEDOUT == err)
+ return EWOULDBLOCK;
+
+ if (err)
+ return err;
+
+ err = Recv_Locked(buf, len, addr, out_len);
+
+ // We must have read from then inputbuffer, so Q up some receive work.
+ if ((err == 0) && *out_len)
+ QueueInput();
+ return err;
+}
+
+
Error MountNodeSocket::Send(const void* buf,
- size_t len,
- int flags,
- int* out_len) {
- return EOPNOTSUPP;
+ size_t len,
+ int flags,
+ int* out_len) {
+ return SendHelper(buf, len, flags, remote_addr_, out_len);
}
Error MountNodeSocket::SendTo(const void* buf,
@@ -223,7 +287,56 @@ Error MountNodeSocket::SendTo(const void* buf,
const struct sockaddr* dest_addr,
socklen_t addrlen,
int* out_len) {
- return EOPNOTSUPP;
+ if ((NULL == dest_addr) && (0 == remote_addr_))
+ return EDESTADDRREQ;
+
+ PP_Resource addr = SockAddrToResource(dest_addr, addrlen);
+ if (addr) {
+ Error err = SendHelper(buf, len, flags, addr, out_len);
+ mount_->ppapi()->ReleaseResource(addr);
+ return err;
+ }
+
+ return EINVAL;
+}
+
+Error MountNodeSocket::SendHelper(const void* buf,
+ size_t len,
+ int flags,
+ PP_Resource addr,
+ int* out_len) {
+ if (0 == socket_resource_)
+ return EBADF;
+
+ if (0 == addr)
+ return ENOTCONN;
+
+ int ms = write_timeout_;
+ if ((flags & MSG_DONTWAIT) || (GetMode() & O_NONBLOCK))
+ ms = 0;
+
+ EventListenerLock wait(GetEventEmitter());
+ Error err = wait.WaitOnEvent(POLLOUT, ms);
+
+ // Timeout is treated as a would block for sockets.
+ if (ETIMEDOUT == err)
+ return EWOULDBLOCK;
+
+ if (err)
+ return err;
+
+ err = Send_Locked(buf, len, addr, out_len);
+
+ // We must have added to the output buffer, so Q up some transmit work.
+ if ((err == 0) && *out_len)
+ QueueOutput();
+ return err;
+}
+
+void MountNodeSocket::SetError_Locked(int pp_error_num) {
+ SetStreamFlags(SSF_ERROR | SSF_CLOSED);
+ ClearStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
+ last_errno_ = PPErrorToErrno(pp_error_num);
}
Error MountNodeSocket::Shutdown(int how) {
@@ -257,7 +370,6 @@ Error MountNodeSocket::GetSockName(struct sockaddr* addr, socklen_t* len) {
return ENOTCONN;
}
-
} // namespace nacl_io
#endif // PROVIDES_SOCKET_API \ No newline at end of file
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_socket.h b/native_client_sdk/src/libraries/nacl_io/mount_node_socket.h
index a133c19..5cd4ba2 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_socket.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_socket.h
@@ -13,6 +13,8 @@
#include <ppapi/c/ppb_net_address.h>
#include "nacl_io/mount.h"
+#include "nacl_io/mount_node.h"
+#include "nacl_io/mount_node_stream.h"
#include "nacl_io/pepper_interface.h"
namespace nacl_io {
@@ -21,9 +23,9 @@ namespace nacl_io {
* should be looping on Send/Recv size. */
static const size_t MAX_SOCK_TRANSFER = 65536;
-class MountSocket;
+class MountStream;
-class MountNodeSocket : public MountNode {
+class MountNodeSocket : public MountNodeStream {
public:
explicit MountNodeSocket(Mount* mount);
@@ -32,8 +34,6 @@ class MountNodeSocket : public MountNode {
virtual Error Init(int flags) = 0;
public:
- virtual uint32_t GetEventStatus();
-
// Normal read/write operations on a file (recv/send).
virtual Error Read(size_t offs, void* buf, size_t count, int* out_bytes);
virtual Error Write(size_t offs,
@@ -57,7 +57,7 @@ class MountNodeSocket : public MountNode {
size_t offset,
void** out_addr);
- // Normal Functions.
+ // Socket Functions.
virtual Error Bind(const struct sockaddr* addr, socklen_t len);
virtual Error Connect(const struct sockaddr* addr, socklen_t len);
virtual Error Recv(void* buf, size_t len, int flags, int* out_len);
@@ -79,8 +79,44 @@ class MountNodeSocket : public MountNode {
virtual Error GetPeerName(struct sockaddr* addr, socklen_t* len);
virtual Error GetSockName(struct sockaddr* addr, socklen_t* len);
+ PP_Resource socket_resource() { return socket_resource_; }
+
+ // Updates socket's state, recording last error.
+ void SetError_Locked(int pp_error_num);
+
protected:
- NetAddressInterface* NetAddress();
+
+ // Wraps common error checks, timeouts, work pump for send.
+ Error SendHelper(const void* buf,
+ size_t len,
+ int flags,
+ PP_Resource addr,
+ int* out_len);
+
+ // Wraps common error checks, timeouts, work pump for recv.
+ Error RecvHelper(void* buf,
+ size_t len,
+ int flags,
+ PP_Resource* addr,
+ int* out_len);
+
+
+ // Per socket type send and recv
+ virtual Error Recv_Locked(void* buffer,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len) = 0;
+
+ virtual Error Send_Locked(const void* buffer,
+ size_t len,
+ PP_Resource addr,
+ int* out_len) = 0;
+
+
+ NetAddressInterface* NetInterface();
+ TCPSocketInterface* TCPInterface();
+ UDPSocketInterface* UDPInterface();
+
PP_Resource SockAddrToResource(const struct sockaddr* addr, socklen_t len);
socklen_t ResourceToSockAddr(PP_Resource addr,
socklen_t len,
@@ -92,9 +128,11 @@ class MountNodeSocket : public MountNode {
PP_Resource socket_resource_;
PP_Resource local_addr_;
PP_Resource remote_addr_;
+ uint32_t socket_flags_;
+ int last_errno_;
friend class KernelProxy;
- friend class MountSocket;
+ friend class MountStream;
};
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_stream.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_stream.cc
new file mode 100644
index 0000000..c3cbc23
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_stream.cc
@@ -0,0 +1,58 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/mount_node_stream.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <string.h>
+
+#include "nacl_io/ioctl.h"
+#include "nacl_io/mount_stream.h"
+#include "sdk_util/atomicops.h"
+
+
+namespace nacl_io {
+
+MountNodeStream::MountNodeStream(Mount* mnt)
+ : MountNode(mnt),
+ read_timeout_(-1),
+ write_timeout_(-1),
+ stream_state_flags_(0) {
+}
+
+Error MountNodeStream::Init(int perm) {
+ MountNode::Init(perm);
+ if (perm & O_NONBLOCK)
+ SetStreamFlags(SSF_NON_BLOCK);
+
+ return 0;
+}
+
+void MountNodeStream::SetStreamFlags(uint32_t bits) {
+ sdk_util::AtomicOrFetch(&stream_state_flags_, bits);
+}
+
+void MountNodeStream::ClearStreamFlags(uint32_t bits) {
+ sdk_util::AtomicAndFetch(&stream_state_flags_, ~bits);
+}
+
+uint32_t MountNodeStream::GetStreamFlags() {
+ return stream_state_flags_;
+}
+
+bool MountNodeStream::TestStreamFlags(uint32_t bits) {
+ return (stream_state_flags_ & bits) == bits;
+}
+
+
+void MountNodeStream::QueueInput() {}
+void MountNodeStream::QueueOutput() {}
+
+MountStream* MountNodeStream::mount_stream() {
+ return static_cast<MountStream*>(mount_);
+}
+
+} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_stream.h b/native_client_sdk/src/libraries/nacl_io/mount_node_stream.h
new file mode 100644
index 0000000..c855aff
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_stream.h
@@ -0,0 +1,62 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_MOUNT_NODE_STREAM_H_
+#define LIBRARIES_NACL_IO_MOUNT_NODE_STREAM_H_
+
+#include <map>
+#include <string>
+
+#include "nacl_io/event_emitter_pipe.h"
+#include "nacl_io/mount_node.h"
+#include "sdk_util/atomicops.h"
+
+namespace nacl_io {
+
+class MountNodeStream;
+class MountStream;
+
+typedef sdk_util::ScopedRef<MountNodeStream> ScopedMountNodeStream;
+
+enum StreamStateFlags {
+ SSF_CONNECTING = 0x0001,
+ SSF_SENDING = 0x0002,
+ SSF_RECVING = 0x0004,
+ SSF_CLOSING = 0x0008,
+ SSF_CAN_SEND = 0x0020,
+ SSF_CAN_RECV = 0x0040,
+ SSF_NON_BLOCK = 0x1000,
+ SSF_ERROR = 0x4000,
+ SSF_CLOSED = 0x8000
+};
+
+
+class MountNodeStream : public MountNode {
+ public:
+ explicit MountNodeStream(Mount* mnt);
+
+ virtual Error Init(int perm);
+
+ // Attempts to pump input and output
+ virtual void QueueInput();
+ virtual void QueueOutput();
+
+ void SetStreamFlags(uint32_t bits);
+ void ClearStreamFlags(uint32_t bits);
+ uint32_t GetStreamFlags();
+ bool TestStreamFlags(uint32_t bits);
+
+ MountStream* mount_stream();
+
+ protected:
+ int read_timeout_;
+ int write_timeout_;
+
+ private:
+ sdk_util::Atomic32 stream_state_flags_;
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_MOUNT_NODE_STREAM_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
index 5a5f9f1..bdc4b10 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc
@@ -10,34 +10,191 @@
#include <string.h>
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
#include "nacl_io/mount_node_tcp.h"
+#include "nacl_io/mount_stream.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {}
+class TCPWork : public MountStream::Work {
+ public:
+ explicit TCPWork(const ScopedEventEmitterTCP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ data_(NULL) {
+ }
+
+ ~TCPWork() {
+ delete[] data_;
+ }
+
+ TCPSocketInterface* TCPInterface() {
+ return mount()->ppapi()->GetTCPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterTCP emitter_;
+ char* data_;
+};
+
+
+class TCPSendWork : public TCPWork {
+ public:
+ explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it send?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
+ return false;
+
+ // If not currently sending...
+ if (!stream->TestStreamFlags(SSF_SENDING)) {
+ size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
+
+ if (capped_len == 0)
+ return false;
+
+ data_ = new char[capped_len];
+ emitter_->ReadOut_Locked(data_, capped_len);
+
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = TCPInterface()->Write(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is still there...
+ if (stream) {
+ // And we did send, then Q more work.
+ if (length_error >= 0) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ } else {
+ // Otherwise this socket has gone bad.
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
+
+class TCPRecvWork : public TCPWork {
+ public:
+ explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
+ : TCPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // Does the stream exist, and can it recv?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
+ return false;
+
+ // If we are not currently receiving
+ if (!stream->TestStreamFlags(SSF_RECVING)) {
+ size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
+ int capped_len =
+ static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
+
+ if (capped_len == 0)
+ return false;
+
+ stream->SetStreamFlags(SSF_RECVING);
+ data_ = new char[capped_len];
+ int err = TCPInterface()->Read(stream->socket_resource(),
+ data_,
+ capped_len,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (length_error > 0) {
+ emitter_->WriteIn_Locked(data_, length_error);
+ stream->QueueInput();
+ } else {
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
-TCPSocketInterface* MountNodeTCP::TCPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
- return mount_->ppapi()->GetTCPSocketInterface();
+MountNodeTCP::MountNodeTCP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeTCP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
}
Error MountNodeTCP::Init(int flags) {
- if (TCPSocket() == NULL)
+ if (TCPInterface() == NULL)
return EACCES;
- socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
+ return emitter_.get();
+}
+
+void MountNodeTCP::QueueInput() {
+ TCPRecvWork* work = new TCPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeTCP::QueueOutput() {
+ TCPSendWork* work = new TCPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+
+// We can not bind a client socket with PPAPI. For now we ignore the
+// bind but report the correct address later, just in case someone is
+// binding without really caring what the address is (for example to
+// select a more optimized interface/route.)
Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
AUTO_LOCK(node_lock_);
@@ -65,9 +222,9 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
if (0 == remote_addr_)
return EINVAL;
- int err = TCPSocket()->Connect(socket_resource_,
- remote_addr_,
- PP_BlockUntilComplete());
+ int err = TCPInterface()->Connect(socket_resource_,
+ remote_addr_,
+ PP_BlockUntilComplete());
// If we fail, release the dest addr resource
if (err != PP_OK) {
@@ -76,71 +233,40 @@ Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
return PPErrorToErrno(err);
}
- local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_);
+ local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
mount_->ppapi()->AddRefResource(local_addr_);
- return 0;
-}
-
-Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
- AUTO_LOCK(node_lock_);
- if (0 == socket_resource_)
- return EBADF;
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Read(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+ // Now that we are connected, we can start sending and receiving.
+ SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
- *out_len = err;
+ // Begin the input pump
+ QueueInput();
return 0;
}
-Error MountNodeTCP::RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len) {
- Error err = Recv(buf, len, flags, out_len);
- if (err == 0)
- GetPeerName(src_addr, addrlen);
- return err;
-}
-
-
-Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
- AUTO_LOCK(node_lock_);
-
- if (0 == socket_resource_)
- return EBADF;
- if (0 == remote_addr_)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = TCPSocket()->Write(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
+Error MountNodeTCP::Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len) {
+ *out_len = emitter_->in_fifo()->Read(buf, len);
+ *out_addr = remote_addr_;
- *out_len = err;
+ // Ref the address copy we pass back.
+ mount_->ppapi()->AddRefResource(remote_addr_);
return 0;
}
-Error MountNodeTCP::SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len) {
- return Send(buf, len, flags, out_len);
+// TCP ignores dst addr passed to send_to, and always uses bound address
+Error MountNodeTCP::Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource,
+ int* out_len) {
+ *out_len = emitter_->out_fifo()->Write(buf, len);
+ return 0;
}
+
} // namespace nacl_io
#endif // PROVIDES_SOCKET_API \ No newline at end of file
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h
index 860e93b..077276f 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h
@@ -11,6 +11,7 @@
#include <ppapi/c/pp_resource.h>
#include <ppapi/c/ppb_tcp_socket.h>
+#include "nacl_io/event_emitter_tcp.h"
#include "nacl_io/mount_node.h"
#include "nacl_io/mount_node_socket.h"
@@ -20,29 +21,31 @@ class MountNodeTCP : public MountNodeSocket {
public:
explicit MountNodeTCP(Mount* mount);
+ protected:
virtual Error Init(int flags);
+ virtual void Destroy();
+
+ public:
+ virtual EventEmitterTCP* GetEventEmitter();
+
+ virtual void QueueInput();
+ virtual void QueueOutput();
virtual Error Bind(const struct sockaddr* addr, socklen_t len);
virtual Error Connect(const struct sockaddr* addr, socklen_t len);
- virtual Error Recv(void* buf, size_t len, int flags, int* out_len);
- virtual Error RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len);
-
- virtual Error Send(const void* buf, size_t len, int flags, int* out_len);
- virtual Error SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len);
-
protected:
- TCPSocketInterface* TCPSocket();
+ virtual Error Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len);
+
+ virtual Error Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource addr,
+ int* out_len);
+
+ ScopedEventEmitterTCP emitter_;
};
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tty.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_tty.cc
index 5a2fc7a..4b6c565 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tty.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tty.cc
@@ -32,13 +32,16 @@
namespace nacl_io {
-MountNodeTty::MountNodeTty(Mount* mount) : MountNodeCharDevice(mount),
- is_readable_(false),
- rows_(DEFAULT_TTY_ROWS),
- cols_(DEFAULT_TTY_COLS) {
+MountNodeTty::MountNodeTty(Mount* mount)
+ : MountNodeCharDevice(mount),
+ emitter_(new EventEmitter),
+ rows_(DEFAULT_TTY_ROWS),
+ cols_(DEFAULT_TTY_COLS) {
output_handler_.handler = NULL;
- pthread_cond_init(&is_readable_cond_, NULL);
InitTermios();
+
+ // Output will never block
+ emitter_->RaiseEvents_Locked(POLLOUT);
}
void MountNodeTty::InitTermios() {
@@ -69,14 +72,15 @@ void MountNodeTty::InitTermios() {
termios_.c_cc[VEOL2] = 0;
}
-MountNodeTty::~MountNodeTty() {
- pthread_cond_destroy(&is_readable_cond_);
+EventEmitter* MountNodeTty::GetEventEmitter() {
+ return emitter_.get();
}
Error MountNodeTty::Write(size_t offs,
const void* buf,
size_t count,
int* out_bytes) {
+
AUTO_LOCK(output_lock_);
*out_bytes = 0;
@@ -97,14 +101,17 @@ Error MountNodeTty::Write(size_t offs,
return 0;
}
+
Error MountNodeTty::Read(size_t offs, void* buf, size_t count, int* out_bytes) {
- AUTO_LOCK(node_lock_);
- while (!is_readable_) {
- pthread_cond_wait(&is_readable_cond_, node_lock_.mutex());
- }
+ EventListenerLock wait(GetEventEmitter());
+ *out_bytes = 0;
- size_t bytes_to_copy = std::min(count, input_buffer_.size());
+ // If interrupted, return
+ Error err = wait.WaitOnEvent(POLLIN, -1);
+ if (err != 0)
+ return err;
+ size_t bytes_to_copy = std::min(count, input_buffer_.size());
if (IS_ICANON) {
// Only read up to (and including) the first newline
std::deque<char>::iterator nl = std::find(input_buffer_.begin(),
@@ -129,12 +136,15 @@ Error MountNodeTty::Read(size_t offs, void* buf, size_t count, int* out_bytes) {
// mark input as no longer readable if we consumed
// the entire buffer or, in the case of buffered input,
// we consumed the final \n char.
+ bool avail;
if (IS_ICANON)
- is_readable_ =
- std::find(input_buffer_.begin(),
- input_buffer_.end(), '\n') != input_buffer_.end();
+ avail = std::find(input_buffer_.begin(),
+ input_buffer_.end(), '\n') != input_buffer_.end();
else
- is_readable_ = input_buffer_.size() > 0;
+ avail = input_buffer_.size() > 0;
+
+ if (!avail)
+ emitter_->ClearEvents_Locked(POLLIN);
return 0;
}
@@ -152,7 +162,7 @@ Error MountNodeTty::Echo(const char* string, int count) {
}
Error MountNodeTty::ProcessInput(struct tioc_nacl_input_string* message) {
- AUTO_LOCK(node_lock_);
+ AUTO_LOCK(emitter_->GetLock())
const char* buffer = message->buffer;
size_t num_bytes = message->length;
@@ -212,12 +222,7 @@ Error MountNodeTty::ProcessInput(struct tioc_nacl_input_string* message) {
input_buffer_.push_back(c);
if (c == '\n' || c == termios_.c_cc[VEOF] || !IS_ICANON)
- is_readable_ = true;
- }
-
- if (is_readable_) {
- RaiseEvent(POLLIN);
- pthread_cond_broadcast(&is_readable_cond_);
+ emitter_->RaiseEvents_Locked(POLLIN);
}
return 0;
@@ -251,6 +256,13 @@ Error MountNodeTty::Ioctl(int request, char* arg) {
cols_ = size->ws_col;
}
kill(getpid(), SIGWINCH);
+ {
+ // Wake up any thread waiting on Read with POLLERR then immediate
+ // clear it to signal EINTR.
+ AUTO_LOCK(emitter_->GetLock())
+ emitter_->RaiseEvents_Locked(POLLERR);
+ emitter_->ClearEvents_Locked(POLLERR);
+ }
return 0;
}
case TIOCGWINSZ: {
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_tty.h b/native_client_sdk/src/libraries/nacl_io/mount_node_tty.h
index c8070b9..1c8e65d 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_tty.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_tty.h
@@ -20,7 +20,8 @@ namespace nacl_io {
class MountNodeTty : public MountNodeCharDevice {
public:
explicit MountNodeTty(Mount* mount);
- ~MountNodeTty();
+
+ virtual EventEmitter* GetEventEmitter();
virtual Error Ioctl(int request,
char* arg);
@@ -39,21 +40,14 @@ class MountNodeTty : public MountNodeCharDevice {
virtual Error Tcsetattr(int optional_actions,
const struct termios *termios_p);
- virtual uint32_t GetEventStatus() {
- uint32_t status = POLLOUT;
- if (is_readable_)
- status |= POLLIN;
- return status;
- }
-
private:
+ ScopedEventEmitter emitter_;
+
Error ProcessInput(struct tioc_nacl_input_string* message);
Error Echo(const char* string, int count);
void InitTermios();
std::deque<char> input_buffer_;
- bool is_readable_;
- pthread_cond_t is_readable_cond_;
struct termios termios_;
/// Current height of terminal in rows. Set via ioctl(2).
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
index 310a2bd..cb8c23e 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc
@@ -3,41 +3,194 @@
// found in the LICENSE file.
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
+#include "nacl_io/mount_node_udp.h"
#include <errno.h>
#include <string.h>
+
#include <algorithm>
-#include "nacl_io/mount.h"
-#include "nacl_io/mount_node_socket.h"
-#include "nacl_io/mount_node_udp.h"
+#include "nacl_io/event_emitter_udp.h"
+#include "nacl_io/mount_stream.h"
+#include "nacl_io/packet.h"
#include "nacl_io/pepper_interface.h"
+namespace {
+ const size_t kMaxPacketSize = 65536;
+ const size_t kDefaultFifoSize = kMaxPacketSize * 8;
+}
+
namespace nacl_io {
-MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {}
+class UDPWork : public MountStream::Work {
+ public:
+ explicit UDPWork(const ScopedEventEmitterUDP& emitter)
+ : MountStream::Work(emitter->stream()->mount_stream()),
+ emitter_(emitter),
+ packet_(NULL) {
+ }
+
+ ~UDPWork() {
+ delete packet_;
+ }
+
+ UDPSocketInterface* UDPInterface() {
+ return mount()->ppapi()->GetUDPSocketInterface();
+ }
+
+ protected:
+ ScopedEventEmitterUDP emitter_;
+ Packet* packet_;
+};
+
+
+class UDPSendWork : public UDPWork {
+ public:
+ explicit UDPSendWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {}
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // Does the stream exist, and can it send?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
+ return false;
+
+ // If not currently sending...
+ if (!stream->TestStreamFlags(SSF_SENDING)) {
+ packet_ = emitter_->ReadTXPacket_Locked();
+ if (packet_) {
+ stream->SetStreamFlags(SSF_SENDING);
+ int err = UDPInterface()->SendTo(stream->socket_resource(),
+ packet_->buffer(),
+ packet_->len(),
+ packet_->addr(),
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ // Anything else, we should assume the socket has gone bad.
+ stream->SetError_Locked(err);
+ }
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there...
+ if (stream) {
+ // And we did send, then Q more work.
+ if (length_error >= 0) {
+ stream->ClearStreamFlags(SSF_SENDING);
+ stream->QueueOutput();
+ } else {
+ // Otherwise this socket has gone bad.
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+};
+
+
+class UDPRecvWork : public UDPWork {
+ public:
+ explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
+ : UDPWork(emitter) {
+ data_ = new char[kMaxPacketSize];
+ }
+
+ ~UDPRecvWork() {
+ delete[] data_;
+ }
+
+ virtual bool Start(int32_t val) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // Does the stream exist, and can it recv?
+ if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
+ return false;
+
+ // If the stream is valid and we are not currently receiving
+ if (!stream->TestStreamFlags(SSF_RECVING)) {
+ stream->SetStreamFlags(SSF_RECVING);
+ int err = UDPInterface()->RecvFrom(stream->socket_resource(),
+ data_,
+ kMaxPacketSize,
+ &addr_,
+ mount()->GetRunCompletion(this));
+ if (err == PP_OK_COMPLETIONPENDING)
+ return true;
+
+ stream->SetError_Locked(err);
+ }
+ return false;
+ }
+
+ virtual void Run(int32_t length_error) {
+ AUTO_LOCK(emitter_->GetLock());
+ MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
+
+ // If the stream is still there, see if we can queue more input
+ if (stream) {
+ if (length_error > 0) {
+ Packet* packet = new Packet(mount()->ppapi());
+ packet->Copy(data_, length_error, addr_);
+ emitter_->WriteRXPacket_Locked(packet);
+ stream->ClearStreamFlags(SSF_RECVING);
+ stream->QueueInput();
+ } else {
+ stream->SetError_Locked(length_error);
+ }
+ }
+ }
+
+ private:
+ char* data_;
+ PP_Resource addr_;
+};
-UDPSocketInterface* MountNodeUDP::UDPSocket() {
- if (mount_->ppapi() == NULL)
- return NULL;
+MountNodeUDP::MountNodeUDP(Mount* mount)
+ : MountNodeSocket(mount),
+ emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
+ emitter_->AttachStream(this);
+}
+
+void MountNodeUDP::Destroy() {
+ emitter_->DetachStream();
+ MountNodeSocket::Destroy();
+}
- return mount_->ppapi()->GetUDPSocketInterface();
+EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
+ return emitter_.get();
}
Error MountNodeUDP::Init(int flags) {
- if (UDPSocket() == NULL)
+ if (UDPInterface() == NULL)
return EACCES;
- socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance());
+ socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
+void MountNodeUDP::QueueInput() {
+ UDPRecvWork* work = new UDPRecvWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
+void MountNodeUDP::QueueOutput() {
+ UDPSendWork* work = new UDPSendWork(emitter_);
+ mount_stream()->EnqueueWork(work);
+}
+
Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
@@ -50,14 +203,18 @@ Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == out_addr)
return EINVAL;
- int err = UDPSocket()->Bind(socket_resource_,
- out_addr,
- PP_BlockUntilComplete());
+ int err = UDPInterface()->Bind(socket_resource_,
+ out_addr,
+ PP_BlockUntilComplete());
if (err != 0) {
mount_->ppapi()->ReleaseResource(out_addr);
return PPErrorToErrno(err);
}
+ // Now that we are bound, we can start sending and receiving.
+ SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
+ QueueInput();
+
local_addr_ = out_addr;
return 0;
}
@@ -79,110 +236,48 @@ Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) {
return 0;
}
-Error MountNodeUDP::RecvFromHelper(void* buf,
- size_t len,
- int flags,
- PP_Resource* out_addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->RecvFrom(socket_resource_,
- static_cast<char*>(buf),
- capped_len,
- out_addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
- return 0;
-}
-
-Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
- while (1) {
- int local_len = 0;
- PP_Resource addr = 0;
-
- int err = RecvFromHelper(buf, len, flags, &addr, &local_len);
- if (err < 0)
- return PPErrorToErrno(err);
-
- /* If "connected" then only receive packets from the given remote. */
- bool same = IsEquivalentAddress(addr, remote_addr_);
- mount_->ppapi()->ReleaseResource(addr);
-
- if (remote_addr_ != 0 && same)
- continue;
-
- *out_len = local_len;
+Error MountNodeUDP::Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* out_addr,
+ int* out_len) {
+ Packet* packet = emitter_->ReadRXPacket_Locked();
+ *out_len = 0;
+ *out_addr = 0;
+
+ if (packet) {
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, packet->len()));
+ memcpy(buf, packet->buffer(), capped_len);
+
+ if (packet->addr() != 0) {
+ mount_->ppapi()->AddRefResource(packet->addr());
+ *out_addr = packet->addr();
+ }
+
+ *out_len = capped_len;
+ delete packet;
return 0;
}
-}
-
-Error MountNodeUDP::RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len) {
- PP_Resource addr = 0;
- int err = RecvFromHelper(buf, len, flags, &addr, out_len);
- if (err < 0)
- return PPErrorToErrno(err);
-
- if (src_addr)
- *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
- mount_->ppapi()->ReleaseResource(addr);
- return 0;
+ // Should never happen, Recv_Locked should not be called
+ // unless already in a POLLIN state.
+ return EBADF;
}
-
-Error MountNodeUDP::SendToHelper(const void* buf,
- size_t len,
- int flags,
- PP_Resource addr,
- int* out_len) {
- if (0 == socket_resource_)
- return EBADF;
-
- if (0 == addr)
- return ENOTCONN;
-
- int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
- int err = UDPSocket()->SendTo(socket_resource_,
- static_cast<const char*>(buf),
- capped_len,
- addr,
- PP_BlockUntilComplete());
- if (err < 0)
- return PPErrorToErrno(err);
-
- *out_len = err;
+Error MountNodeUDP::Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource addr,
+ int* out_len) {
+ *out_len = 0;
+ int capped_len =
+ static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
+ Packet* packet = new Packet(mount_->ppapi());
+ packet->Copy(buf, capped_len, addr);
+
+ emitter_->WriteTXPacket_Locked(packet);
+ *out_len = capped_len;
return 0;
}
-Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
- return SendToHelper(buf, len, flags, remote_addr_, out_len);
-}
-
-Error MountNodeUDP::SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len) {
- PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen);
- if (0 == out_addr)
- return EINVAL;
-
- Error err = SendToHelper(buf, len, flags, out_addr, out_len);
- mount_->ppapi()->ReleaseResource(out_addr);
- return err;
-}
-
} // namespace nacl_io
-#endif // PROVIDES_SOCKET_API \ No newline at end of file
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.h b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.h
index ac095f1..2fef513 100644
--- a/native_client_sdk/src/libraries/nacl_io/mount_node_udp.h
+++ b/native_client_sdk/src/libraries/nacl_io/mount_node_udp.h
@@ -11,6 +11,7 @@
#include <ppapi/c/pp_resource.h>
#include <ppapi/c/ppb_udp_socket.h>
+#include "nacl_io/event_emitter_udp.h"
#include "nacl_io/mount_node.h"
#include "nacl_io/mount_node_socket.h"
@@ -20,41 +21,31 @@ class MountNodeUDP : public MountNodeSocket {
public:
explicit MountNodeUDP(Mount* mount);
+ protected:
virtual Error Init(int flags);
+ virtual void Destroy();
+
+ public:
+ virtual EventEmitterUDP* GetEventEmitter();
+
+ virtual void QueueInput();
+ virtual void QueueOutput();
virtual Error Bind(const struct sockaddr* addr, socklen_t len);
virtual Error Connect(const struct sockaddr* addr, socklen_t len);
- virtual Error Recv(void* buf, size_t len, int flags, int* out_len);
- virtual Error RecvFrom(void* buf,
- size_t len,
- int flags,
- struct sockaddr* src_addr,
- socklen_t* addrlen,
- int* out_len);
-
- virtual Error Send(const void* buf, size_t len, int flags, int* out_len);
- virtual Error SendTo(const void* buf,
- size_t len,
- int flags,
- const struct sockaddr* dest_addr,
- socklen_t addrlen,
- int* out_len);
-
protected:
- UDPSocketInterface* UDPSocket();
-
- Error RecvFromHelper(void* buf,
- size_t len,
- int flags,
- PP_Resource* addr,
- int* out_len);
-
- Error SendToHelper(const void* buf,
- size_t len,
- int flags,
- PP_Resource dest_addr,
- int* out_len);
+ virtual Error Recv_Locked(void* buf,
+ size_t len,
+ PP_Resource* addr,
+ int* out_len);
+
+ virtual Error Send_Locked(const void* buf,
+ size_t len,
+ PP_Resource addr,
+ int* out_len);
+
+ ScopedEventEmitterUDP emitter_;
};
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_socket.cc b/native_client_sdk/src/libraries/nacl_io/mount_socket.cc
deleted file mode 100644
index 141c930..0000000
--- a/native_client_sdk/src/libraries/nacl_io/mount_socket.cc
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright (c) 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
-
-#include <errno.h>
-
-#include "nacl_io/mount_node_socket.h"
-#include "nacl_io/mount_socket.h"
-
-namespace nacl_io {
-
-MountSocket::MountSocket() {}
-
-Error MountSocket::Access(const Path& path, int a_mode) { return EACCES; }
-Error MountSocket::Open(const Path& path,
- int o_flags,
- ScopedMountNode* out_node) { return EACCES; }
-
-Error MountSocket::Unlink(const Path& path) { return EACCES; }
-Error MountSocket::Mkdir(const Path& path, int permissions) { return EACCES; }
-Error MountSocket::Rmdir(const Path& path) { return EACCES; }
-Error MountSocket::Remove(const Path& path) { return EACCES; }
-
-} // namespace nacl_io
-#endif // PROVIDES_SOCKET_API \ No newline at end of file
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_socket.h b/native_client_sdk/src/libraries/nacl_io/mount_socket.h
deleted file mode 100644
index f235232..0000000
--- a/native_client_sdk/src/libraries/nacl_io/mount_socket.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright (c) 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef LIBRARIES_NACL_IO_MOUNT_SOCKET_H_
-#define LIBRARIES_NACL_IO_MOUNT_SOCKET_H_
-
-#include "nacl_io/ossocket.h"
-#ifdef PROVIDES_SOCKET_API
-
-#include "nacl_io/mount.h"
-
-namespace nacl_io {
-
-class MountSocket : public Mount {
- protected:
- MountSocket();
-
- private:
- virtual Error Access(const Path& path, int a_mode);
- virtual Error Open(const Path& path,
- int o_flags,
- ScopedMountNode* out_node);
- virtual Error Unlink(const Path& path);
- virtual Error Mkdir(const Path& path, int permissions);
- virtual Error Rmdir(const Path& path);
- virtual Error Remove(const Path& path);
-
- private:
- friend class KernelProxy;
- DISALLOW_COPY_AND_ASSIGN(MountSocket);
-};
-
-} // namespace nacl_io
-
-#endif // PROVIDES_SOCKET_API
-#endif // LIBRARIES_NACL_IO_MOUNT_SOCKET_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_stream.cc b/native_client_sdk/src/libraries/nacl_io/mount_stream.cc
new file mode 100644
index 0000000..71f21847
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_stream.cc
@@ -0,0 +1,102 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/ossocket.h"
+#ifdef PROVIDES_SOCKET_API
+
+#include <errno.h>
+
+#include "nacl_io/mount_node_socket.h"
+#include "nacl_io/mount_stream.h"
+
+namespace nacl_io {
+
+
+void DispatchStart(void* work_ptr, int32_t val) {
+ MountStream::Work* work = static_cast<MountStream::Work*>(work_ptr);
+
+ // Delete if it fails to Start, Run will never get called.
+ if (!work->Start(val))
+ delete work;
+}
+
+void DispatchRun(void* work_ptr, int32_t val) {
+ MountStream::Work* work = static_cast<MountStream::Work*>(work_ptr);
+
+ work->Run(val);
+ delete work;
+}
+
+void* MountStream::StreamThreadThunk(void* mount_ptr) {
+ MountStream* mount = static_cast<MountStream*>(mount_ptr);
+ mount->StreamThread();
+ return NULL;
+}
+
+// All work is done via completions callbacks from posted work.
+void MountStream::StreamThread() {
+ {
+ AUTO_LOCK(message_lock_)
+ message_loop_ =
+ ppapi_->GetMessageLoopInterface()->Create(ppapi()->GetInstance());
+ ppapi_->GetMessageLoopInterface()->AttachToCurrentThread(message_loop_);
+ pthread_cond_broadcast(&message_cond_);
+ }
+
+ // Run loop until Quit is posted.
+ ppapi_->GetMessageLoopInterface()->Run(message_loop_);
+}
+
+PP_CompletionCallback MountStream::GetStartCompletion(Work* work) {
+ return PP_MakeCompletionCallback(DispatchStart, work);
+}
+
+PP_CompletionCallback MountStream::GetRunCompletion(Work* work) {
+ return PP_MakeCompletionCallback(DispatchRun, work);
+}
+
+// Place enqueue onto the socket thread.
+void MountStream::EnqueueWork(Work* work) {
+ if (message_loop_ == 0) {
+ AUTO_LOCK(message_lock_);
+
+ if (message_loop_ == 0) {
+ pthread_t thread;
+ pthread_create(&thread, NULL, StreamThreadThunk, this);
+ }
+
+ while (message_loop_ == 0)
+ pthread_cond_wait(&message_cond_, message_lock_.mutex());
+ }
+
+ PP_CompletionCallback cb = PP_MakeCompletionCallback(DispatchStart, work);
+ ppapi_->GetMessageLoopInterface()->PostWork(message_loop_, cb, 0);
+}
+
+
+MountStream::MountStream()
+ : message_loop_(0) {
+ pthread_cond_init(&message_cond_, NULL);
+}
+
+MountStream::~MountStream() {
+ if (message_loop_) {
+ ppapi_->GetMessageLoopInterface()->PostQuit(message_loop_, PP_TRUE);
+ ppapi_->ReleaseResource(message_loop_);
+ }
+ pthread_cond_destroy(&message_cond_);
+}
+
+Error MountStream::Access(const Path& path, int a_mode) { return EACCES; }
+Error MountStream::Open(const Path& path,
+ int o_flags,
+ ScopedMountNode* out_node) { return EACCES; }
+
+Error MountStream::Unlink(const Path& path) { return EACCES; }
+Error MountStream::Mkdir(const Path& path, int permissions) { return EACCES; }
+Error MountStream::Rmdir(const Path& path) { return EACCES; }
+Error MountStream::Remove(const Path& path) { return EACCES; }
+
+} // namespace nacl_io
+#endif // PROVIDES_SOCKET_API \ No newline at end of file
diff --git a/native_client_sdk/src/libraries/nacl_io/mount_stream.h b/native_client_sdk/src/libraries/nacl_io/mount_stream.h
new file mode 100644
index 0000000..757e132
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/mount_stream.h
@@ -0,0 +1,81 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_MOUNT_STREAM_H_
+#define LIBRARIES_NACL_IO_MOUNT_STREAM_H_
+
+#include "nacl_io/mount.h"
+
+#include "ppapi/c/pp_completion_callback.h"
+#include "ppapi/c/pp_resource.h"
+
+
+namespace nacl_io {
+
+// MountStreams provides a "mount point" for stream objects which do not
+// provide a path, such as FDs returned by pipe, socket, and sockpair. It
+// also provides a background thread for dispatching completion callbacks.
+
+class MountStream;
+class MountNodeStream;
+
+class MountStream : public Mount {
+ public:
+ class Work {
+ public:
+ explicit Work(MountStream* mount) : mount_(mount) {}
+ virtual ~Work() {}
+
+ // Called by adding work the queue, val should be safe to ignore.
+ virtual bool Start(int32_t val) = 0;
+
+ // Called as a completion of work in Start. Value of val depend on
+ // the function invoked in Start.
+ virtual void Run(int32_t val) = 0;
+ MountStream* mount() { return mount_; }
+
+ private:
+ MountStream* mount_;
+ };
+
+ protected:
+ MountStream();
+ virtual ~MountStream();
+
+ public:
+ // Enqueue a work object onto this MountStream's thread
+ void EnqueueWork(Work* work);
+
+ // Returns a completion callback which will execute the StartWork member
+ // of a MountSocketWork object.
+ static PP_CompletionCallback GetStartCompletion(Work* work);
+
+ // Returns a completion callback which will execute the RunCallback member
+ // of a MountSocketWork object.
+ static PP_CompletionCallback GetRunCompletion(Work* work);
+
+ virtual Error Access(const Path& path, int a_mode);
+ virtual Error Open(const Path& path,
+ int o_flags,
+ ScopedMountNode* out_node);
+ virtual Error Unlink(const Path& path);
+ virtual Error Mkdir(const Path& path, int permissions);
+ virtual Error Rmdir(const Path& path);
+ virtual Error Remove(const Path& path);
+
+ static void* StreamThreadThunk(void*);
+ void StreamThread();
+
+ private:
+ PP_Resource message_loop_;
+ pthread_cond_t message_cond_;
+ sdk_util::SimpleLock message_lock_;
+
+ friend class KernelProxy;
+ DISALLOW_COPY_AND_ASSIGN(MountStream);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_MOUNT_STREAM_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/packet.cc b/native_client_sdk/src/libraries/nacl_io/packet.cc
new file mode 100644
index 0000000..c75fb11
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/packet.cc
@@ -0,0 +1,41 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "nacl_io/packet.h"
+
+#include <string.h>
+
+#include "nacl_io/pepper_interface.h"
+
+namespace nacl_io {
+
+Packet::Packet(PepperInterface* ppapi)
+ : ppapi_(ppapi),
+ addr_(0),
+ buffer_(NULL),
+ len_(0) {}
+
+Packet:: ~Packet() {
+ if ((NULL != ppapi_) && addr_)
+ ppapi_->ReleaseResource(addr_);
+ delete[] buffer_;
+}
+
+void Packet::Take(const void *buffer, size_t len, PP_Resource addr) {
+ addr_ = addr;
+ len_ = len;
+ buffer_ = static_cast<char*>(const_cast<void*>(buffer));
+}
+
+void Packet::Copy(const void *buffer, size_t len, PP_Resource addr) {
+ addr_ = addr;
+ len_ = len;
+ buffer_ = new char[len];
+
+ memcpy(buffer_, buffer, len);
+ if (addr && (NULL != ppapi_))
+ ppapi_->AddRefResource(addr);
+}
+
+} // namespace nacl_io
diff --git a/native_client_sdk/src/libraries/nacl_io/packet.h b/native_client_sdk/src/libraries/nacl_io/packet.h
new file mode 100644
index 0000000..46f3b3b
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/packet.h
@@ -0,0 +1,45 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBRARIES_NACL_IO_PACKET_H_
+#define LIBRARIES_NACL_IO_PACKET_H_
+
+#include "nacl_io/fifo_interface.h"
+#include "ppapi/c/pp_resource.h"
+
+#include "sdk_util/macros.h"
+
+namespace nacl_io {
+
+class PepperInterface;
+
+// NOTE: The Packet class always owns the buffer and address, either by 'Copy'
+// or by 'Take' ownership.
+class Packet {
+ public:
+ explicit Packet(PepperInterface* ppapi);
+ ~Packet();
+
+ // Copy the buffer, and address reference
+ void Copy(const void *buffer, size_t len, PP_Resource addr);
+
+ // Take ownership the buffer, and address reference
+ void Take(const void *buffer, size_t len, PP_Resource addr);
+
+ char* buffer() { return buffer_; }
+ PP_Resource addr() { return addr_; }
+ size_t len() { return len_; }
+
+ private:
+ PepperInterface* ppapi_;
+ PP_Resource addr_;
+ char* buffer_;
+ size_t len_;
+
+ DISALLOW_COPY_AND_ASSIGN(Packet);
+};
+
+} // namespace nacl_io
+
+#endif // LIBRARIES_NACL_IO_PACKET_H_
diff --git a/native_client_sdk/src/libraries/nacl_io/pepper/all_interfaces.h b/native_client_sdk/src/libraries/nacl_io/pepper/all_interfaces.h
index f1fae29..8cdacb44 100644
--- a/native_client_sdk/src/libraries/nacl_io/pepper/all_interfaces.h
+++ b/native_client_sdk/src/libraries/nacl_io/pepper/all_interfaces.h
@@ -65,6 +65,18 @@ BEGIN_INTERFACE(FileSystemInterface, PPB_FileSystem,
PP_CompletionCallback)
END_INTERFACE(FileSystemInterface, PPB_FileSystem)
+BEGIN_INTERFACE(MessageLoopInterface, PPB_MessageLoop,
+ PPB_MESSAGELOOP_INTERFACE_1_0)
+ METHOD1(MessageLoopInterface, PP_Resource, Create, PP_Instance)
+ METHOD1(MessageLoopInterface, int32_t, AttachToCurrentThread, PP_Resource)
+ METHOD1(MessageLoopInterface, int32_t, Run, PP_Resource)
+ METHOD3(MessageLoopInterface, int32_t, PostWork, PP_Resource,
+ struct PP_CompletionCallback, int64_t)
+ METHOD2(MessageLoopInterface, int32_t, PostQuit, PP_Resource, PP_Bool)
+ METHOD0(MessageLoopInterface, PP_Resource, GetCurrent)
+ METHOD0(MessageLoopInterface, PP_Resource, GetForMainThread)
+END_INTERFACE(MessageLoopInterface, PPB_MessageLoop)
+
BEGIN_INTERFACE(MessagingInterface, PPB_Messaging, PPB_MESSAGING_INTERFACE_1_0)
METHOD2(MessagingInterface, void, PostMessage, PP_Instance, PP_Var)
END_INTERFACE(MessagingInterface, PPB_Messaging)
diff --git a/native_client_sdk/src/libraries/nacl_io/syscalls/pipe.c b/native_client_sdk/src/libraries/nacl_io/syscalls/pipe.c
new file mode 100644
index 0000000..18809a1
--- /dev/null
+++ b/native_client_sdk/src/libraries/nacl_io/syscalls/pipe.c
@@ -0,0 +1,10 @@
+/* Copyright 2013 The Chromium Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file. */
+
+#include "nacl_io/kernel_intercept.h"
+#include "nacl_io/kernel_wrap.h"
+
+int pipe(int pipefds[2]) {
+ return ki_pipe(pipefds);
+}
diff --git a/native_client_sdk/src/libraries/sdk_util/simple_lock.h b/native_client_sdk/src/libraries/sdk_util/simple_lock.h
index 90f0ad9..1f38185 100644
--- a/native_client_sdk/src/libraries/sdk_util/simple_lock.h
+++ b/native_client_sdk/src/libraries/sdk_util/simple_lock.h
@@ -26,6 +26,9 @@ class SimpleLock {
pthread_mutex_destroy(&lock_);
}
+ void Lock() const { pthread_mutex_lock(&lock_); }
+ void Unlock() const { pthread_mutex_unlock(&lock_); }
+
pthread_mutex_t* mutex() const { return &lock_; }
private:
diff --git a/native_client_sdk/src/tests/nacl_io_socket_test/event_test.cc b/native_client_sdk/src/tests/nacl_io_socket_test/event_test.cc
deleted file mode 100644
index 7a854d7..0000000
--- a/native_client_sdk/src/tests/nacl_io_socket_test/event_test.cc
+++ /dev/null
@@ -1,480 +0,0 @@
-/* Copyright (c) 2013 The Chromium Authors. All rights reserved.
- * Use of this source code is governed by a BSD-style license that can be
- * found in the LICENSE file.
- */
-
-#include <errno.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <sys/stat.h>
-#include <sys/time.h>
-
-#include "gtest/gtest.h"
-
-#include "nacl_io/event_emitter.h"
-#include "nacl_io/event_listener.h"
-#include "nacl_io/kernel_intercept.h"
-#include "nacl_io/kernel_proxy.h"
-#include "nacl_io/kernel_wrap.h"
-
-
-using namespace nacl_io;
-using namespace sdk_util;
-
-class EventEmitterTester : public MountNode {
- public:
- EventEmitterTester() : MountNode(NULL), event_status_(0), event_cnt_(0) {}
-
- void SetEventStatus(uint32_t bits) { event_status_ = bits; }
- uint32_t GetEventStatus() { return event_status_; }
-
- Error Ioctl(int request, char* arg) {
- event_status_ = static_cast<uint32_t>(request);
- return 0;
- }
-
- int GetType() { return S_IFSOCK; }
- int NumEvents() { return event_cnt_; }
-
- public:
- // Make this function public for testing
- void RaiseEvent(uint32_t events) {
- EventEmitter::RaiseEvent(events);
- }
-
- // Called after registering locally, but while lock is still held.
- void ChainRegisterEventInfo(const ScopedEventInfo& event) {
- event_cnt_++;
- }
-
- // Called before unregistering locally, but while lock is still held.
- void ChainUnregisterEventInfo(const ScopedEventInfo& event) {
- event_cnt_--;
- }
-
- protected:
- uint32_t event_status_;
- uint32_t event_cnt_;
-};
-
-
-const int MAX_EVENTS = 8;
-
-// IDs for Emitters
-const int ID_EMITTER = 5;
-const int ID_LISTENER = 6;
-const int ID_EMITTER_DUP = 7;
-
-// Kernel Event values
-const uint32_t KE_EXPECTED = 4;
-const uint32_t KE_FILTERED = 2;
-const uint32_t KE_NONE = 0;
-
-// User Data values
-const uint64_t USER_DATA_A = 1;
-const uint64_t USER_DATA_B = 5;
-
-// Timeout durations
-const int TIMEOUT_IMMEDIATE = 0;
-const int TIMEOUT_SHORT= 100;
-const int TIMEOUT_LONG = 500;
-const int TIMEOUT_NEVER = -1;
-const int TIMEOUT_VERY_LONG = 1000;
-
-// We subtract TIMEOUT_SLOP from the expected minimum timed due to rounding
-// and clock drift converting between absolute and relative time. This should
-// only be 1 for Less Than, and 1 for rounding, but we use 10 since we don't
-// care about real precision, aren't testing of the underlying
-// implementations and don't want flakiness.
-const int TIMEOUT_SLOP = 10;
-
-TEST(EventTest, EmitterBasic) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedRef<EventEmitter> null_emitter;
-
- ScopedEventListener listener(new EventListener);
-
- // Verify construction
- EXPECT_EQ(0, emitter->NumEvents());
- EXPECT_EQ(0, emitter->GetEventStatus());
-
- // Verify status
- emitter->SetEventStatus(KE_EXPECTED);
- EXPECT_EQ(KE_EXPECTED, emitter->GetEventStatus());
-
- // Fail to update or free an ID not in the set
- EXPECT_EQ(ENOENT, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(ENOENT, listener->Free(ID_EMITTER));
-
- // Fail to Track self
- EXPECT_EQ(EINVAL, listener->Track(ID_LISTENER,
- listener,
- KE_EXPECTED,
- USER_DATA_A));
-
- // Set the emitter filter and data
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(1, emitter->NumEvents());
-
- // Fail to add the same ID
- EXPECT_EQ(EEXIST,
- listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(1, emitter->NumEvents());
-
- int event_cnt = 0;
- EventData ev[MAX_EVENTS];
-
- // Do not allow a wait with a zero events count.
- EXPECT_EQ(EINVAL, listener->Wait(ev, 0, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Do not allow a wait with a negative events count.
- EXPECT_EQ(EINVAL, listener->Wait(ev, -1, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Do not allow a wait with a NULL EventData pointer
- EXPECT_EQ(EFAULT,
- listener->Wait(NULL, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Return with no events if the Emitter has no signals set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_NONE);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with no events if the Emitter has a filtered signals set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with one event if the Emitter has the expected signal set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Return with one event containing only the expected signal.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED | KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Change the USER_DATA on an existing event
- EXPECT_EQ(0, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_B));
-
- // Return with one event signaled with the alternate USER DATA
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED | KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, 0, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_B, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Reset the USER_DATA.
- EXPECT_EQ(0, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_A));
-
- // Support adding a DUP.
- EXPECT_EQ(0, listener->Track(ID_EMITTER_DUP,
- emitter,
- KE_EXPECTED,
- USER_DATA_A));
- EXPECT_EQ(2, emitter->NumEvents());
-
- // Return unsignaled.
- memset(ev, 0, sizeof(ev));
- emitter->SetEventStatus(KE_NONE);
- event_cnt = 100;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with two event signaled with expected data.
- memset(ev, 0, sizeof(ev));
- emitter->SetEventStatus(KE_EXPECTED);
- event_cnt = 100;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(2, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_EQ(USER_DATA_A, ev[1].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[1].events);
-}
-
-long Duration(struct timeval* start, struct timeval* end) {
- if (start->tv_usec > end->tv_usec) {
- end->tv_sec -= 1;
- end->tv_usec += 1000000;
- }
- long cur_time = 1000 * (end->tv_sec - start->tv_sec);
- cur_time += (end->tv_usec - start->tv_usec) / 1000;
- return cur_time;
-}
-
-
-// Run a timed wait, and return the average of 8 iterations to reduce
-// chance of false negative on outlier.
-const int TRIES_TO_AVERAGE = 8;
-bool TimedListen(ScopedEventListener& listen,
- EventData* ev,
- int ev_max,
- int ev_expect,
- int ms_wait,
- long* duration) {
-
- struct timeval start;
- struct timeval end;
- long total_time = 0;
-
- for (int a=0; a < TRIES_TO_AVERAGE; a++) {
- gettimeofday(&start, NULL);
-
- int signaled;
-
- EXPECT_EQ(0, listen->Wait(ev, ev_max, ms_wait, &signaled));
- EXPECT_EQ(signaled, ev_expect);
-
- if (signaled != ev_expect) {
- return false;
- }
-
- gettimeofday(&end, NULL);
-
- long cur_time = Duration(&start, &end);
- total_time += cur_time;
- }
-
- *duration = total_time / TRIES_TO_AVERAGE;
- return true;
-}
-
-
-// NOTE: These timing tests are potentially flaky, the real test is
-// for the zero timeout should be, has the ConditionVariable been waited on?
-// Once we provide a debuggable SimpleCond and SimpleLock we can actually test
-// the correct thing.
-
-// Normal scheduling would expect us to see ~10ms accuracy, but we'll
-// use a much bigger number (yet smaller than the MAX_MS_TIMEOUT).
-const int SCHEDULING_GRANULARITY = 100;
-
-const int EXPECT_ONE_EVENT = 1;
-const int EXPECT_NO_EVENT = 0;
-
-TEST(EventTest, EmitterTimeout) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedEventListener listener(new EventListener());
- long duration;
-
- EventData ev[MAX_EVENTS];
- memset(ev, 0, sizeof(ev));
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
-
- // Return immediately when emitter is signaled, with no timeout
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_IMMEDIATE, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_EQ(0, duration);
-
- // Return immediately when emitter is signaled, even with timeout
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_LONG, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_GT(SCHEDULING_GRANULARITY, duration);
-
- // Return immediately if Emiiter is already signaled when blocking forever.
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_NEVER, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_GT(SCHEDULING_GRANULARITY, duration);
-
- // Return immediately if Emitter is no signaled when not blocking.
- emitter->SetEventStatus(KE_NONE);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_NO_EVENT,
- TIMEOUT_IMMEDIATE, &duration));
- EXPECT_EQ(0, duration);
-
- // Wait TIMEOUT_LONG if the emitter is not in a signaled state.
- emitter->SetEventStatus(KE_NONE);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_NO_EVENT,
- TIMEOUT_LONG, &duration));
- EXPECT_LT(TIMEOUT_LONG - TIMEOUT_SLOP, duration);
- EXPECT_GT(TIMEOUT_LONG + SCHEDULING_GRANULARITY, duration);
-}
-
-struct SignalInfo {
- EventEmitterTester* em;
- unsigned int ms_wait;
- uint32_t events;
-};
-
-static void *SignalEmitterThread(void *ptr) {
- SignalInfo* info = (SignalInfo*) ptr;
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = info->ms_wait * 1000000;
-
- nanosleep(&ts, NULL);
-
- info->em->RaiseEvent(info->events);
- return NULL;
-}
-
-TEST(EventTest, EmitterSignalling) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedEventListener listener(new EventListener);
-
- SignalInfo siginfo;
- struct timeval start;
- struct timeval end;
- long duration;
-
- EventData ev[MAX_EVENTS];
- memset(ev, 0, sizeof(ev));
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
-
- // Setup another thread to wait 1/4 of the max time, and signal both
- // an expected, and unexpected value.
- siginfo.em = emitter.get();
- siginfo.ms_wait = TIMEOUT_SHORT;
- siginfo.events = KE_EXPECTED | KE_FILTERED;
- pthread_t tid;
- pthread_create(&tid, NULL, SignalEmitterThread, &siginfo);
-
- // Wait for the signal from the other thread and time it.
- gettimeofday(&start, NULL);
- int cnt = 0;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_VERY_LONG, &cnt));
- EXPECT_EQ(1, cnt);
- gettimeofday(&end, NULL);
-
- // Verify the wait duration, and that we only recieved the expected signal.
- duration = Duration(&start, &end);
- EXPECT_GT(TIMEOUT_SHORT + SCHEDULING_GRANULARITY, duration);
- EXPECT_LT(TIMEOUT_SHORT - TIMEOUT_SLOP, duration);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-}
-
-
-namespace {
-
-class KernelProxyPolling : public KernelProxy {
- public:
- virtual int socket(int domain, int type, int protocol) {
- ScopedMount mnt;
- ScopedMountNode node(new EventEmitterTester());
- ScopedKernelHandle handle(new KernelHandle(mnt, node));
-
- Error error = handle->Init(0);
- if (error) {
- errno = error;
- return -1;
- }
-
- return AllocateFD(handle);
- }
-};
-
-class KernelProxyPollingTest : public ::testing::Test {
- public:
- KernelProxyPollingTest() : kp_(new KernelProxyPolling) {
- ki_init(kp_);
- }
-
- ~KernelProxyPollingTest() {
- ki_uninit();
- delete kp_;
- }
-
- KernelProxyPolling* kp_;
-};
-
-} // namespace
-
-
-#define SOCKET_CNT 4
-void SetFDs(fd_set* set, int* fds) {
- FD_ZERO(set);
-
- FD_SET(0, set);
- FD_SET(1, set);
- FD_SET(2, set);
-
- for (int index = 0; index < SOCKET_CNT; index++)
- FD_SET(fds[index], set);
-}
-
-TEST_F(KernelProxyPollingTest, Select) {
- int fds[SOCKET_CNT];
-
- fd_set rd_set;
- fd_set wr_set;
-
- FD_ZERO(&rd_set);
- FD_ZERO(&wr_set);
-
- FD_SET(0, &rd_set);
- FD_SET(1, &rd_set);
- FD_SET(2, &rd_set);
-
- FD_SET(0, &wr_set);
- FD_SET(1, &wr_set);
- FD_SET(2, &wr_set);
-
- // Expect normal files to select as read, write, and error
- int cnt = select(4, &rd_set, &rd_set, &rd_set, NULL);
- EXPECT_EQ(3 * 3, cnt);
- EXPECT_NE(0, FD_ISSET(0, &rd_set));
- EXPECT_NE(0, FD_ISSET(1, &rd_set));
- EXPECT_NE(0, FD_ISSET(2, &rd_set));
-
- for (int index = 0 ; index < SOCKET_CNT; index++) {
- fds[index] = socket(0, 0, 0);
- EXPECT_NE(-1, fds[index]);
- }
-
- // Highest numbered fd
- const int fdnum = fds[SOCKET_CNT - 1] + 1;
-
- // Expect only the normal files to select
- SetFDs(&rd_set, fds);
- cnt = select(fds[SOCKET_CNT-1] + 1, &rd_set, NULL, NULL, NULL);
- EXPECT_EQ(3, cnt);
- EXPECT_NE(0, FD_ISSET(0, &rd_set));
- EXPECT_NE(0, FD_ISSET(1, &rd_set));
- EXPECT_NE(0, FD_ISSET(2, &rd_set));
- for (int index = 0 ; index < SOCKET_CNT; index++) {
- EXPECT_EQ(0, FD_ISSET(fds[index], &rd_set));
- }
-
- // Poke one of the pollable nodes to be READ ready
- ioctl(fds[0], POLLIN, NULL);
-
- // Expect normal files to be read/write and one pollable node to be read.
- SetFDs(&rd_set, fds);
- SetFDs(&wr_set, fds);
- cnt = select(fdnum, &rd_set, &wr_set, NULL, NULL);
- EXPECT_EQ(7, cnt);
- EXPECT_NE(0, FD_ISSET(fds[0], &rd_set));
- EXPECT_EQ(0, FD_ISSET(fds[0], &wr_set));
-}
-
-
diff --git a/native_client_sdk/src/tests/nacl_io_socket_test/socket_test.cc b/native_client_sdk/src/tests/nacl_io_socket_test/socket_test.cc
index fe22ae1..e9391d0 100644
--- a/native_client_sdk/src/tests/nacl_io_socket_test/socket_test.cc
+++ b/native_client_sdk/src/tests/nacl_io_socket_test/socket_test.cc
@@ -174,8 +174,47 @@ TEST_F(SocketTestUDP, SendRcv) {
EXPECT_EQ(0, memcmp(outbuf, inbuf, sizeof(outbuf)));
}
-#if 0
-TEST_F(SocketTestTCP, Connect) {
+const size_t queue_size = 65536 * 8;
+TEST_F(SocketTestUDP, FullFifo) {
+ char outbuf[16 * 1024];
+
+ EXPECT_EQ(Bind(sock1, LOCAL_HOST, PORT1), ENONE);
+ EXPECT_EQ(Bind(sock2, LOCAL_HOST, PORT2), ENONE);
+
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ IP4ToSockAddr(LOCAL_HOST, PORT2, &addr);
+
+ size_t total = 0;
+ while (total < queue_size * 8) {
+ int len = sendto(sock1, outbuf, sizeof(outbuf), MSG_DONTWAIT,
+ (sockaddr *) &addr, addrlen);
+
+ if (len <= 0) {
+ EXPECT_EQ(-1, len);
+ EXPECT_EQ(errno, EWOULDBLOCK);
+ break;
+ }
+
+ if (len >= 0) {
+ EXPECT_EQ(sizeof(outbuf), len);
+ total += len;
+ }
+
+ }
+ EXPECT_GT(total, queue_size -1);
+ EXPECT_LT(total, queue_size * 8);
+}
+
+// TODO(noelallen) BUG=294412
+// Re-enable testing on bots when server sockets are available.
+TEST_F(SocketTestTCP, DISABLED_Connect) {
+ char outbuf[256];
+ char inbuf[512];
+
+ memset(outbuf, 1, sizeof(outbuf));
+ memset(inbuf, 0, sizeof(inbuf));
+
int sock = socket(AF_INET, SOCK_STREAM, 0);
EXPECT_NE(-1, sock);
@@ -184,8 +223,14 @@ TEST_F(SocketTestTCP, Connect) {
IP4ToSockAddr(LOCAL_HOST, PORT1, &addr);
int err = connect(sock, (sockaddr*) &addr, addrlen);
+
EXPECT_EQ(ENONE, err) << "Failed with errno: " << errno << "\n";
+
+ EXPECT_EQ(sizeof(outbuf), write(sock, outbuf, sizeof(outbuf)));
+ EXPECT_EQ(sizeof(outbuf), read(sock, inbuf, sizeof(inbuf)));
+
+ // Now they should be the same
+ EXPECT_EQ(0, memcmp(outbuf, inbuf, sizeof(outbuf)));
}
-#endif
-#endif // PROVIDES_SOCKETPAIR_API
+#endif // PROVIDES_SOCKET_API
diff --git a/native_client_sdk/src/tests/nacl_io_test/event_test.cc b/native_client_sdk/src/tests/nacl_io_test/event_test.cc
index 11326bf..3ba14c5 100644
--- a/native_client_sdk/src/tests/nacl_io_test/event_test.cc
+++ b/native_client_sdk/src/tests/nacl_io_test/event_test.cc
@@ -5,6 +5,7 @@
#include <errno.h>
#include <fcntl.h>
+#include <pthread.h>
#include <stdio.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
@@ -14,468 +15,297 @@
#include "nacl_io/event_emitter.h"
#include "nacl_io/event_listener.h"
+#include "nacl_io/event_listener.h"
+#include "nacl_io/event_listener.h"
#include "nacl_io/kernel_intercept.h"
#include "nacl_io/kernel_proxy.h"
#include "nacl_io/kernel_wrap.h"
+#include "nacl_io/mount_node_pipe.h"
+#include "nacl_io/mount_stream.h"
+
+#include "ppapi_simple/ps.h"
using namespace nacl_io;
using namespace sdk_util;
-class EventEmitterTester : public MountNode {
+
+class EventListenerTester : public EventListener {
public:
- EventEmitterTester() : MountNode(NULL), event_status_(0), event_cnt_(0) {}
+ EventListenerTester() : EventListener(), events_(0) {};
- void SetEventStatus(uint32_t bits) { event_status_ = bits; }
- uint32_t GetEventStatus() { return event_status_; }
+ virtual void ReceiveEvents(EventEmitter* emitter, uint32_t events) {
+ events_ |= events;
+ }
- Error Ioctl(int request, char* arg) {
- event_status_ = static_cast<uint32_t>(request);
- return 0;
+ uint32_t Events() {
+ return events_;
}
- int GetType() { return S_IFSOCK; }
- int NumEvents() { return event_cnt_; }
+ void Clear() {
+ events_ = 0;
+ }
- public:
- // Make this function public for testing
- void RaiseEvent(uint32_t events) {
- EventEmitter::RaiseEvent(events);
+ uint32_t events_;
+};
+
+
+TEST(EmitterBasic, SingleThread) {
+ EventListenerTester listener_a;
+ EventListenerTester listener_b;
+ EventEmitter emitter;
+
+ emitter.RegisterListener(&listener_a, POLLIN | POLLOUT | POLLERR);
+ emitter.RegisterListener(&listener_b, POLLIN | POLLOUT | POLLERR);
+
+ EXPECT_EQ(0, emitter.GetEventStatus());
+ EXPECT_EQ(0, listener_a.Events());
+
+ {
+ AUTO_LOCK(emitter.GetLock())
+ emitter.RaiseEvents_Locked(POLLIN);
+ }
+ EXPECT_EQ(POLLIN, listener_a.Events());
+
+ listener_a.Clear();
+
+ {
+ AUTO_LOCK(emitter.GetLock())
+ emitter.RaiseEvents_Locked(POLLOUT);
}
+ EXPECT_EQ(POLLOUT, listener_a.Events());
+ EXPECT_EQ(POLLIN | POLLOUT, listener_b.Events());
+}
- // Called after registering locally, but while lock is still held.
- void ChainRegisterEventInfo(const ScopedEventInfo& event) {
- event_cnt_++;
+class EmitterTest : public ::testing::Test {
+ public:
+ void SetUp() {
+ pthread_cond_init(&multi_cond_, NULL);
+ waiting_ = 0;
+ signaled_ = 0;
}
- // Called before unregistering locally, but while lock is still held.
- void ChainUnregisterEventInfo(const ScopedEventInfo& event) {
- event_cnt_--;
+ void TearDown() {
+ pthread_cond_destroy(&multi_cond_);
}
- protected:
- uint32_t event_status_;
- uint32_t event_cnt_;
-};
+ void CreateThread() {
+ pthread_t id;
+ EXPECT_EQ(0, pthread_create(&id, NULL, ThreadThunk, this));
+ }
+ static void* ThreadThunk(void *ptr) {
+ return static_cast<EmitterTest*>(ptr)->ThreadEntry();
+ }
-const int MAX_EVENTS = 8;
-
-// IDs for Emitters
-const int ID_EMITTER = 5;
-const int ID_LISTENER = 6;
-const int ID_EMITTER_DUP = 7;
-
-// Kernel Event values
-const uint32_t KE_EXPECTED = 4;
-const uint32_t KE_FILTERED = 2;
-const uint32_t KE_NONE = 0;
-
-// User Data values
-const uint64_t USER_DATA_A = 1;
-const uint64_t USER_DATA_B = 5;
-
-// Timeout durations
-const int TIMEOUT_IMMEDIATE = 0;
-const int TIMEOUT_SHORT= 100;
-const int TIMEOUT_LONG = 500;
-const int TIMEOUT_NEVER = -1;
-const int TIMEOUT_VERY_LONG = 1000;
-
-// We subtract TIMEOUT_SLOP from the expected minimum timed due to rounding
-// and clock drift converting between absolute and relative time. This should
-// only be 1 for Less Than, and 1 for rounding, but we use 10 since we don't
-// care about real precision, aren't testing of the underlying
-// implementations and don't want flakiness.
-const int TIMEOUT_SLOP = 10;
-
-TEST(EventTest, EmitterBasic) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedRef<EventEmitter> null_emitter;
-
- ScopedEventListener listener(new EventListener);
-
- // Verify construction
- EXPECT_EQ(0, emitter->NumEvents());
- EXPECT_EQ(0, emitter->GetEventStatus());
-
- // Verify status
- emitter->SetEventStatus(KE_EXPECTED);
- EXPECT_EQ(KE_EXPECTED, emitter->GetEventStatus());
-
- // Fail to update or free an ID not in the set
- EXPECT_EQ(ENOENT, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(ENOENT, listener->Free(ID_EMITTER));
-
- // Fail to Track self
- EXPECT_EQ(EINVAL, listener->Track(ID_LISTENER,
- listener,
- KE_EXPECTED,
- USER_DATA_A));
-
- // Set the emitter filter and data
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(1, emitter->NumEvents());
-
- // Fail to add the same ID
- EXPECT_EQ(EEXIST,
- listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
- EXPECT_EQ(1, emitter->NumEvents());
-
- int event_cnt = 0;
- EventData ev[MAX_EVENTS];
-
- // Do not allow a wait with a zero events count.
- EXPECT_EQ(EINVAL, listener->Wait(ev, 0, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Do not allow a wait with a negative events count.
- EXPECT_EQ(EINVAL, listener->Wait(ev, -1, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Do not allow a wait with a NULL EventData pointer
- EXPECT_EQ(EFAULT,
- listener->Wait(NULL, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
-
- // Return with no events if the Emitter has no signals set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_NONE);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with no events if the Emitter has a filtered signals set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with one event if the Emitter has the expected signal set.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Return with one event containing only the expected signal.
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED | KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Change the USER_DATA on an existing event
- EXPECT_EQ(0, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_B));
-
- // Return with one event signaled with the alternate USER DATA
- memset(ev, 0, sizeof(ev));
- event_cnt = 100;
- emitter->SetEventStatus(KE_EXPECTED | KE_FILTERED);
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, 0, &event_cnt));
- EXPECT_EQ(1, event_cnt);
- EXPECT_EQ(USER_DATA_B, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
-
- // Reset the USER_DATA.
- EXPECT_EQ(0, listener->Update(ID_EMITTER, KE_EXPECTED, USER_DATA_A));
-
- // Support adding a DUP.
- EXPECT_EQ(0, listener->Track(ID_EMITTER_DUP,
- emitter,
- KE_EXPECTED,
- USER_DATA_A));
- EXPECT_EQ(2, emitter->NumEvents());
-
- // Return unsignaled.
- memset(ev, 0, sizeof(ev));
- emitter->SetEventStatus(KE_NONE);
- event_cnt = 100;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(0, event_cnt);
-
- // Return with two event signaled with expected data.
- memset(ev, 0, sizeof(ev));
- emitter->SetEventStatus(KE_EXPECTED);
- event_cnt = 100;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_IMMEDIATE, &event_cnt));
- EXPECT_EQ(2, event_cnt);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_EQ(USER_DATA_A, ev[1].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[1].events);
-}
+ void* ThreadEntry() {
+ EventListenerLock listener(&emitter_);
-long Duration(struct timeval* start, struct timeval* end) {
- if (start->tv_usec > end->tv_usec) {
- end->tv_sec -= 1;
- end->tv_usec += 1000000;
+ pthread_cond_signal(&multi_cond_);
+ waiting_++;
+ EXPECT_EQ(0, listener.WaitOnEvent(POLLIN, -1));
+ emitter_.ClearEvents_Locked(POLLIN);
+ signaled_ ++;
+ return NULL;
}
- long cur_time = 1000 * (end->tv_sec - start->tv_sec);
- cur_time += (end->tv_usec - start->tv_usec) / 1000;
- return cur_time;
-}
+ protected:
+ pthread_cond_t multi_cond_;
+ EventEmitter emitter_;
+
+ uint32_t waiting_;
+ uint32_t signaled_;
+};
-// Run a timed wait, and return the average of 8 iterations to reduce
-// chance of false negative on outlier.
-const int TRIES_TO_AVERAGE = 8;
-bool TimedListen(ScopedEventListener& listen,
- EventData* ev,
- int ev_max,
- int ev_expect,
- int ms_wait,
- long* duration) {
- struct timeval start;
- struct timeval end;
- long total_time = 0;
+const int NUM_THREADS = 10;
+TEST_F(EmitterTest, MultiThread) {
+ for (int a=0; a <NUM_THREADS; a++)
+ CreateThread();
- for (int a=0; a < TRIES_TO_AVERAGE; a++) {
- gettimeofday(&start, NULL);
+ sleep(1);
+ EXPECT_EQ(0, signaled_);
- int signaled;
+ {
+ AUTO_LOCK(emitter_.GetLock());
- EXPECT_EQ(0, listen->Wait(ev, ev_max, ms_wait, &signaled));
- EXPECT_EQ(signaled, ev_expect);
+ // Wait for all threads to wait
+ while(waiting_ < NUM_THREADS)
+ pthread_cond_wait(&multi_cond_, emitter_.GetLock().mutex());
- if (signaled != ev_expect) {
- return false;
- }
+ emitter_.RaiseEvents_Locked(POLLIN);
+ }
- gettimeofday(&end, NULL);
+ sleep(1);
+ EXPECT_EQ(1, signaled_);
- long cur_time = Duration(&start, &end);
- total_time += cur_time;
+ {
+ AUTO_LOCK(emitter_.GetLock());
+ emitter_.RaiseEvents_Locked(POLLIN);
}
- *duration = total_time / TRIES_TO_AVERAGE;
- return true;
+ sleep(1);
+ EXPECT_EQ(2, signaled_);
+
+ // Clean up remaining threads.
+ while (signaled_ < waiting_) {
+ AUTO_LOCK(emitter_.GetLock());
+ emitter_.RaiseEvents_Locked(POLLIN);
+ }
}
-// NOTE: These timing tests are potentially flaky, the real test is
-// for the zero timeout should be, has the ConditionVariable been waited on?
-// Once we provide a debuggable SimpleCond and SimpleLock we can actually test
-// the correct thing.
-
-// Normal scheduling would expect us to see ~10ms accuracy, but we'll
-// use a much bigger number (yet smaller than the MAX_MS_TIMEOUT).
-const int SCHEDULING_GRANULARITY = 100;
-
-const int EXPECT_ONE_EVENT = 1;
-const int EXPECT_NO_EVENT = 0;
-
-TEST(EventTest, EmitterTimeout) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedEventListener listener(new EventListener());
- long duration;
-
- EventData ev[MAX_EVENTS];
- memset(ev, 0, sizeof(ev));
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
-
- // Return immediately when emitter is signaled, with no timeout
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_IMMEDIATE, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_EQ(0, duration);
-
- // Return immediately when emitter is signaled, even with timeout
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_LONG, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_GT(SCHEDULING_GRANULARITY, duration);
-
- // Return immediately if Emiiter is already signaled when blocking forever.
- emitter->SetEventStatus(KE_EXPECTED);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_ONE_EVENT,
- TIMEOUT_NEVER, &duration));
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
- EXPECT_GT(SCHEDULING_GRANULARITY, duration);
-
- // Return immediately if Emitter is no signaled when not blocking.
- emitter->SetEventStatus(KE_NONE);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_NO_EVENT,
- TIMEOUT_IMMEDIATE, &duration));
- EXPECT_EQ(0, duration);
-
- // Wait TIMEOUT_LONG if the emitter is not in a signaled state.
- emitter->SetEventStatus(KE_NONE);
- memset(ev, 0, sizeof(ev));
- EXPECT_TRUE(TimedListen(listener, ev, MAX_EVENTS, EXPECT_NO_EVENT,
- TIMEOUT_LONG, &duration));
- EXPECT_LT(TIMEOUT_LONG - TIMEOUT_SLOP, duration);
- EXPECT_GT(TIMEOUT_LONG + SCHEDULING_GRANULARITY, duration);
-}
+TEST(PipeTest, Listener) {
+ const char hello[] = "Hello World.";
+ char tmp[64] = "Goodbye";
-struct SignalInfo {
- EventEmitterTester* em;
- unsigned int ms_wait;
- uint32_t events;
-};
+ EventEmitterPipe pipe(32);
-static void *SignalEmitterThread(void *ptr) {
- SignalInfo* info = (SignalInfo*) ptr;
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = info->ms_wait * 1000000;
+ // Expect to time out on input.
+ {
+ EventListenerLock locker(&pipe);
+ EXPECT_EQ(ETIMEDOUT, locker.WaitOnEvent(POLLIN, 0));
+ }
- nanosleep(&ts, NULL);
+ // Output should be ready to go.
+ {
+ EventListenerLock locker(&pipe);
+ EXPECT_EQ(0, locker.WaitOnEvent(POLLOUT, 0));
+ EXPECT_EQ(sizeof(hello), pipe.Write_Locked(hello, sizeof(hello)));
+ }
- info->em->RaiseEvent(info->events);
- return NULL;
-}
+ // We should now be able to poll
+ {
+ EventListenerLock locker(&pipe);
+ EXPECT_EQ(0, locker.WaitOnEvent(POLLIN, 0));
+ EXPECT_EQ(sizeof(hello), pipe.Read_Locked(tmp, sizeof(tmp)));
+ }
-TEST(EventTest, EmitterSignalling) {
- ScopedRef<EventEmitterTester> emitter(new EventEmitterTester());
- ScopedEventListener listener(new EventListener);
-
- SignalInfo siginfo;
- struct timeval start;
- struct timeval end;
- long duration;
-
- EventData ev[MAX_EVENTS];
- memset(ev, 0, sizeof(ev));
- EXPECT_EQ(0, listener->Track(ID_EMITTER, emitter, KE_EXPECTED, USER_DATA_A));
-
- // Setup another thread to wait 1/4 of the max time, and signal both
- // an expected, and unexpected value.
- siginfo.em = emitter.get();
- siginfo.ms_wait = TIMEOUT_SHORT;
- siginfo.events = KE_EXPECTED | KE_FILTERED;
- pthread_t tid;
- pthread_create(&tid, NULL, SignalEmitterThread, &siginfo);
-
- // Wait for the signal from the other thread and time it.
- gettimeofday(&start, NULL);
- int cnt = 0;
- EXPECT_EQ(0, listener->Wait(ev, MAX_EVENTS, TIMEOUT_VERY_LONG, &cnt));
- EXPECT_EQ(1, cnt);
- gettimeofday(&end, NULL);
-
- // Verify the wait duration, and that we only recieved the expected signal.
- duration = Duration(&start, &end);
- EXPECT_GT(TIMEOUT_SHORT + SCHEDULING_GRANULARITY, duration);
- EXPECT_LT(TIMEOUT_SHORT - TIMEOUT_SLOP, duration);
- EXPECT_EQ(USER_DATA_A, ev[0].user_data);
- EXPECT_EQ(KE_EXPECTED, ev[0].events);
+ // Verify we can read it correctly.
+ EXPECT_EQ(0, strcmp(hello, tmp));
}
-namespace {
-
-class KernelProxyPolling : public KernelProxy {
+class TestMountStream : public MountStream {
public:
- virtual int socket(int domain, int type, int protocol) {
- ScopedMount mnt;
- ScopedMountNode node(new EventEmitterTester());
- ScopedKernelHandle handle(new KernelHandle(mnt, node));
-
- Error error = handle->Init(0);
- if (error) {
- errno = error;
- return -1;
- }
-
- return AllocateFD(handle);
- }
+ TestMountStream() {}
};
-class KernelProxyPollingTest : public ::testing::Test {
+TEST(PipeNodeTest, Basic) {
+ ScopedMount mnt(new TestMountStream());
+
+ MountNodePipe* pipe_node = new MountNodePipe(mnt.get());
+ ScopedRef<MountNodePipe> pipe(pipe_node);
+
+ EXPECT_EQ(POLLOUT, pipe_node->GetEventStatus());
+}
+
+const int MAX_FDS = 32;
+class SelectPollTest : public ::testing::Test {
public:
void SetUp() {
- ki_init(&kp_);
+ kp = new KernelProxy();
+ kp->Init(NULL);
+ EXPECT_EQ(0, kp->umount("/"));
+ EXPECT_EQ(0, kp->mount("", "/", "memfs", 0, NULL));
+
+ memset(&tv, 0, sizeof(tv));
}
void TearDown() {
- ki_uninit();
+ delete kp;
}
- protected:
- KernelProxyPolling kp_;
-};
-
-} // namespace
-
+ void SetFDs(int* fds, int cnt) {
+ FD_ZERO(&rd_set);
+ FD_ZERO(&wr_set);
+ FD_ZERO(&ex_set);
-#define SOCKET_CNT 4
-void SetFDs(fd_set* set, int* fds) {
- FD_ZERO(set);
+ for (int index = 0; index < cnt; index++) {
+ EXPECT_NE(-1, fds[index]);
+ FD_SET(fds[index], &rd_set);
+ FD_SET(fds[index], &wr_set);
+ FD_SET(fds[index], &ex_set);
- FD_SET(0, set);
- FD_SET(1, set);
- FD_SET(2, set);
+ pollfds[index].fd = fds[index];
+ pollfds[index].events = POLLIN | POLLOUT;
+ pollfds[index].revents = -1;
+ }
+ }
- for (int index = 0; index < SOCKET_CNT; index++)
- FD_SET(fds[index], set);
-}
+ void CloseFDs(int* fds, int cnt) {
+ for (int index = 0; index < cnt; index++)
+ kp->close(fds[index]);
+ }
-TEST_F(KernelProxyPollingTest, Select) {
- int fds[SOCKET_CNT];
+ protected:
+ KernelProxy* kp;
+ timeval tv;
fd_set rd_set;
fd_set wr_set;
+ fd_set ex_set;
+ struct pollfd pollfds[MAX_FDS];
+};
- FD_ZERO(&rd_set);
- FD_ZERO(&wr_set);
+TEST_F(SelectPollTest, PollMemPipe) {
+ int fds[2];
- FD_SET(0, &rd_set);
- FD_SET(1, &rd_set);
- FD_SET(2, &rd_set);
+ // Both FDs for regular files should be read/write but not exception.
+ fds[0] = kp->open("/test.txt", O_CREAT | O_WRONLY);
+ fds[1] = kp->open("/test.txt", O_RDONLY);
- FD_SET(0, &wr_set);
- FD_SET(1, &wr_set);
- FD_SET(2, &wr_set);
+ SetFDs(fds, 2);
- // Expect normal files to select as read, write, and error
- int cnt = select(4, &rd_set, &rd_set, &rd_set, NULL);
- EXPECT_EQ(3 * 3, cnt);
- EXPECT_NE(0, FD_ISSET(0, &rd_set));
- EXPECT_NE(0, FD_ISSET(1, &rd_set));
- EXPECT_NE(0, FD_ISSET(2, &rd_set));
+ EXPECT_EQ(2, kp->poll(pollfds, 2, 0));
+ EXPECT_EQ(POLLIN | POLLOUT, pollfds[0].revents);
+ EXPECT_EQ(POLLIN | POLLOUT, pollfds[1].revents);
+ CloseFDs(fds, 2);
- for (int index = 0 ; index < SOCKET_CNT; index++) {
- fds[index] = socket(0, 0, 0);
- EXPECT_NE(-1, fds[index]);
- }
+ // The write FD should select for write-only, read FD should not select
+ EXPECT_EQ(0, kp->pipe(fds));
+ SetFDs(fds, 2);
- // Highest numbered fd
- const int fdnum = fds[SOCKET_CNT - 1] + 1;
-
- // Expect only the normal files to select
- SetFDs(&rd_set, fds);
- cnt = select(fds[SOCKET_CNT-1] + 1, &rd_set, NULL, NULL, NULL);
- EXPECT_EQ(3, cnt);
- EXPECT_NE(0, FD_ISSET(0, &rd_set));
- EXPECT_NE(0, FD_ISSET(1, &rd_set));
- EXPECT_NE(0, FD_ISSET(2, &rd_set));
- for (int index = 0 ; index < SOCKET_CNT; index++) {
- EXPECT_EQ(0, FD_ISSET(fds[index], &rd_set));
- }
+ EXPECT_EQ(2, kp->poll(pollfds, 2, 0));
+ // TODO(noelallen) fix poll based on open mode
+ // EXPECT_EQ(0, pollfds[0].revents);
+ // Bug 291018
+ EXPECT_EQ(POLLOUT, pollfds[1].revents);
+
+ CloseFDs(fds, 2);
+}
+
+TEST_F(SelectPollTest, SelectMemPipe) {
+ int fds[2];
- // Poke one of the pollable nodes to be READ ready
- ioctl(fds[0], POLLIN, NULL);
+ // Both FDs for regular files should be read/write but not exception.
+ fds[0] = kp->open("/test.txt", O_CREAT | O_WRONLY);
+ fds[1] = kp->open("/test.txt", O_RDONLY);
+ SetFDs(fds, 2);
- // Expect normal files to be read/write and one pollable node to be read.
- SetFDs(&rd_set, fds);
- SetFDs(&wr_set, fds);
- cnt = select(fdnum, &rd_set, &wr_set, NULL, NULL);
- EXPECT_EQ(7, cnt);
+ EXPECT_EQ(4, kp->select(fds[1] + 1, &rd_set, &wr_set, &ex_set, &tv));
EXPECT_NE(0, FD_ISSET(fds[0], &rd_set));
- EXPECT_EQ(0, FD_ISSET(fds[0], &wr_set));
+ EXPECT_NE(0, FD_ISSET(fds[1], &rd_set));
+ EXPECT_NE(0, FD_ISSET(fds[0], &wr_set));
+ EXPECT_NE(0, FD_ISSET(fds[1], &wr_set));
+ EXPECT_EQ(0, FD_ISSET(fds[0], &ex_set));
+ EXPECT_EQ(0, FD_ISSET(fds[1], &ex_set));
+
+ CloseFDs(fds, 2);
+
+ // The write FD should select for write-only, read FD should not select
+ EXPECT_EQ(0, kp->pipe(fds));
+ SetFDs(fds, 2);
+
+ EXPECT_EQ(2, kp->select(fds[1] + 1, &rd_set, &wr_set, &ex_set, &tv));
+ EXPECT_EQ(0, FD_ISSET(fds[0], &rd_set));
+ EXPECT_EQ(0, FD_ISSET(fds[1], &rd_set));
+ // TODO(noelallen) fix poll based on open mode
+ // EXPECT_EQ(0, FD_ISSET(fds[0], &wr_set));
+ // Bug 291018
+ EXPECT_NE(0, FD_ISSET(fds[1], &wr_set));
+ EXPECT_EQ(0, FD_ISSET(fds[0], &ex_set));
+ EXPECT_EQ(0, FD_ISSET(fds[1], &ex_set));
}
diff --git a/native_client_sdk/src/tests/nacl_io_test/example.dsc b/native_client_sdk/src/tests/nacl_io_test/example.dsc
index 7dbbfbc..d257027 100644
--- a/native_client_sdk/src/tests/nacl_io_test/example.dsc
+++ b/native_client_sdk/src/tests/nacl_io_test/example.dsc
@@ -16,6 +16,7 @@
'fake_resource_manager.h',
'fake_var_interface.cc',
'fake_var_interface.h',
+ 'fifo_test.cc',
'kernel_object_test.cc',
'kernel_proxy_mock.cc',
'kernel_proxy_mock.h',
diff --git a/native_client_sdk/src/tests/nacl_io_test/fifo_test.cc b/native_client_sdk/src/tests/nacl_io_test/fifo_test.cc
new file mode 100644
index 0000000..5c7f0e5
--- /dev/null
+++ b/native_client_sdk/src/tests/nacl_io_test/fifo_test.cc
@@ -0,0 +1,130 @@
+/* Copyright (c) 2013 The Chromium Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <sys/ioctl.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+
+#include "gtest/gtest.h"
+
+#include "nacl_io/fifo_char.h"
+#include "nacl_io/fifo_null.h"
+#include "nacl_io/fifo_packet.h"
+#include "nacl_io/packet.h"
+
+#include "ppapi_simple/ps.h"
+
+using namespace nacl_io;
+
+namespace {
+const size_t kTestSize = 32;
+const size_t kHalfSize = 16;
+const size_t kQuarterSize = 8;
+};
+
+TEST(FIFONull, Basic) {
+ FIFONull fifo;
+ EXPECT_FALSE(fifo.IsFull());
+ EXPECT_FALSE(fifo.IsEmpty());
+
+ EXPECT_LT(0, fifo.ReadAvailable());
+ EXPECT_LT(0, fifo.WriteAvailable());
+}
+
+TEST(FIFOChar, Wrap) {
+ char temp_wr[kTestSize * 2];
+ char temp_rd[kTestSize * 2];
+ size_t wr_offs = 0;
+ size_t rd_offs = 0;
+
+ FIFOChar fifo(kTestSize);
+
+ memset(temp_rd, 0, sizeof(temp_rd));
+ for (size_t index = 0; index < sizeof(temp_wr); index++)
+ temp_wr[index] = index;
+
+ EXPECT_TRUE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ // Wrap read and write differently, and verify copy is correct
+ EXPECT_EQ(0, fifo.ReadAvailable());
+ EXPECT_EQ(kTestSize, fifo.WriteAvailable());
+
+ wr_offs += fifo.Write(temp_wr, kHalfSize);
+ EXPECT_EQ(kHalfSize, wr_offs);
+
+ EXPECT_FALSE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ rd_offs += fifo.Read(temp_rd, kQuarterSize);
+ EXPECT_EQ(kQuarterSize, rd_offs);
+
+ EXPECT_FALSE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ wr_offs += fifo.Write(&temp_wr[wr_offs], kTestSize);
+ EXPECT_EQ(kTestSize + kQuarterSize, wr_offs);
+
+ EXPECT_FALSE(fifo.IsEmpty());
+
+ rd_offs += fifo.Read(&temp_rd[rd_offs], kTestSize);
+ EXPECT_EQ(kTestSize + kQuarterSize, rd_offs);
+
+ EXPECT_TRUE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ for (size_t index = 0; index < kQuarterSize + kTestSize; index++)
+ EXPECT_EQ((char) index, temp_rd[index]);
+}
+
+TEST(FIFOPacket, Packets) {
+ char temp_wr[kTestSize];
+ FIFOPacket fifo(kTestSize);
+
+ Packet* pkt0 = new Packet(NULL);
+ Packet* pkt1 = new Packet(NULL);
+ pkt0->Copy(temp_wr, kHalfSize, 0);
+ pkt1->Copy(temp_wr, kTestSize, 0);
+
+ EXPECT_TRUE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ EXPECT_EQ(0, fifo.ReadAvailable());
+ EXPECT_EQ(kTestSize, fifo.WriteAvailable());
+
+ fifo.WritePacket(pkt0);
+ EXPECT_FALSE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ EXPECT_EQ(kHalfSize, fifo.ReadAvailable());
+ EXPECT_EQ(kHalfSize, fifo.WriteAvailable());
+
+ fifo.WritePacket(pkt1);
+ EXPECT_FALSE(fifo.IsEmpty());
+ EXPECT_TRUE(fifo.IsFull());
+
+ EXPECT_EQ(kHalfSize + kTestSize, fifo.ReadAvailable());
+ EXPECT_EQ(0, fifo.WriteAvailable());
+
+ EXPECT_EQ(pkt0, fifo.ReadPacket());
+ EXPECT_FALSE(fifo.IsEmpty());
+ EXPECT_TRUE(fifo.IsFull());
+
+ EXPECT_EQ(kTestSize, fifo.ReadAvailable());
+ EXPECT_EQ(0, fifo.WriteAvailable());
+
+ EXPECT_EQ(pkt1, fifo.ReadPacket());
+
+ EXPECT_TRUE(fifo.IsEmpty());
+ EXPECT_FALSE(fifo.IsFull());
+
+ EXPECT_EQ(0, fifo.ReadAvailable());
+ EXPECT_EQ(kTestSize, fifo.WriteAvailable());
+
+}
diff --git a/native_client_sdk/src/tests/nacl_io_test/kernel_proxy_mock.h b/native_client_sdk/src/tests/nacl_io_test/kernel_proxy_mock.h
index 8a301ac..78b2c31 100644
--- a/native_client_sdk/src/tests/nacl_io_test/kernel_proxy_mock.h
+++ b/native_client_sdk/src/tests/nacl_io_test/kernel_proxy_mock.h
@@ -44,6 +44,7 @@ class KernelProxyMock : public nacl_io::KernelProxy {
MOCK_METHOD5(mount, int(const char*, const char*, const char*, unsigned long,
const void*));
MOCK_METHOD2(open, int(const char*, int));
+ MOCK_METHOD1(pipe, int(int[2]));
MOCK_METHOD3(read, ssize_t(int, void*, size_t));
MOCK_METHOD1(remove, int(const char*));
MOCK_METHOD1(rmdir, int(const char*));
diff --git a/native_client_sdk/src/tests/nacl_io_test/kernel_wrap_test.cc b/native_client_sdk/src/tests/nacl_io_test/kernel_wrap_test.cc
index 8f9957c..75469c7 100644
--- a/native_client_sdk/src/tests/nacl_io_test/kernel_wrap_test.cc
+++ b/native_client_sdk/src/tests/nacl_io_test/kernel_wrap_test.cc
@@ -252,6 +252,13 @@ TEST_F(KernelWrapTest, open) {
open("open", 3456);
}
+TEST_F(KernelWrapTest, pipe) {
+ int fds[2] = { 1, 2 };
+
+ EXPECT_CALL(mock, pipe(fds)).Times(1);
+ pipe(fds);
+}
+
TEST_F(KernelWrapTest, read) {
EXPECT_CALL(mock, read(4567, NULL, 5678)).Times(1);
read(4567, NULL, 5678);
diff --git a/native_client_sdk/src/tests/nacl_io_test/mount_node_tty_test.cc b/native_client_sdk/src/tests/nacl_io_test/mount_node_tty_test.cc
index c9a67cd..dc4798b 100644
--- a/native_client_sdk/src/tests/nacl_io_test/mount_node_tty_test.cc
+++ b/native_client_sdk/src/tests/nacl_io_test/mount_node_tty_test.cc
@@ -126,6 +126,14 @@ TEST_F(TtyTest, TtyOutput) {
EXPECT_EQ(0, strncmp(user_data.output_buf, message, message_len));
}
+static int TtyWrite(int fd, const char* string) {
+ struct tioc_nacl_input_string input;
+ input.buffer =string;
+ input.length = strlen(input.buffer);
+ char* ioctl_arg = reinterpret_cast<char*>(&input);
+ return ki_ioctl(fd, TIOCNACLINPUT, ioctl_arg);
+}
+
// Returns:
// 0 -> Not readable
// 1 -> Readable
@@ -144,9 +152,9 @@ static int IsReadable(int fd) {
if (rtn != 1)
return -1; // error
if (FD_ISSET(fd, &errorfds))
- return -1; // error
+ return -2; // error
if (!FD_ISSET(fd, &readfds))
- return -1; // error
+ return -3; // error
return 1; // readable
}
@@ -186,18 +194,11 @@ TEST_F(TtyTest, TtySelect) {
ASSERT_FALSE(FD_ISSET(tty_fd, &errorfds));
// Send 4 bytes to TTY input
- struct tioc_nacl_input_string input;
- input.buffer = "input:test";
- input.length = strlen(input.buffer);
- char* ioctl_arg = reinterpret_cast<char*>(&input);
- ASSERT_EQ(0, ki_ioctl(tty_fd, TIOCNACLINPUT, ioctl_arg));
+ ASSERT_EQ(0, TtyWrite(tty_fd, "input:test"));
// TTY should not be readable until newline in written
ASSERT_EQ(IsReadable(tty_fd), 0);
-
- input.buffer = "input:\n";
- input.length = strlen(input.buffer);
- ASSERT_EQ(0, ki_ioctl(tty_fd, TIOCNACLINPUT, ioctl_arg));
+ ASSERT_EQ(0, TtyWrite(tty_fd, "input:\n"));
// TTY should now be readable
ASSERT_EQ(IsReadable(tty_fd), 1);
@@ -205,6 +206,33 @@ TEST_F(TtyTest, TtySelect) {
ki_close(tty_fd);
}
+TEST_F(TtyTest, TtyICANON) {
+ int tty_fd = ki_open("/dev/tty", O_RDONLY);
+
+ ASSERT_EQ(IsReadable(tty_fd), 0);
+
+ struct termios tattr;
+ tcgetattr(tty_fd, &tattr);
+ tattr.c_lflag &= ~(ICANON|ECHO); /* Clear ICANON and ECHO. */
+ tcsetattr(tty_fd, TCSAFLUSH, &tattr);
+
+ ASSERT_EQ(IsReadable(tty_fd), 0);
+
+ // Set some bytes to the TTY, not including newline
+ ASSERT_EQ(0, TtyWrite(tty_fd, "a"));
+
+ // Since we are not in canonical mode the bytes should be
+ // immediately readable.
+ ASSERT_EQ(IsReadable(tty_fd), 1);
+
+ // Read byte from tty.
+ char c;
+ ASSERT_EQ(1, read(tty_fd, &c, 1));
+ ASSERT_EQ('a', c);
+
+ ASSERT_EQ(IsReadable(tty_fd), 0);
+}
+
int g_recieved_signal = 0;
void sighandler(int sig) {
@@ -246,4 +274,86 @@ TEST_F(TtyTest, WindowSize) {
reinterpret_cast<char*>(&old_winsize)));
}
+/*
+ * Sleep for 50ms then send a resize event to /dev/tty.
+ */
+static void* resize_thread_main(void* arg) {
+ usleep(50 * 1000);
+
+ int* tty_fd = static_cast<int*>(arg);
+ struct winsize winsize;
+ winsize.ws_col = 100;
+ winsize.ws_row = 200;
+ ki_ioctl(*tty_fd, TIOCSWINSZ, reinterpret_cast<char*>(&winsize));
+ return NULL;
+}
+
+TEST_F(TtyTest, ResizeDuringSelect) {
+ // Test that a window resize during a call
+ // to select(3) will cause it to fail with EINTR.
+ int tty_fd = ki_open("/dev/tty", O_RDONLY);
+
+ fd_set readfds;
+ fd_set errorfds;
+ FD_ZERO(&readfds);
+ FD_ZERO(&errorfds);
+ FD_SET(tty_fd, &readfds);
+ FD_SET(tty_fd, &errorfds);
+
+ pthread_t resize_thread;
+ pthread_create(&resize_thread, NULL, resize_thread_main,
+ &tty_fd);
+
+ struct timeval timeout;
+ timeout.tv_sec = 20;
+ timeout.tv_usec = 0;
+
+ // TTY should not be readable either before or after the
+ // call to select(3).
+ ASSERT_EQ(IsReadable(tty_fd), 0);
+
+ int rtn = ki_select(tty_fd + 1, &readfds, NULL, &errorfds, &timeout);
+ pthread_join(resize_thread, NULL);
+ ASSERT_EQ(-1, rtn);
+ ASSERT_EQ(EINTR, errno);
+ ASSERT_EQ(IsReadable(tty_fd), 0);
+}
+
+/*
+ * Sleep for 50ms then send some input to the /dev/tty.
+ */
+static void* input_thread_main(void* arg) {
+ usleep(50 * 1000);
+
+ int fd = ki_open("/dev/tty", O_RDONLY);
+ TtyWrite(fd, "test\n");
+ return NULL;
+}
+
+TEST_F(TtyTest, InputDuringSelect) {
+ // Test that input which occurs while in select causes
+ // select to return.
+ int tty_fd = ki_open("/dev/tty", O_RDONLY);
+
+ fd_set readfds;
+ fd_set errorfds;
+ FD_ZERO(&readfds);
+ FD_ZERO(&errorfds);
+ FD_SET(tty_fd, &readfds);
+ FD_SET(tty_fd, &errorfds);
+
+ pthread_t resize_thread;
+ pthread_create(&resize_thread, NULL, input_thread_main,
+ &dev_tty_);
+
+ struct timeval timeout;
+ timeout.tv_sec = 20;
+ timeout.tv_usec = 0;
+
+ int rtn = ki_select(tty_fd + 1, &readfds, NULL, &errorfds, &timeout);
+ pthread_join(resize_thread, NULL);
+
+ ASSERT_EQ(1, rtn);
+}
+
}