// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/location.h" #include "base/memory/weak_ptr.h" #include "base/rand_util.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" #include "chrome/browser/devtools/device/android_device_manager.h" #include "content/public/browser/browser_thread.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/server/web_socket_encoder.h" #include "net/socket/stream_socket.h" using content::BrowserThread; using net::WebSocket; namespace { const int kBufferSize = 16 * 1024; } // namespace class AndroidDeviceManager::AndroidWebSocket::WebSocketImpl { public: WebSocketImpl( scoped_refptr response_task_runner, base::WeakPtr weak_socket, const std::string& extensions, const std::string& body_head, scoped_ptr socket) : response_task_runner_(response_task_runner), weak_socket_(weak_socket), socket_(socket.Pass()), encoder_(net::WebSocketEncoder::CreateClient(extensions)), response_buffer_(body_head) { thread_checker_.DetachFromThread(); } void StartListening() { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(socket_); scoped_refptr buffer(new net::IOBuffer(kBufferSize)); if (response_buffer_.size() > 0) ProcessResponseBuffer(buffer); else Read(buffer); } void SendFrame(const std::string& message) { DCHECK(thread_checker_.CalledOnValidThread()); if (!socket_) return; int mask = base::RandInt(0, 0x7FFFFFFF); std::string encoded_frame; encoder_->EncodeFrame(message, mask, &encoded_frame); request_buffer_ += encoded_frame; if (request_buffer_.length() == encoded_frame.length()) SendPendingRequests(0); } private: void Read(scoped_refptr io_buffer) { int result = socket_->Read( io_buffer.get(), kBufferSize, base::Bind(&WebSocketImpl::OnBytesRead, base::Unretained(this), io_buffer)); if (result != net::ERR_IO_PENDING) OnBytesRead(io_buffer, result); } void OnBytesRead(scoped_refptr io_buffer, int result) { DCHECK(thread_checker_.CalledOnValidThread()); if (result <= 0) { Disconnect(); return; } response_buffer_.append(io_buffer->data(), result); ProcessResponseBuffer(io_buffer); } void ProcessResponseBuffer(scoped_refptr io_buffer) { int bytes_consumed; std::string output; WebSocket::ParseResult parse_result = encoder_->DecodeFrame( response_buffer_, &bytes_consumed, &output); while (parse_result == WebSocket::FRAME_OK) { response_buffer_ = response_buffer_.substr(bytes_consumed); response_task_runner_->PostTask( FROM_HERE, base::Bind(&AndroidWebSocket::OnFrameRead, weak_socket_, output)); parse_result = encoder_->DecodeFrame( response_buffer_, &bytes_consumed, &output); } if (parse_result == WebSocket::FRAME_ERROR || parse_result == WebSocket::FRAME_CLOSE) { Disconnect(); return; } Read(io_buffer); } void SendPendingRequests(int result) { DCHECK(thread_checker_.CalledOnValidThread()); if (result < 0) { Disconnect(); return; } request_buffer_ = request_buffer_.substr(result); if (request_buffer_.empty()) return; scoped_refptr buffer = new net::StringIOBuffer(request_buffer_); result = socket_->Write(buffer.get(), buffer->size(), base::Bind(&WebSocketImpl::SendPendingRequests, base::Unretained(this))); if (result != net::ERR_IO_PENDING) SendPendingRequests(result); } void Disconnect() { DCHECK(thread_checker_.CalledOnValidThread()); socket_.reset(); response_task_runner_->PostTask( FROM_HERE, base::Bind(&AndroidWebSocket::OnSocketClosed, weak_socket_)); } scoped_refptr response_task_runner_; base::WeakPtr weak_socket_; scoped_ptr socket_; scoped_ptr encoder_; std::string response_buffer_; std::string request_buffer_; base::ThreadChecker thread_checker_; DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); }; AndroidDeviceManager::AndroidWebSocket::AndroidWebSocket( scoped_refptr device, const std::string& socket_name, const std::string& path, Delegate* delegate) : device_(device.get()), socket_impl_(nullptr), delegate_(delegate), weak_factory_(this) { DCHECK_CURRENTLY_ON(BrowserThread::UI); DCHECK(delegate_); DCHECK(device_); device_->sockets_.insert(this); device_->HttpUpgrade( socket_name, path, net::WebSocketEncoder::kClientExtensions, base::Bind(&AndroidWebSocket::Connected, weak_factory_.GetWeakPtr())); } AndroidDeviceManager::AndroidWebSocket::~AndroidWebSocket() { DCHECK_CURRENTLY_ON(BrowserThread::UI); Terminate(); } void AndroidDeviceManager::AndroidWebSocket::SendFrame( const std::string& message) { DCHECK_CURRENTLY_ON(BrowserThread::UI); DCHECK(socket_impl_); DCHECK(device_); device_->task_runner_->PostTask( FROM_HERE, base::Bind(&WebSocketImpl::SendFrame, base::Unretained(socket_impl_), message)); } void AndroidDeviceManager::AndroidWebSocket::Connected( int result, const std::string& extensions, const std::string& body_head, scoped_ptr socket) { DCHECK_CURRENTLY_ON(BrowserThread::UI); if (result != net::OK || !socket.get()) { OnSocketClosed(); return; } socket_impl_ = new WebSocketImpl(base::ThreadTaskRunnerHandle::Get(), weak_factory_.GetWeakPtr(), extensions, body_head, socket.Pass()); device_->task_runner_->PostTask(FROM_HERE, base::Bind(&WebSocketImpl::StartListening, base::Unretained(socket_impl_))); delegate_->OnSocketOpened(); } void AndroidDeviceManager::AndroidWebSocket::OnFrameRead( const std::string& message) { DCHECK_CURRENTLY_ON(BrowserThread::UI); delegate_->OnFrameRead(message); } void AndroidDeviceManager::AndroidWebSocket::OnSocketClosed() { DCHECK_CURRENTLY_ON(BrowserThread::UI); Terminate(); delegate_->OnSocketClosed(); } void AndroidDeviceManager::AndroidWebSocket::Terminate() { DCHECK_CURRENTLY_ON(BrowserThread::UI); if (socket_impl_) { DCHECK(device_); device_->task_runner_->DeleteSoon(FROM_HERE, socket_impl_); socket_impl_ = nullptr; } if (device_) { device_->sockets_.erase(this); device_ = nullptr; } } AndroidDeviceManager::AndroidWebSocket* AndroidDeviceManager::Device::CreateWebSocket( const std::string& socket_name, const std::string& path, AndroidWebSocket::Delegate* delegate) { return new AndroidWebSocket(this, socket_name, path, delegate); }