// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/proxy/multi_threaded_proxy_resolver.h" #include #include #include "base/bind.h" #include "base/bind_helpers.h" #include "base/single_thread_task_runner.h" #include "base/stl_util.h" #include "base/strings/string_util.h" #include "base/strings/stringprintf.h" #include "base/thread_task_runner_handle.h" #include "base/threading/non_thread_safe.h" #include "base/threading/thread.h" #include "base/threading/thread_restrictions.h" #include "net/base/net_errors.h" #include "net/log/net_log.h" #include "net/proxy/proxy_info.h" #include "net/proxy/proxy_resolver.h" namespace net { namespace { class Job; // An "executor" is a job-runner for PAC requests. It encapsulates a worker // thread and a synchronous ProxyResolver (which will be operated on said // thread.) class Executor : public base::RefCountedThreadSafe { public: class Coordinator { public: virtual void OnExecutorReady(Executor* executor) = 0; protected: virtual ~Coordinator() = default; }; // |coordinator| must remain valid throughout our lifetime. It is used to // signal when the executor is ready to receive work by calling // |coordinator->OnExecutorReady()|. // |thread_number| is an identifier used when naming the worker thread. Executor(Coordinator* coordinator, int thread_number); // Submit a job to this executor. void StartJob(Job* job); // Callback for when a job has completed running on the executor's thread. void OnJobCompleted(Job* job); // Cleanup the executor. Cancels all outstanding work, and frees the thread // and resolver. void Destroy(); // Returns the outstanding job, or NULL. Job* outstanding_job() const { return outstanding_job_.get(); } ProxyResolver* resolver() { return resolver_.get(); } int thread_number() const { return thread_number_; } void set_resolver(scoped_ptr resolver) { resolver_ = resolver.Pass(); } void set_coordinator(Coordinator* coordinator) { DCHECK(coordinator); DCHECK(coordinator_); coordinator_ = coordinator; } private: friend class base::RefCountedThreadSafe; ~Executor(); Coordinator* coordinator_; const int thread_number_; // The currently active job for this executor (either a CreateProxyResolver or // GetProxyForURL task). scoped_refptr outstanding_job_; // The synchronous resolver implementation. scoped_ptr resolver_; // The thread where |resolver_| is run on. // Note that declaration ordering is important here. |thread_| needs to be // destroyed *before* |resolver_|, in case |resolver_| is currently // executing on |thread_|. scoped_ptr thread_; }; class MultiThreadedProxyResolver : public ProxyResolver, public Executor::Coordinator, public base::NonThreadSafe { public: // Creates an asynchronous ProxyResolver that runs requests on up to // |max_num_threads|. // // For each thread that is created, an accompanying synchronous ProxyResolver // will be provisioned using |resolver_factory|. All methods on these // ProxyResolvers will be called on the one thread. MultiThreadedProxyResolver( scoped_ptr resolver_factory, size_t max_num_threads, const scoped_refptr& script_data, scoped_refptr executor); ~MultiThreadedProxyResolver() override; // ProxyResolver implementation: int GetProxyForURL(const GURL& url, ProxyInfo* results, const CompletionCallback& callback, RequestHandle* request, const BoundNetLog& net_log) override; void CancelRequest(RequestHandle request) override; LoadState GetLoadState(RequestHandle request) const override; void CancelSetPacScript() override; int SetPacScript(const scoped_refptr& script_data, const CompletionCallback& callback) override; private: class GetProxyForURLJob; // FIFO queue of pending jobs waiting to be started. // TODO(eroman): Make this priority queue. typedef std::deque> PendingJobsQueue; typedef std::vector> ExecutorList; // Returns an idle worker thread which is ready to receive GetProxyForURL() // requests. If all threads are occupied, returns NULL. Executor* FindIdleExecutor(); // Creates a new worker thread, and appends it to |executors_|. void AddNewExecutor(); // Starts the next job from |pending_jobs_| if possible. void OnExecutorReady(Executor* executor) override; const scoped_ptr resolver_factory_; const size_t max_num_threads_; PendingJobsQueue pending_jobs_; ExecutorList executors_; scoped_refptr script_data_; }; // Job --------------------------------------------- class Job : public base::RefCountedThreadSafe { public: // Identifies the subclass of Job (only being used for debugging purposes). enum Type { TYPE_GET_PROXY_FOR_URL, TYPE_CREATE_RESOLVER, }; Job(Type type, const CompletionCallback& callback) : type_(type), callback_(callback), executor_(NULL), was_cancelled_(false) { } void set_executor(Executor* executor) { executor_ = executor; } // The "executor" is the job runner that is scheduling this job. If // this job has not been submitted to an executor yet, this will be // NULL (and we know it hasn't started yet). Executor* executor() { return executor_; } // Mark the job as having been cancelled. void Cancel() { was_cancelled_ = true; } // Returns true if Cancel() has been called. bool was_cancelled() const { return was_cancelled_; } Type type() const { return type_; } // Returns true if this job still has a user callback. Some jobs // do not have a user callback, because they were helper jobs // scheduled internally (for example TYPE_CREATE_RESOLVER). // // Otherwise jobs that correspond with user-initiated work will // have a non-null callback up until the callback is run. bool has_user_callback() const { return !callback_.is_null(); } // This method is called when the job is inserted into a wait queue // because no executors were ready to accept it. virtual void WaitingForThread() {} // This method is called just before the job is posted to the work thread. virtual void FinishedWaitingForThread() {} // This method is called on the worker thread to do the job's work. On // completion, implementors are expected to call OnJobCompleted() on // |origin_runner|. virtual void Run( scoped_refptr origin_runner) = 0; protected: void OnJobCompleted() { // |executor_| will be NULL if the executor has already been deleted. if (executor_) executor_->OnJobCompleted(this); } void RunUserCallback(int result) { DCHECK(has_user_callback()); CompletionCallback callback = callback_; // Reset the callback so has_user_callback() will now return false. callback_.Reset(); callback.Run(result); } friend class base::RefCountedThreadSafe; virtual ~Job() {} private: const Type type_; CompletionCallback callback_; Executor* executor_; bool was_cancelled_; }; // CreateResolverJob ----------------------------------------------------------- // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. class CreateResolverJob : public Job { public: CreateResolverJob(const scoped_refptr& script_data, ProxyResolverFactory* factory) : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), script_data_(script_data), factory_(factory) {} // Runs on the worker thread. void Run(scoped_refptr origin_runner) override { scoped_ptr request; int rv = factory_->CreateProxyResolver(script_data_, &resolver_, CompletionCallback(), &request); DCHECK_NE(rv, ERR_IO_PENDING); origin_runner->PostTask( FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv)); } protected: ~CreateResolverJob() override {} private: // Runs the completion callback on the origin thread. void RequestComplete(int result_code) { // The task may have been cancelled after it was started. if (!was_cancelled()) { DCHECK(executor()); executor()->set_resolver(resolver_.Pass()); } OnJobCompleted(); } const scoped_refptr script_data_; ProxyResolverFactory* factory_; scoped_ptr resolver_; }; // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ class MultiThreadedProxyResolver::GetProxyForURLJob : public Job { public: // |url| -- the URL of the query. // |results| -- the structure to fill with proxy resolve results. GetProxyForURLJob(const GURL& url, ProxyInfo* results, const CompletionCallback& callback, const BoundNetLog& net_log) : Job(TYPE_GET_PROXY_FOR_URL, callback), results_(results), net_log_(net_log), url_(url), was_waiting_for_thread_(false) { DCHECK(!callback.is_null()); } BoundNetLog* net_log() { return &net_log_; } void WaitingForThread() override { was_waiting_for_thread_ = true; net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); } void FinishedWaitingForThread() override { DCHECK(executor()); if (was_waiting_for_thread_) { net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); } net_log_.AddEvent( NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, NetLog::IntegerCallback("thread_number", executor()->thread_number())); } // Runs on the worker thread. void Run(scoped_refptr origin_runner) override { ProxyResolver* resolver = executor()->resolver(); DCHECK(resolver); int rv = resolver->GetProxyForURL( url_, &results_buf_, CompletionCallback(), NULL, net_log_); DCHECK_NE(rv, ERR_IO_PENDING); origin_runner->PostTask( FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv)); } protected: ~GetProxyForURLJob() override {} private: // Runs the completion callback on the origin thread. void QueryComplete(int result_code) { // The Job may have been cancelled after it was started. if (!was_cancelled()) { if (result_code >= OK) { // Note: unit-tests use values > 0. results_->Use(results_buf_); } RunUserCallback(result_code); } OnJobCompleted(); } // Must only be used on the "origin" thread. ProxyInfo* results_; // Can be used on either "origin" or worker thread. BoundNetLog net_log_; const GURL url_; // Usable from within DoQuery on the worker thread. ProxyInfo results_buf_; bool was_waiting_for_thread_; }; // Executor ---------------------------------------- Executor::Executor(Executor::Coordinator* coordinator, int thread_number) : coordinator_(coordinator), thread_number_(thread_number) { DCHECK(coordinator); // Start up the thread. thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d", thread_number))); CHECK(thread_->Start()); } void Executor::StartJob(Job* job) { DCHECK(!outstanding_job_.get()); outstanding_job_ = job; // Run the job. Once it has completed (regardless of whether it was // cancelled), it will invoke OnJobCompleted() on this thread. job->set_executor(this); job->FinishedWaitingForThread(); thread_->message_loop()->PostTask( FROM_HERE, base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get())); } void Executor::OnJobCompleted(Job* job) { DCHECK_EQ(job, outstanding_job_.get()); outstanding_job_ = NULL; coordinator_->OnExecutorReady(this); } void Executor::Destroy() { DCHECK(coordinator_); { // See http://crbug.com/69710. base::ThreadRestrictions::ScopedAllowIO allow_io; // Join the worker thread. thread_.reset(); } // Cancel any outstanding job. if (outstanding_job_.get()) { outstanding_job_->Cancel(); // Orphan the job (since this executor may be deleted soon). outstanding_job_->set_executor(NULL); } // It is now safe to free the ProxyResolver, since all the tasks that // were using it on the resolver thread have completed. resolver_.reset(); // Null some stuff as a precaution. coordinator_ = NULL; outstanding_job_ = NULL; } Executor::~Executor() { // The important cleanup happens as part of Destroy(), which should always be // called first. DCHECK(!coordinator_) << "Destroy() was not called"; DCHECK(!thread_.get()); DCHECK(!resolver_.get()); DCHECK(!outstanding_job_.get()); } // MultiThreadedProxyResolver -------------------------------------------------- MultiThreadedProxyResolver::MultiThreadedProxyResolver( scoped_ptr resolver_factory, size_t max_num_threads, const scoped_refptr& script_data, scoped_refptr executor) : ProxyResolver(resolver_factory->expects_pac_bytes()), resolver_factory_(resolver_factory.Pass()), max_num_threads_(max_num_threads), script_data_(script_data) { DCHECK(script_data_); executor->set_coordinator(this); executors_.push_back(executor); } MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { DCHECK(CalledOnValidThread()); // We will cancel all outstanding requests. pending_jobs_.clear(); for (auto& executor : executors_) { executor->Destroy(); } } int MultiThreadedProxyResolver::GetProxyForURL( const GURL& url, ProxyInfo* results, const CompletionCallback& callback, RequestHandle* request, const BoundNetLog& net_log) { DCHECK(CalledOnValidThread()); DCHECK(!callback.is_null()); scoped_refptr job( new GetProxyForURLJob(url, results, callback, net_log)); // Completion will be notified through |callback|, unless the caller cancels // the request using |request|. if (request) *request = reinterpret_cast(job.get()); // If there is an executor that is ready to run this request, submit it! Executor* executor = FindIdleExecutor(); if (executor) { DCHECK_EQ(0u, pending_jobs_.size()); executor->StartJob(job.get()); return ERR_IO_PENDING; } // Otherwise queue this request. (We will schedule it to a thread once one // becomes available). job->WaitingForThread(); pending_jobs_.push_back(job); // If we haven't already reached the thread limit, provision a new thread to // drain the requests more quickly. if (executors_.size() < max_num_threads_) AddNewExecutor(); return ERR_IO_PENDING; } void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { DCHECK(CalledOnValidThread()); DCHECK(req); Job* job = reinterpret_cast(req); DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); if (job->executor()) { // If the job was already submitted to the executor, just mark it // as cancelled so the user callback isn't run on completion. job->Cancel(); } else { // Otherwise the job is just sitting in a queue. PendingJobsQueue::iterator it = std::find(pending_jobs_.begin(), pending_jobs_.end(), job); DCHECK(it != pending_jobs_.end()); pending_jobs_.erase(it); } } LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { DCHECK(CalledOnValidThread()); DCHECK(req); return LOAD_STATE_RESOLVING_PROXY_FOR_URL; } void MultiThreadedProxyResolver::CancelSetPacScript() { NOTREACHED(); } int MultiThreadedProxyResolver::SetPacScript( const scoped_refptr& script_data, const CompletionCallback&callback) { NOTREACHED(); return ERR_NOT_IMPLEMENTED; } Executor* MultiThreadedProxyResolver::FindIdleExecutor() { DCHECK(CalledOnValidThread()); for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) { Executor* executor = it->get(); if (!executor->outstanding_job()) return executor; } return NULL; } void MultiThreadedProxyResolver::AddNewExecutor() { DCHECK(CalledOnValidThread()); DCHECK_LT(executors_.size(), max_num_threads_); // The "thread number" is used to give the thread a unique name. int thread_number = executors_.size(); Executor* executor = new Executor(this, thread_number); executor->StartJob( new CreateResolverJob(script_data_, resolver_factory_.get())); executors_.push_back(make_scoped_refptr(executor)); } void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { DCHECK(CalledOnValidThread()); if (pending_jobs_.empty()) return; // Get the next job to process (FIFO). Transfer it from the pending queue // to the executor. scoped_refptr job = pending_jobs_.front(); pending_jobs_.pop_front(); executor->StartJob(job.get()); } } // namespace class MultiThreadedProxyResolverFactory::Job : public ProxyResolverFactory::Request, public Executor::Coordinator { public: Job(MultiThreadedProxyResolverFactory* factory, const scoped_refptr& script_data, scoped_ptr* resolver, scoped_ptr resolver_factory, size_t max_num_threads, const CompletionCallback& callback) : factory_(factory), resolver_out_(resolver), resolver_factory_(resolver_factory.Pass()), max_num_threads_(max_num_threads), script_data_(script_data), executor_(new Executor(this, 0)), callback_(callback) { executor_->StartJob( new CreateResolverJob(script_data_, resolver_factory_.get())); } ~Job() override { if (factory_) { executor_->Destroy(); factory_->RemoveJob(this); } } void FactoryDestroyed() { executor_->Destroy(); executor_ = nullptr; factory_ = nullptr; } private: void OnExecutorReady(Executor* executor) override { int error = OK; if (executor->resolver()) { resolver_out_->reset(new MultiThreadedProxyResolver( resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(), executor_)); } else { error = ERR_PAC_SCRIPT_FAILED; executor_->Destroy(); } factory_->RemoveJob(this); factory_ = nullptr; callback_.Run(error); } MultiThreadedProxyResolverFactory* factory_; scoped_ptr* const resolver_out_; scoped_ptr resolver_factory_; const size_t max_num_threads_; scoped_refptr script_data_; scoped_refptr executor_; const CompletionCallback callback_; }; MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory( size_t max_num_threads, bool factory_expects_bytes) : ProxyResolverFactory(factory_expects_bytes), max_num_threads_(max_num_threads) { DCHECK_GE(max_num_threads, 1u); } MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() { for (auto job : jobs_) { job->FactoryDestroyed(); } } int MultiThreadedProxyResolverFactory::CreateProxyResolver( const scoped_refptr& pac_script, scoped_ptr* resolver, const CompletionCallback& callback, scoped_ptr* request) { scoped_ptr job(new Job(this, pac_script, resolver, CreateProxyResolverFactory(), max_num_threads_, callback)); jobs_.insert(job.get()); *request = job.Pass(); return ERR_IO_PENDING; } void MultiThreadedProxyResolverFactory::RemoveJob( MultiThreadedProxyResolverFactory::Job* job) { size_t erased = jobs_.erase(job); DCHECK_EQ(1u, erased); } } // namespace net