summaryrefslogtreecommitdiffstats
path: root/net/websockets/websocket_channel.cc
diff options
context:
space:
mode:
authorricea@chromium.org <ricea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-24 15:39:36 +0000
committerricea@chromium.org <ricea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-24 15:39:36 +0000
commit4256dbb23c2a2c694cd85aab094e57dcdcd4e679 (patch)
treed9c87bce4101ede3c1f5855dc8d80021b49a63b7 /net/websockets/websocket_channel.cc
parentec24838a49bf0584feafcbe9c3b451c0d002ee84 (diff)
downloadchromium_src-4256dbb23c2a2c694cd85aab094e57dcdcd4e679.zip
chromium_src-4256dbb23c2a2c694cd85aab094e57dcdcd4e679.tar.gz
chromium_src-4256dbb23c2a2c694cd85aab094e57dcdcd4e679.tar.bz2
Obey renderer-supplied quota in the browser.
This allows back-pressure to be applied to the TCP/IP connection and ultimately the origin server when the renderer is not keeping up. This CL also adds a ScopedQueue type (like ScopedVector), because it needs one. There are some shortcomings to the approach implemented here. These are noted in the bug. In brief, net::WebSocketChannel's idea of how much it should read from the socket needs to be somewhat decoupled from whether the renderer is ready to receive data. This will be improved in future as we refine the flow control design for multiplexing. I am having difficulty quantifying the performance impact of this change, because the speeds reported by pywebsocket benchmark.html seem to vary dramatically depending on whether I look at the tab or not. However, if there is a performance regression it cannot be too large. BUG=283792 TEST=net_unittests, layout tests, pywebsocket benchmark Review URL: https://codereview.chromium.org/198723002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@258936 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/websockets/websocket_channel.cc')
-rw-r--r--net/websockets/websocket_channel.cc119
1 files changed, 103 insertions, 16 deletions
diff --git a/net/websockets/websocket_channel.cc b/net/websockets/websocket_channel.cc
index ed8a92b..18e1708 100644
--- a/net/websockets/websocket_channel.cc
+++ b/net/websockets/websocket_channel.cc
@@ -4,12 +4,16 @@
#include "net/websockets/websocket_channel.h"
+#include <limits.h> // for INT_MAX
+
#include <algorithm>
+#include <deque>
#include "base/basictypes.h" // for size_t
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/numerics/safe_conversions.h"
@@ -167,8 +171,7 @@ class WebSocketChannel::ConnectDelegate
}
virtual void OnFinishOpeningHandshake(
- scoped_ptr<WebSocketHandshakeResponseInfo> response)
- OVERRIDE {
+ scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
creator_->OnFinishOpeningHandshake(response.Pass());
}
@@ -217,7 +220,8 @@ class WebSocketChannel::HandshakeNotificationSender
};
WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
- WebSocketChannel* channel) : owner_(channel) {}
+ WebSocketChannel* channel)
+ : owner_(channel) {}
WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
@@ -235,13 +239,13 @@ ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
if (handshake_request_info_.get()) {
if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
- handshake_request_info_.Pass()))
+ handshake_request_info_.Pass()))
return CHANNEL_DELETED;
}
if (handshake_response_info_.get()) {
if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
- handshake_response_info_.Pass()))
+ handshake_response_info_.Pass()))
return CHANNEL_DELETED;
// TODO(yhirano): We can release |this| to save memory because
@@ -251,6 +255,31 @@ ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
return CHANNEL_ALIVE;
}
+WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
+ bool final,
+ WebSocketFrameHeader::OpCode opcode,
+ const scoped_refptr<IOBuffer>& data,
+ size_t offset,
+ size_t size)
+ : final_(final),
+ opcode_(opcode),
+ data_(data),
+ offset_(offset),
+ size_(size) {}
+
+WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
+
+void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
+ DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
+ opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
+}
+
+void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
+ DCHECK_LE(offset_, size_);
+ DCHECK_LE(bytes, size_ - offset_);
+ offset_ += bytes;
+}
+
WebSocketChannel::WebSocketChannel(
scoped_ptr<WebSocketEventInterface> event_interface,
URLRequestContext* url_request_context)
@@ -259,6 +288,7 @@ WebSocketChannel::WebSocketChannel(
send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
current_send_quota_(0),
+ current_receive_quota_(0),
timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
received_close_code_(0),
state_(FRESHLY_CONSTRUCTED),
@@ -364,8 +394,47 @@ void WebSocketChannel::SendFrame(bool fin,
void WebSocketChannel::SendFlowControl(int64 quota) {
DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
state_ == CLOSE_WAIT);
- // TODO(ricea): Add interface to WebSocketStream and implement.
- // stream_->SendFlowControl(quota);
+ // TODO(ricea): Kill the renderer if it tries to send us a negative quota
+ // value or > INT_MAX.
+ DCHECK_GE(quota, 0);
+ DCHECK_LE(quota, INT_MAX);
+ if (!pending_received_frames_.empty()) {
+ DCHECK_EQ(0, current_receive_quota_);
+ }
+ while (!pending_received_frames_.empty() && quota > 0) {
+ PendingReceivedFrame& front = pending_received_frames_.front();
+ const size_t data_size = front.size() - front.offset();
+ const size_t bytes_to_send =
+ std::min(base::checked_cast<size_t>(quota), data_size);
+ const bool final = front.final() && data_size == bytes_to_send;
+ const char* data = front.data()->data() + front.offset();
+ const std::vector<char> data_vector(data, data + bytes_to_send);
+ DVLOG(3) << "Sending frame previously split due to quota to the "
+ << "renderer: quota=" << quota << " data_size=" << data_size
+ << " bytes_to_send=" << bytes_to_send;
+ if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
+ CHANNEL_DELETED)
+ return;
+ if (bytes_to_send < data_size) {
+ front.DidConsume(bytes_to_send);
+ front.ResetOpcode();
+ return;
+ }
+ const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
+ DCHECK_GE(quota, signed_bytes_to_send);
+ quota -= signed_bytes_to_send;
+
+ pending_received_frames_.pop();
+ }
+ // If current_receive_quota_ == 0 then there is no pending ReadFrames()
+ // operation.
+ const bool start_read =
+ current_receive_quota_ == 0 && quota > 0 &&
+ (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
+ current_receive_quota_ += base::checked_cast<int>(quota);
+ if (start_read)
+ AllowUnused(ReadFrames());
+ // |this| may have been deleted.
}
void WebSocketChannel::StartClosingHandshake(uint16 code,
@@ -465,6 +534,7 @@ void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
// |stream_request_| is not used once the connection has succeeded.
stream_request_.reset();
+
AllowUnused(ReadFrames());
// |this| may have been deleted.
}
@@ -574,7 +644,7 @@ ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
ChannelState WebSocketChannel::ReadFrames() {
int result = OK;
- do {
+ while (result == OK && current_receive_quota_ > 0) {
// This use of base::Unretained is safe because this object owns the
// WebSocketStream, and any pending reads will be cancelled when it is
// destroyed.
@@ -588,7 +658,7 @@ ChannelState WebSocketChannel::ReadFrames() {
return CHANNEL_DELETED;
}
DCHECK_NE(CLOSED, state_);
- } while (result == OK);
+ }
return CHANNEL_ALIVE;
}
@@ -642,8 +712,7 @@ ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
}
}
-ChannelState WebSocketChannel::HandleFrame(
- scoped_ptr<WebSocketFrame> frame) {
+ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
if (frame->header.masked) {
// RFC6455 Section 5.1 "A client MUST close a connection if it detects a
// masked frame."
@@ -691,7 +760,7 @@ ChannelState WebSocketChannel::HandleFrameByState(
frame_name + " received after close", kWebSocketErrorProtocolError, "");
}
switch (opcode) {
- case WebSocketFrameHeader::kOpCodeText: // fall-thru
+ case WebSocketFrameHeader::kOpCodeText: // fall-thru
case WebSocketFrameHeader::kOpCodeBinary:
case WebSocketFrameHeader::kOpCodeContinuation:
return HandleDataFrame(opcode, final, data_buffer, size);
@@ -710,6 +779,10 @@ ChannelState WebSocketChannel::HandleFrameByState(
return CHANNEL_ALIVE;
case WebSocketFrameHeader::kOpCodeClose: {
+ // TODO(ricea): If there is a message which is queued for transmission to
+ // the renderer, then the renderer should not receive an
+ // OnClosingHandshake or OnDropChannel IPC until the queued message has
+ // been completedly transmitted.
uint16 code = kWebSocketNormalClosure;
std::string reason;
std::string message;
@@ -807,14 +880,28 @@ ChannelState WebSocketChannel::HandleDataFrame(
return CHANNEL_ALIVE;
initial_frame_forwarded_ = !final;
+ if (size > base::checked_cast<size_t>(current_receive_quota_) ||
+ !pending_received_frames_.empty()) {
+ const bool no_quota = (current_receive_quota_ == 0);
+ DCHECK(no_quota || pending_received_frames_.empty());
+ DVLOG(3) << "Queueing frame to renderer due to quota. quota="
+ << current_receive_quota_ << " size=" << size;
+ WebSocketFrameHeader::OpCode opcode_to_queue =
+ no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
+ pending_received_frames_.push(PendingReceivedFrame(
+ final, opcode_to_queue, data_buffer, current_receive_quota_, size));
+ if (no_quota)
+ return CHANNEL_ALIVE;
+ size = current_receive_quota_;
+ final = false;
+ }
+
// TODO(ricea): Can this copy be eliminated?
const char* const data_begin = size ? data_buffer->data() : NULL;
const char* const data_end = data_begin + size;
const std::vector<char> data(data_begin, data_end);
- // TODO(ricea): Handle the case when ReadFrames returns far
- // more data at once than should be sent in a single IPC. This needs to
- // be handled carefully, as an overloaded IO thread is one possible
- // cause of receiving very large chunks.
+ current_receive_quota_ -= size;
+ DCHECK_GE(current_receive_quota_, 0);
// Sends the received frame to the renderer process.
return event_interface_->OnDataFrame(final, opcode_to_send, data);