// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/websockets/websocket_throttle.h" #include #include "base/hash_tables.h" #include "base/memory/ref_counted.h" #include "base/memory/singleton.h" #include "base/message_loop.h" #include "base/string_number_conversions.h" #include "base/string_util.h" #include "base/stringprintf.h" #include "net/base/io_buffer.h" #include "net/base/sys_addrinfo.h" #include "net/socket_stream/socket_stream.h" #include "net/websockets/websocket_job.h" namespace net { static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) { switch (addrinfo->ai_family) { case AF_INET: { const struct sockaddr_in* const addr = reinterpret_cast(addrinfo->ai_addr); return base::StringPrintf("%d:%s", addrinfo->ai_family, base::HexEncode(&addr->sin_addr, 4).c_str()); } case AF_INET6: { const struct sockaddr_in6* const addr6 = reinterpret_cast(addrinfo->ai_addr); return base::StringPrintf( "%d:%s", addrinfo->ai_family, base::HexEncode(&addr6->sin6_addr, sizeof(addr6->sin6_addr)).c_str()); } default: return base::StringPrintf("%d:%s", addrinfo->ai_family, base::HexEncode(addrinfo->ai_addr, addrinfo->ai_addrlen).c_str()); } } WebSocketThrottle::WebSocketThrottle() { } WebSocketThrottle::~WebSocketThrottle() { DCHECK(queue_.empty()); DCHECK(addr_map_.empty()); } // static WebSocketThrottle* WebSocketThrottle::GetInstance() { return Singleton::get(); } void WebSocketThrottle::PutInQueue(WebSocketJob* job) { queue_.push_back(job); const AddressList& address_list = job->address_list(); base::hash_set address_set; for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { std::string addrkey = AddrinfoToHashkey(addrinfo); // If |addrkey| is already processed, don't do it again. if (address_set.find(addrkey) != address_set.end()) continue; address_set.insert(addrkey); ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); if (iter == addr_map_.end()) { ConnectingQueue* queue = new ConnectingQueue(); queue->push_back(job); addr_map_[addrkey] = queue; } else { iter->second->push_back(job); job->SetWaiting(); DVLOG(1) << "Waiting on " << addrkey; } } } void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { bool in_queue = false; for (ConnectingQueue::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { if (*iter == job) { queue_.erase(iter); in_queue = true; break; } } if (!in_queue) return; const AddressList& address_list = job->address_list(); base::hash_set address_set; for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { std::string addrkey = AddrinfoToHashkey(addrinfo); // If |addrkey| is already processed, don't do it again. if (address_set.find(addrkey) != address_set.end()) continue; address_set.insert(addrkey); ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; // Job may not be front of queue when job is closed early while waiting. for (ConnectingQueue::iterator iter = queue->begin(); iter != queue->end(); ++iter) { if (*iter == job) { queue->erase(iter); break; } } if (queue->empty()) { delete queue; addr_map_.erase(iter); } } } void WebSocketThrottle::WakeupSocketIfNecessary() { for (ConnectingQueue::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { WebSocketJob* job = *iter; if (!job->IsWaiting()) continue; bool should_wakeup = true; const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { std::string addrkey = AddrinfoToHashkey(addrinfo); ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; if (job != queue->front()) { should_wakeup = false; break; } } if (should_wakeup) job->Wakeup(); } } } // namespace net