diff options
Diffstat (limited to 'net/proxy/multi_threaded_proxy_resolver.cc')
-rw-r--r-- | net/proxy/multi_threaded_proxy_resolver.cc | 365 |
1 files changed, 235 insertions, 130 deletions
diff --git a/net/proxy/multi_threaded_proxy_resolver.cc b/net/proxy/multi_threaded_proxy_resolver.cc index b8fea44..a579e79 100644 --- a/net/proxy/multi_threaded_proxy_resolver.cc +++ b/net/proxy/multi_threaded_proxy_resolver.cc @@ -4,38 +4,45 @@ #include "net/proxy/multi_threaded_proxy_resolver.h" +#include <deque> +#include <vector> + #include "base/bind.h" #include "base/bind_helpers.h" #include "base/message_loop/message_loop_proxy.h" +#include "base/stl_util.h" #include "base/strings/string_util.h" #include "base/strings/stringprintf.h" +#include "base/threading/non_thread_safe.h" #include "base/threading/thread.h" #include "base/threading/thread_restrictions.h" #include "net/base/net_errors.h" #include "net/log/net_log.h" #include "net/proxy/proxy_info.h" -#include "net/proxy/proxy_resolver_factory.h" - -// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script -// data when SetPacScript fails. That will reclaim memory when -// testing bogus scripts. +#include "net/proxy/proxy_resolver.h" namespace net { +namespace { +class Job; // An "executor" is a job-runner for PAC requests. It encapsulates a worker // thread and a synchronous ProxyResolver (which will be operated on said // thread.) -class MultiThreadedProxyResolver::Executor - : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { +class Executor : public base::RefCountedThreadSafe<Executor> { public: + class Coordinator { + public: + virtual void OnExecutorReady(Executor* executor) = 0; + + protected: + virtual ~Coordinator() = default; + }; + // |coordinator| must remain valid throughout our lifetime. It is used to // signal when the executor is ready to receive work by calling // |coordinator->OnExecutorReady()|. - // The constructor takes ownership of |resolver|. // |thread_number| is an identifier used when naming the worker thread. - Executor(MultiThreadedProxyResolver* coordinator, - scoped_ptr<ProxyResolver> resolver, - int thread_number); + Executor(Coordinator* coordinator, int thread_number); // Submit a job to this executor. void StartJob(Job* job); @@ -54,14 +61,24 @@ class MultiThreadedProxyResolver::Executor int thread_number() const { return thread_number_; } + void set_resolver(scoped_ptr<ProxyResolver> resolver) { + resolver_ = resolver.Pass(); + } + + void set_coordinator(Coordinator* coordinator) { + DCHECK(coordinator); + DCHECK(coordinator_); + coordinator_ = coordinator; + } + private: friend class base::RefCountedThreadSafe<Executor>; ~Executor(); - MultiThreadedProxyResolver* coordinator_; + Coordinator* coordinator_; const int thread_number_; - // The currently active job for this executor (either a SetPacScript or + // The currently active job for this executor (either a CreateProxyResolver or // GetProxyForURL task). scoped_refptr<Job> outstanding_job_; @@ -75,16 +92,68 @@ class MultiThreadedProxyResolver::Executor scoped_ptr<base::Thread> thread_; }; -// MultiThreadedProxyResolver::Job --------------------------------------------- +class MultiThreadedProxyResolver : public ProxyResolver, + public Executor::Coordinator, + public base::NonThreadSafe { + public: + // Creates an asynchronous ProxyResolver that runs requests on up to + // |max_num_threads|. + // + // For each thread that is created, an accompanying synchronous ProxyResolver + // will be provisioned using |resolver_factory|. All methods on these + // ProxyResolvers will be called on the one thread. + MultiThreadedProxyResolver( + scoped_ptr<ProxyResolverFactory> resolver_factory, + size_t max_num_threads, + const scoped_refptr<ProxyResolverScriptData>& script_data, + scoped_refptr<Executor> executor); + + ~MultiThreadedProxyResolver() override; + + // ProxyResolver implementation: + int GetProxyForURL(const GURL& url, + ProxyInfo* results, + const CompletionCallback& callback, + RequestHandle* request, + const BoundNetLog& net_log) override; + void CancelRequest(RequestHandle request) override; + LoadState GetLoadState(RequestHandle request) const override; + void CancelSetPacScript() override; + int SetPacScript(const scoped_refptr<ProxyResolverScriptData>& script_data, + const CompletionCallback& callback) override; -class MultiThreadedProxyResolver::Job - : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { + private: + class GetProxyForURLJob; + // FIFO queue of pending jobs waiting to be started. + // TODO(eroman): Make this priority queue. + typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; + typedef std::vector<scoped_refptr<Executor>> ExecutorList; + + // Returns an idle worker thread which is ready to receive GetProxyForURL() + // requests. If all threads are occupied, returns NULL. + Executor* FindIdleExecutor(); + + // Creates a new worker thread, and appends it to |executors_|. + void AddNewExecutor(); + + // Starts the next job from |pending_jobs_| if possible. + void OnExecutorReady(Executor* executor) override; + + const scoped_ptr<ProxyResolverFactory> resolver_factory_; + const size_t max_num_threads_; + PendingJobsQueue pending_jobs_; + ExecutorList executors_; + scoped_refptr<ProxyResolverScriptData> script_data_; +}; + +// Job --------------------------------------------- + +class Job : public base::RefCountedThreadSafe<Job> { public: // Identifies the subclass of Job (only being used for debugging purposes). enum Type { TYPE_GET_PROXY_FOR_URL, - TYPE_SET_PAC_SCRIPT, - TYPE_SET_PAC_SCRIPT_INTERNAL, + TYPE_CREATE_RESOLVER, }; Job(Type type, const CompletionCallback& callback) @@ -117,7 +186,7 @@ class MultiThreadedProxyResolver::Job // Returns true if this job still has a user callback. Some jobs // do not have a user callback, because they were helper jobs - // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). + // scheduled internally (for example TYPE_CREATE_RESOLVER). // // Otherwise jobs that correspond with user-initiated work will // have a non-null callback up until the callback is run. @@ -150,7 +219,7 @@ class MultiThreadedProxyResolver::Job callback.Run(result); } - friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; + friend class base::RefCountedThreadSafe<Job>; virtual ~Job() {} @@ -161,51 +230,50 @@ class MultiThreadedProxyResolver::Job bool was_cancelled_; }; -// MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- +// CreateResolverJob ----------------------------------------------------------- -// Runs on the worker thread to call ProxyResolver::SetPacScript. -class MultiThreadedProxyResolver::SetPacScriptJob - : public MultiThreadedProxyResolver::Job { +// Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. +class CreateResolverJob : public Job { public: - SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, - const CompletionCallback& callback) - : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT : - TYPE_SET_PAC_SCRIPT_INTERNAL, - callback), - script_data_(script_data) { - } + CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, + ProxyResolverFactory* factory) + : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), + script_data_(script_data), + factory_(factory) {} // Runs on the worker thread. void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override { - ProxyResolver* resolver = executor()->resolver(); - int rv = resolver->SetPacScript(script_data_, CompletionCallback()); + scoped_ptr<ProxyResolverFactory::Request> request; + int rv = factory_->CreateProxyResolver(script_data_, &resolver_, + CompletionCallback(), &request); DCHECK_NE(rv, ERR_IO_PENDING); origin_loop->PostTask( - FROM_HERE, - base::Bind(&SetPacScriptJob::RequestComplete, this, rv)); + FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv)); } protected: - ~SetPacScriptJob() override {} + ~CreateResolverJob() override {} private: // Runs the completion callback on the origin thread. void RequestComplete(int result_code) { // The task may have been cancelled after it was started. - if (!was_cancelled() && has_user_callback()) { - RunUserCallback(result_code); + if (!was_cancelled()) { + DCHECK(executor()); + executor()->set_resolver(resolver_.Pass()); } OnJobCompleted(); } const scoped_refptr<ProxyResolverScriptData> script_data_; + ProxyResolverFactory* factory_; + scoped_ptr<ProxyResolver> resolver_; }; // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ -class MultiThreadedProxyResolver::GetProxyForURLJob - : public MultiThreadedProxyResolver::Job { +class MultiThreadedProxyResolver::GetProxyForURLJob : public Job { public: // |url| -- the URL of the query. // |results| -- the structure to fill with proxy resolve results. @@ -243,6 +311,7 @@ class MultiThreadedProxyResolver::GetProxyForURLJob // Runs on the worker thread. void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override { ProxyResolver* resolver = executor()->resolver(); + DCHECK(resolver); int rv = resolver->GetProxyForURL( url_, &results_buf_, CompletionCallback(), NULL, net_log_); DCHECK_NE(rv, ERR_IO_PENDING); @@ -281,24 +350,18 @@ class MultiThreadedProxyResolver::GetProxyForURLJob bool was_waiting_for_thread_; }; -// MultiThreadedProxyResolver::Executor ---------------------------------------- +// Executor ---------------------------------------- -MultiThreadedProxyResolver::Executor::Executor( - MultiThreadedProxyResolver* coordinator, - scoped_ptr<ProxyResolver> resolver, - int thread_number) - : coordinator_(coordinator), - thread_number_(thread_number), - resolver_(resolver.Pass()) { +Executor::Executor(Executor::Coordinator* coordinator, int thread_number) + : coordinator_(coordinator), thread_number_(thread_number) { DCHECK(coordinator); - DCHECK(resolver_); // Start up the thread. thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d", thread_number))); CHECK(thread_->Start()); } -void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { +void Executor::StartJob(Job* job) { DCHECK(!outstanding_job_.get()); outstanding_job_ = job; @@ -311,13 +374,13 @@ void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); } -void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { +void Executor::OnJobCompleted(Job* job) { DCHECK_EQ(job, outstanding_job_.get()); outstanding_job_ = NULL; coordinator_->OnExecutorReady(this); } -void MultiThreadedProxyResolver::Executor::Destroy() { +void Executor::Destroy() { DCHECK(coordinator_); { @@ -344,7 +407,7 @@ void MultiThreadedProxyResolver::Executor::Destroy() { outstanding_job_ = NULL; } -MultiThreadedProxyResolver::Executor::~Executor() { +Executor::~Executor() { // The important cleanup happens as part of Destroy(), which should always be // called first. DCHECK(!coordinator_) << "Destroy() was not called"; @@ -356,18 +419,27 @@ MultiThreadedProxyResolver::Executor::~Executor() { // MultiThreadedProxyResolver -------------------------------------------------- MultiThreadedProxyResolver::MultiThreadedProxyResolver( - LegacyProxyResolverFactory* resolver_factory, - size_t max_num_threads) + scoped_ptr<ProxyResolverFactory> resolver_factory, + size_t max_num_threads, + const scoped_refptr<ProxyResolverScriptData>& script_data, + scoped_refptr<Executor> executor) : ProxyResolver(resolver_factory->expects_pac_bytes()), - resolver_factory_(resolver_factory), - max_num_threads_(max_num_threads) { - DCHECK_GE(max_num_threads, 1u); + resolver_factory_(resolver_factory.Pass()), + max_num_threads_(max_num_threads), + script_data_(script_data) { + DCHECK(script_data_); + executor->set_coordinator(this); + executors_.push_back(executor); } MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { + DCHECK(CalledOnValidThread()); // We will cancel all outstanding requests. pending_jobs_.clear(); - ReleaseAllExecutors(); + + for (auto& executor : executors_) { + executor->Destroy(); + } } int MultiThreadedProxyResolver::GetProxyForURL( @@ -375,8 +447,6 @@ int MultiThreadedProxyResolver::GetProxyForURL( RequestHandle* request, const BoundNetLog& net_log) { DCHECK(CalledOnValidThread()); DCHECK(!callback.is_null()); - DCHECK(current_script_data_.get()) - << "Resolver is un-initialized. Must call SetPacScript() first!"; scoped_refptr<GetProxyForURLJob> job( new GetProxyForURLJob(url, results, callback, net_log)); @@ -401,11 +471,8 @@ int MultiThreadedProxyResolver::GetProxyForURL( // If we haven't already reached the thread limit, provision a new thread to // drain the requests more quickly. - if (executors_.size() < max_num_threads_) { - executor = AddNewExecutor(); - executor->StartJob( - new SetPacScriptJob(current_script_data_, CompletionCallback())); - } + if (executors_.size() < max_num_threads_) + AddNewExecutor(); return ERR_IO_PENDING; } @@ -437,72 +504,17 @@ LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { } void MultiThreadedProxyResolver::CancelSetPacScript() { - DCHECK(CalledOnValidThread()); - DCHECK_EQ(0u, pending_jobs_.size()); - DCHECK_EQ(1u, executors_.size()); - DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, - executors_[0]->outstanding_job()->type()); - - // Defensively clear some data which shouldn't be getting used - // anymore. - current_script_data_ = NULL; - - ReleaseAllExecutors(); + NOTREACHED(); } int MultiThreadedProxyResolver::SetPacScript( const scoped_refptr<ProxyResolverScriptData>& script_data, const CompletionCallback&callback) { - DCHECK(CalledOnValidThread()); - DCHECK(!callback.is_null()); - - // Save the script details, so we can provision new executors later. - current_script_data_ = script_data; - - // The user should not have any outstanding requests when they call - // SetPacScript(). - CheckNoOutstandingUserRequests(); - - // Destroy all of the current threads and their proxy resolvers. - ReleaseAllExecutors(); - - // Provision a new executor, and run the SetPacScript request. On completion - // notification will be sent through |callback|. - Executor* executor = AddNewExecutor(); - executor->StartJob(new SetPacScriptJob(script_data, callback)); - return ERR_IO_PENDING; + NOTREACHED(); + return ERR_NOT_IMPLEMENTED; } -void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { - DCHECK(CalledOnValidThread()); - CHECK_EQ(0u, pending_jobs_.size()); - - for (ExecutorList::const_iterator it = executors_.begin(); - it != executors_.end(); ++it) { - const Executor* executor = it->get(); - Job* job = executor->outstanding_job(); - // The "has_user_callback()" is to exclude jobs for which the callback - // has already been invoked, or was not user-initiated (as in the case of - // lazy thread provisions). User-initiated jobs may !has_user_callback() - // when the callback has already been run. (Since we only clear the - // outstanding job AFTER the callback has been invoked, it is possible - // for a new request to be started from within the callback). - CHECK(!job || job->was_cancelled() || !job->has_user_callback()); - } -} - -void MultiThreadedProxyResolver::ReleaseAllExecutors() { - DCHECK(CalledOnValidThread()); - for (ExecutorList::iterator it = executors_.begin(); - it != executors_.end(); ++it) { - Executor* executor = it->get(); - executor->Destroy(); - } - executors_.clear(); -} - -MultiThreadedProxyResolver::Executor* -MultiThreadedProxyResolver::FindIdleExecutor() { +Executor* MultiThreadedProxyResolver::FindIdleExecutor() { DCHECK(CalledOnValidThread()); for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) { @@ -513,16 +525,15 @@ MultiThreadedProxyResolver::FindIdleExecutor() { return NULL; } -MultiThreadedProxyResolver::Executor* -MultiThreadedProxyResolver::AddNewExecutor() { +void MultiThreadedProxyResolver::AddNewExecutor() { DCHECK(CalledOnValidThread()); DCHECK_LT(executors_.size(), max_num_threads_); // The "thread number" is used to give the thread a unique name. int thread_number = executors_.size(); - scoped_ptr<ProxyResolver> resolver = resolver_factory_->CreateProxyResolver(); - Executor* executor = new Executor(this, resolver.Pass(), thread_number); + Executor* executor = new Executor(this, thread_number); + executor->StartJob( + new CreateResolverJob(script_data_, resolver_factory_.get())); executors_.push_back(make_scoped_refptr(executor)); - return executor; } void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { @@ -537,4 +548,98 @@ void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { executor->StartJob(job.get()); } +} // namespace + +class MultiThreadedProxyResolverFactory::Job + : public ProxyResolverFactory::Request, + public Executor::Coordinator { + public: + Job(MultiThreadedProxyResolverFactory* factory, + const scoped_refptr<ProxyResolverScriptData>& script_data, + scoped_ptr<ProxyResolver>* resolver, + scoped_ptr<ProxyResolverFactory> resolver_factory, + size_t max_num_threads, + const CompletionCallback& callback) + : factory_(factory), + resolver_out_(resolver), + resolver_factory_(resolver_factory.Pass()), + max_num_threads_(max_num_threads), + script_data_(script_data), + executor_(new Executor(this, 0)), + callback_(callback) { + executor_->StartJob( + new CreateResolverJob(script_data_, resolver_factory_.get())); + } + + ~Job() override { + if (factory_) { + executor_->Destroy(); + factory_->RemoveJob(this); + } + } + + void FactoryDestroyed() { + executor_->Destroy(); + executor_ = nullptr; + factory_ = nullptr; + } + + private: + void OnExecutorReady(Executor* executor) override { + int error = OK; + if (executor->resolver()) { + resolver_out_->reset(new MultiThreadedProxyResolver( + resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(), + executor_)); + } else { + error = ERR_PAC_SCRIPT_FAILED; + executor_->Destroy(); + } + factory_->RemoveJob(this); + factory_ = nullptr; + callback_.Run(error); + } + + MultiThreadedProxyResolverFactory* factory_; + scoped_ptr<ProxyResolver>* const resolver_out_; + scoped_ptr<ProxyResolverFactory> resolver_factory_; + const size_t max_num_threads_; + scoped_refptr<ProxyResolverScriptData> script_data_; + scoped_refptr<Executor> executor_; + const CompletionCallback callback_; +}; + +MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory( + size_t max_num_threads, + bool factory_expects_bytes) + : ProxyResolverFactory(factory_expects_bytes), + max_num_threads_(max_num_threads) { + DCHECK_GE(max_num_threads, 1u); +} + +MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() { + for (auto job : jobs_) { + job->FactoryDestroyed(); + } +} + +int MultiThreadedProxyResolverFactory::CreateProxyResolver( + const scoped_refptr<ProxyResolverScriptData>& pac_script, + scoped_ptr<ProxyResolver>* resolver, + const CompletionCallback& callback, + scoped_ptr<Request>* request) { + scoped_ptr<Job> job(new Job(this, pac_script, resolver, + CreateProxyResolverFactory(), max_num_threads_, + callback)); + jobs_.insert(job.get()); + *request = job.Pass(); + return ERR_IO_PENDING; +} + +void MultiThreadedProxyResolverFactory::RemoveJob( + MultiThreadedProxyResolverFactory::Job* job) { + size_t erased = jobs_.erase(job); + DCHECK_EQ(1u, erased); +} + } // namespace net |