summaryrefslogtreecommitdiffstats
path: root/net/proxy/multi_threaded_proxy_resolver.cc
diff options
context:
space:
mode:
Diffstat (limited to 'net/proxy/multi_threaded_proxy_resolver.cc')
-rw-r--r--net/proxy/multi_threaded_proxy_resolver.cc365
1 files changed, 235 insertions, 130 deletions
diff --git a/net/proxy/multi_threaded_proxy_resolver.cc b/net/proxy/multi_threaded_proxy_resolver.cc
index b8fea44..a579e79 100644
--- a/net/proxy/multi_threaded_proxy_resolver.cc
+++ b/net/proxy/multi_threaded_proxy_resolver.cc
@@ -4,38 +4,45 @@
#include "net/proxy/multi_threaded_proxy_resolver.h"
+#include <deque>
+#include <vector>
+
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/message_loop/message_loop_proxy.h"
+#include "base/stl_util.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
+#include "base/threading/non_thread_safe.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "net/base/net_errors.h"
#include "net/log/net_log.h"
#include "net/proxy/proxy_info.h"
-#include "net/proxy/proxy_resolver_factory.h"
-
-// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
-// data when SetPacScript fails. That will reclaim memory when
-// testing bogus scripts.
+#include "net/proxy/proxy_resolver.h"
namespace net {
+namespace {
+class Job;
// An "executor" is a job-runner for PAC requests. It encapsulates a worker
// thread and a synchronous ProxyResolver (which will be operated on said
// thread.)
-class MultiThreadedProxyResolver::Executor
- : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
+class Executor : public base::RefCountedThreadSafe<Executor> {
public:
+ class Coordinator {
+ public:
+ virtual void OnExecutorReady(Executor* executor) = 0;
+
+ protected:
+ virtual ~Coordinator() = default;
+ };
+
// |coordinator| must remain valid throughout our lifetime. It is used to
// signal when the executor is ready to receive work by calling
// |coordinator->OnExecutorReady()|.
- // The constructor takes ownership of |resolver|.
// |thread_number| is an identifier used when naming the worker thread.
- Executor(MultiThreadedProxyResolver* coordinator,
- scoped_ptr<ProxyResolver> resolver,
- int thread_number);
+ Executor(Coordinator* coordinator, int thread_number);
// Submit a job to this executor.
void StartJob(Job* job);
@@ -54,14 +61,24 @@ class MultiThreadedProxyResolver::Executor
int thread_number() const { return thread_number_; }
+ void set_resolver(scoped_ptr<ProxyResolver> resolver) {
+ resolver_ = resolver.Pass();
+ }
+
+ void set_coordinator(Coordinator* coordinator) {
+ DCHECK(coordinator);
+ DCHECK(coordinator_);
+ coordinator_ = coordinator;
+ }
+
private:
friend class base::RefCountedThreadSafe<Executor>;
~Executor();
- MultiThreadedProxyResolver* coordinator_;
+ Coordinator* coordinator_;
const int thread_number_;
- // The currently active job for this executor (either a SetPacScript or
+ // The currently active job for this executor (either a CreateProxyResolver or
// GetProxyForURL task).
scoped_refptr<Job> outstanding_job_;
@@ -75,16 +92,68 @@ class MultiThreadedProxyResolver::Executor
scoped_ptr<base::Thread> thread_;
};
-// MultiThreadedProxyResolver::Job ---------------------------------------------
+class MultiThreadedProxyResolver : public ProxyResolver,
+ public Executor::Coordinator,
+ public base::NonThreadSafe {
+ public:
+ // Creates an asynchronous ProxyResolver that runs requests on up to
+ // |max_num_threads|.
+ //
+ // For each thread that is created, an accompanying synchronous ProxyResolver
+ // will be provisioned using |resolver_factory|. All methods on these
+ // ProxyResolvers will be called on the one thread.
+ MultiThreadedProxyResolver(
+ scoped_ptr<ProxyResolverFactory> resolver_factory,
+ size_t max_num_threads,
+ const scoped_refptr<ProxyResolverScriptData>& script_data,
+ scoped_refptr<Executor> executor);
+
+ ~MultiThreadedProxyResolver() override;
+
+ // ProxyResolver implementation:
+ int GetProxyForURL(const GURL& url,
+ ProxyInfo* results,
+ const CompletionCallback& callback,
+ RequestHandle* request,
+ const BoundNetLog& net_log) override;
+ void CancelRequest(RequestHandle request) override;
+ LoadState GetLoadState(RequestHandle request) const override;
+ void CancelSetPacScript() override;
+ int SetPacScript(const scoped_refptr<ProxyResolverScriptData>& script_data,
+ const CompletionCallback& callback) override;
-class MultiThreadedProxyResolver::Job
- : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
+ private:
+ class GetProxyForURLJob;
+ // FIFO queue of pending jobs waiting to be started.
+ // TODO(eroman): Make this priority queue.
+ typedef std::deque<scoped_refptr<Job>> PendingJobsQueue;
+ typedef std::vector<scoped_refptr<Executor>> ExecutorList;
+
+ // Returns an idle worker thread which is ready to receive GetProxyForURL()
+ // requests. If all threads are occupied, returns NULL.
+ Executor* FindIdleExecutor();
+
+ // Creates a new worker thread, and appends it to |executors_|.
+ void AddNewExecutor();
+
+ // Starts the next job from |pending_jobs_| if possible.
+ void OnExecutorReady(Executor* executor) override;
+
+ const scoped_ptr<ProxyResolverFactory> resolver_factory_;
+ const size_t max_num_threads_;
+ PendingJobsQueue pending_jobs_;
+ ExecutorList executors_;
+ scoped_refptr<ProxyResolverScriptData> script_data_;
+};
+
+// Job ---------------------------------------------
+
+class Job : public base::RefCountedThreadSafe<Job> {
public:
// Identifies the subclass of Job (only being used for debugging purposes).
enum Type {
TYPE_GET_PROXY_FOR_URL,
- TYPE_SET_PAC_SCRIPT,
- TYPE_SET_PAC_SCRIPT_INTERNAL,
+ TYPE_CREATE_RESOLVER,
};
Job(Type type, const CompletionCallback& callback)
@@ -117,7 +186,7 @@ class MultiThreadedProxyResolver::Job
// Returns true if this job still has a user callback. Some jobs
// do not have a user callback, because they were helper jobs
- // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
+ // scheduled internally (for example TYPE_CREATE_RESOLVER).
//
// Otherwise jobs that correspond with user-initiated work will
// have a non-null callback up until the callback is run.
@@ -150,7 +219,7 @@ class MultiThreadedProxyResolver::Job
callback.Run(result);
}
- friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
+ friend class base::RefCountedThreadSafe<Job>;
virtual ~Job() {}
@@ -161,51 +230,50 @@ class MultiThreadedProxyResolver::Job
bool was_cancelled_;
};
-// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
+// CreateResolverJob -----------------------------------------------------------
-// Runs on the worker thread to call ProxyResolver::SetPacScript.
-class MultiThreadedProxyResolver::SetPacScriptJob
- : public MultiThreadedProxyResolver::Job {
+// Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
+class CreateResolverJob : public Job {
public:
- SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
- const CompletionCallback& callback)
- : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
- TYPE_SET_PAC_SCRIPT_INTERNAL,
- callback),
- script_data_(script_data) {
- }
+ CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
+ ProxyResolverFactory* factory)
+ : Job(TYPE_CREATE_RESOLVER, CompletionCallback()),
+ script_data_(script_data),
+ factory_(factory) {}
// Runs on the worker thread.
void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override {
- ProxyResolver* resolver = executor()->resolver();
- int rv = resolver->SetPacScript(script_data_, CompletionCallback());
+ scoped_ptr<ProxyResolverFactory::Request> request;
+ int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
+ CompletionCallback(), &request);
DCHECK_NE(rv, ERR_IO_PENDING);
origin_loop->PostTask(
- FROM_HERE,
- base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
+ FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv));
}
protected:
- ~SetPacScriptJob() override {}
+ ~CreateResolverJob() override {}
private:
// Runs the completion callback on the origin thread.
void RequestComplete(int result_code) {
// The task may have been cancelled after it was started.
- if (!was_cancelled() && has_user_callback()) {
- RunUserCallback(result_code);
+ if (!was_cancelled()) {
+ DCHECK(executor());
+ executor()->set_resolver(resolver_.Pass());
}
OnJobCompleted();
}
const scoped_refptr<ProxyResolverScriptData> script_data_;
+ ProxyResolverFactory* factory_;
+ scoped_ptr<ProxyResolver> resolver_;
};
// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
-class MultiThreadedProxyResolver::GetProxyForURLJob
- : public MultiThreadedProxyResolver::Job {
+class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
public:
// |url| -- the URL of the query.
// |results| -- the structure to fill with proxy resolve results.
@@ -243,6 +311,7 @@ class MultiThreadedProxyResolver::GetProxyForURLJob
// Runs on the worker thread.
void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override {
ProxyResolver* resolver = executor()->resolver();
+ DCHECK(resolver);
int rv = resolver->GetProxyForURL(
url_, &results_buf_, CompletionCallback(), NULL, net_log_);
DCHECK_NE(rv, ERR_IO_PENDING);
@@ -281,24 +350,18 @@ class MultiThreadedProxyResolver::GetProxyForURLJob
bool was_waiting_for_thread_;
};
-// MultiThreadedProxyResolver::Executor ----------------------------------------
+// Executor ----------------------------------------
-MultiThreadedProxyResolver::Executor::Executor(
- MultiThreadedProxyResolver* coordinator,
- scoped_ptr<ProxyResolver> resolver,
- int thread_number)
- : coordinator_(coordinator),
- thread_number_(thread_number),
- resolver_(resolver.Pass()) {
+Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
+ : coordinator_(coordinator), thread_number_(thread_number) {
DCHECK(coordinator);
- DCHECK(resolver_);
// Start up the thread.
thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
thread_number)));
CHECK(thread_->Start());
}
-void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
+void Executor::StartJob(Job* job) {
DCHECK(!outstanding_job_.get());
outstanding_job_ = job;
@@ -311,13 +374,13 @@ void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
}
-void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
+void Executor::OnJobCompleted(Job* job) {
DCHECK_EQ(job, outstanding_job_.get());
outstanding_job_ = NULL;
coordinator_->OnExecutorReady(this);
}
-void MultiThreadedProxyResolver::Executor::Destroy() {
+void Executor::Destroy() {
DCHECK(coordinator_);
{
@@ -344,7 +407,7 @@ void MultiThreadedProxyResolver::Executor::Destroy() {
outstanding_job_ = NULL;
}
-MultiThreadedProxyResolver::Executor::~Executor() {
+Executor::~Executor() {
// The important cleanup happens as part of Destroy(), which should always be
// called first.
DCHECK(!coordinator_) << "Destroy() was not called";
@@ -356,18 +419,27 @@ MultiThreadedProxyResolver::Executor::~Executor() {
// MultiThreadedProxyResolver --------------------------------------------------
MultiThreadedProxyResolver::MultiThreadedProxyResolver(
- LegacyProxyResolverFactory* resolver_factory,
- size_t max_num_threads)
+ scoped_ptr<ProxyResolverFactory> resolver_factory,
+ size_t max_num_threads,
+ const scoped_refptr<ProxyResolverScriptData>& script_data,
+ scoped_refptr<Executor> executor)
: ProxyResolver(resolver_factory->expects_pac_bytes()),
- resolver_factory_(resolver_factory),
- max_num_threads_(max_num_threads) {
- DCHECK_GE(max_num_threads, 1u);
+ resolver_factory_(resolver_factory.Pass()),
+ max_num_threads_(max_num_threads),
+ script_data_(script_data) {
+ DCHECK(script_data_);
+ executor->set_coordinator(this);
+ executors_.push_back(executor);
}
MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
+ DCHECK(CalledOnValidThread());
// We will cancel all outstanding requests.
pending_jobs_.clear();
- ReleaseAllExecutors();
+
+ for (auto& executor : executors_) {
+ executor->Destroy();
+ }
}
int MultiThreadedProxyResolver::GetProxyForURL(
@@ -375,8 +447,6 @@ int MultiThreadedProxyResolver::GetProxyForURL(
RequestHandle* request, const BoundNetLog& net_log) {
DCHECK(CalledOnValidThread());
DCHECK(!callback.is_null());
- DCHECK(current_script_data_.get())
- << "Resolver is un-initialized. Must call SetPacScript() first!";
scoped_refptr<GetProxyForURLJob> job(
new GetProxyForURLJob(url, results, callback, net_log));
@@ -401,11 +471,8 @@ int MultiThreadedProxyResolver::GetProxyForURL(
// If we haven't already reached the thread limit, provision a new thread to
// drain the requests more quickly.
- if (executors_.size() < max_num_threads_) {
- executor = AddNewExecutor();
- executor->StartJob(
- new SetPacScriptJob(current_script_data_, CompletionCallback()));
- }
+ if (executors_.size() < max_num_threads_)
+ AddNewExecutor();
return ERR_IO_PENDING;
}
@@ -437,72 +504,17 @@ LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
}
void MultiThreadedProxyResolver::CancelSetPacScript() {
- DCHECK(CalledOnValidThread());
- DCHECK_EQ(0u, pending_jobs_.size());
- DCHECK_EQ(1u, executors_.size());
- DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
- executors_[0]->outstanding_job()->type());
-
- // Defensively clear some data which shouldn't be getting used
- // anymore.
- current_script_data_ = NULL;
-
- ReleaseAllExecutors();
+ NOTREACHED();
}
int MultiThreadedProxyResolver::SetPacScript(
const scoped_refptr<ProxyResolverScriptData>& script_data,
const CompletionCallback&callback) {
- DCHECK(CalledOnValidThread());
- DCHECK(!callback.is_null());
-
- // Save the script details, so we can provision new executors later.
- current_script_data_ = script_data;
-
- // The user should not have any outstanding requests when they call
- // SetPacScript().
- CheckNoOutstandingUserRequests();
-
- // Destroy all of the current threads and their proxy resolvers.
- ReleaseAllExecutors();
-
- // Provision a new executor, and run the SetPacScript request. On completion
- // notification will be sent through |callback|.
- Executor* executor = AddNewExecutor();
- executor->StartJob(new SetPacScriptJob(script_data, callback));
- return ERR_IO_PENDING;
+ NOTREACHED();
+ return ERR_NOT_IMPLEMENTED;
}
-void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
- DCHECK(CalledOnValidThread());
- CHECK_EQ(0u, pending_jobs_.size());
-
- for (ExecutorList::const_iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- const Executor* executor = it->get();
- Job* job = executor->outstanding_job();
- // The "has_user_callback()" is to exclude jobs for which the callback
- // has already been invoked, or was not user-initiated (as in the case of
- // lazy thread provisions). User-initiated jobs may !has_user_callback()
- // when the callback has already been run. (Since we only clear the
- // outstanding job AFTER the callback has been invoked, it is possible
- // for a new request to be started from within the callback).
- CHECK(!job || job->was_cancelled() || !job->has_user_callback());
- }
-}
-
-void MultiThreadedProxyResolver::ReleaseAllExecutors() {
- DCHECK(CalledOnValidThread());
- for (ExecutorList::iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- Executor* executor = it->get();
- executor->Destroy();
- }
- executors_.clear();
-}
-
-MultiThreadedProxyResolver::Executor*
-MultiThreadedProxyResolver::FindIdleExecutor() {
+Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
DCHECK(CalledOnValidThread());
for (ExecutorList::iterator it = executors_.begin();
it != executors_.end(); ++it) {
@@ -513,16 +525,15 @@ MultiThreadedProxyResolver::FindIdleExecutor() {
return NULL;
}
-MultiThreadedProxyResolver::Executor*
-MultiThreadedProxyResolver::AddNewExecutor() {
+void MultiThreadedProxyResolver::AddNewExecutor() {
DCHECK(CalledOnValidThread());
DCHECK_LT(executors_.size(), max_num_threads_);
// The "thread number" is used to give the thread a unique name.
int thread_number = executors_.size();
- scoped_ptr<ProxyResolver> resolver = resolver_factory_->CreateProxyResolver();
- Executor* executor = new Executor(this, resolver.Pass(), thread_number);
+ Executor* executor = new Executor(this, thread_number);
+ executor->StartJob(
+ new CreateResolverJob(script_data_, resolver_factory_.get()));
executors_.push_back(make_scoped_refptr(executor));
- return executor;
}
void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
@@ -537,4 +548,98 @@ void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
executor->StartJob(job.get());
}
+} // namespace
+
+class MultiThreadedProxyResolverFactory::Job
+ : public ProxyResolverFactory::Request,
+ public Executor::Coordinator {
+ public:
+ Job(MultiThreadedProxyResolverFactory* factory,
+ const scoped_refptr<ProxyResolverScriptData>& script_data,
+ scoped_ptr<ProxyResolver>* resolver,
+ scoped_ptr<ProxyResolverFactory> resolver_factory,
+ size_t max_num_threads,
+ const CompletionCallback& callback)
+ : factory_(factory),
+ resolver_out_(resolver),
+ resolver_factory_(resolver_factory.Pass()),
+ max_num_threads_(max_num_threads),
+ script_data_(script_data),
+ executor_(new Executor(this, 0)),
+ callback_(callback) {
+ executor_->StartJob(
+ new CreateResolverJob(script_data_, resolver_factory_.get()));
+ }
+
+ ~Job() override {
+ if (factory_) {
+ executor_->Destroy();
+ factory_->RemoveJob(this);
+ }
+ }
+
+ void FactoryDestroyed() {
+ executor_->Destroy();
+ executor_ = nullptr;
+ factory_ = nullptr;
+ }
+
+ private:
+ void OnExecutorReady(Executor* executor) override {
+ int error = OK;
+ if (executor->resolver()) {
+ resolver_out_->reset(new MultiThreadedProxyResolver(
+ resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(),
+ executor_));
+ } else {
+ error = ERR_PAC_SCRIPT_FAILED;
+ executor_->Destroy();
+ }
+ factory_->RemoveJob(this);
+ factory_ = nullptr;
+ callback_.Run(error);
+ }
+
+ MultiThreadedProxyResolverFactory* factory_;
+ scoped_ptr<ProxyResolver>* const resolver_out_;
+ scoped_ptr<ProxyResolverFactory> resolver_factory_;
+ const size_t max_num_threads_;
+ scoped_refptr<ProxyResolverScriptData> script_data_;
+ scoped_refptr<Executor> executor_;
+ const CompletionCallback callback_;
+};
+
+MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
+ size_t max_num_threads,
+ bool factory_expects_bytes)
+ : ProxyResolverFactory(factory_expects_bytes),
+ max_num_threads_(max_num_threads) {
+ DCHECK_GE(max_num_threads, 1u);
+}
+
+MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
+ for (auto job : jobs_) {
+ job->FactoryDestroyed();
+ }
+}
+
+int MultiThreadedProxyResolverFactory::CreateProxyResolver(
+ const scoped_refptr<ProxyResolverScriptData>& pac_script,
+ scoped_ptr<ProxyResolver>* resolver,
+ const CompletionCallback& callback,
+ scoped_ptr<Request>* request) {
+ scoped_ptr<Job> job(new Job(this, pac_script, resolver,
+ CreateProxyResolverFactory(), max_num_threads_,
+ callback));
+ jobs_.insert(job.get());
+ *request = job.Pass();
+ return ERR_IO_PENDING;
+}
+
+void MultiThreadedProxyResolverFactory::RemoveJob(
+ MultiThreadedProxyResolverFactory::Job* job) {
+ size_t erased = jobs_.erase(job);
+ DCHECK_EQ(1u, erased);
+}
+
} // namespace net