diff options
Diffstat (limited to 'net/proxy/multi_threaded_proxy_resolver.cc')
-rw-r--r-- | net/proxy/multi_threaded_proxy_resolver.cc | 588 |
1 files changed, 0 insertions, 588 deletions
diff --git a/net/proxy/multi_threaded_proxy_resolver.cc b/net/proxy/multi_threaded_proxy_resolver.cc deleted file mode 100644 index a55fef9..0000000 --- a/net/proxy/multi_threaded_proxy_resolver.cc +++ /dev/null @@ -1,588 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/proxy/multi_threaded_proxy_resolver.h" - -#include "base/message_loop.h" -#include "base/string_util.h" -#include "base/thread.h" -#include "net/base/capturing_net_log.h" -#include "net/base/net_errors.h" -#include "net/proxy/proxy_info.h" - -// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script -// data when SetPacScript fails. That will reclaim memory when -// testing bogus scripts. - -namespace net { - -namespace { - -class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { - public: - explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} - void PurgeMemory() { resolver_->PurgeMemory(); } - private: - friend class base::RefCountedThreadSafe<PurgeMemoryTask>; - ~PurgeMemoryTask() {} - ProxyResolver* resolver_; -}; - -} // namespace - -// An "executor" is a job-runner for PAC requests. It encapsulates a worker -// thread and a synchronous ProxyResolver (which will be operated on said -// thread.) -class MultiThreadedProxyResolver::Executor - : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { - public: - // |coordinator| must remain valid throughout our lifetime. It is used to - // signal when the executor is ready to receive work by calling - // |coordinator->OnExecutorReady()|. - // The constructor takes ownership of |resolver|. - // |thread_number| is an identifier used when naming the worker thread. - Executor(MultiThreadedProxyResolver* coordinator, - ProxyResolver* resolver, - int thread_number); - - // Submit a job to this executor. - void StartJob(Job* job); - - // Callback for when a job has completed running on the executor's thread. - void OnJobCompleted(Job* job); - - // Cleanup the executor. Cancels all outstanding work, and frees the thread - // and resolver. - void Destroy(); - - void PurgeMemory(); - - // Returns the outstanding job, or NULL. - Job* outstanding_job() const { return outstanding_job_.get(); } - - ProxyResolver* resolver() { return resolver_.get(); } - - int thread_number() const { return thread_number_; } - - private: - friend class base::RefCountedThreadSafe<Executor>; - ~Executor(); - - MultiThreadedProxyResolver* coordinator_; - const int thread_number_; - - // The currently active job for this executor (either a SetPacScript or - // GetProxyForURL task). - scoped_refptr<Job> outstanding_job_; - - // The synchronous resolver implementation. - scoped_ptr<ProxyResolver> resolver_; - - // The thread where |resolver_| is run on. - // Note that declaration ordering is important here. |thread_| needs to be - // destroyed *before* |resolver_|, in case |resolver_| is currently - // executing on |thread_|. - scoped_ptr<base::Thread> thread_; -}; - -// MultiThreadedProxyResolver::Job --------------------------------------------- - -class MultiThreadedProxyResolver::Job - : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { - public: - // Identifies the subclass of Job (only being used for debugging purposes). - enum Type { - TYPE_GET_PROXY_FOR_URL, - TYPE_SET_PAC_SCRIPT, - TYPE_SET_PAC_SCRIPT_INTERNAL, - }; - - Job(Type type, CompletionCallback* user_callback) - : type_(type), - user_callback_(user_callback), - executor_(NULL), - was_cancelled_(false) { - } - - void set_executor(Executor* executor) { - executor_ = executor; - } - - // The "executor" is the job runner that is scheduling this job. If - // this job has not been submitted to an executor yet, this will be - // NULL (and we know it hasn't started yet). - Executor* executor() { - return executor_; - } - - // Mark the job as having been cancelled. - void Cancel() { - was_cancelled_ = true; - } - - // Returns true if Cancel() has been called. - bool was_cancelled() const { return was_cancelled_; } - - Type type() const { return type_; } - - // Returns true if this job still has a user callback. Some jobs - // do not have a user callback, because they were helper jobs - // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). - // - // Otherwise jobs that correspond with user-initiated work will - // have a non-NULL callback up until the callback is run. - bool has_user_callback() const { return user_callback_ != NULL; } - - // This method is called when the job is inserted into a wait queue - // because no executors were ready to accept it. - virtual void WaitingForThread() {} - - // This method is called just before the job is posted to the work thread. - virtual void FinishedWaitingForThread() {} - - // This method is called on the worker thread to do the job's work. On - // completion, implementors are expected to call OnJobCompleted() on - // |origin_loop|. - virtual void Run(MessageLoop* origin_loop) = 0; - - protected: - void OnJobCompleted() { - // |executor_| will be NULL if the executor has already been deleted. - if (executor_) - executor_->OnJobCompleted(this); - } - - void RunUserCallback(int result) { - DCHECK(has_user_callback()); - CompletionCallback* callback = user_callback_; - // Null the callback so has_user_callback() will now return false. - user_callback_ = NULL; - callback->Run(result); - } - - friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; - - virtual ~Job() {} - - private: - const Type type_; - CompletionCallback* user_callback_; - Executor* executor_; - bool was_cancelled_; -}; - -// MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- - -// Runs on the worker thread to call ProxyResolver::SetPacScript. -class MultiThreadedProxyResolver::SetPacScriptJob - : public MultiThreadedProxyResolver::Job { - public: - SetPacScriptJob(const GURL& pac_url, - const string16& pac_script, - CompletionCallback* callback) - : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, - callback), - pac_url_(pac_url), - pac_script_(pac_script) { - } - - // Runs on the worker thread. - virtual void Run(MessageLoop* origin_loop) { - ProxyResolver* resolver = executor()->resolver(); - int rv = resolver->expects_pac_bytes() ? - resolver->SetPacScriptByData(pac_script_, NULL) : - resolver->SetPacScriptByUrl(pac_url_, NULL); - - DCHECK_NE(rv, ERR_IO_PENDING); - origin_loop->PostTask( - FROM_HERE, - NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); - } - - private: - // Runs the completion callback on the origin thread. - void RequestComplete(int result_code) { - // The task may have been cancelled after it was started. - if (!was_cancelled() && has_user_callback()) { - RunUserCallback(result_code); - } - OnJobCompleted(); - } - - const GURL pac_url_; - const string16 pac_script_; -}; - -// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ - -class MultiThreadedProxyResolver::GetProxyForURLJob - : public MultiThreadedProxyResolver::Job { - public: - // |url| -- the URL of the query. - // |results| -- the structure to fill with proxy resolve results. - GetProxyForURLJob(const GURL& url, - ProxyInfo* results, - CompletionCallback* callback, - const BoundNetLog& net_log) - : Job(TYPE_GET_PROXY_FOR_URL, callback), - results_(results), - net_log_(net_log), - url_(url), - was_waiting_for_thread_(false) { - DCHECK(callback); - } - - BoundNetLog* net_log() { return &net_log_; } - - virtual void WaitingForThread() { - was_waiting_for_thread_ = true; - net_log_.BeginEvent( - NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); - } - - virtual void FinishedWaitingForThread() { - DCHECK(executor()); - - if (was_waiting_for_thread_) { - net_log_.EndEvent( - NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); - } - - net_log_.AddEvent( - NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, - new NetLogIntegerParameter( - "thread_number", executor()->thread_number())); - } - - // Runs on the worker thread. - virtual void Run(MessageLoop* origin_loop) { - const size_t kNetLogBound = 50u; - worker_log_.reset(new CapturingNetLog(kNetLogBound)); - BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); - - ProxyResolver* resolver = executor()->resolver(); - int rv = resolver->GetProxyForURL( - url_, &results_buf_, NULL, NULL, bound_worker_log); - DCHECK_NE(rv, ERR_IO_PENDING); - - origin_loop->PostTask( - FROM_HERE, - NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); - } - - private: - // Runs the completion callback on the origin thread. - void QueryComplete(int result_code) { - // The Job may have been cancelled after it was started. - if (!was_cancelled()) { - // Merge the load log that was generated on the worker thread, into the - // main log. - CapturingBoundNetLog bound_worker_log(NetLog::Source(), - worker_log_.release()); - bound_worker_log.AppendTo(net_log_); - - if (result_code >= OK) { // Note: unit-tests use values > 0. - results_->Use(results_buf_); - } - RunUserCallback(result_code); - } - OnJobCompleted(); - } - - // Must only be used on the "origin" thread. - ProxyInfo* results_; - BoundNetLog net_log_; - const GURL url_; - - // Usable from within DoQuery on the worker thread. - ProxyInfo results_buf_; - - // Used to pass the captured events between DoQuery [worker thread] and - // QueryComplete [origin thread]. - scoped_ptr<CapturingNetLog> worker_log_; - - bool was_waiting_for_thread_; -}; - -// MultiThreadedProxyResolver::Executor ---------------------------------------- - -MultiThreadedProxyResolver::Executor::Executor( - MultiThreadedProxyResolver* coordinator, - ProxyResolver* resolver, - int thread_number) - : coordinator_(coordinator), - thread_number_(thread_number), - resolver_(resolver) { - DCHECK(coordinator); - DCHECK(resolver); - // Start up the thread. - // Note that it is safe to pass a temporary C-String to Thread(), as it will - // make a copy. - std::string thread_name = - StringPrintf("PAC thread #%d", thread_number); - thread_.reset(new base::Thread(thread_name.c_str())); - thread_->Start(); -} - -void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { - DCHECK(!outstanding_job_); - outstanding_job_ = job; - - // Run the job. Once it has completed (regardless of whether it was - // cancelled), it will invoke OnJobCompleted() on this thread. - job->set_executor(this); - job->FinishedWaitingForThread(); - thread_->message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(job, &Job::Run, MessageLoop::current())); -} - -void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { - DCHECK_EQ(job, outstanding_job_.get()); - outstanding_job_ = NULL; - coordinator_->OnExecutorReady(this); -} - -void MultiThreadedProxyResolver::Executor::Destroy() { - DCHECK(coordinator_); - - // Give the resolver an opportunity to shutdown from THIS THREAD before - // joining on the resolver thread. This allows certain implementations - // to avoid deadlocks. - resolver_->Shutdown(); - - // Join the worker thread. - thread_.reset(); - - // Cancel any outstanding job. - if (outstanding_job_) { - outstanding_job_->Cancel(); - // Orphan the job (since this executor may be deleted soon). - outstanding_job_->set_executor(NULL); - } - - // It is now safe to free the ProxyResolver, since all the tasks that - // were using it on the resolver thread have completed. - resolver_.reset(); - - // Null some stuff as a precaution. - coordinator_ = NULL; - outstanding_job_ = NULL; -} - -void MultiThreadedProxyResolver::Executor::PurgeMemory() { - scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); - thread_->message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); -} - -MultiThreadedProxyResolver::Executor::~Executor() { - // The important cleanup happens as part of Destroy(), which should always be - // called first. - DCHECK(!coordinator_) << "Destroy() was not called"; - DCHECK(!thread_.get()); - DCHECK(!resolver_.get()); - DCHECK(!outstanding_job_); -} - -// MultiThreadedProxyResolver -------------------------------------------------- - -MultiThreadedProxyResolver::MultiThreadedProxyResolver( - ProxyResolverFactory* resolver_factory, - size_t max_num_threads) - : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), - resolver_factory_(resolver_factory), - max_num_threads_(max_num_threads), - was_set_pac_script_called_(false) { - DCHECK_GE(max_num_threads, 1u); -} - -MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { - // We will cancel all outstanding requests. - pending_jobs_.clear(); - ReleaseAllExecutors(); -} - -int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, - ProxyInfo* results, - CompletionCallback* callback, - RequestHandle* request, - const BoundNetLog& net_log) { - DCHECK(CalledOnValidThread()); - DCHECK(callback); - DCHECK(was_set_pac_script_called_) - << "Resolver is un-initialized. Must call SetPacScript() first!"; - - scoped_refptr<GetProxyForURLJob> job = - new GetProxyForURLJob(url, results, callback, net_log); - - // Completion will be notified through |callback|, unless the caller cancels - // the request using |request|. - if (request) - *request = reinterpret_cast<RequestHandle>(job.get()); - - // If there is an executor that is ready to run this request, submit it! - Executor* executor = FindIdleExecutor(); - if (executor) { - DCHECK_EQ(0u, pending_jobs_.size()); - executor->StartJob(job); - return ERR_IO_PENDING; - } - - // Otherwise queue this request. (We will schedule it to a thread once one - // becomes available). - job->WaitingForThread(); - pending_jobs_.push_back(job); - - // If we haven't already reached the thread limit, provision a new thread to - // drain the requests more quickly. - if (executors_.size() < max_num_threads_) { - executor = AddNewExecutor(); - executor->StartJob( - new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL)); - } - - return ERR_IO_PENDING; -} - -void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { - DCHECK(CalledOnValidThread()); - DCHECK(req); - - Job* job = reinterpret_cast<Job*>(req); - DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); - - if (job->executor()) { - // If the job was already submitted to the executor, just mark it - // as cancelled so the user callback isn't run on completion. - job->Cancel(); - } else { - // Otherwise the job is just sitting in a queue. - PendingJobsQueue::iterator it = - std::find(pending_jobs_.begin(), pending_jobs_.end(), job); - DCHECK(it != pending_jobs_.end()); - pending_jobs_.erase(it); - } -} - -void MultiThreadedProxyResolver::CancelSetPacScript() { - DCHECK(CalledOnValidThread()); - DCHECK_EQ(0u, pending_jobs_.size()); - DCHECK_EQ(1u, executors_.size()); - DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, - executors_[0]->outstanding_job()->type()); - - // Defensively clear some data which shouldn't be getting used - // anymore. - was_set_pac_script_called_ = false; - current_pac_url_ = GURL(); - current_pac_script_ = string16(); - - ReleaseAllExecutors(); -} - -void MultiThreadedProxyResolver::PurgeMemory() { - DCHECK(CalledOnValidThread()); - for (ExecutorList::iterator it = executors_.begin(); - it != executors_.end(); ++it) { - Executor* executor = *it; - executor->PurgeMemory(); - } -} - -int MultiThreadedProxyResolver::SetPacScript( - const GURL& pac_url, - const string16& pac_script, - CompletionCallback* callback) { - DCHECK(CalledOnValidThread()); - DCHECK(callback); - - // Save the script details, so we can provision new executors later. - // (We rely on internal reference counting of strings to avoid this memory - // being duplicated by each of the resolver threads). - was_set_pac_script_called_ = true; - current_pac_url_ = pac_url; - current_pac_script_ = pac_script; - - // The user should not have any outstanding requests when they call - // SetPacScript(). - CheckNoOutstandingUserRequests(); - - // Destroy all of the current threads and their proxy resolvers. - ReleaseAllExecutors(); - - // Provision a new executor, and run the SetPacScript request. On completion - // notification will be sent through |callback|. - Executor* executor = AddNewExecutor(); - executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback)); - return ERR_IO_PENDING; -} - -void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { - DCHECK(CalledOnValidThread()); - CHECK_EQ(0u, pending_jobs_.size()); - - for (ExecutorList::const_iterator it = executors_.begin(); - it != executors_.end(); ++it) { - const Executor* executor = *it; - Job* job = executor->outstanding_job(); - // The "has_user_callback()" is to exclude jobs for which the callback - // has already been invoked, or was not user-initiated (as in the case of - // lazy thread provisions). User-initiated jobs may !has_user_callback() - // when the callback has already been run. (Since we only clear the - // outstanding job AFTER the callback has been invoked, it is possible - // for a new request to be started from within the callback). - CHECK(!job || job->was_cancelled() || !job->has_user_callback()); - } -} - -void MultiThreadedProxyResolver::ReleaseAllExecutors() { - DCHECK(CalledOnValidThread()); - for (ExecutorList::iterator it = executors_.begin(); - it != executors_.end(); ++it) { - Executor* executor = *it; - executor->Destroy(); - } - executors_.clear(); -} - -MultiThreadedProxyResolver::Executor* -MultiThreadedProxyResolver::FindIdleExecutor() { - DCHECK(CalledOnValidThread()); - for (ExecutorList::iterator it = executors_.begin(); - it != executors_.end(); ++it) { - Executor* executor = *it; - if (!executor->outstanding_job()) - return executor; - } - return NULL; -} - -MultiThreadedProxyResolver::Executor* -MultiThreadedProxyResolver::AddNewExecutor() { - DCHECK(CalledOnValidThread()); - DCHECK_LT(executors_.size(), max_num_threads_); - // The "thread number" is used to give the thread a unique name. - int thread_number = executors_.size(); - ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); - Executor* executor = new Executor( - this, resolver, thread_number); - executors_.push_back(executor); - return executor; -} - -void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { - DCHECK(CalledOnValidThread()); - if (pending_jobs_.empty()) - return; - - // Get the next job to process (FIFO). Transfer it from the pending queue - // to the executor. - scoped_refptr<Job> job = pending_jobs_.front(); - pending_jobs_.pop_front(); - executor->StartJob(job); -} - -} // namespace net |