diff options
author | avallee <avallee@chromium.org> | 2016-02-18 21:53:57 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-02-19 05:55:04 +0000 |
commit | 1454f2cb0491a006cc81166fe56de55c848323b4 (patch) | |
tree | c46670c14f175b24136fb1c8a699ed9dd884c45d /native_client_sdk | |
parent | a504e48570e35e6229fdcbdf7967510507a843b8 (diff) | |
download | chromium_src-1454f2cb0491a006cc81166fe56de55c848323b4.zip chromium_src-1454f2cb0491a006cc81166fe56de55c848323b4.tar.gz chromium_src-1454f2cb0491a006cc81166fe56de55c848323b4.tar.bz2 |
[NaCL SDK] nacl_io: Add Threaded Unix Socket Test.
Ensure we test actual multithreaded use of the unix domain sockets
available through socketpair(). There was a crash that occurred when
emitters were updated without holding their lock.
+ Add UnixSocketMultithreadedTest.SendRecv.
+ Implement shutdown for Unix sockets (necessary to implement a sane
test).
BUG=584997
CQ_INCLUDE_TRYBOTS=tryserver.chromium.linux:linux_nacl_sdk;tryserver.chromium.mac:mac_nacl_sdk;tryserver.chromium.win:win_nacl_sdk
Review URL: https://codereview.chromium.org/1708613002
Cr-Commit-Position: refs/heads/master@{#376401}
Diffstat (limited to 'native_client_sdk')
5 files changed, 217 insertions, 3 deletions
diff --git a/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.cc b/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.cc index a90039c..153a86d 100644 --- a/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.cc +++ b/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.cc @@ -4,6 +4,7 @@ #include "nacl_io/socket/unix_event_emitter.h" +#include <poll.h> #include <stdlib.h> #include <sys/socket.h> @@ -22,7 +23,10 @@ typedef sdk_util::ScopedRef<UnixMasterEventEmitter> class UnixMasterEventEmitter : public UnixEventEmitter { public: explicit UnixMasterEventEmitter(size_t size, int type) - : child_emitter_created_(false), child_emitter_(NULL) { + : in_shutdown_(false), + out_shutdown_(false), + child_emitter_created_(false), + child_emitter_(NULL) { if (type == SOCK_STREAM) { in_fifo_ = new FIFOChar(size); out_fifo_ = new FIFOChar(size); @@ -38,6 +42,14 @@ class UnixMasterEventEmitter : public UnixEventEmitter { delete out_fifo_; } + virtual void Shutdown_Locked(bool read, bool write) { + in_shutdown_ |= read; + out_shutdown_ |= write; + } + + virtual bool IsShutdownRead() const { return in_shutdown_; } + virtual bool IsShutdownWrite() const { return out_shutdown_; } + virtual ScopedUnixEventEmitter GetPeerEmitter(); protected: @@ -47,6 +59,8 @@ class UnixMasterEventEmitter : public UnixEventEmitter { private: FIFOInterface* in_fifo_; FIFOInterface* out_fifo_; + bool in_shutdown_; + bool out_shutdown_; bool child_emitter_created_; UnixChildEventEmitter* child_emitter_; @@ -61,6 +75,15 @@ class UnixChildEventEmitter : public UnixEventEmitter { } virtual ScopedUnixEventEmitter GetPeerEmitter() { return parent_emitter_; } virtual sdk_util::SimpleLock& GetLock() { return parent_emitter_->GetLock(); } + virtual void Shutdown_Locked(bool read, bool write) { + parent_emitter_->Shutdown_Locked(write, read); + } + virtual bool IsShutdownRead() const { + return parent_emitter_->IsShutdownWrite(); + } + virtual bool IsShutdownWrite() const { + return parent_emitter_->IsShutdownRead(); + } protected: virtual void Destroy() { parent_emitter_->child_emitter_ = NULL; } @@ -100,6 +123,18 @@ uint32_t UnixEventEmitter::WriteOut_Locked(const char* data, uint32_t len) { return count; } +void UnixEventEmitter::UpdateStatus_Locked() { + uint32_t status = 0; + if (!in_fifo()->IsEmpty() || IsShutdownRead()) + status |= POLLIN; + + if (!out_fifo()->IsFull() && !IsShutdownWrite()) + status |= POLLOUT; + + ClearEvents_Locked(~status); + RaiseEvents_Locked(status); +} + ScopedUnixEventEmitter UnixEventEmitter::MakeUnixEventEmitter(size_t size, int type) { return ScopedUnixEventEmitter(new UnixMasterEventEmitter(size, type)); diff --git a/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.h b/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.h index 378402e..29af1b5 100644 --- a/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.h +++ b/native_client_sdk/src/libraries/nacl_io/socket/unix_event_emitter.h @@ -28,6 +28,9 @@ class UnixEventEmitter : public StreamEventEmitter { uint32_t SpaceInInputFIFO(); virtual ScopedUnixEventEmitter GetPeerEmitter() = 0; + virtual void Shutdown_Locked(bool read, bool write) = 0; + virtual bool IsShutdownRead() const = 0; + virtual bool IsShutdownWrite() const = 0; static ScopedUnixEventEmitter MakeUnixEventEmitter(size_t size, int type); @@ -36,6 +39,7 @@ class UnixEventEmitter : public StreamEventEmitter { virtual FIFOInterface* in_fifo() = 0; virtual FIFOInterface* out_fifo() = 0; + void UpdateStatus_Locked(); private: DISALLOW_COPY_AND_ASSIGN(UnixEventEmitter); diff --git a/native_client_sdk/src/libraries/nacl_io/socket/unix_node.cc b/native_client_sdk/src/libraries/nacl_io/socket/unix_node.cc index 968cdd7..f84d92f 100644 --- a/native_client_sdk/src/libraries/nacl_io/socket/unix_node.cc +++ b/native_client_sdk/src/libraries/nacl_io/socket/unix_node.cc @@ -6,6 +6,7 @@ #ifdef PROVIDES_SOCKET_API #include <assert.h> +#include <errno.h> #include <string.h> #include <algorithm> @@ -35,7 +36,7 @@ Error UnixNode::Recv_Locked(void* buffer, PP_Resource* out_addr, int* out_len) { assert(emitter_.get()); - *out_len = emitter_->ReadIn_Locked((char*)buffer, len); + *out_len = emitter_->ReadIn_Locked(static_cast<char*>(buffer), len); *out_addr = 0; return 0; } @@ -45,7 +46,10 @@ Error UnixNode::Send_Locked(const void* buffer, PP_Resource out_addr, int* out_len) { assert(emitter_.get()); - *out_len = emitter_->WriteOut_Locked((char*)buffer, len); + if (emitter_->IsShutdownWrite()) { + return EPIPE; + } + *out_len = emitter_->WriteOut_Locked(static_cast<const char*>(buffer), len); return 0; } @@ -90,6 +94,29 @@ Error UnixNode::SendTo(const HandleAttr& attr, return SendHelper(attr, buf, len, flags, addr, out_len); } +Error UnixNode::Shutdown(int how) { + bool read; + bool write; + switch (how) { + case SHUT_RDWR: + read = write = true; + break; + case SHUT_RD: + read = true; + write = false; + break; + case SHUT_WR: + read = false; + write = true; + break; + default: + return EINVAL; + } + AUTO_LOCK(emitter_->GetLock()); + emitter_->Shutdown_Locked(read, write); + return 0; +} + } // namespace nacl_io #endif // PROVIDES_SOCKET_API diff --git a/native_client_sdk/src/libraries/nacl_io/socket/unix_node.h b/native_client_sdk/src/libraries/nacl_io/socket/unix_node.h index 881261a..7d644d3 100644 --- a/native_client_sdk/src/libraries/nacl_io/socket/unix_node.h +++ b/native_client_sdk/src/libraries/nacl_io/socket/unix_node.h @@ -49,6 +49,7 @@ class UnixNode : public SocketNode { const struct sockaddr* dest_addr, socklen_t addrlen, int* out_len); + virtual Error Shutdown(int how); private: ScopedUnixEventEmitter emitter_; diff --git a/native_client_sdk/src/tests/nacl_io_test/socket_test.cc b/native_client_sdk/src/tests/nacl_io_test/socket_test.cc index e739f74..60d9f30 100644 --- a/native_client_sdk/src/tests/nacl_io_test/socket_test.cc +++ b/native_client_sdk/src/tests/nacl_io_test/socket_test.cc @@ -11,7 +11,9 @@ #include <sys/socket.h> #include <sys/stat.h> +#include <iterator> #include <map> +#include <vector> #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -422,6 +424,151 @@ TEST_F(UnixSocketTest, RecvNonBlockingDgram) { EXPECT_NE(POLLIN, pollfd.revents & POLLIN); } +namespace { +using std::vector; +using std::min; +using std::advance; +using std::distance; + +typedef vector<uint8_t> Buffer; +typedef Buffer::iterator BufferIterator; +typedef Buffer::const_iterator BufferConstIterator; + +const size_t kReceiveBufferSize = 2 * 1024 * 1024; +const size_t kThreadSendSize = 512 * 1024; +const size_t kMainSendSize = 1024 * 1024; + +const uint8_t kThreadPattern[] = {0xAA, 0x12, 0x55, 0x34, 0xCC, 0x33}; + +// Exercises the implementation of an AF_UNIX socket. Will read from the socket +// into read_buf until EOF (or read_buf is full) whenever the socket is readable +// and write send_size number of bytes into the socket according to pattern. +// The test UnixSocketMultithreadedTest.SendRecv uses this function to quickly +// push about 1 Meg of data between two threads over a socketpair. +void ReadWriteSocket(int fd, + const uint8_t* pattern, + const size_t pattern_size, + const size_t send_size, + Buffer* read_vector) { + Buffer send; + while (send.size() != send_size) { + size_t s = min(pattern_size, send_size - send.size()); + send.insert(send.end(), &pattern[0], &pattern[s]); + } + + bool read_complete = false, write_complete = false; + + size_t received_count = 0; + read_vector->resize(kReceiveBufferSize); + + BufferConstIterator send_iterator(send.begin()); + BufferConstIterator send_end(send.end()); + struct timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + while (!read_complete || !write_complete) { + fd_set rfd; + FD_ZERO(&rfd); + if (!read_complete) { + FD_SET(fd, &rfd); + } + fd_set wfd; + FD_ZERO(&wfd); + if (!write_complete) { + FD_SET(fd, &wfd); + } + struct timeval tv = timeout; + + // Should not timeout, but added to fail test and allow to proceed. + EXPECT_LT(0, select(fd + 1, &rfd, &wfd, NULL, &tv)); + if (!FD_ISSET(fd, &rfd) && !FD_ISSET(fd, &wfd)) { + FAIL() << "Select returned with neither readable nor writable fd."; + } + + if (!read_complete && FD_ISSET(fd, &rfd)) { + if (received_count == read_vector->size()) { + read_vector->resize(read_vector->size() + kReceiveBufferSize); + } + ssize_t len = + ki_recv(fd, read_vector->data() + received_count, + read_vector->size() - received_count, /* flags */ 0); + ASSERT_LE(0, len) << "Read should succeed"; + if (len == 0) { + read_complete = true; + read_vector->resize(received_count); + } + received_count += len; + } + if (!write_complete && FD_ISSET(fd, &wfd)) { + ssize_t len = ki_send(fd, &(*send_iterator), + distance(send_iterator, send_end), /* flags */ 0); + ASSERT_LE(0, len) << "Write should succeed"; + advance(send_iterator, len); + if (send_iterator == send_end) { + EXPECT_EQ(0, ki_shutdown(fd, SHUT_WR)); + write_complete = true; + } + } + } +} + +class UnixSocketMultithreadedTest : public UnixSocketTest { + public: + void SetUp() { + UnixSocketTest::SetUp(); + EXPECT_EQ(0, ki_socketpair(AF_UNIX, SOCK_STREAM, 0, sv_)); + } + + void TearDown() { UnixSocketTest::TearDown(); } + + pthread_t CreateThread() { + pthread_t id; + EXPECT_EQ(0, pthread_create(&id, NULL, ThreadThunk, this)); + return id; + } + + private: + static void* ThreadThunk(void* ptr) { + return static_cast<UnixSocketMultithreadedTest*>(ptr)->ThreadEntry(); + } + + void* ThreadEntry() { + int fd = sv_[1]; + + ReadWriteSocket(fd, kThreadPattern, sizeof(kThreadPattern), kThreadSendSize, + &thread_buffer_); + return NULL; + } + + protected: + Buffer thread_buffer_; +}; + +} // namespace + +TEST_F(UnixSocketMultithreadedTest, SendRecv) { + pthread_t thread = CreateThread(); + + uint8_t pattern[] = {0xA5, 0x00, 0xC3, 0xFF}; + size_t pattern_size = sizeof(pattern); + Buffer main_read_buf; + + ReadWriteSocket(sv_[0], pattern, pattern_size, kMainSendSize, &main_read_buf); + + pthread_join(thread, NULL); + + EXPECT_EQ(kMainSendSize, thread_buffer_.size()); + EXPECT_EQ(kThreadSendSize, main_read_buf.size()); + for (size_t i = 0; i != thread_buffer_.size(); ++i) { + EXPECT_EQ(pattern[i % pattern_size], thread_buffer_[i]) + << "Invalid result at position " << i << "in data received by thread"; + } + for (size_t i = 0; i != main_read_buf.size(); ++i) { + EXPECT_EQ(kThreadPattern[i % sizeof(kThreadPattern)], main_read_buf[i]) + << "Invalid result at position " << i << "in data received by main"; + } +} + TEST(SocketUtilityFunctions, Htonl) { uint32_t host_long = 0x44332211; uint32_t network_long = htonl(host_long); |