summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authortyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-03 17:13:02 +0000
committertyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-03 17:13:02 +0000
commit21dc4c2c1747a71efebb36067577df3af367d01f (patch)
treea5416176d140c293448e43c736b31a44f840d2ab /net
parent31782c03a4a6746e8fc194ecf036b203a1942e55 (diff)
downloadchromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.zip
chromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.tar.gz
chromium_src-21dc4c2c1747a71efebb36067577df3af367d01f.tar.bz2
Clean up SocketStream send operation by utilizing DrainableIOBuffer.
- Remove unnecessary variables, write_buf_, write_buf_offset_ and write_buf_size_, and utilize the DrainableIOBuffer |current_write_buf_| to do what they were doing. - Use waiting_for_write_completion_ instead of current_write_buf_ to remember whether Write operation is ongoing or not. - Make DidSendData void and return OK outside it. - In SendData(), now max_pending_send_allowed_ check is done even if current_write_buf_ (formerly write_buf_) is NULL - Rewrite comments. TEST=net_unittests --gtest_filter='SocketStreamTest.*' Review URL: https://chromiumcodereview.appspot.com/12742018 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192080 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/socket_stream/socket_stream.cc169
-rw-r--r--net/socket_stream/socket_stream.h45
-rw-r--r--net/socket_stream/socket_stream_unittest.cc44
3 files changed, 170 insertions, 88 deletions
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc
index 0b1bc5c..84b58bc 100644
--- a/net/socket_stream/socket_stream.cc
+++ b/net/socket_stream/socket_stream.cc
@@ -105,10 +105,8 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate)
io_callback_(base::Bind(&SocketStream::OnIOCompleted,
base::Unretained(this)))),
read_buf_(NULL),
- write_buf_(NULL),
current_write_buf_(NULL),
- write_buf_offset_(0),
- write_buf_size_(0),
+ waiting_for_write_completion_(false),
closing_(false),
server_closed_(false),
metrics_(new SocketStreamMetrics(url)) {
@@ -188,39 +186,58 @@ void SocketStream::Connect() {
base::Bind(&SocketStream::DoLoop, this, OK));
}
+size_t SocketStream::GetTotalSizeOfPendingWriteBufs() const {
+ size_t total_size = 0;
+ for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin();
+ iter != pending_write_bufs_.end();
+ ++iter)
+ total_size += (*iter)->size();
+ return total_size;
+}
+
bool SocketStream::SendData(const char* data, int len) {
DCHECK(MessageLoop::current()) <<
"The current MessageLoop must exist";
DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
"The current MessageLoop must be TYPE_IO";
+ DCHECK_GT(len, 0);
+
if (!socket_.get() || !socket_->IsConnected() || next_state_ == STATE_NONE)
return false;
- if (write_buf_) {
- int current_amount_send = write_buf_size_ - write_buf_offset_;
- for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin();
- iter != pending_write_bufs_.end();
- ++iter)
- current_amount_send += (*iter)->size();
-
- current_amount_send += len;
- if (current_amount_send > max_pending_send_allowed_)
- return false;
-
- pending_write_bufs_.push_back(make_scoped_refptr(
- new IOBufferWithSize(len)));
- memcpy(pending_write_bufs_.back()->data(), data, len);
- return true;
+
+ int total_buffered_bytes = len;
+ if (current_write_buf_) {
+ // Since
+ // - the purpose of this check is to limit the amount of buffer used by
+ // this instance.
+ // - the DrainableIOBuffer doesn't release consumed memory.
+ // we need to use not BytesRemaining() but size() here.
+ total_buffered_bytes += current_write_buf_->size();
}
- DCHECK(!current_write_buf_);
- write_buf_ = new IOBuffer(len);
- memcpy(write_buf_->data(), data, len);
- write_buf_size_ = len;
- write_buf_offset_ = 0;
- // Send pending data asynchronously, so that delegate won't be called
- // back before returning SendData().
- MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&SocketStream::DoLoop, this, OK));
+ total_buffered_bytes += GetTotalSizeOfPendingWriteBufs();
+ if (total_buffered_bytes > max_pending_send_allowed_)
+ return false;
+
+ // TODO(tyoshino): Split data into smaller chunks e.g. 8KiB to free consumed
+ // buffer progressively
+ pending_write_bufs_.push_back(make_scoped_refptr(
+ new IOBufferWithSize(len)));
+ memcpy(pending_write_bufs_.back()->data(), data, len);
+
+ // If current_write_buf_ is not NULL, it means that a) there's ongoing write
+ // operation or b) the connection is being closed. If a), the buffer we just
+ // pushed will be automatically handled when the completion callback runs
+ // the loop, and therefore we don't need to enqueue DoLoop(). If b), it's ok
+ // to do nothing. If current_write_buf_ is NULL, to make sure DoLoop() is
+ // ran soon, enequeue it.
+ if (!current_write_buf_) {
+ // Send pending data asynchronously, so that delegate won't be called
+ // back before returning from SendData().
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&SocketStream::DoLoop, this, OK));
+ }
+
return true;
}
@@ -378,30 +395,29 @@ int SocketStream::DidReceiveData(int result) {
return OK;
}
-int SocketStream::DidSendData(int result) {
+void SocketStream::DidSendData(int result) {
DCHECK_GT(result, 0);
+ DCHECK(current_write_buf_);
net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_SENT);
- int len = result;
- metrics_->OnWrite(len);
+
+ int bytes_sent = result;
+
+ metrics_->OnWrite(bytes_sent);
+
+ current_write_buf_->DidConsume(result);
+
+ if (current_write_buf_->BytesRemaining())
+ return;
+
+ size_t bytes_freed = current_write_buf_->size();
+
current_write_buf_ = NULL;
+
+ // We freed current_write_buf_ and this instance is now able to accept more
+ // data via SendData() (note that DidConsume() doesn't free consumed memory).
+ // We can tell that to delegate_ by calling OnSentData().
if (delegate_)
- delegate_->OnSentData(this, len);
-
- int remaining_size = write_buf_size_ - write_buf_offset_ - len;
- if (remaining_size == 0) {
- if (!pending_write_bufs_.empty()) {
- write_buf_size_ = pending_write_bufs_.front()->size();
- write_buf_ = pending_write_bufs_.front();
- pending_write_bufs_.pop_front();
- } else {
- write_buf_size_ = 0;
- write_buf_ = NULL;
- }
- write_buf_offset_ = 0;
- } else {
- write_buf_offset_ += len;
- }
- return OK;
+ delegate_->OnSentData(this, bytes_freed);
}
void SocketStream::OnIOCompleted(int result) {
@@ -420,8 +436,10 @@ void SocketStream::OnReadCompleted(int result) {
}
void SocketStream::OnWriteCompleted(int result) {
- if (result > 0 && write_buf_) {
- result = DidSendData(result);
+ waiting_for_write_completion_ = false;
+ if (result > 0) {
+ DidSendData(result);
+ result = OK;
}
DoLoop(result);
}
@@ -1077,7 +1095,7 @@ int SocketStream::DoReadWrite(int result) {
// If client has requested close(), and there's nothing to write, then
// let's close the socket.
// We don't care about receiving data after the socket is closed.
- if (closing_ && !write_buf_ && pending_write_bufs_.empty()) {
+ if (closing_ && !current_write_buf_ && pending_write_bufs_.empty()) {
socket_->Disconnect();
next_state_ = STATE_CLOSE;
return OK;
@@ -1114,28 +1132,39 @@ int SocketStream::DoReadWrite(int result) {
DCHECK(read_buf_);
}
- if (write_buf_ && !current_write_buf_) {
- // No write pending.
- current_write_buf_ = new DrainableIOBuffer(write_buf_, write_buf_size_);
- current_write_buf_->SetOffset(write_buf_offset_);
- result = socket_->Write(current_write_buf_,
- current_write_buf_->BytesRemaining(),
- base::Bind(&SocketStream::OnWriteCompleted,
- base::Unretained(this)));
- if (result > 0) {
- return DidSendData(result);
- }
- // If write is not pending, return the result and do next loop (to close
- // the connection).
- if (result != 0 && result != ERR_IO_PENDING) {
- next_state_ = STATE_CLOSE;
- return result;
+ if (waiting_for_write_completion_)
+ return ERR_IO_PENDING;
+
+ if (!current_write_buf_) {
+ if (pending_write_bufs_.empty()) {
+ // Nothing buffered for send.
+ return ERR_IO_PENDING;
}
- return result;
+
+ current_write_buf_ =
+ new DrainableIOBuffer(pending_write_bufs_.front(),
+ pending_write_bufs_.front()->size());
+ pending_write_bufs_.pop_front();
}
- // We arrived here when both operation is pending.
- return ERR_IO_PENDING;
+ result = socket_->Write(current_write_buf_,
+ current_write_buf_->BytesRemaining(),
+ base::Bind(&SocketStream::OnWriteCompleted,
+ base::Unretained(this)));
+
+ if (result == ERR_IO_PENDING) {
+ waiting_for_write_completion_ = true;
+ } else if (result < 0) {
+ // Shortcut. Enter STATE_CLOSE now by changing next_state_ here than by
+ // calling DoReadWrite() again with the error code.
+ next_state_ = STATE_CLOSE;
+ } else if (result > 0) {
+ // Write is not pending. Return OK and do next loop.
+ DidSendData(result);
+ result = OK;
+ }
+
+ return result;
}
GURL SocketStream::ProxyAuthOrigin() const {
diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h
index ebe175a..50386c3e 100644
--- a/net/socket_stream/socket_stream.h
+++ b/net/socket_stream/socket_stream.h
@@ -60,10 +60,11 @@ class NET_EXPORT SocketStream
virtual int OnStartOpenConnection(SocketStream* socket,
const CompletionCallback& callback);
- // Called when socket stream has been connected. The socket stream accepts
- // at most |max_pending_send_allowed| so that a client of the socket stream
- // should keep track of how much it has pending and shouldn't go over
- // |max_pending_send_allowed| bytes.
+ // Called when a socket stream has been connected. The socket stream is
+ // allowed to buffer pending send data at most |max_pending_send_allowed|
+ // bytes. A client of the socket stream should keep track of how much
+ // pending send data it has and must not call SendData() if the pending
+ // data goes over |max_pending_send_allowed| bytes.
virtual void OnConnected(SocketStream* socket,
int max_pending_send_allowed) = 0;
@@ -135,10 +136,9 @@ class NET_EXPORT SocketStream
// Once the connection is established, calls delegate's OnConnected.
virtual void Connect();
- // Requests to send |len| bytes of |data| on the connection.
- // Returns true if |data| is buffered in the job.
- // Returns false if size of buffered data would exceeds
- // |max_pending_send_allowed_| and |data| is not sent at all.
+ // Buffers |data| of |len| bytes for send and returns true if successful.
+ // If size of buffered data exceeds |max_pending_send_allowed_|, sends no
+ // data and returns false. |len| must be positive.
virtual bool SendData(const char* data, int len);
// Requests to close the connection.
@@ -270,7 +270,13 @@ class NET_EXPORT SocketStream
int DidEstablishConnection();
int DidReceiveData(int result);
- int DidSendData(int result);
+ // Given the number of bytes sent,
+ // - notifies the |delegate_| and |metrics_| of this event.
+ // - drains sent data from |current_write_buf_|.
+ // - if |current_write_buf_| has been fully sent, sets NULL to
+ // |current_write_buf_| to get ready for next write.
+ // and then, returns OK.
+ void DidSendData(int result);
void OnIOCompleted(int result);
void OnReadCompleted(int result);
@@ -318,9 +324,17 @@ class NET_EXPORT SocketStream
SSLConfigService* ssl_config_service() const;
ProxyService* proxy_service() const;
+ // Returns the sum of the size of buffers in |pending_write_bufs_|.
+ size_t GetTotalSizeOfPendingWriteBufs() const;
+
BoundNetLog net_log_;
GURL url_;
+ // The number of bytes allowed to be buffered in this object. If the size of
+ // buffered data which is
+ // current_write_buf_.BytesRemaining() +
+ // sum of the size of buffers in |pending_write_bufs_|
+ // exceeds this limit, SendData() fails.
int max_pending_send_allowed_;
const URLRequestContext* context_;
@@ -358,16 +372,11 @@ class NET_EXPORT SocketStream
scoped_refptr<IOBuffer> read_buf_;
int read_buf_size_;
- // Total amount of buffer (|write_buf_size_| - |write_buf_offset_| +
- // sum of size of |pending_write_bufs_|) should not exceed
- // |max_pending_send_allowed_|.
- // |write_buf_| holds requested data and |current_write_buf_| is used
- // for Write operation, that is, |current_write_buf_| is
- // |write_buf_| + |write_buf_offset_|.
- scoped_refptr<IOBuffer> write_buf_;
+ // Buffer to hold data to pass to socket_.
scoped_refptr<DrainableIOBuffer> current_write_buf_;
- int write_buf_offset_;
- int write_buf_size_;
+ // True iff there's no error and this instance is waiting for completion of
+ // Write operation by socket_.
+ bool waiting_for_write_completion_;
PendingDataQueue pending_write_bufs_;
bool closing_;
diff --git a/net/socket_stream/socket_stream_unittest.cc b/net/socket_stream/socket_stream_unittest.cc
index ed3aa9b..e98e0df 100644
--- a/net/socket_stream/socket_stream_unittest.cc
+++ b/net/socket_stream/socket_stream_unittest.cc
@@ -258,6 +258,13 @@ class SocketStreamTest : public PlatformTest {
event->socket->Close();
}
+ virtual void DoFailByTooBigDataAndClose(SocketStreamEvent* event) {
+ std::string frame(event->number + 1, 0x00);
+ VLOG(1) << event->number;
+ EXPECT_FALSE(event->socket->SendData(&frame[0], frame.size()));
+ event->socket->Close();
+ }
+
virtual int DoSwitchToSpdyTest(SocketStreamEvent* event) {
return ERR_PROTOCOL_SWITCHED;
}
@@ -363,6 +370,43 @@ TEST_F(SocketStreamTest, CloseFlushPendingWrite) {
EXPECT_EQ(SocketStreamEvent::EVENT_CLOSE, events[7].event_type);
}
+TEST_F(SocketStreamTest, ExceedMaxPendingSendAllowed) {
+ TestCompletionCallback test_callback;
+
+ scoped_ptr<SocketStreamEventRecorder> delegate(
+ new SocketStreamEventRecorder(test_callback.callback()));
+ delegate->SetOnConnected(base::Bind(
+ &SocketStreamTest::DoFailByTooBigDataAndClose, base::Unretained(this)));
+
+ TestURLRequestContext context;
+
+ scoped_refptr<SocketStream> socket_stream(
+ new SocketStream(GURL("ws://example.com/demo"), delegate.get()));
+
+ socket_stream->set_context(&context);
+
+ DelayedSocketData data_provider(1, NULL, 0, NULL, 0);
+
+ MockClientSocketFactory* mock_socket_factory =
+ GetMockClientSocketFactory();
+ mock_socket_factory->AddSocketDataProvider(&data_provider);
+
+ socket_stream->SetClientSocketFactory(mock_socket_factory);
+
+ socket_stream->Connect();
+
+ test_callback.WaitForResult();
+
+ const std::vector<SocketStreamEvent>& events = delegate->GetSeenEvents();
+ ASSERT_EQ(4U, events.size());
+
+ EXPECT_EQ(SocketStreamEvent::EVENT_START_OPEN_CONNECTION,
+ events[0].event_type);
+ EXPECT_EQ(SocketStreamEvent::EVENT_CONNECTED, events[1].event_type);
+ EXPECT_EQ(SocketStreamEvent::EVENT_ERROR, events[2].event_type);
+ EXPECT_EQ(SocketStreamEvent::EVENT_CLOSE, events[3].event_type);
+}
+
TEST_F(SocketStreamTest, BasicAuthProxy) {
MockClientSocketFactory mock_socket_factory;
MockWrite data_writes1[] = {