summaryrefslogtreecommitdiffstats
path: root/net/socket_stream
diff options
context:
space:
mode:
Diffstat (limited to 'net/socket_stream')
-rw-r--r--net/socket_stream/socket_stream.cc623
-rw-r--r--net/socket_stream/socket_stream.h254
2 files changed, 877 insertions, 0 deletions
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc
new file mode 100644
index 0000000..7308b53
--- /dev/null
+++ b/net/socket_stream/socket_stream.cc
@@ -0,0 +1,623 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// TODO(ukai): code is similar with http_network_transaction.cc. We should
+// think about ways to share code, if possible.
+
+#include "net/socket_stream/socket_stream.h"
+
+#include <string>
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/string_util.h"
+#include "net/base/host_resolver.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
+#include "net/http/http_response_headers.h"
+#include "net/http/http_util.h"
+#include "net/socket/client_socket_factory.h"
+#include "net/socket/ssl_client_socket.h"
+#include "net/socket/socks5_client_socket.h"
+#include "net/socket/socks_client_socket.h"
+#include "net/socket/tcp_client_socket.h"
+#include "net/url_request/url_request.h"
+
+static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
+static const int kReadBufferSize = 4096;
+
+namespace net {
+
+void SocketStream::ResponseHeaders::Realloc(size_t new_size) {
+ headers_.reset(static_cast<char*>(realloc(headers_.release(), new_size)));
+}
+
+SocketStream::SocketStream(const GURL& url, Delegate* delegate)
+ : url_(url),
+ delegate_(delegate),
+ max_pending_send_allowed_(kMaxPendingSendAllowed),
+ next_state_(STATE_NONE),
+ host_resolver_(CreateSystemHostResolver()),
+ factory_(ClientSocketFactory::GetDefaultFactory()),
+ proxy_mode_(kDirectConnection),
+ pac_request_(NULL),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ io_callback_(this, &SocketStream::OnIOCompleted)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &SocketStream::OnReadCompleted)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_callback_(this, &SocketStream::OnWriteCompleted)),
+ read_buf_(NULL),
+ write_buf_(NULL),
+ current_write_buf_(NULL),
+ write_buf_offset_(0),
+ write_buf_size_(0) {
+ DCHECK(MessageLoop::current()) <<
+ "The current MessageLoop must exist";
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
+ "The current MessageLoop must be TYPE_IO";
+ DCHECK(delegate_);
+}
+
+SocketStream::~SocketStream() {
+ DCHECK(!delegate_);
+}
+
+SocketStream::UserData* SocketStream::GetUserData(
+ const void* key) const {
+ UserDataMap::const_iterator found = user_data_.find(key);
+ if (found != user_data_.end())
+ return found->second.get();
+ return NULL;
+}
+
+void SocketStream::SetUserData(const void* key, UserData* data) {
+ user_data_[key] = linked_ptr<UserData>(data);
+}
+
+void SocketStream::set_context(URLRequestContext* context) {
+ context_ = context;
+}
+
+void SocketStream::Connect() {
+ DCHECK(MessageLoop::current()) <<
+ "The current MessageLoop must exist";
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
+ "The current MessageLoop must be TYPE_IO";
+ ssl_config_service()->GetSSLConfig(&ssl_config_);
+
+ AddRef(); // Released in Finish()
+ // Open a connection asynchronously, so that delegate won't be called
+ // back before returning Connect().
+ next_state_ = STATE_RESOLVE_PROXY;
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SocketStream::DoLoop, OK));
+}
+
+bool SocketStream::SendData(const char* data, int len) {
+ DCHECK(MessageLoop::current()) <<
+ "The current MessageLoop must exist";
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
+ "The current MessageLoop must be TYPE_IO";
+ if (!socket_.get() || !socket_->IsConnected())
+ return false;
+ if (write_buf_) {
+ int current_amount_send = write_buf_size_ - write_buf_offset_;
+ for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin();
+ iter != pending_write_bufs_.end();
+ ++iter)
+ current_amount_send += (*iter)->size();
+
+ current_amount_send += len;
+ if (current_amount_send > max_pending_send_allowed_)
+ return false;
+
+ pending_write_bufs_.push_back(new IOBufferWithSize(len));
+ memcpy(pending_write_bufs_.back()->data(), data, len);
+ return true;
+ }
+ DCHECK(!current_write_buf_);
+ write_buf_ = new IOBuffer(len);
+ memcpy(write_buf_->data(), data, len);
+ write_buf_size_ = len;
+ write_buf_offset_ = 0;
+ // Send pending data asynchronously, so that delegate won't be called
+ // back before returning SendData().
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SocketStream::DoLoop, OK));
+ return true;
+}
+
+void SocketStream::Close() {
+ DCHECK(MessageLoop::current()) <<
+ "The current MessageLoop must exist";
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
+ "The current MessageLoop must be TYPE_IO";
+ if (!socket_.get())
+ return;
+ if (socket_->IsConnected())
+ socket_->Disconnect();
+ // Close asynchronously, so that delegate won't be called
+ // back before returning Close().
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SocketStream::DoLoop, OK));
+}
+
+void SocketStream::DetachDelegate() {
+ if (!delegate_)
+ return;
+ delegate_ = NULL;
+ Close();
+}
+
+void SocketStream::Finish() {
+ DCHECK(MessageLoop::current()) <<
+ "The current MessageLoop must exist";
+ DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
+ "The current MessageLoop must be TYPE_IO";
+ Delegate* delegate = delegate_;
+ delegate_ = NULL;
+ if (delegate) {
+ delegate->OnClose(this);
+ Release();
+ }
+}
+
+void SocketStream::SetHostResolver(HostResolver* host_resolver) {
+ DCHECK(host_resolver);
+ host_resolver_ = host_resolver;
+}
+
+void SocketStream::SetClientSocketFactory(
+ ClientSocketFactory* factory) {
+ DCHECK(factory);
+ factory_ = factory;
+}
+
+void SocketStream::DidEstablishConnection() {
+ if (!socket_.get() || !socket_->IsConnected()) {
+ Finish();
+ return;
+ }
+ next_state_ = STATE_READ_WRITE;
+
+ if (delegate_)
+ delegate_->OnConnected(this, max_pending_send_allowed_);
+
+ return;
+}
+
+void SocketStream::DidReceiveData(int result) {
+ DCHECK(read_buf_);
+ DCHECK(result > 0);
+ if (!delegate_)
+ return;
+ // Notify recevied data to delegate.
+ delegate_->OnReceivedData(this, read_buf_->data(), result);
+ read_buf_ = NULL;
+}
+
+void SocketStream::DidSendData(int result) {
+ current_write_buf_ = NULL;
+ DCHECK(result > 0);
+ if (!delegate_)
+ return;
+
+ delegate_->OnSentData(this, result);
+ int remaining_size = write_buf_size_ - write_buf_offset_ - result;
+ if (remaining_size == 0) {
+ if (!pending_write_bufs_.empty()) {
+ write_buf_size_ = pending_write_bufs_.front()->size();
+ write_buf_ = pending_write_bufs_.front();
+ pending_write_bufs_.pop_front();
+ } else {
+ write_buf_size_ = 0;
+ write_buf_ = NULL;
+ }
+ write_buf_offset_ = 0;
+ } else {
+ write_buf_offset_ += result;
+ }
+}
+
+void SocketStream::OnIOCompleted(int result) {
+ DoLoop(result);
+ // TODO(ukai): notify error.
+}
+
+void SocketStream::OnReadCompleted(int result) {
+ // TODO(ukai): notify error.
+ if (result >= 0 && read_buf_) {
+ DidReceiveData(result);
+ result = OK;
+ }
+ DoLoop(result);
+}
+
+void SocketStream::OnWriteCompleted(int result) {
+ // TODO(ukai): notify error.
+ if (result >= 0 && write_buf_) {
+ DidSendData(result);
+ result = OK;
+ }
+ DoLoop(result);
+}
+
+int SocketStream::DoLoop(int result) {
+ if (next_state_ == STATE_NONE) {
+ Finish();
+ return ERR_CONNECTION_CLOSED;
+ }
+
+ do {
+ State state = next_state_;
+ next_state_ = STATE_NONE;
+ switch (state) {
+ case STATE_RESOLVE_PROXY:
+ DCHECK_EQ(OK, result);
+ result = DoResolveProxy();
+ break;
+ case STATE_RESOLVE_PROXY_COMPLETE:
+ result = DoResolveProxyComplete(result);
+ break;
+ case STATE_RESOLVE_HOST:
+ DCHECK_EQ(OK, result);
+ result = DoResolveHost();
+ break;
+ case STATE_RESOLVE_HOST_COMPLETE:
+ result = DoResolveHostComplete(result);
+ break;
+ case STATE_TCP_CONNECT:
+ DCHECK_EQ(OK, result);
+ result = DoTcpConnect();
+ break;
+ case STATE_TCP_CONNECT_COMPLETE:
+ result = DoTcpConnectComplete(result);
+ break;
+ case STATE_WRITE_TUNNEL_HEADERS:
+ DCHECK_EQ(OK, result);
+ result = DoWriteTunnelHeaders();
+ break;
+ case STATE_WRITE_TUNNEL_HEADERS_COMPLETE:
+ result = DoWriteTunnelHeadersComplete(result);
+ break;
+ case STATE_READ_TUNNEL_HEADERS:
+ DCHECK_EQ(OK, result);
+ result = DoReadTunnelHeaders();
+ break;
+ case STATE_READ_TUNNEL_HEADERS_COMPLETE:
+ result = DoReadTunnelHeadersComplete(result);
+ break;
+ case STATE_SOCKS_CONNECT:
+ DCHECK_EQ(OK, result);
+ result = DoSOCKSConnect();
+ break;
+ case STATE_SOCKS_CONNECT_COMPLETE:
+ result = DoSOCKSConnectComplete(result);
+ break;
+ case STATE_SSL_CONNECT:
+ DCHECK_EQ(OK, result);
+ result = DoSSLConnect();
+ break;
+ case STATE_SSL_CONNECT_COMPLETE:
+ result = DoSSLConnectComplete(result);
+ break;
+ case STATE_READ_WRITE:
+ result = DoReadWrite(result);
+ break;
+ default:
+ NOTREACHED() << "bad state";
+ result = ERR_UNEXPECTED;
+ break;
+ }
+ } while (result != ERR_IO_PENDING && next_state_ != STATE_NONE);
+
+ if (result != ERR_IO_PENDING)
+ Finish();
+
+ return result;
+}
+
+int SocketStream::DoResolveProxy() {
+ DCHECK(!pac_request_);
+ next_state_ = STATE_RESOLVE_PROXY_COMPLETE;
+
+ return proxy_service()->ResolveProxy(
+ url_, &proxy_info_, &io_callback_, &pac_request_, NULL);
+}
+
+int SocketStream::DoResolveProxyComplete(int result) {
+ next_state_ = STATE_RESOLVE_HOST;
+
+ pac_request_ = NULL;
+ if (result != OK) {
+ LOG(ERROR) << "Failed to resolve proxy: " << result;
+ proxy_info_.UseDirect();
+ }
+
+ return OK;
+}
+
+int SocketStream::DoResolveHost() {
+ next_state_ = STATE_RESOLVE_HOST_COMPLETE;
+
+ if (proxy_info_.is_direct())
+ proxy_mode_ = kDirectConnection;
+ else if (proxy_info_.proxy_server().is_socks())
+ proxy_mode_ = kSOCKSProxy;
+ else
+ proxy_mode_ = kTunnelProxy;
+
+ // Determine the host and port to connect to.
+ std::string host;
+ int port;
+ if (proxy_mode_ != kDirectConnection) {
+ ProxyServer proxy_server = proxy_info_.proxy_server();
+ host = proxy_server.HostNoBrackets();
+ port = proxy_server.port();
+ } else {
+ host = url_.HostNoBrackets();
+ port = url_.EffectiveIntPort();
+ }
+
+ HostResolver::RequestInfo resolve_info(host, port);
+
+ resolver_.reset(new SingleRequestHostResolver(host_resolver_.get()));
+ return resolver_->Resolve(resolve_info, &addresses_, &io_callback_, NULL);
+}
+
+int SocketStream::DoResolveHostComplete(int result) {
+ if (result == OK)
+ next_state_ = STATE_TCP_CONNECT;
+ return result;
+}
+
+int SocketStream::DoTcpConnect() {
+ next_state_ = STATE_TCP_CONNECT_COMPLETE;
+ DCHECK(factory_);
+ socket_.reset(factory_->CreateTCPClientSocket(addresses_));
+ return socket_->Connect(&io_callback_);
+}
+
+int SocketStream::DoTcpConnectComplete(int result) {
+ if (result != OK)
+ return result;
+
+ if (proxy_mode_ == kTunnelProxy)
+ next_state_ = STATE_WRITE_TUNNEL_HEADERS;
+ else if (proxy_mode_ == kSOCKSProxy)
+ next_state_ = STATE_SOCKS_CONNECT;
+ else if (is_secure()) {
+ next_state_ = STATE_SSL_CONNECT;
+ } else {
+ DidEstablishConnection();
+ }
+ return OK;
+}
+
+int SocketStream::DoWriteTunnelHeaders() {
+ DCHECK_EQ(kTunnelProxy, proxy_mode_);
+
+ next_state_ = STATE_WRITE_TUNNEL_HEADERS_COMPLETE;
+
+ if (!tunnel_request_headers_.get()) {
+ tunnel_request_headers_ = new RequestHeaders();
+ tunnel_request_headers_bytes_sent_ = 0;
+ }
+ if (tunnel_request_headers_->headers_.empty()) {
+ tunnel_request_headers_->headers_ = StringPrintf(
+ "CONNECT %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Proxy-Connection: keep-alive\r\n",
+ GetHostAndPort(url_).c_str(),
+ GetHostAndOptionalPort(url_).c_str());
+ // TODO(ukai): set proxy auth if necessary.
+ tunnel_request_headers_->headers_ += "\r\n";
+ }
+ tunnel_request_headers_->SetDataOffset(tunnel_request_headers_bytes_sent_);
+ int buf_len = static_cast<int>(tunnel_request_headers_->headers_.size() -
+ tunnel_request_headers_bytes_sent_);
+ DCHECK_GT(buf_len, 0);
+ return socket_->Write(tunnel_request_headers_, buf_len, &io_callback_);
+}
+
+int SocketStream::DoWriteTunnelHeadersComplete(int result) {
+ DCHECK_EQ(kTunnelProxy, proxy_mode_);
+
+ if (result < 0)
+ return result;
+
+ tunnel_request_headers_bytes_sent_ += result;
+ if (tunnel_request_headers_bytes_sent_ <
+ tunnel_request_headers_->headers_.size())
+ next_state_ = STATE_WRITE_TUNNEL_HEADERS;
+ else
+ next_state_ = STATE_READ_TUNNEL_HEADERS;
+ return OK;
+}
+
+int SocketStream::DoReadTunnelHeaders() {
+ DCHECK_EQ(kTunnelProxy, proxy_mode_);
+
+ next_state_ = STATE_READ_TUNNEL_HEADERS_COMPLETE;
+
+ if (!tunnel_response_headers_.get()) {
+ tunnel_response_headers_ = new ResponseHeaders();
+ tunnel_response_headers_capacity_ = kMaxTunnelResponseHeadersSize;
+ tunnel_response_headers_->Realloc(tunnel_response_headers_capacity_);
+ tunnel_response_headers_len_ = 0;
+ }
+
+ int buf_len = tunnel_response_headers_capacity_ -
+ tunnel_response_headers_len_;
+ tunnel_response_headers_->SetDataOffset(tunnel_response_headers_len_);
+ CHECK(tunnel_response_headers_->data());
+
+ return socket_->Read(tunnel_response_headers_, buf_len, &io_callback_);
+}
+
+int SocketStream::DoReadTunnelHeadersComplete(int result) {
+ DCHECK_EQ(kTunnelProxy, proxy_mode_);
+
+ if (result < 0)
+ return result;
+
+ tunnel_response_headers_len_ += result;
+ DCHECK(tunnel_response_headers_len_ <= tunnel_response_headers_capacity_);
+
+ int eoh = HttpUtil::LocateEndOfHeaders(
+ tunnel_response_headers_->headers(), tunnel_response_headers_len_, 0);
+ if (eoh == -1) {
+ if (tunnel_response_headers_len_ >= kMaxTunnelResponseHeadersSize)
+ return ERR_RESPONSE_HEADERS_TOO_BIG;
+
+ next_state_ = STATE_READ_TUNNEL_HEADERS;
+ return OK;
+ }
+ // DidReadResponseHeaders
+ scoped_refptr<HttpResponseHeaders> headers;
+ headers = new HttpResponseHeaders(
+ HttpUtil::AssembleRawHeaders(tunnel_response_headers_->headers(), eoh));
+ if (headers->GetParsedHttpVersion() < HttpVersion(1, 0)) {
+ // Require the "HTTP/1.x" status line.
+ return ERR_TUNNEL_CONNECTION_FAILED;
+ }
+ switch (headers->response_code()) {
+ case 200: // OK
+ if (is_secure()) {
+ DCHECK_EQ(eoh, tunnel_response_headers_len_);
+ next_state_ = STATE_SSL_CONNECT;
+ } else {
+ DidEstablishConnection();
+ if ((eoh < tunnel_response_headers_len_) && delegate_)
+ delegate_->OnReceivedData(
+ this, tunnel_response_headers_->headers() + eoh,
+ tunnel_response_headers_len_ - eoh);
+ }
+ return OK;
+ case 407: // Proxy Authentication Required.
+ // TODO(ukai): handle Proxy Authentication.
+ break;
+ default:
+ break;
+ }
+ return ERR_TUNNEL_CONNECTION_FAILED;
+}
+
+int SocketStream::DoSOCKSConnect() {
+ DCHECK_EQ(kSOCKSProxy, proxy_mode_);
+
+ next_state_ = STATE_SOCKS_CONNECT_COMPLETE;
+
+ ClientSocket* s = socket_.release();
+ HostResolver::RequestInfo req_info(url_.HostNoBrackets(),
+ url_.EffectiveIntPort());
+
+ if (proxy_info_.proxy_server().scheme() == ProxyServer::SCHEME_SOCKS5)
+ s = new SOCKS5ClientSocket(s, req_info, host_resolver_.get());
+ else
+ s = new SOCKSClientSocket(s, req_info, host_resolver_.get());
+ socket_.reset(s);
+ return socket_->Connect(&io_callback_);
+}
+
+int SocketStream::DoSOCKSConnectComplete(int result) {
+ DCHECK_EQ(kSOCKSProxy, proxy_mode_);
+
+ if (result == OK) {
+ if (is_secure())
+ next_state_ = STATE_SSL_CONNECT;
+ else
+ DidEstablishConnection();
+ }
+ return result;
+}
+
+int SocketStream::DoSSLConnect() {
+ DCHECK(factory_);
+ socket_.reset(factory_->CreateSSLClientSocket(
+ socket_.release(), url_.HostNoBrackets(), ssl_config_));
+ next_state_ = STATE_SSL_CONNECT_COMPLETE;
+ return socket_->Connect(&io_callback_);
+}
+
+int SocketStream::DoSSLConnectComplete(int result) {
+ if (IsCertificateError(result))
+ result = HandleCertificateError(result);
+
+ if (result == OK)
+ DidEstablishConnection();
+ return result;
+}
+
+int SocketStream::DoReadWrite(int result) {
+ if (result < OK) {
+ Finish();
+ return result;
+ }
+ if (!socket_.get() || !socket_->IsConnected()) {
+ Finish();
+ return ERR_CONNECTION_CLOSED;
+ }
+
+ next_state_ = STATE_READ_WRITE;
+
+ if (!read_buf_) {
+ read_buf_ = new IOBuffer(kReadBufferSize);
+ result = socket_->Read(read_buf_, kReadBufferSize, &read_callback_);
+ if (result > 0) {
+ DidReceiveData(result);
+ result = OK;
+ }
+ }
+ if (write_buf_ && !current_write_buf_) {
+ current_write_buf_ = new ReusedIOBuffer(write_buf_, write_buf_size_);
+ current_write_buf_->SetOffset(write_buf_offset_);
+ result = socket_->Write(current_write_buf_,
+ write_buf_size_ - write_buf_offset_,
+ &write_callback_);
+ if (result > 0) {
+ DidSendData(result);
+ result = OK;
+ }
+ }
+
+ // We arrived here when Write is performed and finished.
+ if (result == OK)
+ return ERR_IO_PENDING;
+ return result;
+}
+
+int SocketStream::HandleCertificateError(int result) {
+ // TODO(ukai): handle cert error properly.
+ switch (result) {
+ case ERR_CERT_COMMON_NAME_INVALID:
+ case ERR_CERT_DATE_INVALID:
+ case ERR_CERT_AUTHORITY_INVALID:
+ result = OK;
+ break;
+ default:
+ break;
+ }
+ return result;
+}
+
+bool SocketStream::is_secure() const {
+ return url_.SchemeIs("wss");
+}
+
+SSLConfigService* SocketStream::ssl_config_service() const {
+ return context_->ssl_config_service();
+}
+
+ProxyService* SocketStream::proxy_service() const {
+ return context_->proxy_service();
+}
+
+} // namespace net
diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h
new file mode 100644
index 0000000..13f8680
--- /dev/null
+++ b/net/socket_stream/socket_stream.h
@@ -0,0 +1,254 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_SOCKET_STREAM_SOCKET_STREAM_H_
+#define NET_SOCKET_STREAM_SOCKET_STREAM_H_
+
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "base/linked_ptr.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "base/task.h"
+#include "net/base/address_list.h"
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+#include "net/proxy/proxy_service.h"
+#include "net/socket/tcp_client_socket.h"
+#include "net/url_request/url_request_context.h"
+
+namespace net {
+
+class ClientSocketFactory;
+class HostResolver;
+class SSLConfigService;
+class SingleRequestHostResolver;
+
+class SocketStream : public base::RefCountedThreadSafe<SocketStream> {
+ public:
+ // Derive from this class and add your own data members to associate extra
+ // information with a SocketStream. Use GetUserData(key) and
+ // SetUserData(key, data).
+ class UserData {
+ public:
+ UserData() {}
+ virtual ~UserData() {}
+ };
+
+ class Delegate {
+ public:
+ virtual ~Delegate() {}
+
+ // Called when socket stream has been connected. The socket stream accepts
+ // at most |max_pending_send_allowed| so that a client of the socket stream
+ // should keep track of how much it has pending and shouldn't go over
+ // |max_pending_send_allowed| bytes.
+ virtual void OnConnected(SocketStream* socket,
+ int max_pending_send_allowed) = 0;
+
+ // Called when |amount_sent| bytes of data are sent.
+ virtual void OnSentData(SocketStream* socket,
+ int amount_sent) = 0;
+
+ // Called when |len| bytes of |data| are received.
+ virtual void OnReceivedData(SocketStream* socket,
+ const char* data, int len) = 0;
+
+ // Called when the socket stream has been closed.
+ virtual void OnClose(SocketStream* socket) = 0;
+ };
+
+ SocketStream(const GURL& url, Delegate* delegate);
+
+ // The user data allows the clients to associate data with this job.
+ // Multiple user data values can be stored under different keys.
+ // This job will TAKE OWNERSHIP of the given data pointer, and will
+ // delete the object if it is changed or the job is destroyed.
+ UserData* GetUserData(const void* key) const;
+ void SetUserData(const void* key, UserData* data);
+
+ const GURL& url() const { return url_; }
+ Delegate* delegate() const { return delegate_; }
+ int max_pending_send_allowed() const { return max_pending_send_allowed_; }
+
+ URLRequestContext* context() const { return context_.get(); }
+ void set_context(URLRequestContext* context);
+
+ // Opens the connection on the IO thread.
+ // Once the connection is established, calls delegate's OnConnected.
+ void Connect();
+
+ // Requests to send |len| bytes of |data| on the connection.
+ // Returns true if |data| is buffered in the job.
+ // Returns false if size of buffered data would exceeds
+ // |max_pending_send_allowed_| and |data| is not sent at all.
+ bool SendData(const char* data, int len);
+
+ // Requests to close the connection.
+ // Once the connection is closed, calls delegate's OnClose.
+ void Close();
+
+ // Detach delegate. Call before delegate is deleted.
+ // Once delegate is detached, close the socket stream and never call delegate
+ // back.
+ void DetachDelegate();
+
+ // Sets an alternative HostResolver. For testing purposes only.
+ void SetHostResolver(HostResolver* host_resolver);
+
+ // Sets an alternative ClientSocketFactory. Doesn't take ownership of
+ // |factory|. For testing purposes only.
+ void SetClientSocketFactory(ClientSocketFactory* factory);
+
+ private:
+ class RequestHeaders : public IOBuffer {
+ public:
+ RequestHeaders() : IOBuffer() {}
+ ~RequestHeaders() { data_ = NULL; }
+
+ void SetDataOffset(size_t offset) {
+ data_ = const_cast<char*>(headers_.data()) + offset;
+ }
+ std::string headers_;
+ };
+
+ class ResponseHeaders : public IOBuffer {
+ public:
+ ResponseHeaders() : IOBuffer() {}
+ ~ResponseHeaders() { data_ = NULL; }
+
+ void SetDataOffset(size_t offset) { data_ = headers_.get() + offset; }
+ char* headers() const { return headers_.get(); }
+ void Reset() { headers_.reset(); }
+ void Realloc(size_t new_size);
+
+ private:
+ scoped_ptr_malloc<char> headers_;
+ };
+
+ enum State {
+ STATE_NONE,
+ STATE_RESOLVE_PROXY,
+ STATE_RESOLVE_PROXY_COMPLETE,
+ STATE_RESOLVE_HOST,
+ STATE_RESOLVE_HOST_COMPLETE,
+ STATE_TCP_CONNECT,
+ STATE_TCP_CONNECT_COMPLETE,
+ STATE_WRITE_TUNNEL_HEADERS,
+ STATE_WRITE_TUNNEL_HEADERS_COMPLETE,
+ STATE_READ_TUNNEL_HEADERS,
+ STATE_READ_TUNNEL_HEADERS_COMPLETE,
+ STATE_SOCKS_CONNECT,
+ STATE_SOCKS_CONNECT_COMPLETE,
+ STATE_SSL_CONNECT,
+ STATE_SSL_CONNECT_COMPLETE,
+ STATE_CONNECTION_ESTABLISHED,
+ STATE_READ_WRITE
+ };
+
+ enum ProxyMode {
+ kDirectConnection, // If using a direct connection
+ kTunnelProxy, // If using a tunnel (CONNECT method as HTTPS)
+ kSOCKSProxy, // If using a SOCKS proxy
+ };
+
+ typedef std::deque< scoped_refptr<IOBufferWithSize> > PendingDataQueue;
+ friend class base::RefCountedThreadSafe<SocketStream>;
+ ~SocketStream();
+
+ // Finish the job. Once finished, calls OnClose of delegate, and no more
+ // notifications will be sent to delegate.
+ void Finish();
+
+ void DidEstablishConnection();
+ void DidReceiveData(int result);
+ void DidSendData(int result);
+
+ void OnIOCompleted(int result);
+ void OnReadCompleted(int result);
+ void OnWriteCompleted(int result);
+
+ int DoLoop(int result);
+
+ int DoResolveProxy();
+ int DoResolveProxyComplete(int result);
+ int DoResolveHost();
+ int DoResolveHostComplete(int result);
+ int DoTcpConnect();
+ int DoTcpConnectComplete(int result);
+ int DoWriteTunnelHeaders();
+ int DoWriteTunnelHeadersComplete(int result);
+ int DoReadTunnelHeaders();
+ int DoReadTunnelHeadersComplete(int result);
+ int DoSOCKSConnect();
+ int DoSOCKSConnectComplete(int result);
+ int DoSSLConnect();
+ int DoSSLConnectComplete(int result);
+ int DoReadWrite(int result);
+
+ int HandleCertificateError(int result);
+
+ bool is_secure() const;
+ SSLConfigService* ssl_config_service() const;
+ ProxyService* proxy_service() const;
+
+ GURL url_;
+ Delegate* delegate_;
+ int max_pending_send_allowed_;
+ scoped_refptr<URLRequestContext> context_;
+
+ typedef std::map<const void*, linked_ptr<UserData> > UserDataMap;
+ UserDataMap user_data_;
+
+ State next_state_;
+ scoped_refptr<HostResolver> host_resolver_;
+ ClientSocketFactory* factory_;
+
+ ProxyMode proxy_mode_;
+
+ ProxyService::PacRequest* pac_request_;
+ ProxyInfo proxy_info_;
+
+ scoped_refptr<RequestHeaders> tunnel_request_headers_;
+ size_t tunnel_request_headers_bytes_sent_;
+ scoped_refptr<ResponseHeaders> tunnel_response_headers_;
+ int tunnel_response_headers_capacity_;
+ int tunnel_response_headers_len_;
+
+ // Use the same number as HttpNetworkTransaction::kMaxHeaderBufSize.
+ enum { kMaxTunnelResponseHeadersSize = 32768 }; // 32 kilobytes.
+
+ scoped_ptr<SingleRequestHostResolver> resolver_;
+ AddressList addresses_;
+ scoped_ptr<ClientSocket> socket_;
+
+ SSLConfig ssl_config_;
+
+ CompletionCallbackImpl<SocketStream> io_callback_;
+ CompletionCallbackImpl<SocketStream> read_callback_;
+ CompletionCallbackImpl<SocketStream> write_callback_;
+
+ scoped_refptr<IOBuffer> read_buf_;
+ int read_buf_size_;
+
+ // Total amount of buffer (|write_buf_size_| - |write_buf_offset_| +
+ // sum of size of |pending_write_bufs_|) should not exceed
+ // |max_pending_send_allowed_|.
+ // |write_buf_| holds requested data and |current_write_buf_| is used
+ // for Write operation, that is, |current_write_buf_| is
+ // |write_buf_| + |write_buf_offset_|.
+ scoped_refptr<IOBuffer> write_buf_;
+ scoped_refptr<ReusedIOBuffer> current_write_buf_;
+ int write_buf_offset_;
+ int write_buf_size_;
+ PendingDataQueue pending_write_bufs_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketStream);
+};
+
+} // namespace net
+
+#endif // NET_SOCKET_STREAM_SOCKET_STREAM_H_