diff options
author | tyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-03 17:13:02 +0000 |
---|---|---|
committer | tyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-03 17:13:02 +0000 |
commit | 21dc4c2c1747a71efebb36067577df3af367d01f (patch) | |
tree | a5416176d140c293448e43c736b31a44f840d2ab /net | |
parent | 31782c03a4a6746e8fc194ecf036b203a1942e55 (diff) | |
download | chromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.zip chromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.tar.gz chromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.tar.bz2 |
Clean up SocketStream send operation by utilizing DrainableIOBuffer.
- Remove unnecessary variables, write_buf_, write_buf_offset_ and write_buf_size_, and utilize the DrainableIOBuffer |current_write_buf_| to do what they were doing.
- Use waiting_for_write_completion_ instead of current_write_buf_ to remember
whether Write operation is ongoing or not.
- Make DidSendData void and return OK outside it.
- In SendData(), now max_pending_send_allowed_ check is done even if current_write_buf_ (formerly write_buf_) is NULL
- Rewrite comments.
TEST=net_unittests --gtest_filter='SocketStreamTest.*'
Review URL: https://chromiumcodereview.appspot.com/12742018
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192080 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/socket_stream/socket_stream.cc | 169 | ||||
-rw-r--r-- | net/socket_stream/socket_stream.h | 45 | ||||
-rw-r--r-- | net/socket_stream/socket_stream_unittest.cc | 44 |
3 files changed, 170 insertions, 88 deletions
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc index 0b1bc5c..84b58bc 100644 --- a/net/socket_stream/socket_stream.cc +++ b/net/socket_stream/socket_stream.cc @@ -105,10 +105,8 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate) io_callback_(base::Bind(&SocketStream::OnIOCompleted, base::Unretained(this)))), read_buf_(NULL), - write_buf_(NULL), current_write_buf_(NULL), - write_buf_offset_(0), - write_buf_size_(0), + waiting_for_write_completion_(false), closing_(false), server_closed_(false), metrics_(new SocketStreamMetrics(url)) { @@ -188,39 +186,58 @@ void SocketStream::Connect() { base::Bind(&SocketStream::DoLoop, this, OK)); } +size_t SocketStream::GetTotalSizeOfPendingWriteBufs() const { + size_t total_size = 0; + for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin(); + iter != pending_write_bufs_.end(); + ++iter) + total_size += (*iter)->size(); + return total_size; +} + bool SocketStream::SendData(const char* data, int len) { DCHECK(MessageLoop::current()) << "The current MessageLoop must exist"; DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) << "The current MessageLoop must be TYPE_IO"; + DCHECK_GT(len, 0); + if (!socket_.get() || !socket_->IsConnected() || next_state_ == STATE_NONE) return false; - if (write_buf_) { - int current_amount_send = write_buf_size_ - write_buf_offset_; - for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin(); - iter != pending_write_bufs_.end(); - ++iter) - current_amount_send += (*iter)->size(); - - current_amount_send += len; - if (current_amount_send > max_pending_send_allowed_) - return false; - - pending_write_bufs_.push_back(make_scoped_refptr( - new IOBufferWithSize(len))); - memcpy(pending_write_bufs_.back()->data(), data, len); - return true; + + int total_buffered_bytes = len; + if (current_write_buf_) { + // Since + // - the purpose of this check is to limit the amount of buffer used by + // this instance. + // - the DrainableIOBuffer doesn't release consumed memory. + // we need to use not BytesRemaining() but size() here. + total_buffered_bytes += current_write_buf_->size(); } - DCHECK(!current_write_buf_); - write_buf_ = new IOBuffer(len); - memcpy(write_buf_->data(), data, len); - write_buf_size_ = len; - write_buf_offset_ = 0; - // Send pending data asynchronously, so that delegate won't be called - // back before returning SendData(). - MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(&SocketStream::DoLoop, this, OK)); + total_buffered_bytes += GetTotalSizeOfPendingWriteBufs(); + if (total_buffered_bytes > max_pending_send_allowed_) + return false; + + // TODO(tyoshino): Split data into smaller chunks e.g. 8KiB to free consumed + // buffer progressively + pending_write_bufs_.push_back(make_scoped_refptr( + new IOBufferWithSize(len))); + memcpy(pending_write_bufs_.back()->data(), data, len); + + // If current_write_buf_ is not NULL, it means that a) there's ongoing write + // operation or b) the connection is being closed. If a), the buffer we just + // pushed will be automatically handled when the completion callback runs + // the loop, and therefore we don't need to enqueue DoLoop(). If b), it's ok + // to do nothing. If current_write_buf_ is NULL, to make sure DoLoop() is + // ran soon, enequeue it. + if (!current_write_buf_) { + // Send pending data asynchronously, so that delegate won't be called + // back before returning from SendData(). + MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&SocketStream::DoLoop, this, OK)); + } + return true; } @@ -378,30 +395,29 @@ int SocketStream::DidReceiveData(int result) { return OK; } -int SocketStream::DidSendData(int result) { +void SocketStream::DidSendData(int result) { DCHECK_GT(result, 0); + DCHECK(current_write_buf_); net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_SENT); - int len = result; - metrics_->OnWrite(len); + + int bytes_sent = result; + + metrics_->OnWrite(bytes_sent); + + current_write_buf_->DidConsume(result); + + if (current_write_buf_->BytesRemaining()) + return; + + size_t bytes_freed = current_write_buf_->size(); + current_write_buf_ = NULL; + + // We freed current_write_buf_ and this instance is now able to accept more + // data via SendData() (note that DidConsume() doesn't free consumed memory). + // We can tell that to delegate_ by calling OnSentData(). if (delegate_) - delegate_->OnSentData(this, len); - - int remaining_size = write_buf_size_ - write_buf_offset_ - len; - if (remaining_size == 0) { - if (!pending_write_bufs_.empty()) { - write_buf_size_ = pending_write_bufs_.front()->size(); - write_buf_ = pending_write_bufs_.front(); - pending_write_bufs_.pop_front(); - } else { - write_buf_size_ = 0; - write_buf_ = NULL; - } - write_buf_offset_ = 0; - } else { - write_buf_offset_ += len; - } - return OK; + delegate_->OnSentData(this, bytes_freed); } void SocketStream::OnIOCompleted(int result) { @@ -420,8 +436,10 @@ void SocketStream::OnReadCompleted(int result) { } void SocketStream::OnWriteCompleted(int result) { - if (result > 0 && write_buf_) { - result = DidSendData(result); + waiting_for_write_completion_ = false; + if (result > 0) { + DidSendData(result); + result = OK; } DoLoop(result); } @@ -1077,7 +1095,7 @@ int SocketStream::DoReadWrite(int result) { // If client has requested close(), and there's nothing to write, then // let's close the socket. // We don't care about receiving data after the socket is closed. - if (closing_ && !write_buf_ && pending_write_bufs_.empty()) { + if (closing_ && !current_write_buf_ && pending_write_bufs_.empty()) { socket_->Disconnect(); next_state_ = STATE_CLOSE; return OK; @@ -1114,28 +1132,39 @@ int SocketStream::DoReadWrite(int result) { DCHECK(read_buf_); } - if (write_buf_ && !current_write_buf_) { - // No write pending. - current_write_buf_ = new DrainableIOBuffer(write_buf_, write_buf_size_); - current_write_buf_->SetOffset(write_buf_offset_); - result = socket_->Write(current_write_buf_, - current_write_buf_->BytesRemaining(), - base::Bind(&SocketStream::OnWriteCompleted, - base::Unretained(this))); - if (result > 0) { - return DidSendData(result); - } - // If write is not pending, return the result and do next loop (to close - // the connection). - if (result != 0 && result != ERR_IO_PENDING) { - next_state_ = STATE_CLOSE; - return result; + if (waiting_for_write_completion_) + return ERR_IO_PENDING; + + if (!current_write_buf_) { + if (pending_write_bufs_.empty()) { + // Nothing buffered for send. + return ERR_IO_PENDING; } - return result; + + current_write_buf_ = + new DrainableIOBuffer(pending_write_bufs_.front(), + pending_write_bufs_.front()->size()); + pending_write_bufs_.pop_front(); } - // We arrived here when both operation is pending. - return ERR_IO_PENDING; + result = socket_->Write(current_write_buf_, + current_write_buf_->BytesRemaining(), + base::Bind(&SocketStream::OnWriteCompleted, + base::Unretained(this))); + + if (result == ERR_IO_PENDING) { + waiting_for_write_completion_ = true; + } else if (result < 0) { + // Shortcut. Enter STATE_CLOSE now by changing next_state_ here than by + // calling DoReadWrite() again with the error code. + next_state_ = STATE_CLOSE; + } else if (result > 0) { + // Write is not pending. Return OK and do next loop. + DidSendData(result); + result = OK; + } + + return result; } GURL SocketStream::ProxyAuthOrigin() const { diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h index ebe175a..50386c3e 100644 --- a/net/socket_stream/socket_stream.h +++ b/net/socket_stream/socket_stream.h @@ -60,10 +60,11 @@ class NET_EXPORT SocketStream virtual int OnStartOpenConnection(SocketStream* socket, const CompletionCallback& callback); - // Called when socket stream has been connected. The socket stream accepts - // at most |max_pending_send_allowed| so that a client of the socket stream - // should keep track of how much it has pending and shouldn't go over - // |max_pending_send_allowed| bytes. + // Called when a socket stream has been connected. The socket stream is + // allowed to buffer pending send data at most |max_pending_send_allowed| + // bytes. A client of the socket stream should keep track of how much + // pending send data it has and must not call SendData() if the pending + // data goes over |max_pending_send_allowed| bytes. virtual void OnConnected(SocketStream* socket, int max_pending_send_allowed) = 0; @@ -135,10 +136,9 @@ class NET_EXPORT SocketStream // Once the connection is established, calls delegate's OnConnected. virtual void Connect(); - // Requests to send |len| bytes of |data| on the connection. - // Returns true if |data| is buffered in the job. - // Returns false if size of buffered data would exceeds - // |max_pending_send_allowed_| and |data| is not sent at all. + // Buffers |data| of |len| bytes for send and returns true if successful. + // If size of buffered data exceeds |max_pending_send_allowed_|, sends no + // data and returns false. |len| must be positive. virtual bool SendData(const char* data, int len); // Requests to close the connection. @@ -270,7 +270,13 @@ class NET_EXPORT SocketStream int DidEstablishConnection(); int DidReceiveData(int result); - int DidSendData(int result); + // Given the number of bytes sent, + // - notifies the |delegate_| and |metrics_| of this event. + // - drains sent data from |current_write_buf_|. + // - if |current_write_buf_| has been fully sent, sets NULL to + // |current_write_buf_| to get ready for next write. + // and then, returns OK. + void DidSendData(int result); void OnIOCompleted(int result); void OnReadCompleted(int result); @@ -318,9 +324,17 @@ class NET_EXPORT SocketStream SSLConfigService* ssl_config_service() const; ProxyService* proxy_service() const; + // Returns the sum of the size of buffers in |pending_write_bufs_|. + size_t GetTotalSizeOfPendingWriteBufs() const; + BoundNetLog net_log_; GURL url_; + // The number of bytes allowed to be buffered in this object. If the size of + // buffered data which is + // current_write_buf_.BytesRemaining() + + // sum of the size of buffers in |pending_write_bufs_| + // exceeds this limit, SendData() fails. int max_pending_send_allowed_; const URLRequestContext* context_; @@ -358,16 +372,11 @@ class NET_EXPORT SocketStream scoped_refptr<IOBuffer> read_buf_; int read_buf_size_; - // Total amount of buffer (|write_buf_size_| - |write_buf_offset_| + - // sum of size of |pending_write_bufs_|) should not exceed - // |max_pending_send_allowed_|. - // |write_buf_| holds requested data and |current_write_buf_| is used - // for Write operation, that is, |current_write_buf_| is - // |write_buf_| + |write_buf_offset_|. - scoped_refptr<IOBuffer> write_buf_; + // Buffer to hold data to pass to socket_. scoped_refptr<DrainableIOBuffer> current_write_buf_; - int write_buf_offset_; - int write_buf_size_; + // True iff there's no error and this instance is waiting for completion of + // Write operation by socket_. + bool waiting_for_write_completion_; PendingDataQueue pending_write_bufs_; bool closing_; diff --git a/net/socket_stream/socket_stream_unittest.cc b/net/socket_stream/socket_stream_unittest.cc index ed3aa9b..e98e0df 100644 --- a/net/socket_stream/socket_stream_unittest.cc +++ b/net/socket_stream/socket_stream_unittest.cc @@ -258,6 +258,13 @@ class SocketStreamTest : public PlatformTest { event->socket->Close(); } + virtual void DoFailByTooBigDataAndClose(SocketStreamEvent* event) { + std::string frame(event->number + 1, 0x00); + VLOG(1) << event->number; + EXPECT_FALSE(event->socket->SendData(&frame[0], frame.size())); + event->socket->Close(); + } + virtual int DoSwitchToSpdyTest(SocketStreamEvent* event) { return ERR_PROTOCOL_SWITCHED; } @@ -363,6 +370,43 @@ TEST_F(SocketStreamTest, CloseFlushPendingWrite) { EXPECT_EQ(SocketStreamEvent::EVENT_CLOSE, events[7].event_type); } +TEST_F(SocketStreamTest, ExceedMaxPendingSendAllowed) { + TestCompletionCallback test_callback; + + scoped_ptr<SocketStreamEventRecorder> delegate( + new SocketStreamEventRecorder(test_callback.callback())); + delegate->SetOnConnected(base::Bind( + &SocketStreamTest::DoFailByTooBigDataAndClose, base::Unretained(this))); + + TestURLRequestContext context; + + scoped_refptr<SocketStream> socket_stream( + new SocketStream(GURL("ws://example.com/demo"), delegate.get())); + + socket_stream->set_context(&context); + + DelayedSocketData data_provider(1, NULL, 0, NULL, 0); + + MockClientSocketFactory* mock_socket_factory = + GetMockClientSocketFactory(); + mock_socket_factory->AddSocketDataProvider(&data_provider); + + socket_stream->SetClientSocketFactory(mock_socket_factory); + + socket_stream->Connect(); + + test_callback.WaitForResult(); + + const std::vector<SocketStreamEvent>& events = delegate->GetSeenEvents(); + ASSERT_EQ(4U, events.size()); + + EXPECT_EQ(SocketStreamEvent::EVENT_START_OPEN_CONNECTION, + events[0].event_type); + EXPECT_EQ(SocketStreamEvent::EVENT_CONNECTED, events[1].event_type); + EXPECT_EQ(SocketStreamEvent::EVENT_ERROR, events[2].event_type); + EXPECT_EQ(SocketStreamEvent::EVENT_CLOSE, events[3].event_type); +} + TEST_F(SocketStreamTest, BasicAuthProxy) { MockClientSocketFactory mock_socket_factory; MockWrite data_writes1[] = { |