diff options
author | eroman@chromium.org <eroman@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-07-09 03:32:00 +0000 |
---|---|---|
committer | eroman@chromium.org <eroman@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-07-09 03:32:00 +0000 |
commit | 61b84d5d212129231137839524448b6205f7e097 (patch) | |
tree | 474727da6d37d02dac29dffb5edc192be0864a79 /net/proxy/multi_threaded_proxy_resolver.cc | |
parent | edf769c871dd82fe83becdc115fcf96aa601f646 (diff) | |
download | chromium_src-61b84d5d212129231137839524448b6205f7e097.zip chromium_src-61b84d5d212129231137839524448b6205f7e097.tar.gz chromium_src-61b84d5d212129231137839524448b6205f7e097.tar.bz2 |
Add the capability to run multiple proxy PAC scripts in parallel.
Refactors SingleThreadedProxyResolver into MultiThreadedProxyResolver.
New threads are created lazily on demand, up to a fixed maximum.
Note that this CL does NOT change the policy in Chrome -- it will continue to use a single thread for proxy resolving, but using the new code to do so.
BUG=11079
Review URL: http://codereview.chromium.org/2822043
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@51924 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/proxy/multi_threaded_proxy_resolver.cc')
-rw-r--r-- | net/proxy/multi_threaded_proxy_resolver.cc | 588 |
1 files changed, 588 insertions, 0 deletions
diff --git a/net/proxy/multi_threaded_proxy_resolver.cc b/net/proxy/multi_threaded_proxy_resolver.cc new file mode 100644 index 0000000..a55fef9 --- /dev/null +++ b/net/proxy/multi_threaded_proxy_resolver.cc @@ -0,0 +1,588 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/proxy/multi_threaded_proxy_resolver.h" + +#include "base/message_loop.h" +#include "base/string_util.h" +#include "base/thread.h" +#include "net/base/capturing_net_log.h" +#include "net/base/net_errors.h" +#include "net/proxy/proxy_info.h" + +// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script +// data when SetPacScript fails. That will reclaim memory when +// testing bogus scripts. + +namespace net { + +namespace { + +class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { + public: + explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} + void PurgeMemory() { resolver_->PurgeMemory(); } + private: + friend class base::RefCountedThreadSafe<PurgeMemoryTask>; + ~PurgeMemoryTask() {} + ProxyResolver* resolver_; +}; + +} // namespace + +// An "executor" is a job-runner for PAC requests. It encapsulates a worker +// thread and a synchronous ProxyResolver (which will be operated on said +// thread.) +class MultiThreadedProxyResolver::Executor + : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { + public: + // |coordinator| must remain valid throughout our lifetime. It is used to + // signal when the executor is ready to receive work by calling + // |coordinator->OnExecutorReady()|. + // The constructor takes ownership of |resolver|. + // |thread_number| is an identifier used when naming the worker thread. + Executor(MultiThreadedProxyResolver* coordinator, + ProxyResolver* resolver, + int thread_number); + + // Submit a job to this executor. + void StartJob(Job* job); + + // Callback for when a job has completed running on the executor's thread. + void OnJobCompleted(Job* job); + + // Cleanup the executor. Cancels all outstanding work, and frees the thread + // and resolver. + void Destroy(); + + void PurgeMemory(); + + // Returns the outstanding job, or NULL. + Job* outstanding_job() const { return outstanding_job_.get(); } + + ProxyResolver* resolver() { return resolver_.get(); } + + int thread_number() const { return thread_number_; } + + private: + friend class base::RefCountedThreadSafe<Executor>; + ~Executor(); + + MultiThreadedProxyResolver* coordinator_; + const int thread_number_; + + // The currently active job for this executor (either a SetPacScript or + // GetProxyForURL task). + scoped_refptr<Job> outstanding_job_; + + // The synchronous resolver implementation. + scoped_ptr<ProxyResolver> resolver_; + + // The thread where |resolver_| is run on. + // Note that declaration ordering is important here. |thread_| needs to be + // destroyed *before* |resolver_|, in case |resolver_| is currently + // executing on |thread_|. + scoped_ptr<base::Thread> thread_; +}; + +// MultiThreadedProxyResolver::Job --------------------------------------------- + +class MultiThreadedProxyResolver::Job + : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { + public: + // Identifies the subclass of Job (only being used for debugging purposes). + enum Type { + TYPE_GET_PROXY_FOR_URL, + TYPE_SET_PAC_SCRIPT, + TYPE_SET_PAC_SCRIPT_INTERNAL, + }; + + Job(Type type, CompletionCallback* user_callback) + : type_(type), + user_callback_(user_callback), + executor_(NULL), + was_cancelled_(false) { + } + + void set_executor(Executor* executor) { + executor_ = executor; + } + + // The "executor" is the job runner that is scheduling this job. If + // this job has not been submitted to an executor yet, this will be + // NULL (and we know it hasn't started yet). + Executor* executor() { + return executor_; + } + + // Mark the job as having been cancelled. + void Cancel() { + was_cancelled_ = true; + } + + // Returns true if Cancel() has been called. + bool was_cancelled() const { return was_cancelled_; } + + Type type() const { return type_; } + + // Returns true if this job still has a user callback. Some jobs + // do not have a user callback, because they were helper jobs + // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). + // + // Otherwise jobs that correspond with user-initiated work will + // have a non-NULL callback up until the callback is run. + bool has_user_callback() const { return user_callback_ != NULL; } + + // This method is called when the job is inserted into a wait queue + // because no executors were ready to accept it. + virtual void WaitingForThread() {} + + // This method is called just before the job is posted to the work thread. + virtual void FinishedWaitingForThread() {} + + // This method is called on the worker thread to do the job's work. On + // completion, implementors are expected to call OnJobCompleted() on + // |origin_loop|. + virtual void Run(MessageLoop* origin_loop) = 0; + + protected: + void OnJobCompleted() { + // |executor_| will be NULL if the executor has already been deleted. + if (executor_) + executor_->OnJobCompleted(this); + } + + void RunUserCallback(int result) { + DCHECK(has_user_callback()); + CompletionCallback* callback = user_callback_; + // Null the callback so has_user_callback() will now return false. + user_callback_ = NULL; + callback->Run(result); + } + + friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; + + virtual ~Job() {} + + private: + const Type type_; + CompletionCallback* user_callback_; + Executor* executor_; + bool was_cancelled_; +}; + +// MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- + +// Runs on the worker thread to call ProxyResolver::SetPacScript. +class MultiThreadedProxyResolver::SetPacScriptJob + : public MultiThreadedProxyResolver::Job { + public: + SetPacScriptJob(const GURL& pac_url, + const string16& pac_script, + CompletionCallback* callback) + : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, + callback), + pac_url_(pac_url), + pac_script_(pac_script) { + } + + // Runs on the worker thread. + virtual void Run(MessageLoop* origin_loop) { + ProxyResolver* resolver = executor()->resolver(); + int rv = resolver->expects_pac_bytes() ? + resolver->SetPacScriptByData(pac_script_, NULL) : + resolver->SetPacScriptByUrl(pac_url_, NULL); + + DCHECK_NE(rv, ERR_IO_PENDING); + origin_loop->PostTask( + FROM_HERE, + NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); + } + + private: + // Runs the completion callback on the origin thread. + void RequestComplete(int result_code) { + // The task may have been cancelled after it was started. + if (!was_cancelled() && has_user_callback()) { + RunUserCallback(result_code); + } + OnJobCompleted(); + } + + const GURL pac_url_; + const string16 pac_script_; +}; + +// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ + +class MultiThreadedProxyResolver::GetProxyForURLJob + : public MultiThreadedProxyResolver::Job { + public: + // |url| -- the URL of the query. + // |results| -- the structure to fill with proxy resolve results. + GetProxyForURLJob(const GURL& url, + ProxyInfo* results, + CompletionCallback* callback, + const BoundNetLog& net_log) + : Job(TYPE_GET_PROXY_FOR_URL, callback), + results_(results), + net_log_(net_log), + url_(url), + was_waiting_for_thread_(false) { + DCHECK(callback); + } + + BoundNetLog* net_log() { return &net_log_; } + + virtual void WaitingForThread() { + was_waiting_for_thread_ = true; + net_log_.BeginEvent( + NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); + } + + virtual void FinishedWaitingForThread() { + DCHECK(executor()); + + if (was_waiting_for_thread_) { + net_log_.EndEvent( + NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); + } + + net_log_.AddEvent( + NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, + new NetLogIntegerParameter( + "thread_number", executor()->thread_number())); + } + + // Runs on the worker thread. + virtual void Run(MessageLoop* origin_loop) { + const size_t kNetLogBound = 50u; + worker_log_.reset(new CapturingNetLog(kNetLogBound)); + BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); + + ProxyResolver* resolver = executor()->resolver(); + int rv = resolver->GetProxyForURL( + url_, &results_buf_, NULL, NULL, bound_worker_log); + DCHECK_NE(rv, ERR_IO_PENDING); + + origin_loop->PostTask( + FROM_HERE, + NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); + } + + private: + // Runs the completion callback on the origin thread. + void QueryComplete(int result_code) { + // The Job may have been cancelled after it was started. + if (!was_cancelled()) { + // Merge the load log that was generated on the worker thread, into the + // main log. + CapturingBoundNetLog bound_worker_log(NetLog::Source(), + worker_log_.release()); + bound_worker_log.AppendTo(net_log_); + + if (result_code >= OK) { // Note: unit-tests use values > 0. + results_->Use(results_buf_); + } + RunUserCallback(result_code); + } + OnJobCompleted(); + } + + // Must only be used on the "origin" thread. + ProxyInfo* results_; + BoundNetLog net_log_; + const GURL url_; + + // Usable from within DoQuery on the worker thread. + ProxyInfo results_buf_; + + // Used to pass the captured events between DoQuery [worker thread] and + // QueryComplete [origin thread]. + scoped_ptr<CapturingNetLog> worker_log_; + + bool was_waiting_for_thread_; +}; + +// MultiThreadedProxyResolver::Executor ---------------------------------------- + +MultiThreadedProxyResolver::Executor::Executor( + MultiThreadedProxyResolver* coordinator, + ProxyResolver* resolver, + int thread_number) + : coordinator_(coordinator), + thread_number_(thread_number), + resolver_(resolver) { + DCHECK(coordinator); + DCHECK(resolver); + // Start up the thread. + // Note that it is safe to pass a temporary C-String to Thread(), as it will + // make a copy. + std::string thread_name = + StringPrintf("PAC thread #%d", thread_number); + thread_.reset(new base::Thread(thread_name.c_str())); + thread_->Start(); +} + +void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { + DCHECK(!outstanding_job_); + outstanding_job_ = job; + + // Run the job. Once it has completed (regardless of whether it was + // cancelled), it will invoke OnJobCompleted() on this thread. + job->set_executor(this); + job->FinishedWaitingForThread(); + thread_->message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(job, &Job::Run, MessageLoop::current())); +} + +void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { + DCHECK_EQ(job, outstanding_job_.get()); + outstanding_job_ = NULL; + coordinator_->OnExecutorReady(this); +} + +void MultiThreadedProxyResolver::Executor::Destroy() { + DCHECK(coordinator_); + + // Give the resolver an opportunity to shutdown from THIS THREAD before + // joining on the resolver thread. This allows certain implementations + // to avoid deadlocks. + resolver_->Shutdown(); + + // Join the worker thread. + thread_.reset(); + + // Cancel any outstanding job. + if (outstanding_job_) { + outstanding_job_->Cancel(); + // Orphan the job (since this executor may be deleted soon). + outstanding_job_->set_executor(NULL); + } + + // It is now safe to free the ProxyResolver, since all the tasks that + // were using it on the resolver thread have completed. + resolver_.reset(); + + // Null some stuff as a precaution. + coordinator_ = NULL; + outstanding_job_ = NULL; +} + +void MultiThreadedProxyResolver::Executor::PurgeMemory() { + scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); + thread_->message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); +} + +MultiThreadedProxyResolver::Executor::~Executor() { + // The important cleanup happens as part of Destroy(), which should always be + // called first. + DCHECK(!coordinator_) << "Destroy() was not called"; + DCHECK(!thread_.get()); + DCHECK(!resolver_.get()); + DCHECK(!outstanding_job_); +} + +// MultiThreadedProxyResolver -------------------------------------------------- + +MultiThreadedProxyResolver::MultiThreadedProxyResolver( + ProxyResolverFactory* resolver_factory, + size_t max_num_threads) + : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), + resolver_factory_(resolver_factory), + max_num_threads_(max_num_threads), + was_set_pac_script_called_(false) { + DCHECK_GE(max_num_threads, 1u); +} + +MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { + // We will cancel all outstanding requests. + pending_jobs_.clear(); + ReleaseAllExecutors(); +} + +int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, + ProxyInfo* results, + CompletionCallback* callback, + RequestHandle* request, + const BoundNetLog& net_log) { + DCHECK(CalledOnValidThread()); + DCHECK(callback); + DCHECK(was_set_pac_script_called_) + << "Resolver is un-initialized. Must call SetPacScript() first!"; + + scoped_refptr<GetProxyForURLJob> job = + new GetProxyForURLJob(url, results, callback, net_log); + + // Completion will be notified through |callback|, unless the caller cancels + // the request using |request|. + if (request) + *request = reinterpret_cast<RequestHandle>(job.get()); + + // If there is an executor that is ready to run this request, submit it! + Executor* executor = FindIdleExecutor(); + if (executor) { + DCHECK_EQ(0u, pending_jobs_.size()); + executor->StartJob(job); + return ERR_IO_PENDING; + } + + // Otherwise queue this request. (We will schedule it to a thread once one + // becomes available). + job->WaitingForThread(); + pending_jobs_.push_back(job); + + // If we haven't already reached the thread limit, provision a new thread to + // drain the requests more quickly. + if (executors_.size() < max_num_threads_) { + executor = AddNewExecutor(); + executor->StartJob( + new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL)); + } + + return ERR_IO_PENDING; +} + +void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { + DCHECK(CalledOnValidThread()); + DCHECK(req); + + Job* job = reinterpret_cast<Job*>(req); + DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); + + if (job->executor()) { + // If the job was already submitted to the executor, just mark it + // as cancelled so the user callback isn't run on completion. + job->Cancel(); + } else { + // Otherwise the job is just sitting in a queue. + PendingJobsQueue::iterator it = + std::find(pending_jobs_.begin(), pending_jobs_.end(), job); + DCHECK(it != pending_jobs_.end()); + pending_jobs_.erase(it); + } +} + +void MultiThreadedProxyResolver::CancelSetPacScript() { + DCHECK(CalledOnValidThread()); + DCHECK_EQ(0u, pending_jobs_.size()); + DCHECK_EQ(1u, executors_.size()); + DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, + executors_[0]->outstanding_job()->type()); + + // Defensively clear some data which shouldn't be getting used + // anymore. + was_set_pac_script_called_ = false; + current_pac_url_ = GURL(); + current_pac_script_ = string16(); + + ReleaseAllExecutors(); +} + +void MultiThreadedProxyResolver::PurgeMemory() { + DCHECK(CalledOnValidThread()); + for (ExecutorList::iterator it = executors_.begin(); + it != executors_.end(); ++it) { + Executor* executor = *it; + executor->PurgeMemory(); + } +} + +int MultiThreadedProxyResolver::SetPacScript( + const GURL& pac_url, + const string16& pac_script, + CompletionCallback* callback) { + DCHECK(CalledOnValidThread()); + DCHECK(callback); + + // Save the script details, so we can provision new executors later. + // (We rely on internal reference counting of strings to avoid this memory + // being duplicated by each of the resolver threads). + was_set_pac_script_called_ = true; + current_pac_url_ = pac_url; + current_pac_script_ = pac_script; + + // The user should not have any outstanding requests when they call + // SetPacScript(). + CheckNoOutstandingUserRequests(); + + // Destroy all of the current threads and their proxy resolvers. + ReleaseAllExecutors(); + + // Provision a new executor, and run the SetPacScript request. On completion + // notification will be sent through |callback|. + Executor* executor = AddNewExecutor(); + executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback)); + return ERR_IO_PENDING; +} + +void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { + DCHECK(CalledOnValidThread()); + CHECK_EQ(0u, pending_jobs_.size()); + + for (ExecutorList::const_iterator it = executors_.begin(); + it != executors_.end(); ++it) { + const Executor* executor = *it; + Job* job = executor->outstanding_job(); + // The "has_user_callback()" is to exclude jobs for which the callback + // has already been invoked, or was not user-initiated (as in the case of + // lazy thread provisions). User-initiated jobs may !has_user_callback() + // when the callback has already been run. (Since we only clear the + // outstanding job AFTER the callback has been invoked, it is possible + // for a new request to be started from within the callback). + CHECK(!job || job->was_cancelled() || !job->has_user_callback()); + } +} + +void MultiThreadedProxyResolver::ReleaseAllExecutors() { + DCHECK(CalledOnValidThread()); + for (ExecutorList::iterator it = executors_.begin(); + it != executors_.end(); ++it) { + Executor* executor = *it; + executor->Destroy(); + } + executors_.clear(); +} + +MultiThreadedProxyResolver::Executor* +MultiThreadedProxyResolver::FindIdleExecutor() { + DCHECK(CalledOnValidThread()); + for (ExecutorList::iterator it = executors_.begin(); + it != executors_.end(); ++it) { + Executor* executor = *it; + if (!executor->outstanding_job()) + return executor; + } + return NULL; +} + +MultiThreadedProxyResolver::Executor* +MultiThreadedProxyResolver::AddNewExecutor() { + DCHECK(CalledOnValidThread()); + DCHECK_LT(executors_.size(), max_num_threads_); + // The "thread number" is used to give the thread a unique name. + int thread_number = executors_.size(); + ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); + Executor* executor = new Executor( + this, resolver, thread_number); + executors_.push_back(executor); + return executor; +} + +void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { + DCHECK(CalledOnValidThread()); + if (pending_jobs_.empty()) + return; + + // Get the next job to process (FIFO). Transfer it from the pending queue + // to the executor. + scoped_refptr<Job> job = pending_jobs_.front(); + pending_jobs_.pop_front(); + executor->StartJob(job); +} + +} // namespace net |