diff options
author | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-05-21 22:31:53 +0000 |
---|---|---|
committer | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-05-21 22:31:53 +0000 |
commit | 182ac532081125b65bc169b56eb8230063a8d3a6 (patch) | |
tree | 902958892f112aeb15481879bf2a5c3aabfbad3c /net | |
parent | 7747b6426a1c173b8274d5fe347a5b6d218a8f44 (diff) | |
download | chromium_src-182ac532081125b65bc169b56eb8230063a8d3a6.zip chromium_src-182ac532081125b65bc169b56eb8230063a8d3a6.tar.gz chromium_src-182ac532081125b65bc169b56eb8230063a8d3a6.tar.bz2 |
Implement full duplex mode for TCPClientSocketLibevent.
The main thing is not to share the same MessageLoopForIO::FileDescriptorWatcher object for both reads and writes.
Fixed tcp_client_socket_unittest.cc's full duplex test. In doing so, I also had to fix up some other stuff.
(1) Put everything in the net::(anonymous) namespace.
(2) Keep calling Write() until ERR_IO_PENDING happens.
(3) Make the server socket pause its reads so that the buffers (both client and server side) eventually fill up.
(4) Discovered that after the first Read(), the server socket will close. Further Write() calls will lead to a TCP RST, which breaks the FullDuplex_WriteFirst test, since it wants to do a Write() at the end. Fix this by changing DidRead() not to automatically close the server socket. Added a CloseServerSocket() member function to make this explicit. Fixed up the Read() tests. Made them more exact too.
BUG=http://www.crbug.com/11888
TEST=Go to gmail, compose a message, attach a large file (say 1MB or so). It shouldn't crash.
Review URL: http://codereview.chromium.org/115504
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@16675 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/base/listen_socket.cc | 42 | ||||
-rw-r--r-- | net/base/listen_socket.h | 15 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.cc | 53 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.h | 59 | ||||
-rw-r--r-- | net/base/tcp_client_socket_unittest.cc | 258 |
5 files changed, 273 insertions, 154 deletions
diff --git a/net/base/listen_socket.cc b/net/base/listen_socket.cc index a9d16ef..d31c66d 100644 --- a/net/base/listen_socket.cc +++ b/net/base/listen_socket.cc @@ -23,7 +23,7 @@ #if defined(OS_WIN) #define socklen_t int #elif defined(OS_POSIX) -const int INVALID_SOCKET = -1; // Used same name as in Windows to avoid #ifdef +const int INVALID_SOCKET = -1; // Used same name as in Windows to avoid #ifdef const int SOCKET_ERROR = -1; #endif @@ -31,7 +31,9 @@ const int kReadBufSize = 200; ListenSocket::ListenSocket(SOCKET s, ListenSocketDelegate *del) : socket_(s), - socket_delegate_(del) { + socket_delegate_(del), + reads_paused_(false), + has_pending_reads_(false) { #if defined(OS_WIN) socket_event_ = WSACreateEvent(); // TODO(ibrar): error handling in case of socket_event_ == WSA_INVALID_EVENT @@ -83,7 +85,7 @@ ListenSocket* ListenSocket::Listen(std::string ip, int port, } void ListenSocket::Listen() { - int backlog = 10; // TODO(erikkay): maybe don't allow any backlog? + int backlog = 10; // TODO(erikkay): maybe don't allow any backlog? listen(socket_, backlog); // TODO(erikkay): error handling #if defined(OS_POSIX) @@ -118,7 +120,7 @@ void ListenSocket::Accept() { } void ListenSocket::Read() { - char buf[kReadBufSize + 1]; // +1 for null termination + char buf[kReadBufSize + 1]; // +1 for null termination int len; do { len = HANDLE_EINTR(recv(socket_, buf, kReadBufSize, 0)); @@ -143,7 +145,7 @@ void ListenSocket::Read() { } else { // TODO(ibrar): maybe change DidRead to take a length instead DCHECK(len > 0 && len <= kReadBufSize); - buf[len] = 0; // already create a buffer with +1 length + buf[len] = 0; // already create a buffer with +1 length socket_delegate_->DidRead(this, buf); } } while (len == kReadBufSize); @@ -198,7 +200,7 @@ void ListenSocket::SendInternal(const char* bytes, int len) { #elif defined(OS_POSIX) if (errno == EWOULDBLOCK || errno == EAGAIN) { #endif - // TODO (ibrar): there should be logic here to handle this because + // TODO(ibrar): there should be logic here to handle this because // it is not an error } } else if (sent != len) { @@ -217,7 +219,21 @@ void ListenSocket::Send(const std::string& str, bool append_linefeed) { Send(str.data(), static_cast<int>(str.length()), append_linefeed); } -// TODO (ibrar): We can add these functions into OS dependent files +void ListenSocket::PauseReads() { + DCHECK(!reads_paused_); + reads_paused_ = true; +} + +void ListenSocket::ResumeReads() { + DCHECK(reads_paused_); + reads_paused_ = false; + if (has_pending_reads_) { + has_pending_reads_ = false; + Read(); + } +} + +// TODO(ibrar): We can add these functions into OS dependent files #if defined(OS_WIN) // MessageLoop watcher callback void ListenSocket::OnObjectSignaled(HANDLE object) { @@ -239,7 +255,11 @@ void ListenSocket::OnObjectSignaled(HANDLE object) { Accept(); } if (ev.lNetworkEvents & FD_READ) { - Read(); + if (reads_paused_) { + has_pending_reads_ = true; + } else { + Read(); + } } if (ev.lNetworkEvents & FD_CLOSE) { Close(); @@ -251,7 +271,11 @@ void ListenSocket::OnFileCanReadWithoutBlocking(int fd) { Accept(); } if (wait_state_ == WAITING_READ) { - Read(); + if (reads_paused_) { + has_pending_reads_ = true; + } else { + Read(); + } } if (wait_state_ == WAITING_CLOSE) { // Close() is called by Read() in the Linux case. diff --git a/net/base/listen_socket.h b/net/base/listen_socket.h index c6650ac..cadc962 100644 --- a/net/base/listen_socket.h +++ b/net/base/listen_socket.h @@ -45,6 +45,8 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, // should be split up similarly. class ListenSocketDelegate { public: + virtual ~ListenSocketDelegate() {} + // server is the original listening Socket, connection is the new // Socket that was created. Ownership of connection is transferred // to the delegate with this call. @@ -59,10 +61,16 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, ListenSocketDelegate* del); virtual ~ListenSocket(); - // send data to the socket + // Send data to the socket. void Send(const char* bytes, int len, bool append_linefeed = false); void Send(const std::string& str, bool append_linefeed = false); + // NOTE: This is for unit test use only! + // Pause/Resume calling Read(). Note that ResumeReads() will also call + // Read() if there is anything to read. + void PauseReads(); + void ResumeReads(); + protected: ListenSocket(SOCKET s, ListenSocketDelegate* del); static SOCKET Listen(std::string ip, int port); @@ -106,7 +114,10 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, ListenSocketDelegate *socket_delegate_; private: - DISALLOW_EVIL_CONSTRUCTORS(ListenSocket); + bool reads_paused_; + bool has_pending_reads_; + + DISALLOW_COPY_AND_ASSIGN(ListenSocket); }; #endif // NET_BASE_LISTEN_SOCKET_H_ diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc index 3e23f1c..6ad3d2d 100644 --- a/net/base/tcp_client_socket_libevent.cc +++ b/net/base/tcp_client_socket_libevent.cc @@ -19,11 +19,13 @@ namespace net { +namespace { + const int kInvalidSocket = -1; // Return 0 on success, -1 on failure. // Too small a function to bother putting in a library? -static int SetNonBlocking(int fd) { +int SetNonBlocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (-1 == flags) return flags; @@ -31,7 +33,7 @@ static int SetNonBlocking(int fd) { } // Convert values from <errno.h> to values from "net/base/net_errors.h" -static int MapPosixError(int err) { +int MapPosixError(int err) { // There are numerous posix error codes, but these are the ones we thus far // find interesting. switch (err) { @@ -64,6 +66,8 @@ static int MapPosixError(int err) { } } +} // namespace + //----------------------------------------------------------------------------- TCPClientSocketLibevent::TCPClientSocketLibevent(const AddressList& addresses) @@ -71,6 +75,8 @@ TCPClientSocketLibevent::TCPClientSocketLibevent(const AddressList& addresses) addresses_(addresses), current_ai_(addresses_.head()), waiting_connect_(false), + read_watcher_(this), + write_watcher_(this), read_callback_(NULL), write_callback_(NULL) { } @@ -111,12 +117,12 @@ int TCPClientSocketLibevent::Connect(CompletionCallback* callback) { return MapPosixError(errno); } - // Initialize socket_watcher_ and link it to our MessagePump. + // Initialize write_socket_watcher_ and link it to our MessagePump. // POLLOUT is set if the connection is established. // POLLIN is set if the connection fails. if (!MessageLoopForIO::current()->WatchFileDescriptor( - socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, - this)) { + socket_, true, MessageLoopForIO::WATCH_WRITE, &write_socket_watcher_, + &write_watcher_)) { DLOG(INFO) << "WatchFileDescriptor failed: " << errno; close(socket_); socket_ = kInvalidSocket; @@ -124,7 +130,7 @@ int TCPClientSocketLibevent::Connect(CompletionCallback* callback) { } waiting_connect_ = true; - read_callback_ = callback; + write_callback_ = callback; return ERR_IO_PENDING; } @@ -134,7 +140,10 @@ void TCPClientSocketLibevent::Disconnect() { TRACE_EVENT_INSTANT("socket.disconnect", this, ""); - socket_watcher_.StopWatchingFileDescriptor(); + bool ok = read_socket_watcher_.StopWatchingFileDescriptor(); + DCHECK(ok); + ok = write_socket_watcher_.StopWatchingFileDescriptor(); + DCHECK(ok); close(socket_); socket_ = kInvalidSocket; waiting_connect_ = false; @@ -197,7 +206,7 @@ int TCPClientSocketLibevent::Read(IOBuffer* buf, if (!MessageLoopForIO::current()->WatchFileDescriptor( socket_, true, MessageLoopForIO::WATCH_READ, - &socket_watcher_, this)) { + &read_socket_watcher_, &read_watcher_)) { DLOG(INFO) << "WatchFileDescriptor failed on read, errno " << errno; return MapPosixError(errno); } @@ -229,7 +238,7 @@ int TCPClientSocketLibevent::Write(IOBuffer* buf, if (!MessageLoopForIO::current()->WatchFileDescriptor( socket_, true, MessageLoopForIO::WATCH_WRITE, - &socket_watcher_, this)) { + &write_socket_watcher_, &write_watcher_)) { DLOG(INFO) << "WatchFileDescriptor failed on write, errno " << errno; return MapPosixError(errno); } @@ -300,12 +309,13 @@ void TCPClientSocketLibevent::DidCompleteConnect() { result = Connect(read_callback_); } else { result = MapPosixError(error_code); - socket_watcher_.StopWatchingFileDescriptor(); + bool ok = write_socket_watcher_.StopWatchingFileDescriptor(); + DCHECK(ok); waiting_connect_ = false; } if (result != ERR_IO_PENDING) { - DoReadCallback(result); + DoWriteCallback(result); } } @@ -326,7 +336,8 @@ void TCPClientSocketLibevent::DidCompleteRead() { if (result != ERR_IO_PENDING) { read_buf_ = NULL; read_buf_len_ = 0; - socket_watcher_.StopWatchingFileDescriptor(); + bool ok = read_socket_watcher_.StopWatchingFileDescriptor(); + DCHECK(ok); DoReadCallback(result); } } @@ -348,27 +359,11 @@ void TCPClientSocketLibevent::DidCompleteWrite() { if (result != ERR_IO_PENDING) { write_buf_ = NULL; write_buf_len_ = 0; - socket_watcher_.StopWatchingFileDescriptor(); + write_socket_watcher_.StopWatchingFileDescriptor(); DoWriteCallback(result); } } -void TCPClientSocketLibevent::OnFileCanReadWithoutBlocking(int fd) { - // When a socket connects it signals both Read and Write, we handle - // DidCompleteConnect() in the write handler. - if (!waiting_connect_ && read_callback_) { - DidCompleteRead(); - } -} - -void TCPClientSocketLibevent::OnFileCanWriteWithoutBlocking(int fd) { - if (waiting_connect_) { - DidCompleteConnect(); - } else if (write_callback_) { - DidCompleteWrite(); - } -} - int TCPClientSocketLibevent::GetPeerName(struct sockaddr *name, socklen_t *namelen) { return ::getpeername(socket_, name, namelen); diff --git a/net/base/tcp_client_socket_libevent.h b/net/base/tcp_client_socket_libevent.h index 4b3dd45..041ca7c 100644 --- a/net/base/tcp_client_socket_libevent.h +++ b/net/base/tcp_client_socket_libevent.h @@ -8,6 +8,8 @@ #include <sys/socket.h> // for struct sockaddr #include "base/message_loop.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" #include "net/base/address_list.h" #include "net/base/client_socket.h" #include "net/base/completion_callback.h" @@ -17,15 +19,14 @@ struct event; // From libevent namespace net { // A client socket that uses TCP as the transport layer. -class TCPClientSocketLibevent : public ClientSocket, - public MessageLoopForIO::Watcher { +class TCPClientSocketLibevent : public ClientSocket { public: // The IP address(es) and port number to connect to. The TCP socket will try // each IP address in the list until it succeeds in establishing a // connection. explicit TCPClientSocketLibevent(const AddressList& addresses); - ~TCPClientSocketLibevent(); + virtual ~TCPClientSocketLibevent(); // ClientSocket methods: virtual int Connect(CompletionCallback* callback); @@ -44,9 +45,46 @@ class TCPClientSocketLibevent : public ClientSocket, virtual int GetPeerName(struct sockaddr *name, socklen_t *namelen); private: - // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnFileCanReadWithoutBlocking(int fd); - void OnFileCanWriteWithoutBlocking(int fd); + class ReadWatcher : public MessageLoopForIO::Watcher { + public: + explicit ReadWatcher(TCPClientSocketLibevent* socket) : socket_(socket) {} + + // MessageLoopForIO::Watcher methods + + virtual void OnFileCanReadWithoutBlocking(int /* fd */) { + if (socket_->read_callback_) + socket_->DidCompleteRead(); + } + + virtual void OnFileCanWriteWithoutBlocking(int /* fd */) {} + + private: + TCPClientSocketLibevent* const socket_; + + DISALLOW_COPY_AND_ASSIGN(ReadWatcher); + }; + + class WriteWatcher : public MessageLoopForIO::Watcher { + public: + explicit WriteWatcher(TCPClientSocketLibevent* socket) : socket_(socket) {} + + // MessageLoopForIO::Watcher methods + + virtual void OnFileCanReadWithoutBlocking(int /* fd */) {} + + virtual void OnFileCanWriteWithoutBlocking(int /* fd */) { + if (socket_->waiting_connect_) { + socket_->DidCompleteConnect(); + } else if (socket_->write_callback_) { + socket_->DidCompleteWrite(); + } + } + + private: + TCPClientSocketLibevent* const socket_; + + DISALLOW_COPY_AND_ASSIGN(WriteWatcher); + }; void DoReadCallback(int rv); void DoWriteCallback(int rv); @@ -67,8 +105,13 @@ class TCPClientSocketLibevent : public ClientSocket, // Whether we're currently waiting for connect() to complete bool waiting_connect_; - // The socket's libevent wrapper - MessageLoopForIO::FileDescriptorWatcher socket_watcher_; + // The socket's libevent wrappers + MessageLoopForIO::FileDescriptorWatcher read_socket_watcher_; + MessageLoopForIO::FileDescriptorWatcher write_socket_watcher_; + + // The corresponding watchers for reads and writes. + ReadWatcher read_watcher_; + WriteWatcher write_watcher_; // The buffer used by OnSocketReady to retry Read requests scoped_refptr<IOBuffer> read_buf_; diff --git a/net/base/tcp_client_socket_unittest.cc b/net/base/tcp_client_socket_unittest.cc index 3fbb02c..fc364645 100644 --- a/net/base/tcp_client_socket_unittest.cc +++ b/net/base/tcp_client_socket_unittest.cc @@ -2,16 +2,24 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "net/base/tcp_client_socket.h" + +#include "base/basictypes.h" #include "net/base/address_list.h" #include "net/base/host_resolver.h" #include "net/base/listen_socket.h" #include "net/base/net_errors.h" -#include "net/base/tcp_client_socket.h" #include "net/base/test_completion_callback.h" #include "net/base/winsock_init.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" +namespace net { + +namespace { + +const char kServerReply[] = "HTTP/1.1 404 Not Found"; + class TCPClientSocketTest : public PlatformTest, public ListenSocket::ListenSocketDelegate { public: @@ -24,18 +32,31 @@ class TCPClientSocketTest } virtual void DidRead(ListenSocket*, const std::string& s) { // TODO(dkegel): this might not be long enough to tickle some bugs. - connected_sock_->Send(std::string("HTTP/1.1 404 Not Found"), true); - // Close socket by destroying it, else read test below will hang. - connected_sock_ = NULL; + connected_sock_->Send(kServerReply, + arraysize(kServerReply) - 1, + false /* don't append line feed */); } virtual void DidClose(ListenSocket* sock) {} // Testcase hooks virtual void SetUp(); + void CloseServerSocket() { + // delete the connected_sock_, which will close it. + connected_sock_ = NULL; + } + + void PauseServerReads() { + connected_sock_->PauseReads(); + } + + void ResumeServerReads() { + connected_sock_->ResumeReads(); + } + protected: int listen_port_; - scoped_ptr<net::TCPClientSocket> sock_; + scoped_ptr<TCPClientSocket> sock_; private: scoped_refptr<ListenSocket> listen_sock_; @@ -52,7 +73,7 @@ void TCPClientSocketTest::SetUp() { const int kMinPort = 10100; const int kMaxPort = 10200; #if defined(OS_WIN) - net::EnsureWinsockInit(); + EnsureWinsockInit(); #endif for (port = kMinPort; port < kMaxPort; port++) { sock = ListenSocket::Listen("127.0.0.1", port, this); @@ -63,11 +84,11 @@ void TCPClientSocketTest::SetUp() { listen_sock_ = sock; listen_port_ = port; - net::AddressList addr; - net::HostResolver resolver; + AddressList addr; + HostResolver resolver; int rv = resolver.Resolve("localhost", listen_port_, &addr, NULL); - CHECK(rv == net::OK); - sock_.reset(new net::TCPClientSocket(addr)); + CHECK(rv == OK); + sock_.reset(new TCPClientSocket(addr)); } TEST_F(TCPClientSocketTest, Connect) { @@ -75,11 +96,11 @@ TEST_F(TCPClientSocketTest, Connect) { EXPECT_FALSE(sock_->IsConnected()); int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(rv, ERR_IO_PENDING); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(rv, OK); } EXPECT_TRUE(sock_->IsConnected()); @@ -95,197 +116,222 @@ TEST_F(TCPClientSocketTest, Connect) { TEST_F(TCPClientSocketTest, Read) { TestCompletionCallback callback; int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(rv, ERR_IO_PENDING); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(rv, OK); } const char request_text[] = "GET / HTTP/1.0\r\n\r\n"; - scoped_refptr<net::IOBuffer> request_buffer = - new net::IOBuffer(arraysize(request_text) - 1); + scoped_refptr<IOBuffer> request_buffer = + new IOBuffer(arraysize(request_text) - 1); memcpy(request_buffer->data(), request_text, arraysize(request_text) - 1); rv = sock_->Write(request_buffer, arraysize(request_text) - 1, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) { + if (rv == ERR_IO_PENDING) { rv = callback.WaitForResult(); EXPECT_EQ(rv, static_cast<int>(arraysize(request_text) - 1)); } - scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(4096); - for (;;) { + scoped_refptr<IOBuffer> buf = new IOBuffer(4096); + uint32 bytes_read = 0; + while (bytes_read < arraysize(kServerReply) - 1) { rv = sock_->Read(buf, 4096, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) + if (rv == ERR_IO_PENDING) rv = callback.WaitForResult(); - EXPECT_GE(rv, 0); - if (rv <= 0) - break; + ASSERT_GE(rv, 0); + bytes_read += rv; } + + // All data has been read now. Read once more to force an ERR_IO_PENDING, and + // then close the server socket, and note the close. + + rv = sock_->Read(buf, 4096, &callback); + ASSERT_EQ(ERR_IO_PENDING, rv); + CloseServerSocket(); + EXPECT_EQ(0, callback.WaitForResult()); } TEST_F(TCPClientSocketTest, Read_SmallChunks) { TestCompletionCallback callback; int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(rv, ERR_IO_PENDING); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(rv, OK); } const char request_text[] = "GET / HTTP/1.0\r\n\r\n"; - scoped_refptr<net::IOBuffer> request_buffer = - new net::IOBuffer(arraysize(request_text) - 1); + scoped_refptr<IOBuffer> request_buffer = + new IOBuffer(arraysize(request_text) - 1); memcpy(request_buffer->data(), request_text, arraysize(request_text) - 1); rv = sock_->Write(request_buffer, arraysize(request_text) - 1, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) { + if (rv == ERR_IO_PENDING) { rv = callback.WaitForResult(); EXPECT_EQ(rv, static_cast<int>(arraysize(request_text) - 1)); } - scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(1); - for (;;) { + scoped_refptr<IOBuffer> buf = new IOBuffer(1); + uint32 bytes_read = 0; + while (bytes_read < arraysize(kServerReply) - 1) { rv = sock_->Read(buf, 1, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) + if (rv == ERR_IO_PENDING) rv = callback.WaitForResult(); - EXPECT_GE(rv, 0); - if (rv <= 0) - break; + ASSERT_EQ(1, rv); + bytes_read += rv; } + + // All data has been read now. Read once more to force an ERR_IO_PENDING, and + // then close the server socket, and note the close. + + rv = sock_->Read(buf, 1, &callback); + ASSERT_EQ(ERR_IO_PENDING, rv); + CloseServerSocket(); + EXPECT_EQ(0, callback.WaitForResult()); } TEST_F(TCPClientSocketTest, Read_Interrupted) { TestCompletionCallback callback; int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(ERR_IO_PENDING, rv); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(rv, OK); } const char request_text[] = "GET / HTTP/1.0\r\n\r\n"; - scoped_refptr<net::IOBuffer> request_buffer = - new net::IOBuffer(arraysize(request_text) - 1); + scoped_refptr<IOBuffer> request_buffer = + new IOBuffer(arraysize(request_text) - 1); memcpy(request_buffer->data(), request_text, arraysize(request_text) - 1); rv = sock_->Write(request_buffer, arraysize(request_text) - 1, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) { + if (rv == ERR_IO_PENDING) { rv = callback.WaitForResult(); EXPECT_EQ(rv, static_cast<int>(arraysize(request_text) - 1)); } // Do a partial read and then exit. This test should not crash! - scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(512); - rv = sock_->Read(buf, 512, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + scoped_refptr<IOBuffer> buf = new IOBuffer(16); + rv = sock_->Read(buf, 16, &callback); + EXPECT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) + if (rv == ERR_IO_PENDING) rv = callback.WaitForResult(); - EXPECT_NE(rv, 0); + EXPECT_NE(0, rv); } TEST_F(TCPClientSocketTest, FullDuplex_ReadFirst) { TestCompletionCallback callback; int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(rv, ERR_IO_PENDING); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(rv, OK); } + // Read first. There's no data, so it should return ERR_IO_PENDING. const int kBufLen = 4096; - scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kBufLen); + scoped_refptr<IOBuffer> buf = new IOBuffer(kBufLen); rv = sock_->Read(buf, kBufLen, &callback); - EXPECT_EQ(net::ERR_IO_PENDING, rv); + EXPECT_EQ(ERR_IO_PENDING, rv); - const char request_text[] = "GET / HTTP/1.0\r\n\r\n"; - scoped_refptr<net::IOBuffer> request_buffer = - new net::IOBuffer(arraysize(request_text) - 1); - memcpy(request_buffer->data(), request_text, arraysize(request_text) - 1); + PauseServerReads(); + const int kWriteBufLen = 64 * 1024; + scoped_refptr<IOBuffer> request_buffer = new IOBuffer(kWriteBufLen); + char* request_data = request_buffer->data(); + memset(request_data, 'A', kWriteBufLen); TestCompletionCallback write_callback; - rv = sock_->Write(request_buffer, arraysize(request_text) - 1, - &write_callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + while (true) { + rv = sock_->Write(request_buffer, kWriteBufLen, &write_callback); + ASSERT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) { - rv = write_callback.WaitForResult(); - EXPECT_EQ(rv, static_cast<int>(arraysize(request_text) - 1)); + if (rv == ERR_IO_PENDING) { + ResumeServerReads(); + rv = write_callback.WaitForResult(); + break; + } } + // At this point, both read and write have returned ERR_IO_PENDING, and the + // write callback has executed. We wait for the read callback to run now to + // make sure that the socket can handle full duplex communications. + rv = callback.WaitForResult(); EXPECT_GE(rv, 0); - while (rv > 0) { - rv = sock_->Read(buf, kBufLen, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); - - if (rv == net::ERR_IO_PENDING) - rv = callback.WaitForResult(); - - EXPECT_GE(rv, 0); - if (rv <= 0) - break; - } } TEST_F(TCPClientSocketTest, FullDuplex_WriteFirst) { TestCompletionCallback callback; int rv = sock_->Connect(&callback); - if (rv != net::OK) { - ASSERT_EQ(rv, net::ERR_IO_PENDING); + if (rv != OK) { + ASSERT_EQ(ERR_IO_PENDING, rv); rv = callback.WaitForResult(); - EXPECT_EQ(rv, net::OK); + EXPECT_EQ(OK, rv); } - const char request_text[] = "GET / HTTP/1.0\r\n\r\n"; - scoped_refptr<net::IOBuffer> request_buffer = - new net::IOBuffer(arraysize(request_text) - 1); - memcpy(request_buffer->data(), request_text, arraysize(request_text) - 1); + PauseServerReads(); + const int kWriteBufLen = 64 * 1024; + scoped_refptr<IOBuffer> request_buffer = new IOBuffer(kWriteBufLen); + char* request_data = request_buffer->data(); + memset(request_data, 'A', kWriteBufLen); TestCompletionCallback write_callback; - rv = sock_->Write(request_buffer, arraysize(request_text) - 1, - &write_callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); + while (true) { + rv = sock_->Write(request_buffer, kWriteBufLen, &write_callback); + ASSERT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); - const int kBufLen = 4096; - scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kBufLen); - int read_rv = sock_->Read(buf, kBufLen, &callback); - EXPECT_TRUE(read_rv >= 0 || read_rv == net::ERR_IO_PENDING); + if (rv == ERR_IO_PENDING) + break; + } + + // Now we have the Write() blocked on ERR_IO_PENDING. It's time to force the + // Read() to block on ERR_IO_PENDING too. - if (rv == net::ERR_IO_PENDING) { - rv = write_callback.WaitForResult(); - EXPECT_EQ(static_cast<int>(arraysize(request_text) - 1), rv); + const int kBufLen = 4096; + scoped_refptr<IOBuffer> buf = new IOBuffer(kBufLen); + while (true) { + rv = sock_->Read(buf, kBufLen, &callback); + ASSERT_TRUE(rv >= 0 || rv == ERR_IO_PENDING); + if (rv == ERR_IO_PENDING) + break; } - rv = callback.WaitForResult(); + // At this point, both read and write have returned ERR_IO_PENDING. Now we + // run the write and read callbacks to make sure they can handle full duplex + // communications. + + ResumeServerReads(); + rv = write_callback.WaitForResult(); EXPECT_GE(rv, 0); - while (rv > 0) { - rv = sock_->Read(buf, kBufLen, &callback); - EXPECT_TRUE(rv >= 0 || rv == net::ERR_IO_PENDING); - if (rv == net::ERR_IO_PENDING) - rv = callback.WaitForResult(); + // It's possible the read is blocked because it's already read all the data. + // Close the server socket, so there will at least be a 0-byte read. + CloseServerSocket(); - EXPECT_GE(rv, 0); - if (rv <= 0) - break; - } + rv = callback.WaitForResult(); + EXPECT_GE(rv, 0); } + +} // namespace + +} // namespace net |