diff options
author | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-02-02 20:03:02 +0000 |
---|---|---|
committer | mbelshe@chromium.org <mbelshe@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-02-02 20:03:02 +0000 |
commit | 878984e6a994d308543f3ce93b2d5a511c81ba00 (patch) | |
tree | 755e2c50ee764ad05e5600f97bad1bd7ef972f78 /net | |
parent | b3c57fe514e2b3cf10278004abe1d4b189cdb7f9 (diff) | |
download | chromium_src-878984e6a994d308543f3ce93b2d5a511c81ba00.zip chromium_src-878984e6a994d308543f3ce93b2d5a511c81ba00.tar.gz chromium_src-878984e6a994d308543f3ce93b2d5a511c81ba00.tar.bz2 |
Split flip_in_mem_edsm_server into a gazillion pieces.
For the most part, this is a straight refactor. I'm sure I broke something.
BUG=monolithic code
TEST=none
Review URL: http://codereview.chromium.org/6392011
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@73491 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
25 files changed, 3612 insertions, 3216 deletions
diff --git a/net/net.gyp b/net/net.gyp index 5ffda1b..c242f27 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -1349,6 +1349,8 @@ 'tools/dump_cache/url_utilities.h', 'tools/dump_cache/url_utilities.cc', + 'tools/flip_server/acceptor_thread.h', + 'tools/flip_server/acceptor_thread.cc', 'tools/flip_server/balsa_enums.h', 'tools/flip_server/balsa_frame.cc', 'tools/flip_server/balsa_frame.h', @@ -1358,21 +1360,39 @@ 'tools/flip_server/balsa_headers_token_utils.h', 'tools/flip_server/balsa_visitor_interface.h', 'tools/flip_server/buffer_interface.h', + 'tools/flip_server/constants.h', 'tools/flip_server/create_listener.cc', 'tools/flip_server/create_listener.h', 'tools/flip_server/epoll_server.cc', 'tools/flip_server/epoll_server.h', 'tools/flip_server/flip_in_mem_edsm_server.cc', + 'tools/flip_server/http_interface.cc', + 'tools/flip_server/http_interface.h', 'tools/flip_server/http_message_constants.cc', 'tools/flip_server/http_message_constants.h', 'tools/flip_server/loadtime_measurement.h', + 'tools/flip_server/mem_cache.h', + 'tools/flip_server/mem_cache.cc', 'tools/flip_server/porting.txt', + 'tools/flip_server/output_ordering.cc', + 'tools/flip_server/output_ordering.h', 'tools/flip_server/ring_buffer.cc', 'tools/flip_server/ring_buffer.h', 'tools/flip_server/simple_buffer.cc', 'tools/flip_server/simple_buffer.h', + 'tools/flip_server/sm_connection.cc', + 'tools/flip_server/sm_connection.h', + 'tools/flip_server/sm_interface.h', 'tools/flip_server/split.h', 'tools/flip_server/split.cc', + 'tools/flip_server/spdy_ssl.cc', + 'tools/flip_server/spdy_ssl.h', + 'tools/flip_server/spdy_interface.cc', + 'tools/flip_server/spdy_interface.h', + 'tools/flip_server/spdy_util.cc', + 'tools/flip_server/spdy_util.h', + 'tools/flip_server/streamer_interface.cc', + 'tools/flip_server/streamer_interface.h', 'tools/flip_server/string_piece_utils.h', 'tools/flip_server/thread.h', 'tools/flip_server/url_to_filename_encoder.h', diff --git a/net/tools/flip_server/acceptor_thread.cc b/net/tools/flip_server/acceptor_thread.cc new file mode 100644 index 0000000..89aaa75 --- /dev/null +++ b/net/tools/flip_server/acceptor_thread.cc @@ -0,0 +1,210 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/acceptor_thread.h" + +#include <netinet/in.h> +#include <netinet/tcp.h> // For TCP_NODELAY +#include <sys/socket.h> +#include <sys/types.h> + +#include <string> + +#include "net/tools/flip_server/constants.h" +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/spdy_ssl.h" +#include "openssl/err.h" +#include "openssl/ssl.h" + +namespace net { + +SMAcceptorThread::SMAcceptorThread(FlipAcceptor *acceptor, + MemoryCache* memory_cache) + : SimpleThread("SMAcceptorThread"), + acceptor_(acceptor), + ssl_state_(NULL), + use_ssl_(false), + idle_socket_timeout_s_(acceptor->idle_socket_timeout_s_), + quitting_(false), + memory_cache_(memory_cache) { + if (!acceptor->ssl_cert_filename_.empty() && + !acceptor->ssl_key_filename_.empty()) { + ssl_state_ = new SSLState; + bool use_npn = true; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + use_npn = false; + } + InitSSL(ssl_state_, + acceptor_->ssl_cert_filename_, + acceptor_->ssl_key_filename_, + use_npn, + acceptor_->ssl_session_expiry_, + acceptor_->ssl_disable_compression_); + use_ssl_ = true; + } +} + +SMAcceptorThread::~SMAcceptorThread() { + for (std::vector<SMConnection*>::iterator i = + allocated_server_connections_.begin(); + i != allocated_server_connections_.end(); + ++i) { + delete *i; + } + delete ssl_state_; +} + +SMConnection* SMAcceptorThread::NewConnection() { + SMConnection* server = + SMConnection::NewSMConnection(&epoll_server_, ssl_state_, + memory_cache_, acceptor_, + "client_conn: "); + allocated_server_connections_.push_back(server); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server."; + return server; +} + +SMConnection* SMAcceptorThread::FindOrMakeNewSMConnection() { + if (unused_server_connections_.empty()) { + return NewConnection(); + } + SMConnection* server = unused_server_connections_.back(); + unused_server_connections_.pop_back(); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server."; + return server; +} + +void SMAcceptorThread::InitWorker() { + epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); +} + +void SMAcceptorThread::HandleConnection(int server_fd, + struct sockaddr_in *remote_addr) { + int on = 1; + int rc; + if (acceptor_->disable_nagle_) { + rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast<char*>(&on), sizeof(on)); + if (rc < 0) { + close(server_fd); + LOG(ERROR) << "setsockopt() failed fd=" + server_fd; + return; + } + } + + SMConnection* server_connection = FindOrMakeNewSMConnection(); + if (server_connection == NULL) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd; + close(server_fd); + return; + } + std::string remote_ip = inet_ntoa(remote_addr->sin_addr); + server_connection->InitSMConnection(this, + NULL, + &epoll_server_, + server_fd, + "", "", remote_ip, + use_ssl_); + if (server_connection->initialized()) + active_server_connections_.push_back(server_connection); +} + +void SMAcceptorThread::AcceptFromListenFD() { + if (acceptor_->accepts_per_wake_ > 0) { + for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) { + struct sockaddr address; + socklen_t socklen = sizeof(address); + int fd = accept(acceptor_->listen_fd_, &address, &socklen); + if (fd == -1) { + if (errno != 11) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" + << acceptor_->listen_fd_ << "): " << errno << ": " + << strerror(errno); + } + break; + } + VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; + HandleConnection(fd, (struct sockaddr_in *)&address); + } + } else { + while (true) { + struct sockaddr address; + socklen_t socklen = sizeof(address); + int fd = accept(acceptor_->listen_fd_, &address, &socklen); + if (fd == -1) { + if (errno != 11) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" + << acceptor_->listen_fd_ << "): " << errno << ": " + << strerror(errno); + } + break; + } + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; + HandleConnection(fd, (struct sockaddr_in *)&address); + } + } +} + +void SMAcceptorThread::HandleConnectionIdleTimeout() { + static time_t oldest_time = time(NULL); + + int cur_time = time(NULL); + // Only iterate the list if we speculate that a connection is ready to be + // expired + if ((cur_time - oldest_time) < idle_socket_timeout_s_) + return; + + // TODO(mbelshe): This code could be optimized, active_server_connections_ + // is already in-order. + std::list<SMConnection*>::iterator iter = active_server_connections_.begin(); + while (iter != active_server_connections_.end()) { + SMConnection *conn = *iter; + int elapsed_time = (cur_time - conn->last_read_time_); + if (elapsed_time > idle_socket_timeout_s_) { + conn->Cleanup("Connection idle timeout reached."); + iter = active_server_connections_.erase(iter); + continue; + } + if (conn->last_read_time_ < oldest_time) + oldest_time = conn->last_read_time_; + iter++; + } + if ((cur_time - oldest_time) >= idle_socket_timeout_s_) + oldest_time = cur_time; +} + +void SMAcceptorThread::Run() { + while (!quitting_.HasBeenNotified()) { + epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms + epoll_server_.WaitForEventsAndExecuteCallbacks(); + if (tmp_unused_server_connections_.size()) { + VLOG(2) << "have " << tmp_unused_server_connections_.size() + << " additional unused connections. Total = " + << unused_server_connections_.size(); + unused_server_connections_.insert(unused_server_connections_.end(), + tmp_unused_server_connections_.begin(), + tmp_unused_server_connections_.end()); + tmp_unused_server_connections_.clear(); + } + HandleConnectionIdleTimeout(); + } +} + +void SMAcceptorThread::OnEvent(int fd, EpollEvent* event) { + if (event->in_events | EPOLLIN) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "Acceptor: Accepting based upon epoll events"; + AcceptFromListenFD(); + } +} + +void SMAcceptorThread::SMConnectionDone(SMConnection* sc) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection."; + tmp_unused_server_connections_.push_back(sc); +} + +} // namespace net + + diff --git a/net/tools/flip_server/acceptor_thread.h b/net/tools/flip_server/acceptor_thread.h new file mode 100644 index 0000000..df161d7 --- /dev/null +++ b/net/tools/flip_server/acceptor_thread.h @@ -0,0 +1,95 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_ACCEPTOR_THREAD_H_ +#define NET_TOOLS_FLIP_SERVER_ACCEPTOR_THREAD_H_ + +#include <list> +#include <string> +#include <vector> + +#include "base/threading/simple_thread.h" +#include "net/tools/flip_server/epoll_server.h" +#include "net/tools/flip_server/sm_interface.h" +#include "openssl/ssl.h" + +struct sockaddr_in; + +namespace net { + +class FlipAcceptor; +class MemoryCache; +class SMConnection; +class SSLState; + +// TODO(mbelshe): Get rid of this class; we don't need a lock just to set +// a bool cross threads - especially one which only is set once... +class Notification { + public: + explicit Notification(bool value) : value_(value) {} + + void Notify() { + base::AutoLock al(lock_); + value_ = true; + } + bool HasBeenNotified() { + base::AutoLock al(lock_); + return value_; + } + bool value_; + base::Lock lock_; +}; + +class SMAcceptorThread : public base::SimpleThread, + public EpollCallbackInterface, + public SMConnectionPoolInterface { + public: + SMAcceptorThread(FlipAcceptor *acceptor, MemoryCache* memory_cache); + ~SMAcceptorThread(); + + // EpollCallbackInteface interface + virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) {} + virtual void OnModification(int fd, int event_mask) {} + virtual void OnEvent(int fd, EpollEvent* event); + virtual void OnUnregistration(int fd, bool replaced) {} + virtual void OnShutdown(EpollServer* eps, int fd) {} + + // SMConnectionPool interface + virtual void SMConnectionDone(SMConnection* sc); + + // TODO(mbelshe): figure out if we can move these to private functions. + SMConnection* NewConnection(); + SMConnection* FindOrMakeNewSMConnection(); + void InitWorker(); + void HandleConnection(int server_fd, struct sockaddr_in *remote_addr); + void AcceptFromListenFD(); + + // Notify the Accept thread that it is time to terminate. + void Quit() { quitting_.Notify(); } + + // Iterates through a list of active connections expiring any that have been + // idle longer than the configured timeout. + void HandleConnectionIdleTimeout(); + + void Run(); + + private: + EpollServer epoll_server_; + FlipAcceptor* acceptor_; + SSLState* ssl_state_; + bool use_ssl_; + int idle_socket_timeout_s_; + + std::vector<SMConnection*> unused_server_connections_; + std::vector<SMConnection*> tmp_unused_server_connections_; + std::vector<SMConnection*> allocated_server_connections_; + std::list<SMConnection*> active_server_connections_; + Notification quitting_; + MemoryCache* memory_cache_; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_ACCEPTOR_THREAD_H_ + diff --git a/net/tools/flip_server/constants.h b/net/tools/flip_server/constants.h new file mode 100644 index 0000000..47ed535 --- /dev/null +++ b/net/tools/flip_server/constants.h @@ -0,0 +1,31 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_CONSTANTS_H_ +#define NET_TOOLS_FLIP_SERVER_CONSTANTS_H_ + +#include "net/spdy/spdy_protocol.h" + +const int kMSS = 1460; +const int kSSLOverhead = 25; +const int kSpdyOverhead = spdy::SpdyFrame::size(); +const int kInitialDataSendersThreshold = (2 * kMSS) - kSpdyOverhead; +const int kSSLSegmentSize = (1 * kMSS) - kSSLOverhead; +const int kSpdySegmentSize = kSSLSegmentSize - kSpdyOverhead; + +#define ACCEPTOR_CLIENT_IDENT \ + acceptor_->listen_ip_ << ":" \ + << acceptor_->listen_port_ << " " + +#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" + +#define SSL_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH" + +#define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff), (((IP)>>8)&0xff), \ + (((IP)>>16)&0xff), (((IP)>>24)&0xff) + +#define PIDFILE "/var/run/flip-server.pid" + +#endif // NET_TOOLS_FLIP_SERVER_CONSTANTS_H_ + diff --git a/net/tools/flip_server/create_listener.cc b/net/tools/flip_server/create_listener.cc index 59a03a6..4676912 100644 --- a/net/tools/flip_server/create_listener.cc +++ b/net/tools/flip_server/create_listener.cc @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "net/tools/flip_server/create_listener.h" + #include <arpa/inet.h> // for inet_ntop #include <errno.h> // for strerror #include <netdb.h> // for getaddrinfo and getnameinfo @@ -14,8 +16,6 @@ #include <unistd.h> // for exit() #include <ostream> -#include "net/tools/flip_server/create_listener.h" - #include "base/logging.h" namespace net { diff --git a/net/tools/flip_server/flip_config.h b/net/tools/flip_server/flip_config.h index e214259..fb59e69 100644 --- a/net/tools/flip_server/flip_config.h +++ b/net/tools/flip_server/flip_config.h @@ -8,14 +8,13 @@ #include <arpa/inet.h> // in_addr_t +#include <string> +#include <vector> + #include "base/logging.h" #include "net/tools/flip_server/create_listener.h" -#include <vector> -#include <string> - -using std::string; -using std::vector; +namespace net { enum FlipHandlerType { FLIP_HANDLER_PROXY, @@ -24,167 +23,166 @@ enum FlipHandlerType { }; class FlipAcceptor { -public: - enum FlipHandlerType flip_handler_type_; - string listen_ip_; - string listen_port_; - string ssl_cert_filename_; - string ssl_key_filename_; - string http_server_ip_; - string http_server_port_; - string https_server_ip_; - string https_server_port_; - int spdy_only_; - int accept_backlog_size_; - bool disable_nagle_; - int accepts_per_wake_; - int listen_fd_; - void* memory_cache_; - - FlipAcceptor(enum FlipHandlerType flip_handler_type, - string listen_ip, - string listen_port, - string ssl_cert_filename, - string ssl_key_filename, - string http_server_ip, - string http_server_port, - string https_server_ip, - string https_server_port, - int spdy_only, - int accept_backlog_size, - bool disable_nagle, - int accepts_per_wake, - bool reuseport, - bool wait_for_iface, - void *memory_cache) : - flip_handler_type_(flip_handler_type), - listen_ip_(listen_ip), - listen_port_(listen_port), - ssl_cert_filename_(ssl_cert_filename), - ssl_key_filename_(ssl_key_filename), - http_server_ip_(http_server_ip), - http_server_port_(http_server_port), - https_server_ip_(https_server_ip), - https_server_port_(https_server_port), - spdy_only_(spdy_only), - accept_backlog_size_(accept_backlog_size), - disable_nagle_(disable_nagle), - accepts_per_wake_(accepts_per_wake), - memory_cache_(memory_cache) - { - VLOG(1) << "Attempting to listen on " << listen_ip_.c_str() << ":" - << listen_port_.c_str(); - if (!https_server_ip_.size()) - https_server_ip_ = http_server_ip_; - if (!https_server_port_.size()) - https_server_port_ = http_server_port_; - - while (1) { - int ret = net::CreateListeningSocket(listen_ip_, - listen_port_, - true, - accept_backlog_size_, - true, - reuseport, - wait_for_iface, - disable_nagle_, - &listen_fd_); - if ( ret == 0 ) { - break; - } else if ( ret == -3 && wait_for_iface ) { - // Binding error EADDRNOTAVAIL was encounted. We need - // to wait for the interfaces to raised. try again. - usleep(200000); - } else { - LOG(ERROR) << "Unable to create listening socket for: ret = " << ret - << ": " << listen_ip_.c_str() << ":" - << listen_port_.c_str(); - return; - } - } - net::SetNonBlocking(listen_fd_); - VLOG(1) << "Listening on socket: "; - if (flip_handler_type == FLIP_HANDLER_PROXY) - VLOG(1) << "\tType : Proxy"; - else if (FLIP_HANDLER_SPDY_SERVER) - VLOG(1) << "\tType : SPDY Server"; - else if (FLIP_HANDLER_HTTP_SERVER) - VLOG(1) << "\tType : HTTP Server"; - VLOG(1) << "\tIP : " << listen_ip_; - VLOG(1) << "\tPort : " << listen_port_; - VLOG(1) << "\tHTTP Server : " << http_server_ip_ << ":" - << http_server_port_; - VLOG(1) << "\tHTTPS Server : " << https_server_ip_ << ":" - << https_server_port_; - VLOG(1) << "\tSSL : " - << (ssl_cert_filename.size()?"true":"false"); - VLOG(1) << "\tCertificate : " << ssl_cert_filename; - VLOG(1) << "\tKey : " << ssl_key_filename; - VLOG(1) << "\tSpdy Only : " << (spdy_only?"true":"flase"); - } - ~FlipAcceptor () {} + public: + enum FlipHandlerType flip_handler_type_; + std::string listen_ip_; + std::string listen_port_; + std::string ssl_cert_filename_; + std::string ssl_key_filename_; + std::string http_server_ip_; + std::string http_server_port_; + std::string https_server_ip_; + std::string https_server_port_; + int spdy_only_; + int accept_backlog_size_; + bool disable_nagle_; + int accepts_per_wake_; + int listen_fd_; + void* memory_cache_; + int ssl_session_expiry_; + bool ssl_disable_compression_; + int idle_socket_timeout_s_; + + FlipAcceptor(enum FlipHandlerType flip_handler_type, + std::string listen_ip, + std::string listen_port, + std::string ssl_cert_filename, + std::string ssl_key_filename, + std::string http_server_ip, + std::string http_server_port, + std::string https_server_ip, + std::string https_server_port, + int spdy_only, + int accept_backlog_size, + bool disable_nagle, + int accepts_per_wake, + bool reuseport, + bool wait_for_iface, + void *memory_cache) + : flip_handler_type_(flip_handler_type), + listen_ip_(listen_ip), + listen_port_(listen_port), + ssl_cert_filename_(ssl_cert_filename), + ssl_key_filename_(ssl_key_filename), + http_server_ip_(http_server_ip), + http_server_port_(http_server_port), + https_server_ip_(https_server_ip), + https_server_port_(https_server_port), + spdy_only_(spdy_only), + accept_backlog_size_(accept_backlog_size), + disable_nagle_(disable_nagle), + accepts_per_wake_(accepts_per_wake), + memory_cache_(memory_cache), + ssl_session_expiry_(300), // TODO(mbelshe): Hook these up! + ssl_disable_compression_(false), + idle_socket_timeout_s_(300) { + VLOG(1) << "Attempting to listen on " << listen_ip_.c_str() << ":" + << listen_port_.c_str(); + if (!https_server_ip_.size()) + https_server_ip_ = http_server_ip_; + if (!https_server_port_.size()) + https_server_port_ = http_server_port_; + + while (1) { + int ret = CreateListeningSocket(listen_ip_, + listen_port_, + true, + accept_backlog_size_, + true, + reuseport, + wait_for_iface, + disable_nagle_, + &listen_fd_); + if ( ret == 0 ) { + break; + } else if ( ret == -3 && wait_for_iface ) { + // Binding error EADDRNOTAVAIL was encounted. We need + // to wait for the interfaces to raised. try again. + usleep(200000); + } else { + LOG(ERROR) << "Unable to create listening socket for: ret = " << ret + << ": " << listen_ip_.c_str() << ":" + << listen_port_.c_str(); + return; + } + } + + SetNonBlocking(listen_fd_); + VLOG(1) << "Listening on socket: "; + if (flip_handler_type == FLIP_HANDLER_PROXY) + VLOG(1) << "\tType : Proxy"; + else if (FLIP_HANDLER_SPDY_SERVER) + VLOG(1) << "\tType : SPDY Server"; + else if (FLIP_HANDLER_HTTP_SERVER) + VLOG(1) << "\tType : HTTP Server"; + VLOG(1) << "\tIP : " << listen_ip_; + VLOG(1) << "\tPort : " << listen_port_; + VLOG(1) << "\tHTTP Server : " << http_server_ip_ << ":" + << http_server_port_; + VLOG(1) << "\tHTTPS Server : " << https_server_ip_ << ":" + << https_server_port_; + VLOG(1) << "\tSSL : " + << (ssl_cert_filename.size()?"true":"false"); + VLOG(1) << "\tCertificate : " << ssl_cert_filename; + VLOG(1) << "\tKey : " << ssl_key_filename; + VLOG(1) << "\tSpdy Only : " << (spdy_only?"true":"flase"); + } }; class FlipConfig { -public: - std::vector <FlipAcceptor*> acceptors_; - double server_think_time_in_s_; - enum logging::LoggingDestination log_destination_; - string log_filename_; - bool forward_ip_header_enabled_; - string forward_ip_header_; - bool wait_for_iface_; - int ssl_session_expiry_; - bool ssl_disable_compression_; - int idle_timeout_s_; - - FlipConfig() : - server_think_time_in_s_(0), - log_destination_(logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG), - forward_ip_header_enabled_(false), - wait_for_iface_(false), - ssl_session_expiry_(300), - ssl_disable_compression_(false), - idle_timeout_s_(300) - {} - - ~FlipConfig() {} - - void AddAcceptor(enum FlipHandlerType flip_handler_type, - string listen_ip, - string listen_port, - string ssl_cert_filename, - string ssl_key_filename, - string http_server_ip, - string http_server_port, - string https_server_ip, - string https_server_port, - int spdy_only, - int accept_backlog_size, - bool disable_nagle, - int accepts_per_wake, - bool reuseport, - bool wait_for_iface, - void *memory_cache) { - // TODO(mbelshe): create a struct FlipConfigArgs{} for the arguments. - acceptors_.push_back(new FlipAcceptor(flip_handler_type, - listen_ip, - listen_port, - ssl_cert_filename, - ssl_key_filename, - http_server_ip, - http_server_port, - https_server_ip, - https_server_port, - spdy_only, - accept_backlog_size, - disable_nagle, - accepts_per_wake, - reuseport, - wait_for_iface, - memory_cache)); - } + public: + std::vector <FlipAcceptor*> acceptors_; + double server_think_time_in_s_; + enum logging::LoggingDestination log_destination_; + std::string log_filename_; + bool wait_for_iface_; + int ssl_session_expiry_; + bool ssl_disable_compression_; + int idle_socket_timeout_s_; + FlipConfig() + : server_think_time_in_s_(0), + log_destination_(logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG), + wait_for_iface_(false) {} + ~FlipConfig() {} + + void AddAcceptor(enum FlipHandlerType flip_handler_type, + std::string listen_ip, + std::string listen_port, + std::string ssl_cert_filename, + std::string ssl_key_filename, + std::string http_server_ip, + std::string http_server_port, + std::string https_server_ip, + std::string https_server_port, + int spdy_only, + int accept_backlog_size, + bool disable_nagle, + int accepts_per_wake, + bool reuseport, + bool wait_for_iface, + void *memory_cache) { + // TODO(mbelshe): create a struct FlipConfigArgs{} for the arguments. + acceptors_.push_back(new FlipAcceptor(flip_handler_type, + listen_ip, + listen_port, + ssl_cert_filename, + ssl_key_filename, + http_server_ip, + http_server_port, + https_server_ip, + https_server_port, + spdy_only, + accept_backlog_size, + disable_nagle, + accepts_per_wake, + reuseport, + wait_for_iface, + memory_cache)); + } }; -#endif +} // namespace + +#endif // NET_TOOLS_FLIP_PROXY_CONFIG_H + diff --git a/net/tools/flip_server/flip_in_mem_edsm_server.cc b/net/tools/flip_server/flip_in_mem_edsm_server.cc index 5f55341..ef27afa 100644 --- a/net/tools/flip_server/flip_in_mem_edsm_server.cc +++ b/net/tools/flip_server/flip_in_mem_edsm_server.cc @@ -2,70 +2,30 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include <dirent.h> -#include <netinet/tcp.h> // For TCP_NODELAY -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/file.h> -#include <sys/stat.h> -#include <unistd.h> -#include <openssl/err.h> -#include <openssl/ssl.h> +#include <errno.h> #include <signal.h> +#include <sys/file.h> -#include <deque> #include <iostream> -#include <limits> +#include <string> #include <vector> -#include <list> #include "base/command_line.h" #include "base/logging.h" #include "base/synchronization/lock.h" -#include "base/threading/simple_thread.h" #include "base/timer.h" -#include "net/spdy/spdy_frame_builder.h" -#include "net/spdy/spdy_framer.h" -#include "net/spdy/spdy_protocol.h" -#include "net/tools/dump_cache/url_to_filename_encoder.h" -#include "net/tools/dump_cache/url_utilities.h" -#include "net/tools/flip_server/balsa_enums.h" -#include "net/tools/flip_server/balsa_frame.h" -#include "net/tools/flip_server/balsa_headers.h" -#include "net/tools/flip_server/balsa_visitor_interface.h" -#include "net/tools/flip_server/buffer_interface.h" -#include "net/tools/flip_server/epoll_server.h" -#include "net/tools/flip_server/ring_buffer.h" -#include "net/tools/flip_server/simple_buffer.h" -#include "net/tools/flip_server/split.h" +#include "net/tools/flip_server/acceptor_thread.h" +#include "net/tools/flip_server/constants.h" #include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/output_ordering.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/sm_interface.h" +#include "net/tools/flip_server/spdy_interface.h" +#include "net/tools/flip_server/split.h" -//////////////////////////////////////////////////////////////////////////////// - -using std::deque; -using std::list; -using std::map; -using std::ostream; -using std::pair; -using std::string; -using std::vector; using std::cout; using std::cerr; -//////////////////////////////////////////////////////////////////////////////// - -#define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \ - (((IP)>>16)&0xff),(((IP)>>24)&0xff) - -#define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \ - << acceptor_->listen_port_ << " " - -#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" - -#define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH" - -#define PIDFILE "/var/run/flip-server.pid" - // If true, then disables the nagle algorithm); bool FLAGS_disable_nagle = true; @@ -78,12 +38,6 @@ int32 FLAGS_accepts_per_wake = 0; // The size of the TCP accept backlog); int32 FLAGS_accept_backlog_size = 1024; -// The directory where cache locates); -string FLAGS_cache_base_dir = "."; - -// If true, then encode url to filename); -bool FLAGS_need_to_encode_url = false; - // If set to false a single socket will be used. If set to true // then a new socket will be created for each accept thread. // Note that this only works with kernels that support @@ -97,2972 +51,7 @@ bool FLAGS_force_spdy = false; // reply); double FLAGS_server_think_time_in_s = 0; -// Does the server send X-Subresource headers); -bool FLAGS_use_xsub = false; - -// Does the server send X-Associated-Content headers); -bool FLAGS_use_xac = false; - -// Does the server compress data frames); -bool FLAGS_use_compression = false; - -//////////////////////////////////////////////////////////////////////////////// - -using base::StringPiece; -using base::SimpleThread; -using net::BalsaFrame; -using net::BalsaFrameEnums; -using net::BalsaHeaders; -using net::BalsaHeadersEnums; -using net::BalsaVisitorInterface; -using net::EpollAlarmCallbackInterface; -using net::EpollCallbackInterface; -using net::EpollEvent; -using net::EpollServer; -using net::RingBuffer; -using net::SimpleBuffer; -using net::SplitStringPieceToVector; -using net::UrlUtilities; -using spdy::CONTROL_FLAG_NONE; -using spdy::DATA_FLAG_COMPRESSED; -using spdy::DATA_FLAG_FIN; -using spdy::RST_STREAM; -using spdy::SYN_REPLY; -using spdy::SYN_STREAM; -using spdy::SpdyControlFrame; -using spdy::SpdySettingsControlFrame; -using spdy::SpdyDataFlags; -using spdy::SpdyDataFrame; -using spdy::SpdyRstStreamControlFrame; -using spdy::SpdyFrame; -using spdy::SpdyFrameBuilder; -using spdy::SpdyFramer; -using spdy::SpdyFramerVisitorInterface; -using spdy::SpdyHeaderBlock; -using spdy::SpdyStreamId; -using spdy::SpdySynReplyControlFrame; -using spdy::SpdySynStreamControlFrame; - -FlipConfig g_proxy_config; - -//////////////////////////////////////////////////////////////////////////////// - -void PrintSslError() { - char buf[128]; // this buffer must be at least 120 chars long. - int error_num = ERR_get_error(); - while (error_num != 0) { - ERR_error_string_n(error_num, buf, sizeof(buf)); - //LOG(ERROR) << buf; - error_num = ERR_get_error(); - } -} - -static int ssl_set_npn_callback(SSL *s, - const unsigned char **data, - unsigned int *len, - void *arg) -{ - VLOG(1) << "SSL NPN callback: advertising protocols."; - *data = (const unsigned char *) NEXT_PROTO_STRING; - *len = strlen(NEXT_PROTO_STRING); - return SSL_TLSEXT_ERR_OK; -} -//////////////////////////////////////////////////////////////////////////////// - -// Creates a socket with domain, type and protocol parameters. -// Assigns the return value of socket() to *fd. -// Returns errno if an error occurs, else returns zero. -int CreateSocket(int domain, int type, int protocol, int *fd) { - CHECK(fd != NULL); - *fd = ::socket(domain, type, protocol); - return (*fd == -1) ? errno : 0; -} - -// Encode the URL. -string EncodeURL(string uri, string host, string method) { - if (!FLAGS_need_to_encode_url) { - // TODO(mbelshe): if uri is fully qualified, need to strip protocol/host. - return string(method + "_" + uri); - } - - string filename; - if (uri[0] == '/') { - // uri is not fully qualified. - filename = net::UrlToFilenameEncoder::Encode( - "http://" + host + uri, method + "_/", false); - } else { - filename = net::UrlToFilenameEncoder::Encode(uri, method + "_/", false); - } - return filename; -} - -//////////////////////////////////////////////////////////////////////////////// - -struct SSLState { - SSL_METHOD* ssl_method; - SSL_CTX* ssl_ctx; -}; - -// SSL stuff -void spdy_init_ssl(SSLState* state, - string ssl_cert_name, - string ssl_key_name, - bool use_npn) { - SSL_library_init(); - PrintSslError(); - - SSL_load_error_strings(); - PrintSslError(); - - state->ssl_method = SSLv23_method(); - state->ssl_ctx = SSL_CTX_new(state->ssl_method); - if (!state->ssl_ctx) { - PrintSslError(); - LOG(FATAL) << "Unable to create SSL context"; - } - // Disable SSLv2 support. - SSL_CTX_set_options(state->ssl_ctx, - SSL_OP_NO_SSLv2 | SSL_OP_CIPHER_SERVER_PREFERENCE); - if (SSL_CTX_use_certificate_file(state->ssl_ctx, - ssl_cert_name.c_str(), - SSL_FILETYPE_PEM) <= 0) { - PrintSslError(); - LOG(FATAL) << "Unable to use cert.pem as SSL cert."; - } - if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx, - ssl_key_name.c_str(), - SSL_FILETYPE_PEM) <= 0) { - PrintSslError(); - LOG(FATAL) << "Unable to use key.pem as SSL key."; - } - if (!SSL_CTX_check_private_key(state->ssl_ctx)) { - PrintSslError(); - LOG(FATAL) << "The cert.pem and key.pem files don't match"; - } - if (use_npn) { - SSL_CTX_set_next_protos_advertised_cb(state->ssl_ctx, - ssl_set_npn_callback, NULL); - } - VLOG(1) << "SSL CTX default cipher list: " << SSL_CTX_DEFAULT_CIPHER_LIST; - SSL_CTX_set_cipher_list(state->ssl_ctx, SSL_CTX_DEFAULT_CIPHER_LIST); - - VLOG(1) << "SSL CTX session expiry: " << g_proxy_config.ssl_session_expiry_ - << " seconds"; - SSL_CTX_set_timeout(state->ssl_ctx, g_proxy_config.ssl_session_expiry_); - -#ifdef SSL_MODE_RELEASE_BUFFERS - VLOG(1) << "SSL CTX: Setting Release Buffers mode."; - SSL_CTX_set_mode(state->ssl_ctx, SSL_MODE_RELEASE_BUFFERS); -#endif - - // Proper methods to disable compression don't exist until 0.9.9+. For now - // we must manipulate the stack of compression methods directly. - if (g_proxy_config.ssl_disable_compression_) { - STACK_OF(SSL_COMP) *ssl_comp_methods = SSL_COMP_get_compression_methods(); - int num_methods = sk_SSL_COMP_num(ssl_comp_methods); - int i; - for (i = 0; i < num_methods; i++) { - static_cast<void>(sk_SSL_COMP_delete(ssl_comp_methods, i)); - } - } -} - -SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) { - SSL* ssl = SSL_new(ssl_ctx); - PrintSslError(); - - SSL_set_accept_state(ssl); - PrintSslError(); - - return ssl; -} - -//////////////////////////////////////////////////////////////////////////////// - -const int kMSS = 1460; -const int kSSLOverhead = 25; -const int kSpdyOverhead = SpdyFrame::size(); -const int kInitialDataSendersThreshold = (2 * kMSS) - kSpdyOverhead; -const int kSSLSegmentSize = (1 * kMSS) - kSSLOverhead; -const int kSpdySegmentSize = kSSLSegmentSize - kSpdyOverhead; - -//////////////////////////////////////////////////////////////////////////////// - -class DataFrame { - public: - const char* data; - size_t size; - bool delete_when_done; - size_t index; - DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {} - virtual ~DataFrame() { - if (delete_when_done) - delete[] data; - } -}; - -class SpdyFrameDataFrame : public DataFrame { - public: - SpdyFrameDataFrame(SpdyFrame* spdy_frame) - : frame(spdy_frame) { - data = spdy_frame->data(); - size = spdy_frame->length() + SpdyFrame::size(); - } - - virtual ~SpdyFrameDataFrame() { - delete frame; - } - - const SpdyFrame* frame; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface { - public: - BalsaHeaders headers; - string body; - bool error_; - - virtual void ProcessBodyInput(const char *input, size_t size) {} - virtual void ProcessBodyData(const char *input, size_t size) { - body.append(input, size); - } - virtual void ProcessHeaderInput(const char *input, size_t size) {} - virtual void ProcessTrailerInput(const char *input, size_t size) {} - virtual void ProcessHeaders(const BalsaHeaders& headers) { - // nothing to do here-- we're assuming that the BalsaFrame has - // been handed our headers. - } - virtual void ProcessRequestFirstLine(const char* line_input, - size_t line_length, - const char* method_input, - size_t method_length, - const char* request_uri_input, - size_t request_uri_length, - const char* version_input, - size_t version_length) {} - virtual void ProcessResponseFirstLine(const char *line_input, - size_t line_length, - const char *version_input, - size_t version_length, - const char *status_input, - size_t status_length, - const char *reason_input, - size_t reason_length) {} - virtual void ProcessChunkLength(size_t chunk_length) {} - virtual void ProcessChunkExtensions(const char *input, size_t size) {} - virtual void HeaderDone() {} - virtual void MessageDone() {} - virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); } - virtual void HandleHeaderWarning(BalsaFrame* framer) { HandleError(); } - virtual void HandleChunkingError(BalsaFrame* framer) { HandleError(); } - virtual void HandleBodyError(BalsaFrame* framer) { HandleError(); } - - void HandleError() { error_ = true; } -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct FileData { - void CopyFrom(const FileData& file_data) { - headers = new BalsaHeaders; - headers->CopyFrom(*(file_data.headers)); - filename = file_data.filename; - related_files = file_data.related_files; - body = file_data.body; - } - FileData(BalsaHeaders* h, const string& b) : headers(h), body(b) {} - FileData() {} - BalsaHeaders* headers; - string filename; - vector< pair<int, string> > related_files; // priority, filename - string body; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class MemCacheIter { - public: - MemCacheIter() : - file_data(NULL), - priority(0), - transformed_header(false), - body_bytes_consumed(0), - stream_id(0), - max_segment_size(kInitialDataSendersThreshold), - bytes_sent(0) {} - explicit MemCacheIter(FileData* fd) : - file_data(fd), - priority(0), - transformed_header(false), - body_bytes_consumed(0), - stream_id(0), - max_segment_size(kInitialDataSendersThreshold), - bytes_sent(0) {} - FileData* file_data; - int priority; - bool transformed_header; - size_t body_bytes_consumed; - uint32 stream_id; - uint32 max_segment_size; - size_t bytes_sent; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class MemoryCache { - public: - typedef map<string, FileData> Files; - - public: - Files files_; - string cwd_; - - void CloneFrom(const MemoryCache& mc) { - for (Files::const_iterator i = mc.files_.begin(); - i != mc.files_.end(); - ++i) { - Files::iterator out_i = - files_.insert(make_pair(i->first, FileData())).first; - out_i->second.CopyFrom(i->second); - cwd_ = mc.cwd_; - } - } - - void AddFiles() { - deque<string> paths; - cwd_ = FLAGS_cache_base_dir; - paths.push_back(cwd_ + "/GET_"); - DIR* current_dir = NULL; - while (!paths.empty()) { - while (current_dir == NULL && !paths.empty()) { - string current_dir_name = paths.front(); - VLOG(1) << "Attempting to open dir: \"" << current_dir_name << "\""; - current_dir = opendir(current_dir_name.c_str()); - paths.pop_front(); - - if (current_dir == NULL) { - perror("Unable to open directory. "); - current_dir_name.clear(); - continue; - } - - if (current_dir) { - VLOG(1) << "Succeeded opening"; - for (struct dirent* dir_data = readdir(current_dir); - dir_data != NULL; - dir_data = readdir(current_dir)) { - string current_entry_name = - current_dir_name + "/" + dir_data->d_name; - if (dir_data->d_type == DT_REG) { - VLOG(1) << "Found file: " << current_entry_name; - ReadAndStoreFileContents(current_entry_name.c_str()); - } else if (dir_data->d_type == DT_DIR) { - VLOG(1) << "Found subdir: " << current_entry_name; - if (string(dir_data->d_name) != "." && - string(dir_data->d_name) != "..") { - VLOG(1) << "Adding to search path: " << current_entry_name; - paths.push_front(current_entry_name); - } - } - } - VLOG(1) << "Oops, no data left. Closing dir."; - closedir(current_dir); - current_dir = NULL; - } - } - } - } - - void ReadToString(const char* filename, string* output) { - output->clear(); - int fd = open(filename, 0, "r"); - if (fd == -1) - return; - char buffer[4096]; - ssize_t read_status = read(fd, buffer, sizeof(buffer)); - while (read_status > 0) { - output->append(buffer, static_cast<size_t>(read_status)); - do { - read_status = read(fd, buffer, sizeof(buffer)); - } while (read_status <= 0 && errno == EINTR); - } - close(fd); - } - - void ReadAndStoreFileContents(const char* filename) { - StoreBodyAndHeadersVisitor visitor; - BalsaFrame framer; - framer.set_balsa_visitor(&visitor); - framer.set_balsa_headers(&(visitor.headers)); - string filename_contents; - ReadToString(filename, &filename_contents); - - // Ugly hack to make everything look like 1.1. - if (filename_contents.find("HTTP/1.0") == 0) - filename_contents[7] = '1'; - - size_t pos = 0; - size_t old_pos = 0; - while (true) { - old_pos = pos; - pos += framer.ProcessInput(filename_contents.data() + pos, - filename_contents.size() - pos); - if (framer.Error() || pos == old_pos) { - LOG(ERROR) << "Unable to make forward progress, or error" - " framing file: " << filename; - if (framer.Error()) { - LOG(INFO) << "********************************************ERROR!"; - return; - } - return; - } - if (framer.MessageFullyRead()) { - // If no Content-Length or Transfer-Encoding was captured in the - // file, then the rest of the data is the body. Many of the captures - // from within Chrome don't have content-lengths. - if (!visitor.body.length()) - visitor.body = filename_contents.substr(pos); - break; - } - } - visitor.headers.RemoveAllOfHeader("content-length"); - visitor.headers.RemoveAllOfHeader("transfer-encoding"); - visitor.headers.RemoveAllOfHeader("connection"); - visitor.headers.AppendHeader("transfer-encoding", "chunked"); - visitor.headers.AppendHeader("connection", "keep-alive"); - - // Experiment with changing headers for forcing use of cached - // versions of content. - // TODO(mbelshe) REMOVE ME -#if 0 - // TODO(mbelshe) append current date. - visitor.headers.RemoveAllOfHeader("date"); - if (visitor.headers.HasHeader("expires")) { - visitor.headers.RemoveAllOfHeader("expires"); - visitor.headers.AppendHeader("expires", - "Fri, 30 Aug, 2019 12:00:00 GMT"); - } -#endif - BalsaHeaders* headers = new BalsaHeaders; - headers->CopyFrom(visitor.headers); - string filename_stripped = string(filename).substr(cwd_.size() + 1); -// LOG(INFO) << "Adding file (" << visitor.body.length() << " bytes): " -// << filename_stripped; - files_[filename_stripped] = FileData(); - FileData& fd = files_[filename_stripped]; - fd = FileData(headers, visitor.body); - fd.filename = string(filename_stripped, - filename_stripped.find_first_of('/')); - if (headers->HasHeader("X-Associated-Content")) { - string content = headers->GetHeader("X-Associated-Content").as_string(); - vector<StringPiece> urls_and_priorities; - SplitStringPieceToVector(content, "||", &urls_and_priorities, true); - VLOG(1) << "Examining X-Associated-Content header"; - for (unsigned int i = 0; i < urls_and_priorities.size(); ++i) { - const StringPiece& url_and_priority_pair = urls_and_priorities[i]; - vector<StringPiece> url_and_priority; - SplitStringPieceToVector(url_and_priority_pair, "??", - &url_and_priority, true); - if (url_and_priority.size() >= 2) { - string priority_string(url_and_priority[0].data(), - url_and_priority[0].size()); - string filename_string(url_and_priority[1].data(), - url_and_priority[1].size()); - long priority; - char* last_eaten_char; - priority = strtol(priority_string.c_str(), &last_eaten_char, 0); - if (last_eaten_char == - priority_string.c_str() + priority_string.size()) { - pair<int, string> entry(priority, filename_string); - VLOG(1) << "Adding associated content: " << filename_string; - fd.related_files.push_back(entry); - } - } - } - } - } - - // Called at runtime to update learned headers - // |url| is a url which contains a referrer header. - // |referrer| is the referring URL - // Adds an X-Subresource or X-Associated-Content to |referer| for |url| - void UpdateHeaders(string referrer, string file_url) { - if (!FLAGS_use_xac && !FLAGS_use_xsub) - return; - - string referrer_host_path = - net::UrlToFilenameEncoder::Encode(referrer, "GET_/", false); - - FileData* fd1 = GetFileData(string("GET_") + file_url); - if (!fd1) { - LOG(ERROR) << "Updating headers for unknown url: " << file_url; - return; - } - string url = fd1->headers->GetHeader("X-Original-Url").as_string(); - string content_type = fd1->headers->GetHeader("Content-Type").as_string(); - if (content_type.length() == 0) { - LOG(ERROR) << "Skipping subresource with unknown content-type"; - return; - } - // Now, lets see if this is the same host or not - bool same_host = (UrlUtilities::GetUrlHost(referrer) == - UrlUtilities::GetUrlHost(url)); - - // This is a hacked algorithm for figuring out what priority - // to use with pushed content. - int priority = 4; - if (content_type.find("css") != string::npos) - priority = 1; - else if (content_type.find("cript") != string::npos) - priority = 1; - else if (content_type.find("html") != string::npos) - priority = 2; - - LOG(ERROR) << "Attempting update for " << referrer_host_path; - - FileData* fd2 = GetFileData(referrer_host_path); - if (fd2 != NULL) { - // If they are on the same host, we'll use X-Associated-Content - string header_name; - string new_value; - string delimiter; - bool related_files = false; - if (same_host && FLAGS_use_xac) { - header_name = "X-Associated-Content"; - char pri_ch = priority + '0'; - new_value = pri_ch + string("??") + url; - delimiter = "||"; - related_files = true; - } else { - if (!FLAGS_use_xsub) - return; - header_name = "X-Subresource"; - new_value = content_type + "!!" + url; - delimiter = "!!"; - } - - if (fd2->headers->HasNonEmptyHeader(header_name)) { - string existing_header = - fd2->headers->GetHeader(header_name).as_string(); - if (existing_header.find(url) != string::npos) - return; // header already recorded - - // Don't let these lists grow too long for low pri stuff. - // TODO(mbelshe) We need better algorithms for this. - if (existing_header.length() > 256 && priority > 2) - return; - - new_value = existing_header + delimiter + new_value; - } - - LOG(INFO) << "Recording " << header_name << " for " << new_value; - fd2->headers->ReplaceOrAppendHeader(header_name, new_value); - - // Add it to the related files so that it will actually get sent out. - if (related_files) { - pair<int, string> entry(4, file_url); - fd2->related_files.push_back(entry); - } - } else { - LOG(ERROR) << "Failed to update headers:"; - LOG(ERROR) << "FAIL url: " << url; - LOG(ERROR) << "FAIL ref: " << referrer_host_path; - } - } - - FileData* GetFileData(const string& filename) { - Files::iterator fi = files_.end(); - if (filename.compare(filename.length() - 5, 5, ".html", 5) == 0) { - string new_filename(filename.data(), filename.size() - 5); - new_filename += ".http"; - fi = files_.find(new_filename); - } - if (fi == files_.end()) - fi = files_.find(filename); - - if (fi == files_.end()) { - return NULL; - } - return &(fi->second); - } - - bool AssignFileData(const string& filename, MemCacheIter* mci) { - mci->file_data = GetFileData(filename); - if (mci->file_data == NULL) { - LOG(ERROR) << "Could not find file data for " << filename; - return false; - } - return true; - } -}; - -class NotifierInterface { - public: - virtual ~NotifierInterface() {} - virtual void Notify() = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class SMConnectionPoolInterface; - -class SMInterface { - public: - virtual void InitSMInterface(SMInterface* sm_other_interface, - int32 server_idx) = 0; - virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) = 0; - virtual size_t ProcessReadInput(const char* data, size_t len) = 0; - virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; - virtual void SetStreamID(uint32 stream_id) = 0; - virtual bool MessageFullyRead() const = 0; - virtual bool Error() const = 0; - virtual const char* ErrorAsString() const = 0; - virtual void Reset() = 0; - virtual void ResetForNewInterface(int32 server_idx) = 0; - // ResetForNewConnection is used for interfaces which control SMConnection - // objects. When called an interface may put its connection object into - // a reusable instance pool. Currently this is what the HttpSM interface - // does. - virtual void ResetForNewConnection() = 0; - virtual void Cleanup() = 0; - - virtual int PostAcceptHook() = 0; - - virtual void NewStream(uint32 stream_id, uint32 priority, - const string& filename) = 0; - virtual void SendEOF(uint32 stream_id) = 0; - virtual void SendErrorNotFound(uint32 stream_id) = 0; - virtual size_t SendSynStream(uint32 stream_id, - const BalsaHeaders& headers) = 0; - virtual size_t SendSynReply(uint32 stream_id, - const BalsaHeaders& headers) = 0; - virtual void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) = 0; - virtual void GetOutput() = 0; - - virtual ~SMInterface() {} -}; - -class SMConnectionInterface { - public: - virtual ~SMConnectionInterface() {} - virtual void ReadyToSend() = 0; - virtual EpollServer* epoll_server() = 0; -}; - -class HttpSM; -class SMConnection; - -typedef list<DataFrame*> OutputList; - -class SMConnectionPoolInterface { - public: - virtual ~SMConnectionPoolInterface() {} - // SMConnections will use this: - virtual void SMConnectionDone(SMConnection* connection) = 0; -}; - -SMInterface* NewStreamerSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor); - -SMInterface* NewSpdySM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor); - -SMInterface* NewHttpSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor); - -//////////////////////////////////////////////////////////////////////////////// - -class SMConnection: public SMConnectionInterface, - public EpollCallbackInterface, - public NotifierInterface { - private: - SMConnection(EpollServer* epoll_server, - SSLState* ssl_state, - MemoryCache* memory_cache, - FlipAcceptor* acceptor, - string log_prefix) - : fd_(-1), - events_(0), - registered_in_epoll_server_(false), - initialized_(false), - protocol_detected_(false), - connection_complete_(false), - connection_pool_(NULL), - epoll_server_(epoll_server), - ssl_state_(ssl_state), - memory_cache_(memory_cache), - acceptor_(acceptor), - read_buffer_(kSpdySegmentSize * 40), - sm_spdy_interface_(NULL), - sm_http_interface_(NULL), - sm_streamer_interface_(NULL), - sm_interface_(NULL), - log_prefix_(log_prefix), - max_bytes_sent_per_dowrite_(4096), - ssl_(NULL), - last_read_time_(0) - {} - - int fd_; - int events_; - - bool registered_in_epoll_server_; - bool initialized_; - bool protocol_detected_; - bool connection_complete_; - - SMConnectionPoolInterface* connection_pool_; - - EpollServer *epoll_server_; - SSLState *ssl_state_; - MemoryCache* memory_cache_; - FlipAcceptor *acceptor_; - string client_ip_; - - RingBuffer read_buffer_; - - OutputList output_list_; - SMInterface* sm_spdy_interface_; - SMInterface* sm_http_interface_; - SMInterface* sm_streamer_interface_; - SMInterface* sm_interface_; - string log_prefix_; - - size_t max_bytes_sent_per_dowrite_; - - SSL* ssl_; - public: - time_t last_read_time_; - string server_ip_; - string server_port_; - - EpollServer* epoll_server() { return epoll_server_; } - OutputList* output_list() { return &output_list_; } - MemoryCache* memory_cache() { return memory_cache_; } - void ReadyToSend() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Setting ready to send: EPOLLIN | EPOLLOUT"; - epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); - } - void EnqueueDataFrame(DataFrame* df) { - output_list_.push_back(df); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " - << "size = " << df->size << ": Setting FD ready."; - ReadyToSend(); - } - int fd() { return fd_; } - - public: - ~SMConnection() { - if (initialized()) { - Reset(); - } - } - static SMConnection* NewSMConnection(EpollServer* epoll_server, - SSLState *ssl_state, - MemoryCache* memory_cache, - FlipAcceptor *acceptor, - string log_prefix) { - return new SMConnection(epoll_server, ssl_state, memory_cache, - acceptor, log_prefix); - } - - bool initialized() const { return initialized_; } - string client_ip() const { return client_ip_; } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) { - if (initialized_) { - LOG(FATAL) << "Attempted to initialize already initialized server"; - return; - } - - client_ip_ = remote_ip; - - if (fd == -1) { - // If fd == -1, then we are initializing a new connection that will - // connect to the backend. - // - // ret: -1 == error - // 0 == connection in progress - // 1 == connection complete - // TODO: is_numeric_host_address value needs to be detected - server_ip_ = server_ip; - server_port_ = server_port; - int ret = net::CreateConnectedSocket(&fd_, - server_ip, - server_port, - true, - acceptor_->disable_nagle_); - - if (ret < 0) { - LOG(ERROR) << "-1 Could not create connected socket"; - return; - } else if (ret == 1) { - DCHECK_NE(-1, fd_); - connection_complete_ = true; - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connection complete to: " << server_ip_ << ":" - << server_port_ << " "; - } - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connecting to server: " << server_ip_ << ":" - << server_port_ << " "; - } else { - // If fd != -1 then we are initializing a connection that has just been - // accepted from the listen socket. - connection_complete_ = true; - if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { - epoll_server_->UnregisterFD(fd_); - } - if (fd_ != -1) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Closing pre-existing fd"; - close(fd_); - fd_ = -1; - } - - fd_ = fd; - } - - registered_in_epoll_server_ = false; - // Set the last read time here as the idle checker will start from - // now. - last_read_time_ = time(NULL); - initialized_ = true; - - connection_pool_ = connection_pool; - epoll_server_ = epoll_server; - - if (sm_interface) { - sm_interface_ = sm_interface; - protocol_detected_ = true; - } - - read_buffer_.Clear(); - - epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET); - - if (use_ssl) { - ssl_ = spdy_new_ssl(ssl_state_->ssl_ctx); - SSL_set_fd(ssl_, fd_); - PrintSslError(); - } - } - - void CorkSocket() { - int state = 1; - int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); - if (rv < 0) - VLOG(1) << "setsockopt(CORK): " << errno; - } - - void UncorkSocket() { - int state = 0; - int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); - if (rv < 0) - VLOG(1) << "setsockopt(CORK): " << errno; - } - - int Send(const char* data, int len, int flags) { - int rv = 0; - CorkSocket(); - if (ssl_) { - ssize_t bytes_written = 0; - // Write smallish chunks to SSL so that we don't have large - // multi-packet TLS records to receive before being able to handle - // the data. We don't have to be too careful here, because our data - // frames are already getting chunked appropriately, and those are - // the most likely "big" frames. - while(len > 0) { - const int kMaxTLSRecordSize = 1500; - const char* ptr = &(data[bytes_written]); - int chunksize = std::min(len, kMaxTLSRecordSize); - rv = SSL_write(ssl_, ptr, chunksize); - VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv; - if (rv <= 0) { - switch(SSL_get_error(ssl_, rv)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - case SSL_ERROR_WANT_ACCEPT: - case SSL_ERROR_WANT_CONNECT: - rv = -2; - break; - default: - PrintSslError(); - break; - } - break; - } - bytes_written += rv; - len -= rv; - if (rv != chunksize) - break; // If we couldn't write everything, we're implicitly stalled - } - // If we wrote some data, return that count. Otherwise - // return the stall error. - if (bytes_written > 0) - rv = bytes_written; - } else { - rv = send(fd_, data, len, flags); - } - if (!(flags & MSG_MORE)) - UncorkSocket(); - return rv; - } - - // the following are from the EpollCallbackInterface - virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { - registered_in_epoll_server_ = true; - } - virtual void OnModification(int fd, int event_mask) { } - virtual void OnEvent(int fd, EpollEvent* event) { - events_ |= event->in_events; - HandleEvents(); - if (events_) { - event->out_ready_mask = events_; - events_ = 0; - } - } - virtual void OnUnregistration(int fd, bool replaced) { - registered_in_epoll_server_ = false; - } - virtual void OnShutdown(EpollServer* eps, int fd) { - Cleanup("OnShutdown"); - return; - } - - void Cleanup(const char* cleanup) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup; - if (!initialized_) { - return; - } - Reset(); - if (connection_pool_) - connection_pool_->SMConnectionDone(this); - if (sm_interface_) - sm_interface_->ResetForNewConnection(); - last_read_time_ = 0; - } - - private: - void HandleEvents() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " - << EpollServer::EventMaskToString(events_).c_str(); - - if (events_ & EPOLLIN) { - if (!DoRead()) - goto handle_close_or_error; - } - - if (events_ & EPOLLOUT) { - // Check if we have connected or not - if (connection_complete_ == false) { - int sock_error; - socklen_t sock_error_len = sizeof(sock_error); - int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, - &sock_error_len); - if (ret != 0) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "getsockopt error: " << errno << ": " << strerror(errno); - goto handle_close_or_error; - } - if (sock_error == 0) { - connection_complete_ = true; - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connection complete to " << server_ip_ << ":" - << server_port_ << " "; - } else if (sock_error == EINPROGRESS) { - return; - } else { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "error connecting to server"; - goto handle_close_or_error; - } - } - if (!DoWrite()) - goto handle_close_or_error; - } - - if (events_ & (EPOLLHUP | EPOLLERR)) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR"; - goto handle_close_or_error; - } - return; - - handle_close_or_error: - Cleanup("HandleEvents"); - } - - // Decide if SPDY was negotiated. - bool WasSpdyNegotiated() { - if (FLAGS_force_spdy) - return true; - - // If this is an SSL connection, check if NPN specifies SPDY. - if (ssl_) { - const unsigned char *npn_proto; - unsigned int npn_proto_len; - SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len); - if (npn_proto_len > 0) { - string npn_proto_str((const char *)npn_proto, npn_proto_len); - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "NPN protocol detected: " << npn_proto_str; - if (!strncmp(reinterpret_cast<const char*>(npn_proto), - "spdy/2", npn_proto_len)) - return true; - } - } - - return false; - } - - // Initialize the protocol interfaces we'll need for this connection. - // Returns true if successful, false otherwise. - bool SetupProtocolInterfaces() { - DCHECK(!protocol_detected_); - protocol_detected_ = true; - - bool spdy_negotiated = WasSpdyNegotiated(); - bool using_ssl = ssl_ != NULL; - - if (using_ssl) - VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated") - << " SSL Session."; - - if (acceptor_->spdy_only_ && !spdy_negotiated) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "SPDY proxy only, closing HTTPS connection."; - return false; - } - - switch (acceptor_->flip_handler_type_) { - case FLIP_HANDLER_HTTP_SERVER: - { - DCHECK(!spdy_negotiated); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_http_interface_ ? "Creating" : "Reusing") - << " HTTP interface."; - if (!sm_http_interface_) - sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_, - memory_cache_, acceptor_); - sm_interface_ = sm_http_interface_; - } - break; - case FLIP_HANDLER_PROXY: - { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_streamer_interface_ ? "Creating" : "Reusing") - << " PROXY Streamer interface."; - if (!sm_streamer_interface_) - sm_streamer_interface_ = NewStreamerSM(this, NULL, - epoll_server_, - acceptor_); - sm_interface_ = sm_streamer_interface_; - // If spdy is not negotiated, the streamer interface will proxy all - // data to the origin server. - if (!spdy_negotiated) - break; - } - // Otherwise fall through into the case below. - case FLIP_HANDLER_SPDY_SERVER: - { - DCHECK(spdy_negotiated); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_spdy_interface_ ? "Creating" : "Reusing") - << " SPDY interface."; - if (!sm_spdy_interface_) - sm_spdy_interface_ = NewSpdySM(this, NULL, epoll_server_, - memory_cache_, acceptor_); - sm_interface_ = sm_spdy_interface_; - } - break; - } - - CorkSocket(); - if (!sm_interface_->PostAcceptHook()) - return false; - - return true; - } - - bool DoRead() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()"; - while (!read_buffer_.Full()) { - char* bytes; - int size; - if (fd_ == -1) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead(): fd_ == -1. Invalid FD. Returning false"; - return false; - } - read_buffer_.GetWritablePtr(&bytes, &size); - ssize_t bytes_read = 0; - if (ssl_) { - bytes_read = SSL_read(ssl_, bytes, size); - if (bytes_read < 0) { - int err = SSL_get_error(ssl_, bytes_read); - switch(err) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - case SSL_ERROR_WANT_ACCEPT: - case SSL_ERROR_WANT_CONNECT: - events_ &= ~EPOLLIN; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead: SSL WANT_XXX: " << err; - goto done; - default: - PrintSslError(); - goto error_or_close; - } - } - } else { - bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT); - } - int stored_errno = errno; - if (bytes_read == -1) { - switch (stored_errno) { - case EAGAIN: - events_ &= ~EPOLLIN; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EAGAIN while reading"; - goto done; - case EINTR: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EINTR while reading"; - continue; - default: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "While calling recv, got error: " - << (ssl_?"(ssl error)":strerror(stored_errno)); - goto error_or_close; - } - } else if (bytes_read > 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read - << " bytes"; - last_read_time_ = time(NULL); - // If the protocol hasn't been detected yet, set up the handlers - // we'll need. - if (!protocol_detected_) { - if (!SetupProtocolInterfaces()) { - LOG(ERROR) << "Error setting up protocol interfaces."; - goto error_or_close; - } - } - read_buffer_.AdvanceWritablePtr(bytes_read); - if (!DoConsumeReadData()) - goto error_or_close; - continue; - } else { // bytes_read == 0 - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "0 bytes read with recv call."; - } - goto error_or_close; - } - done: - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!"; - return true; - - error_or_close: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead(): error_or_close. " - << "Cleaning up, then returning false"; - Cleanup("DoRead"); - return false; - } - - bool DoConsumeReadData() { - char* bytes; - int size; - read_buffer_.GetReadablePtr(&bytes, &size); - while (size != 0) { - size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed " - << bytes_consumed << " bytes"; - if (bytes_consumed == 0) { - break; - } - read_buffer_.AdvanceReadablePtr(bytes_consumed); - if (sm_interface_->MessageFullyRead()) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "HandleRequestFullyRead: Setting EPOLLOUT"; - HandleResponseFullyRead(); - events_ |= EPOLLOUT; - } else if (sm_interface_->Error()) { - LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Framer error detected: Setting EPOLLOUT: " - << sm_interface_->ErrorAsString(); - // this causes everything to be closed/cleaned up. - events_ |= EPOLLOUT; - return false; - } - read_buffer_.GetReadablePtr(&bytes, &size); - } - return true; - } - - void WriteResponse() { - // this happens asynchronously from separate threads - // feeding files into the output buffer. - } - - void HandleResponseFullyRead() { - sm_interface_->Cleanup(); - } - - void Notify() { - } - - bool DoWrite() { - size_t bytes_sent = 0; - int flags = MSG_NOSIGNAL | MSG_DONTWAIT; - if (fd_ == -1) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoWrite: fd == -1. Returning false."; - return false; - } - if (output_list_.empty()) { - VLOG(2) << log_prefix_ << "DoWrite: Output list empty."; - if (sm_interface_) { - sm_interface_->GetOutput(); - } - if (output_list_.empty()) { - events_ &= ~EPOLLOUT; - } - } - while (!output_list_.empty()) { - VLOG(2) << log_prefix_ << "DoWrite: Items in output list: " - << output_list_.size(); - if (bytes_sent >= max_bytes_sent_per_dowrite_) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << " byte sent >= max bytes sent per write: Setting EPOLLOUT: " - << bytes_sent; - events_ |= EPOLLOUT; - break; - } - if (sm_interface_ && output_list_.size() < 2) { - sm_interface_->GetOutput(); - } - DataFrame* data_frame = output_list_.front(); - const char* bytes = data_frame->data; - int size = data_frame->size; - bytes += data_frame->index; - size -= data_frame->index; - DCHECK_GE(size, 0); - if (size <= 0) { - output_list_.pop_front(); - delete data_frame; - continue; - } - - flags = MSG_NOSIGNAL | MSG_DONTWAIT; - // Look for a queue size > 1 because |this| frame is remains on the list - // until it has finished sending. - if (output_list_.size() > 1) { - VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() - << ": Adding MSG_MORE flag"; - flags |= MSG_MORE; - } - VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; - ssize_t bytes_written = Send(bytes, size, flags); - int stored_errno = errno; - if (bytes_written == -1) { - switch (stored_errno) { - case EAGAIN: - events_ &= ~EPOLLOUT; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EAGAIN while writing"; - goto done; - case EINTR: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EINTR while writing"; - continue; - default: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "While calling send, got error: " << stored_errno - << ": " << (ssl_?"":strerror(stored_errno)); - goto error_or_close; - } - } else if (bytes_written > 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " - << bytes_written << " bytes"; - data_frame->index += bytes_written; - bytes_sent += bytes_written; - continue; - } else if (bytes_written == -2) { - // -2 handles SSL_ERROR_WANT_* errors - events_ &= ~EPOLLOUT; - goto done; - } - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "0 bytes written with send call."; - goto error_or_close; - } - done: - UncorkSocket(); - return true; - - error_or_close: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoWrite: error_or_close. Returning false " - << "after cleaning up"; - Cleanup("DoWrite"); - UncorkSocket(); - return false; - } - - friend ostream& operator<<(ostream& os, const SMConnection& c) { - os << &c << "\n"; - return os; - } - - void Reset() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; - if (ssl_) { - SSL_shutdown(ssl_); - PrintSslError(); - SSL_free(ssl_); - PrintSslError(); - ssl_ = NULL; - } - if (registered_in_epoll_server_) { - epoll_server_->UnregisterFD(fd_); - registered_in_epoll_server_ = false; - } - if (fd_ >= 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; - close(fd_); - fd_ = -1; - } - read_buffer_.Clear(); - initialized_ = false; - protocol_detected_ = false; - events_ = 0; - for (list<DataFrame*>::iterator i = - output_list_.begin(); - i != output_list_.end(); - ++i) { - delete *i; - } - output_list_.clear(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class OutputOrdering { - public: - typedef list<MemCacheIter> PriorityRing; - - typedef map<uint32, PriorityRing> PriorityMap; - - struct PriorityMapPointer { - PriorityMapPointer(): ring(NULL), alarm_enabled(false) {} - PriorityRing* ring; - PriorityRing::iterator it; - bool alarm_enabled; - EpollServer::AlarmRegToken alarm_token; - }; - typedef map<uint32, PriorityMapPointer> StreamIdToPriorityMap; - - StreamIdToPriorityMap stream_ids_; - PriorityMap priority_map_; - PriorityRing first_data_senders_; - uint32 first_data_senders_threshold_; // when you've passed this, you're no - // longer a first_data_sender... - SMConnectionInterface* connection_; - EpollServer* epoll_server_; - - explicit OutputOrdering(SMConnectionInterface* connection) : - first_data_senders_threshold_(kInitialDataSendersThreshold), - connection_(connection) { - if (connection) { - epoll_server_ = connection->epoll_server(); - } - } - - void Reset() { - while (!stream_ids_.empty()) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin(); - PriorityMapPointer& pmp = sitpmi->second; - if (pmp.alarm_enabled) { - epoll_server_->UnregisterAlarm(pmp.alarm_token); - } - stream_ids_.erase(sitpmi); - } - priority_map_.clear(); - first_data_senders_.clear(); - } - - bool ExistsInPriorityMaps(uint32 stream_id) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); - return sitpmi != stream_ids_.end(); - } - - struct BeginOutputtingAlarm : public EpollAlarmCallbackInterface { - public: - BeginOutputtingAlarm(OutputOrdering* oo, - OutputOrdering::PriorityMapPointer* pmp, - const MemCacheIter& mci) : - output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {} - - int64 OnAlarm() { - OnUnregistration(); - output_ordering_->MoveToActive(pmp_, mci_); - VLOG(2) << "ON ALARM! Should now start to output..."; - delete this; - return 0; - } - void OnRegistration(const EpollServer::AlarmRegToken& tok, - EpollServer* eps) { - epoll_server_ = eps; - pmp_->alarm_token = tok; - pmp_->alarm_enabled = true; - } - void OnUnregistration() { - pmp_->alarm_enabled = false; - } - void OnShutdown(EpollServer* eps) { - OnUnregistration(); - } - ~BeginOutputtingAlarm() { - if (epoll_server_ && pmp_->alarm_enabled) - epoll_server_->UnregisterAlarm(pmp_->alarm_token); - } - private: - OutputOrdering* output_ordering_; - OutputOrdering::PriorityMapPointer* pmp_; - MemCacheIter mci_; - EpollServer* epoll_server_; - }; - - void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { - VLOG(2) << "Moving to active!"; - first_data_senders_.push_back(mci); - pmp->ring = &first_data_senders_; - pmp->it = first_data_senders_.end(); - --pmp->it; - connection_->ReadyToSend(); - } - - void AddToOutputOrder(const MemCacheIter& mci) { - if (ExistsInPriorityMaps(mci.stream_id)) - LOG(ERROR) << "OOps, already was inserted here?!"; - - double think_time_in_s = g_proxy_config.server_think_time_in_s_; - string x_server_latency = - mci.file_data->headers->GetHeader("X-Server-Latency").as_string(); - if (x_server_latency.size() != 0) { - char* endp; - double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); - if (endp != x_server_latency.c_str() + x_server_latency.size()) { - LOG(ERROR) << "Unable to understand X-Server-Latency of: " - << x_server_latency << " for resource: " - << mci.file_data->filename.c_str(); - } else { - think_time_in_s = tmp_think_time_in_s; - } - } - StreamIdToPriorityMap::iterator sitpmi; - sitpmi = stream_ids_.insert( - pair<uint32, PriorityMapPointer>(mci.stream_id, - PriorityMapPointer())).first; - PriorityMapPointer& pmp = sitpmi->second; - - BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); - VLOG(1) << "Server think time: " << think_time_in_s; - epoll_server_->RegisterAlarmApproximateDelta( - think_time_in_s * 1000000, boa); - } - - void SpliceToPriorityRing(PriorityRing::iterator pri) { - MemCacheIter& mci = *pri; - PriorityMap::iterator pmi = priority_map_.find(mci.priority); - if (pmi == priority_map_.end()) { - pmi = priority_map_.insert( - pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first; - } - - pmi->second.splice(pmi->second.end(), - first_data_senders_, - pri); - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id); - sitpmi->second.ring = &(pmi->second); - } - - MemCacheIter* GetIter() { - while (!first_data_senders_.empty()) { - MemCacheIter& mci = first_data_senders_.front(); - if (mci.bytes_sent >= first_data_senders_threshold_) { - SpliceToPriorityRing(first_data_senders_.begin()); - } else { - first_data_senders_.splice(first_data_senders_.end(), - first_data_senders_, - first_data_senders_.begin()); - mci.max_segment_size = kInitialDataSendersThreshold; - return &mci; - } - } - while (!priority_map_.empty()) { - PriorityRing& first_ring = priority_map_.begin()->second; - if (first_ring.empty()) { - priority_map_.erase(priority_map_.begin()); - continue; - } - MemCacheIter& mci = first_ring.front(); - first_ring.splice(first_ring.end(), - first_ring, - first_ring.begin()); - mci.max_segment_size = kSpdySegmentSize; - return &mci; - } - return NULL; - } - - void RemoveStreamId(uint32 stream_id) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); - if (sitpmi == stream_ids_.end()) - return; - PriorityMapPointer& pmp = sitpmi->second; - if (pmp.alarm_enabled) { - epoll_server_->UnregisterAlarm(pmp.alarm_token); - } else { - pmp.ring->erase(pmp.it); - } - - stream_ids_.erase(sitpmi); - } -}; - - -//////////////////////////////////////////////////////////////////////////////// - -class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { - private: - uint64 seq_num_; - SpdyFramer* spdy_framer_; - bool valid_spdy_session_; // True if we have seen valid data on this session. - // Use this to fail fast when junk is sent to our - // port. - - SMConnection* connection_; - OutputList* client_output_list_; - OutputOrdering client_output_ordering_; - uint32 next_outgoing_stream_id_; - EpollServer* epoll_server_; - FlipAcceptor* acceptor_; - MemoryCache* memory_cache_; - vector<SMInterface*> server_interface_list; - vector<int32> unused_server_interface_list; - typedef map<uint32,SMInterface*> StreamToSmif; - StreamToSmif stream_to_smif_; - bool close_on_error_; - public: - SpdySM(SMConnection* connection, - SMInterface* sm_http_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) - : seq_num_(0), - spdy_framer_(new SpdyFramer), - valid_spdy_session_(false), - connection_(connection), - client_output_list_(connection->output_list()), - client_output_ordering_(connection), - next_outgoing_stream_id_(2), - epoll_server_(epoll_server), - acceptor_(acceptor), - memory_cache_(memory_cache), - close_on_error_(false) { - spdy_framer_->set_visitor(this); - } - - ~SpdySM() { - delete spdy_framer_; - } - - void InitSMInterface(SMInterface* sm_http_interface, - int32 server_idx) { } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: Initializing server connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, server_port, - remote_ip, use_ssl); - } - - private: - virtual void OnError(SpdyFramer* framer) { - /* do nothing with this right now */ - } - - SMInterface* NewConnectionInterface() { - SMConnection* server_connection = - SMConnection::NewSMConnection(epoll_server_, NULL, - memory_cache_, acceptor_, - "http_conn: "); - if (server_connection == NULL) { - LOG(ERROR) << "SpdySM: Could not create server connection"; - return NULL; - } - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface"; - SMInterface *sm_http_interface = NewHttpSM(server_connection, this, - epoll_server_, memory_cache_, - acceptor_); - return sm_http_interface; - } - - SMInterface* FindOrMakeNewSMConnectionInterface(string server_ip, - string server_port) { - SMInterface *sm_http_interface; - int32 server_idx; - if (unused_server_interface_list.empty()) { - sm_http_interface = NewConnectionInterface(); - server_idx = server_interface_list.size(); - server_interface_list.push_back(sm_http_interface); - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: Making new server connection on index: " - << server_idx; - } else { - server_idx = unused_server_interface_list.back(); - unused_server_interface_list.pop_back(); - sm_http_interface = server_interface_list.at(server_idx); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on " - << "index: " << server_idx; - } - - sm_http_interface->InitSMInterface(this, server_idx); - sm_http_interface->InitSMConnection(NULL, sm_http_interface, - epoll_server_, -1, - server_ip, server_port, "", false); - - return sm_http_interface; - } - - int SpdyHandleNewStream(const SpdyControlFrame* frame, - string &http_data, - bool *is_https_scheme) - { - bool parsed_headers = false; - SpdyHeaderBlock headers; - const SpdySynStreamControlFrame* syn_stream = - reinterpret_cast<const SpdySynStreamControlFrame*>(frame); - - *is_https_scheme = false; - parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" - << syn_stream->stream_id() << ")"; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: headers parsed?: " - << (parsed_headers? "yes": "no"); - if (parsed_headers) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: " - << headers.size(); - } - SpdyHeaderBlock::iterator url = headers.find("url"); - SpdyHeaderBlock::iterator method = headers.find("method"); - if (url == headers.end() || method == headers.end()) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: didn't find method or url " - << "or method. Not creating stream"; - return 0; - } - - SpdyHeaderBlock::iterator scheme = headers.find("scheme"); - if (scheme->second.compare("https") == 0) { - *is_https_scheme = true; - } - - string uri = UrlUtilities::GetUrlPath(url->second); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { - SpdyHeaderBlock::iterator referer = headers.find("referer"); - if (referer != headers.end() && method->second == "GET") { - memory_cache_->UpdateHeaders(referer->second, url->second); - } - string host = UrlUtilities::GetUrlHost(url->second); - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second - << " " << uri; - string filename = EncodeURL(uri, host, method->second); - NewStream(syn_stream->stream_id(), - reinterpret_cast<const SpdySynStreamControlFrame*>(frame)-> - priority(), - filename); - } else { - SpdyHeaderBlock::iterator version = headers.find("version"); - http_data += method->second + " " + uri + " " + version->second + "\r\n"; - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " - << uri << " " << version->second; - for (SpdyHeaderBlock::iterator i = headers.begin(); - i != headers.end(); ++i) { - http_data += i->first + ": " + i->second + "\r\n"; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" - << i->second.c_str(); - } - if (g_proxy_config.forward_ip_header_enabled_) { - // X-Client-Cluster-IP header - http_data += g_proxy_config.forward_ip_header_ + ": " + - connection_->client_ip() + "\r\n"; - } - http_data += "\r\n"; - } - - VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; - return 1; - } - - virtual void OnControl(const SpdyControlFrame* frame) { - SpdyHeaderBlock headers; - bool parsed_headers = false; - switch (frame->type()) { - case SYN_STREAM: - { - const SpdySynStreamControlFrame* syn_stream = - reinterpret_cast<const SpdySynStreamControlFrame*>(frame); - - string http_data; - bool is_https_scheme; - int ret = SpdyHandleNewStream(frame, http_data, &is_https_scheme); - if (!ret) { - LOG(ERROR) << "SpdySM: Could not convert spdy into http."; - break; - } - // We've seen a valid looking SYN_STREAM, consider this to have - // been a real spdy session. - valid_spdy_session_ = true; - - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - string server_ip; - string server_port; - if (is_https_scheme) { - server_ip = acceptor_->https_server_ip_; - server_port = acceptor_->https_server_port_; - } else { - server_ip = acceptor_->http_server_ip_; - server_port = acceptor_->http_server_port_; - } - SMInterface *sm_http_interface = - FindOrMakeNewSMConnectionInterface(server_ip, server_port); - stream_to_smif_[syn_stream->stream_id()] = sm_http_interface; - sm_http_interface->SetStreamID(syn_stream->stream_id()); - sm_http_interface->ProcessWriteInput(http_data.c_str(), - http_data.size()); - } - } - break; - - case SYN_REPLY: - parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" << - reinterpret_cast<const SpdySynReplyControlFrame*>(frame)->stream_id() - << ")"; - break; - case RST_STREAM: - { - const SpdyRstStreamControlFrame* rst_stream = - reinterpret_cast<const SpdyRstStreamControlFrame*>(frame); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRst(" - << rst_stream->stream_id() << ")"; - client_output_ordering_.RemoveStreamId(rst_stream ->stream_id()); - } - break; - - default: - LOG(ERROR) << "SpdySM: Unknown control frame type"; - } - } - virtual void OnStreamFrameData(SpdyStreamId stream_id, - const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id - << ", [" << len << "])"; - StreamToSmif::iterator it = stream_to_smif_.find(stream_id); - if (it == stream_to_smif_.end()) { - VLOG(2) << "Dropping frame from unknown stream " << stream_id; - if (!valid_spdy_session_) - close_on_error_ = true; - return; - } - - SMInterface* interface = it->second; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) - interface->ProcessWriteInput(data, len); - } - - public: - size_t ProcessReadInput(const char* data, size_t len) { - return spdy_framer_->ProcessInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - return 0; - } - - bool MessageFullyRead() const { - return spdy_framer_->MessageFullyRead(); - } - - void SetStreamID(uint32 stream_id) {} - - bool Error() const { - return close_on_error_ || spdy_framer_->HasError(); - } - - const char* ErrorAsString() const { - DCHECK(Error()); - return SpdyFramer::ErrorCodeToString(spdy_framer_->error_code()); - } - - void Reset() { - } - - void ResetForNewInterface(int32 server_idx) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: " - << "server_idx: " << server_idx; - unused_server_interface_list.push_back(server_idx); - } - - void ResetForNewConnection() { - // seq_num is not cleared, intentionally. - delete spdy_framer_; - spdy_framer_ = new SpdyFramer; - spdy_framer_->set_visitor(this); - valid_spdy_session_ = false; - client_output_ordering_.Reset(); - next_outgoing_stream_id_ = 2; - } - - // SMInterface's Cleanup is currently only called by SMConnection after a - // protocol message as been fully read. Spdy's SMInterface does not need - // to do any cleanup at this time. - // TODO (klindsay) This method is probably not being used properly and - // some logic review and method renaming is probably in order. - void Cleanup() {} - - // Send a settings frame - int PostAcceptHook() { - spdy::SpdySettings settings; - spdy::SettingsFlagsAndId settings_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS); - settings.push_back(spdy::SpdySetting(settings_id, 100)); - SpdySettingsControlFrame* settings_frame = - spdy_framer_->CreateSettings(settings); - - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame"; - EnqueueDataFrame(new SpdyFrameDataFrame(settings_frame)); - return 1; - } - - void AddAssociatedContent(FileData* file_data) { - for (unsigned int i = 0; i < file_data->related_files.size(); ++i) { - pair<int, string>& related_file = file_data->related_files[i]; - MemCacheIter mci; - string filename = "GET_"; - filename += related_file.second; - if (!memory_cache_->AssignFileData(filename, &mci)) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Unable to find associated " - << "content for: " << filename; - continue; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Adding associated content: " - << filename; - mci.stream_id = next_outgoing_stream_id_; - next_outgoing_stream_id_ += 2; - mci.priority = related_file.first; - AddToOutputOrder(mci); - } - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - MemCacheIter mci; - mci.stream_id = stream_id; - mci.priority = priority; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { - if (!memory_cache_->AssignFileData(filename, &mci)) { - // error creating new stream. - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; - SendErrorNotFound(stream_id); - } else { - AddToOutputOrder(mci); - if (FLAGS_use_xac) { - AddAssociatedContent(mci.file_data); - } - } - } else { - AddToOutputOrder(mci); - } - } - - void AddToOutputOrder(const MemCacheIter& mci) { - client_output_ordering_.AddToOutputOrder(mci); - } - - void SendEOF(uint32 stream_id) { - SendEOFImpl(stream_id); - } - - void SendErrorNotFound(uint32 stream_id) { - SendErrorNotFoundImpl(stream_id); - } - - void SendOKResponse(uint32 stream_id, string* output) { - SendOKResponseImpl(stream_id, output); - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynStreamImpl(stream_id, headers); - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynReplyImpl(stream_id, headers); - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - SpdyDataFlags spdy_flags = static_cast<SpdyDataFlags>(flags); - SendDataFrameImpl(stream_id, data, len, spdy_flags, compress); - } - - SpdyFramer* spdy_framer() { return spdy_framer_; } - - private: - void SendEOFImpl(uint32 stream_id) { - SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id; - KillStream(stream_id); - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false); - client_output_ordering_.RemoveStreamId(stream_id); - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame( - stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false); - client_output_ordering_.RemoveStreamId(stream_id); - } - - void KillStream(uint32 stream_id) { - client_output_ordering_.RemoveStreamId(stream_id); - } - - void CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) { - for (BalsaHeaders::const_header_lines_iterator hi = - headers.header_lines_begin(); - hi != headers.header_lines_end(); - ++hi) { - // It is illegal to send SPDY headers with empty value or header - // names. - if (!hi->first.length() || !hi->second.length()) - continue; - - SpdyHeaderBlock::iterator fhi = dest.find(hi->first.as_string()); - if (fhi == dest.end()) { - dest[hi->first.as_string()] = hi->second.as_string(); - } else { - dest[hi->first.as_string()] = ( - string(fhi->second.data(), fhi->second.size()) + "," + - string(hi->second.data(), hi->second.size())); - } - } - - // These headers have no value - dest.erase("X-Associated-Content"); // TODO(mbelshe): case-sensitive - dest.erase("X-Original-Url"); // TODO(mbelshe): case-sensitive - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - SpdyHeaderBlock block; - block["method"] = headers.request_method().as_string(); - if (!headers.HasHeader("status")) - block["status"] = headers.response_code().as_string(); - if (!headers.HasHeader("version")) - block["version"] =headers.response_version().as_string(); - if (headers.HasHeader("X-Original-Url")) { - string original_url = headers.GetHeader("X-Original-Url").as_string(); - block["path"] = UrlUtilities::GetUrlPath(original_url); - } else { - block["path"] = headers.request_uri().as_string(); - } - CopyHeaders(block, headers); - - SpdySynStreamControlFrame* fsrcf = - spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, - &block); - size_t df_size = fsrcf->length() + SpdyFrame::size(); - EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " - << stream_id; - return df_size; - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - SpdyHeaderBlock block; - CopyHeaders(block, headers); - block["status"] = headers.response_code().as_string() + " " + - headers.response_reason_phrase().as_string(); - block["version"] = headers.response_version().as_string(); - - SpdySynReplyControlFrame* fsrcf = - spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); - size_t df_size = fsrcf->length() + SpdyFrame::size(); - EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " - << stream_id; - return df_size; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - SpdyDataFlags flags, bool compress) { - // Force compression off if disabled via command line. - if (!FLAGS_use_compression) - flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED); - - // TODO(mbelshe): We can't compress here - before going into the - // priority queue. Compression needs to be done - // with late binding. - if (len == 0) { - SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, - flags); - EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); - return; - } - - // Chop data frames into chunks so that one stream can't monopolize the - // output channel. - while(len > 0) { - int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize)); - SpdyDataFlags chunk_flags = flags; - - // If we chunked this block, and the FIN flag was set, there is more - // data coming. So, remove the flag. - if ((size < len) && (flags & DATA_FLAG_FIN)) - chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN); - - SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size, - chunk_flags); - EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " - << stream_id << " [" << size << "] shrunk to " << fdf->length() - << ", flags=" << flags; - - data += size; - len -= size; - } - } - - void EnqueueDataFrame(DataFrame* df) { - connection_->EnqueueDataFrame(df); - } - - void GetOutput() { - while (client_output_list_->size() < 2) { - MemCacheIter* mci = client_output_ordering_.GetIter(); - if (mci == NULL) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: GetOutput: nothing to output!?"; - return; - } - if (!mci->transformed_header) { - mci->transformed_header = true; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed " - << "header stream_id: [" << mci->stream_id << "]"; - if ((mci->stream_id % 2) == 0) { - // this is a server initiated stream. - // Ideally, we'd do a 'syn-push' here, instead of a syn-reply. - BalsaHeaders headers; - headers.CopyFrom(*(mci->file_data->headers)); - headers.ReplaceOrAppendHeader("status", "200"); - headers.ReplaceOrAppendHeader("version", "http/1.1"); - headers.SetRequestFirstlineFromStringPieces("PUSH", - mci->file_data->filename, - ""); - mci->bytes_sent = SendSynStream(mci->stream_id, headers); - } else { - BalsaHeaders headers; - headers.CopyFrom(*(mci->file_data->headers)); - mci->bytes_sent = SendSynReply(mci->stream_id, headers); - } - return; - } - if (mci->body_bytes_consumed >= mci->file_data->body.size()) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput " - << "remove_stream_id: [" << mci->stream_id << "]"; - SendEOF(mci->stream_id); - return; - } - size_t num_to_write = - mci->file_data->body.size() - mci->body_bytes_consumed; - if (num_to_write > mci->max_segment_size) - num_to_write = mci->max_segment_size; - - bool should_compress = false; - if (!mci->file_data->headers->HasHeader("content-encoding")) { - if (mci->file_data->headers->HasHeader("content-type")) { - string content_type = - mci->file_data->headers->GetHeader("content-type").as_string(); - if (content_type.find("image") == content_type.npos) - should_compress = true; - } - } - - SendDataFrame(mci->stream_id, - mci->file_data->body.data() + mci->body_bytes_consumed, - num_to_write, 0, should_compress); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame[" - << mci->stream_id << "]: " << num_to_write; - mci->body_bytes_consumed += num_to_write; - mci->bytes_sent += num_to_write; - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class HttpSM : public BalsaVisitorInterface, public SMInterface { - private: - uint64 seq_num_; - BalsaFrame* http_framer_; - BalsaHeaders headers_; - uint32 stream_id_; - int32 server_idx_; - - SMConnection* connection_; - SMInterface* sm_spdy_interface_; - OutputList* output_list_; - OutputOrdering output_ordering_; - MemoryCache* memory_cache_; - FlipAcceptor* acceptor_; - public: - explicit HttpSM(SMConnection* connection, - SMInterface* sm_spdy_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) : - seq_num_(0), - http_framer_(new BalsaFrame), - stream_id_(0), - server_idx_(-1), - connection_(connection), - sm_spdy_interface_(sm_spdy_interface), - output_list_(connection->output_list()), - output_ordering_(connection), - memory_cache_(connection->memory_cache()), - acceptor_(acceptor) { - http_framer_->set_balsa_visitor(this); - http_framer_->set_balsa_headers(&headers_); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - http_framer_->set_is_request(false); - } - } - private: - typedef map<string, uint32> ClientTokenMap; - private: - virtual void ProcessBodyInput(const char *input, size_t size) { - } - virtual void ProcessBodyData(const char *input, size_t size) { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process Body Data: stream " - << stream_id_ << ": size " << size; - sm_spdy_interface_->SendDataFrame(stream_id_, input, size, 0, false); - } - } - virtual void ProcessHeaderInput(const char *input, size_t size) { - } - virtual void ProcessTrailerInput(const char *input, size_t size) {} - virtual void ProcessHeaders(const BalsaHeaders& headers) { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - string host = - UrlUtilities::GetUrlHost(headers.GetHeader("Host").as_string()); - string method = headers.request_method().as_string(); - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Received Request: " - << headers.request_uri().as_string() << " " << method; - string filename = EncodeURL(headers.request_uri().as_string(), - host, method); - NewStream(stream_id_, 0, filename); - stream_id_ += 2; - } else { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Received Response from " - << connection_->server_ip_ << ":" - << connection_->server_port_ << " "; - sm_spdy_interface_->SendSynReply(stream_id_, headers); - } - } - virtual void ProcessRequestFirstLine(const char* line_input, - size_t line_length, - const char* method_input, - size_t method_length, - const char* request_uri_input, - size_t request_uri_length, - const char* version_input, - size_t version_length) {} - virtual void ProcessResponseFirstLine(const char *line_input, - size_t line_length, - const char *version_input, - size_t version_length, - const char *status_input, - size_t status_length, - const char *reason_input, - size_t reason_length) {} - virtual void ProcessChunkLength(size_t chunk_length) {} - virtual void ProcessChunkExtensions(const char *input, size_t size) {} - virtual void HeaderDone() {} - virtual void MessageDone() { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone. Sending EOF: " - << "stream " << stream_id_; - sm_spdy_interface_->SendEOF(stream_id_); - } else { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone."; - } - } - virtual void HandleHeaderError(BalsaFrame* framer) { - HandleError(); - } - virtual void HandleHeaderWarning(BalsaFrame* framer) {} - virtual void HandleChunkingError(BalsaFrame* framer) { - HandleError(); - } - virtual void HandleBodyError(BalsaFrame* framer) { - HandleError(); - } - - void HandleError() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Error detected"; - } - - public: - ~HttpSM() { - Reset(); - delete http_framer_; - } - - void InitSMInterface(SMInterface* sm_spdy_interface, - int32 server_idx) - { - sm_spdy_interface_ = sm_spdy_interface; - server_idx_ = server_idx; - } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " - << "connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, server_port, - remote_ip, use_ssl); - } - - size_t ProcessReadInput(const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream " - << stream_id_; - return http_framer_->ProcessInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size " - << len << ": stream " << stream_id_; - char * dataPtr = new char[len]; - memcpy(dataPtr, data, len); - DataFrame* data_frame = new DataFrame; - data_frame->data = (const char *)dataPtr; - data_frame->size = len; - data_frame->delete_when_done = true; - connection_->EnqueueDataFrame(data_frame); - return len; - } - - bool MessageFullyRead() const { - return http_framer_->MessageFullyRead(); - } - - void SetStreamID(uint32 stream_id) { - stream_id_ = stream_id; - } - - bool Error() const { - return http_framer_->Error(); - } - - const char* ErrorAsString() const { - return BalsaFrameEnums::ErrorCodeToString(http_framer_->ErrorCode()); - } - - void Reset() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Reset: stream " - << stream_id_; - http_framer_->Reset(); - } - - void ResetForNewInterface(int32 server_idx) { - } - - void ResetForNewConnection() { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Server connection closing " - << "to: " << connection_->server_ip_ << ":" - << connection_->server_port_ << " "; - } - seq_num_ = 0; - output_ordering_.Reset(); - http_framer_->Reset(); - if (sm_spdy_interface_) { - sm_spdy_interface_->ResetForNewInterface(server_idx_); - } - } - - void Cleanup() { - if (!(acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER)) { - VLOG(2) << "HttpSM Request Fully Read; stream_id: " << stream_id_; - connection_->Cleanup("request complete"); - } - } - - int PostAcceptHook() { - return 1; - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - MemCacheIter mci; - mci.stream_id = stream_id; - mci.priority = priority; - if (!memory_cache_->AssignFileData(filename, &mci)) { - // error creating new stream. - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; - SendErrorNotFound(stream_id); - } else { - AddToOutputOrder(mci); - } - } - - void AddToOutputOrder(const MemCacheIter& mci) { - output_ordering_.AddToOutputOrder(mci); - } - - void SendEOF(uint32 stream_id) { - SendEOFImpl(stream_id); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - sm_spdy_interface_->ResetForNewInterface(server_idx_); - } - } - - void SendErrorNotFound(uint32 stream_id) { - SendErrorNotFoundImpl(stream_id); - } - - void SendOKResponse(uint32 stream_id, string* output) { - SendOKResponseImpl(stream_id, output); - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynReplyImpl(stream_id, headers); - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - SendDataFrameImpl(stream_id, data, len, flags, compress); - } - - BalsaFrame* spdy_framer() { return http_framer_; } - - private: - void SendEOFImpl(uint32 stream_id) { - DataFrame* df = new DataFrame; - df->data = "0\r\n\r\n"; - df->size = 5; - df->delete_when_done = false; - EnqueueDataFrame(df); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - Reset(); - } - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); - my_headers.RemoveAllOfHeader("content-length"); - my_headers.AppendHeader("transfer-encoding", "chunked"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, "wtf?", 4, 0, false); - SendEOFImpl(stream_id); - output_ordering_.RemoveStreamId(stream_id); - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); - my_headers.RemoveAllOfHeader("content-length"); - my_headers.AppendHeader("transfer-encoding", "chunked"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, output->c_str(), output->size(), 0, false); - SendEOFImpl(stream_id); - output_ordering_.RemoveStreamId(stream_id); - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - SimpleBuffer sb; - headers.WriteHeaderAndEndingToBuffer(&sb); - DataFrame* df = new DataFrame; - df->size = sb.ReadableBytes(); - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - sb.Read(buffer, df->size); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " - << stream_id_; - size_t df_size = df->size; - EnqueueDataFrame(df); - return df_size; - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - SimpleBuffer sb; - headers.WriteHeaderAndEndingToBuffer(&sb); - DataFrame* df = new DataFrame; - df->size = sb.ReadableBytes(); - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - sb.Read(buffer, df->size); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " - << stream_id_; - size_t df_size = df->size; - EnqueueDataFrame(df); - return df_size; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - char chunk_buf[128]; - snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len); - string chunk_description(chunk_buf); - DataFrame* df = new DataFrame; - df->size = chunk_description.size() + len + 2; - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - memcpy(buffer, chunk_description.data(), chunk_description.size()); - memcpy(buffer + chunk_description.size(), data, len); - memcpy(buffer + chunk_description.size() + len, "\r\n", 2); - EnqueueDataFrame(df); - } - - void EnqueueDataFrame(DataFrame* df) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " - << stream_id_; - connection_->EnqueueDataFrame(df); - } - - void GetOutput() { - MemCacheIter* mci = output_ordering_.GetIter(); - if (mci == NULL) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to " - << "output!?: stream " << stream_id_; - return; - } - if (!mci->transformed_header) { - mci->bytes_sent = SendSynReply(mci->stream_id, - *(mci->file_data->headers)); - mci->transformed_header = true; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput transformed " - << "header stream_id: [" << mci->stream_id << "]"; - return; - } - if (mci->body_bytes_consumed >= mci->file_data->body.size()) { - SendEOF(mci->stream_id); - output_ordering_.RemoveStreamId(mci->stream_id); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "GetOutput remove_stream_id: [" - << mci->stream_id << "]"; - return; - } - size_t num_to_write = - mci->file_data->body.size() - mci->body_bytes_consumed; - if (num_to_write > mci->max_segment_size) - num_to_write = mci->max_segment_size; - - SendDataFrame(mci->stream_id, - mci->file_data->body.data() + mci->body_bytes_consumed, - num_to_write, 0, true); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput SendDataFrame[" - << mci->stream_id << "]: " << num_to_write; - mci->body_bytes_consumed += num_to_write; - mci->bytes_sent += num_to_write; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class StreamerSM : public SMInterface { - private: - SMConnection* connection_; - SMInterface* sm_other_interface_; - EpollServer* epoll_server_; - FlipAcceptor* acceptor_; - public: - explicit StreamerSM(SMConnection* connection, - SMInterface* sm_other_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor) : - connection_(connection), - sm_other_interface_(sm_other_interface), - epoll_server_(epoll_server), - acceptor_(acceptor) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Creating StreamerSM object"; - } - ~StreamerSM() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Destroying StreamerSM object"; - Reset(); - } - - void InitSMInterface(SMInterface* sm_other_interface, - int32 server_idx) - { - sm_other_interface_ = sm_other_interface; - } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " - << "connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, - server_port, remote_ip, use_ssl); - } - - size_t ProcessReadInput(const char* data, size_t len) { - return sm_other_interface_->ProcessWriteInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - char * dataPtr = new char[len]; - memcpy(dataPtr, data, len); - DataFrame* df = new DataFrame; - df->data = (const char *)dataPtr; - df->size = len; - df->delete_when_done = true; - connection_->EnqueueDataFrame(df); - return len; - } - - bool MessageFullyRead() const { - return false; - } - - void SetStreamID(uint32 stream_id) {} - - bool Error() const { - return false; - } - - const char* ErrorAsString() const { - return "(none)"; - } - - void Reset() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Reset"; - connection_->Cleanup("Server Reset"); - } - - void ResetForNewInterface(int32 server_idx) { - } - - void ResetForNewConnection() { - sm_other_interface_->Reset(); - } - - void Cleanup() { - } - - int PostAcceptHook() { - if (!sm_other_interface_) { - SMConnection *server_connection = - SMConnection::NewSMConnection(epoll_server_, NULL, NULL, - acceptor_, "server_conn: "); - if (server_connection == NULL) { - LOG(ERROR) << "StreamerSM: Could not create server conenction."; - return 0; - } - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Creating new server " - << "connection."; - sm_other_interface_ = new StreamerSM(server_connection, this, - epoll_server_, acceptor_); - sm_other_interface_->InitSMInterface(this, 0); - } - // The Streamer interface is used to stream HTTPS connections, so we - // will always use the https_server_ip/port here. - sm_other_interface_->InitSMConnection(NULL, sm_other_interface_, - epoll_server_, -1, - acceptor_->https_server_ip_, - acceptor_->https_server_port_, - "", - false); - - return 1; - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - } - - void AddToOutputOrder(const MemCacheIter& mci) { - } - - void SendEOF(uint32 stream_id) { - } - - void SendErrorNotFound(uint32 stream_id) { - } - - void SendOKResponse(uint32 stream_id, string output) { - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - } - - private: - void SendEOFImpl(uint32 stream_id) { - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - } - - void GetOutput() { - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class Notification { - public: - explicit Notification(bool value) : value_(value) {} - - void Notify() { - base::AutoLock al(lock_); - value_ = true; - } - bool HasBeenNotified() { - base::AutoLock al(lock_); - return value_; - } - bool value_; - base::Lock lock_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class SMAcceptorThread : public SimpleThread, - public EpollCallbackInterface, - public SMConnectionPoolInterface { - EpollServer epoll_server_; - FlipAcceptor *acceptor_; - SSLState *ssl_state_; - bool use_ssl_; - - vector<SMConnection*> unused_server_connections_; - vector<SMConnection*> tmp_unused_server_connections_; - vector<SMConnection*> allocated_server_connections_; - list<SMConnection*> active_server_connections_; - Notification quitting_; - MemoryCache* memory_cache_; - public: - - SMAcceptorThread(FlipAcceptor *acceptor, - MemoryCache* memory_cache) : - SimpleThread("SMAcceptorThread"), - acceptor_(acceptor), - ssl_state_(NULL), - use_ssl_(false), - quitting_(false), - memory_cache_(memory_cache) - { - if (!acceptor->ssl_cert_filename_.empty() && - !acceptor->ssl_key_filename_.empty()) { - ssl_state_ = new SSLState; - bool use_npn = true; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - use_npn = false; - } - spdy_init_ssl(ssl_state_, acceptor_->ssl_cert_filename_, - acceptor_->ssl_key_filename_, use_npn); - use_ssl_ = true; - } - } - - ~SMAcceptorThread() { - for (vector<SMConnection*>::iterator i = - allocated_server_connections_.begin(); - i != allocated_server_connections_.end(); - ++i) { - delete *i; - } - delete ssl_state_; - } - - SMConnection* NewConnection() { - SMConnection* server = - SMConnection::NewSMConnection(&epoll_server_, ssl_state_, - memory_cache_, acceptor_, - "client_conn: "); - allocated_server_connections_.push_back(server); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server."; - return server; - } - - SMConnection* FindOrMakeNewSMConnection() { - if (unused_server_connections_.empty()) { - return NewConnection(); - } - SMConnection* server = unused_server_connections_.back(); - unused_server_connections_.pop_back(); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server."; - return server; - } - - void InitWorker() { - epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); - } - - void HandleConnection(int server_fd, struct sockaddr_in *remote_addr) { - int on = 1; - int rc; - if (acceptor_->disable_nagle_) { - rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, - reinterpret_cast<char*>(&on), sizeof(on)); - if (rc < 0) { - close(server_fd); - LOG(ERROR) << "setsockopt() failed fd=" + server_fd; - return; - } - } - - SMConnection* server_connection = FindOrMakeNewSMConnection(); - if (server_connection == NULL) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd; - close(server_fd); - return; - } - string remote_ip = inet_ntoa(remote_addr->sin_addr); - server_connection->InitSMConnection(this, - NULL, - &epoll_server_, - server_fd, - "", "", remote_ip, - use_ssl_); - if (server_connection->initialized()) - active_server_connections_.push_back(server_connection); - } - - void AcceptFromListenFD() { - if (acceptor_->accepts_per_wake_ > 0) { - for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) { - struct sockaddr address; - socklen_t socklen = sizeof(address); - int fd = accept(acceptor_->listen_fd_, &address, &socklen); - if (fd == -1) { - if (errno != 11) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" - << acceptor_->listen_fd_ << "): " << errno << ": " - << strerror(errno); - } - break; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; - HandleConnection(fd, (struct sockaddr_in *)&address); - } - } else { - while (true) { - struct sockaddr address; - socklen_t socklen = sizeof(address); - int fd = accept(acceptor_->listen_fd_, &address, &socklen); - if (fd == -1) { - if (errno != 11) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" - << acceptor_->listen_fd_ << "): " << errno << ": " - << strerror(errno); - } - break; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; - HandleConnection(fd, (struct sockaddr_in *)&address); - } - } - } - - // EpollCallbackInteface virtual functions. - virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { } - virtual void OnModification(int fd, int event_mask) { } - virtual void OnEvent(int fd, EpollEvent* event) { - if (event->in_events | EPOLLIN) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "Acceptor: Accepting based upon epoll events"; - AcceptFromListenFD(); - } - } - virtual void OnUnregistration(int fd, bool replaced) { } - virtual void OnShutdown(EpollServer* eps, int fd) { } - - void Quit() { - quitting_.Notify(); - } - - // Iterates through a list of active connections expiring any that have been - // idle longer than the configured timeout. - void HandleConnectionIdleTimeout() { - static time_t oldest_time = time(NULL); - - int cur_time = time(NULL); - // Only iterate the list if we speculate that a connection is ready to be - // expired - if ((cur_time - oldest_time) < g_proxy_config.idle_timeout_s_) - return; - list<SMConnection*>::iterator iter = active_server_connections_.begin(); - while (iter != active_server_connections_.end()) { - SMConnection *conn = *iter; - int elapsed_time = (cur_time - conn->last_read_time_); - if (elapsed_time > g_proxy_config.idle_timeout_s_) { - conn->Cleanup("Connection idle timeout reached."); - iter = active_server_connections_.erase(iter); - delete(conn); - continue; - } - if (conn->last_read_time_ < oldest_time) - oldest_time = conn->last_read_time_; - iter++; - } - if ((cur_time - oldest_time) >= g_proxy_config.idle_timeout_s_) - oldest_time = cur_time; - } - - void Run() { - while (!quitting_.HasBeenNotified()) { - epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms - epoll_server_.WaitForEventsAndExecuteCallbacks(); - if (tmp_unused_server_connections_.size()) { - VLOG(2) << "have " << tmp_unused_server_connections_.size() - << " additional unused connections. Total = " - << unused_server_connections_.size(); - unused_server_connections_.insert(unused_server_connections_.end(), - tmp_unused_server_connections_.begin(), - tmp_unused_server_connections_.end()); - tmp_unused_server_connections_.clear(); - } - HandleConnectionIdleTimeout(); - } - } - - // SMConnections will use this: - virtual void SMConnectionDone(SMConnection* sc) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection."; - tmp_unused_server_connections_.push_back(sc); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -SMInterface* NewStreamerSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor) { - return new StreamerSM(connection, sm_interface, epoll_server, acceptor); -} - - -SMInterface* NewSpdySM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) { - return new SpdySM(connection, sm_interface, epoll_server, - memory_cache, acceptor); -} - -SMInterface* NewHttpSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) { - return new HttpSM(connection, sm_interface, epoll_server, - memory_cache, acceptor); -} +net::FlipConfig g_proxy_config; //////////////////////////////////////////////////////////////////////////////// @@ -3086,7 +75,7 @@ bool GotQuitFromStdin() { // Make stdin nonblocking. Yes this is done each time. Oh well. fcntl(0, F_SETFL, O_NONBLOCK); char c; - string maybequit; + std::string maybequit; while (read(0, &c, 1) > 0) { maybequit += c; } @@ -3224,18 +213,15 @@ int main (int argc, char**argv) pidfile_fd = OpenPidFile(PIDFILE); } - g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s; + net::OutputOrdering::set_server_think_time_in_s(FLAGS_server_think_time_in_s); if (cl.HasSwitch("forward-ip-header")) { - g_proxy_config.forward_ip_header_enabled_ = true; - g_proxy_config.forward_ip_header_ = - cl.GetSwitchValueASCII("forward-ip-header"); - } else { - g_proxy_config.forward_ip_header_enabled_ = false; + net::SpdySM::set_forward_ip_header( + cl.GetSwitchValueASCII("forward-ip-header")); } if (cl.HasSwitch("logdest")) { - string log_dest_value = cl.GetSwitchValueASCII("logdest"); + std::string log_dest_value = cl.GetSwitchValueASCII("logdest"); if (log_dest_value.compare("file") == 0) { g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE; } else if (log_dest_value.compare("system") == 0) { @@ -3266,7 +252,7 @@ int main (int argc, char**argv) } if (cl.HasSwitch("ssl-session-expiry")) { - string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); + std::string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); g_proxy_config.ssl_session_expiry_ = atoi(session_expiry.c_str()); } @@ -3275,12 +261,12 @@ int main (int argc, char**argv) } if (cl.HasSwitch("idle-timeout")) { - g_proxy_config.idle_timeout_s_ = + g_proxy_config.idle_socket_timeout_s_ = atoi(cl.GetSwitchValueASCII("idle-timeout").c_str()); } if (cl.HasSwitch("force_spdy")) - FLAGS_force_spdy = true; + net::SMConnection::set_force_spdy(true); InitLogging(g_proxy_config.log_filename_.c_str(), g_proxy_config.log_destination_, @@ -3292,8 +278,8 @@ int main (int argc, char**argv) LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_; LOG(INFO) << "Log file : " << g_proxy_config.log_filename_; LOG(INFO) << "Forward IP Header : " - << (g_proxy_config.forward_ip_header_enabled_ ? - g_proxy_config.forward_ip_header_ : "(disabled)"); + << (net::SpdySM::forward_ip_header().length() ? + net::SpdySM::forward_ip_header() : "<disabled>"); LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false"); LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size; LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake; @@ -3307,7 +293,8 @@ int main (int argc, char**argv) << g_proxy_config.ssl_session_expiry_; LOG(INFO) << "SSL disable compression : " << g_proxy_config.ssl_disable_compression_; - LOG(INFO) << "Connection idle timeout : " << g_proxy_config.idle_timeout_s_; + LOG(INFO) << "Connection idle timeout : " + << g_proxy_config.idle_socket_timeout_s_; // Proxy Acceptors while (true) { @@ -3317,13 +304,13 @@ int main (int argc, char**argv) if (!cl.HasSwitch(name.str())) { break; } - string value = cl.GetSwitchValueASCII(name.str()); - vector<std::string> valueArgs = split(value, ','); + std::string value = cl.GetSwitchValueASCII(name.str()); + std::vector<std::string> valueArgs = split(value, ','); CHECK_EQ((unsigned int)9, valueArgs.size()); int spdy_only = atoi(valueArgs[8].c_str()); // If wait_for_iface is enabled, then this call will block // indefinitely until the interface is raised. - g_proxy_config.AddAcceptor(FLIP_HANDLER_PROXY, + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_PROXY, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], valueArgs[4], valueArgs[5], @@ -3338,12 +325,12 @@ int main (int argc, char**argv) } // Spdy Server Acceptor - MemoryCache spdy_memory_cache; + net::MemoryCache spdy_memory_cache; if (cl.HasSwitch("spdy-server")) { spdy_memory_cache.AddFiles(); - string value = cl.GetSwitchValueASCII("spdy-server"); - vector<std::string> valueArgs = split(value, ','); - g_proxy_config.AddAcceptor(FLIP_HANDLER_SPDY_SERVER, + std::string value = cl.GetSwitchValueASCII("spdy-server"); + std::vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_SPDY_SERVER, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], "", "", "", "", @@ -3357,12 +344,12 @@ int main (int argc, char**argv) } // Spdy Server Acceptor - MemoryCache http_memory_cache; + net::MemoryCache http_memory_cache; if (cl.HasSwitch("http-server")) { http_memory_cache.AddFiles(); - string value = cl.GetSwitchValueASCII("http-server"); - vector<std::string> valueArgs = split(value, ','); - g_proxy_config.AddAcceptor(FLIP_HANDLER_HTTP_SERVER, + std::string value = cl.GetSwitchValueASCII("http-server"); + std::vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_HTTP_SERVER, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], "", "", "", "", @@ -3375,13 +362,14 @@ int main (int argc, char**argv) &http_memory_cache); } - vector<SMAcceptorThread*> sm_worker_threads_; + std::vector<net::SMAcceptorThread*> sm_worker_threads_; for (i = 0; i < g_proxy_config.acceptors_.size(); i++) { - FlipAcceptor *acceptor = g_proxy_config.acceptors_[i]; + net::FlipAcceptor *acceptor = g_proxy_config.acceptors_[i]; - sm_worker_threads_.push_back(new SMAcceptorThread(acceptor, - (MemoryCache *)acceptor->memory_cache_)); + sm_worker_threads_.push_back( + new net::SMAcceptorThread(acceptor, + (net::MemoryCache *)acceptor->memory_cache_)); // Note that spdy_memory_cache is not threadsafe, it is merely // thread compatible. Thus, if ever we are to spawn multiple threads, // we either must make the MemoryCache threadsafe, or use @@ -3418,3 +406,4 @@ int main (int argc, char**argv) close(pidfile_fd); return 0; } + diff --git a/net/tools/flip_server/http_interface.cc b/net/tools/flip_server/http_interface.cc new file mode 100644 index 0000000..1202336 --- /dev/null +++ b/net/tools/flip_server/http_interface.cc @@ -0,0 +1,336 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/http_interface.h" + +#include "net/tools/dump_cache/url_utilities.h" +#include "net/tools/flip_server/balsa_frame.h" +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/spdy_util.h" + +namespace net { + +HttpSM::HttpSM(SMConnection* connection, + SMInterface* sm_spdy_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) + : seq_num_(0), + http_framer_(new BalsaFrame), + stream_id_(0), + server_idx_(-1), + connection_(connection), + sm_spdy_interface_(sm_spdy_interface), + output_list_(connection->output_list()), + output_ordering_(connection), + memory_cache_(connection->memory_cache()), + acceptor_(acceptor) { + http_framer_->set_balsa_visitor(this); + http_framer_->set_balsa_headers(&headers_); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) + http_framer_->set_is_request(false); +} +HttpSM::~HttpSM() { + Reset(); + delete http_framer_; +} + +void HttpSM::ProcessBodyData(const char *input, size_t size) { + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process Body Data: stream " + << stream_id_ << ": size " << size; + sm_spdy_interface_->SendDataFrame(stream_id_, input, size, 0, false); + } +} + +void HttpSM::ProcessHeaders(const BalsaHeaders& headers) { + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + std::string host = + UrlUtilities::GetUrlHost(headers.GetHeader("Host").as_string()); + std::string method = headers.request_method().as_string(); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Received Request: " + << headers.request_uri().as_string() << " " << method; + std::string filename = EncodeURL(headers.request_uri().as_string(), + host, method); + NewStream(stream_id_, 0, filename); + stream_id_ += 2; + } else { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Received Response from " + << connection_->server_ip_ << ":" + << connection_->server_port_ << " "; + sm_spdy_interface_->SendSynReply(stream_id_, headers); + } +} + +void HttpSM::MessageDone() { + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone. Sending EOF: " + << "stream " << stream_id_; + sm_spdy_interface_->SendEOF(stream_id_); + } else { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone."; + } +} + +void HttpSM::HandleError() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Error detected"; +} + +void HttpSM::InitSMInterface(SMInterface* sm_spdy_interface, + int32 server_idx) { + sm_spdy_interface_ = sm_spdy_interface; + server_idx_ = server_idx; +} + +void HttpSM::InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " + << "connection."; + connection_->InitSMConnection(connection_pool, + sm_interface, + epoll_server, + fd, + server_ip, + server_port, + remote_ip, + use_ssl); +} + +size_t HttpSM::ProcessReadInput(const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream " + << stream_id_; + return http_framer_->ProcessInput(data, len); +} + +size_t HttpSM::ProcessWriteInput(const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size " + << len << ": stream " << stream_id_; + char * dataPtr = new char[len]; + memcpy(dataPtr, data, len); + DataFrame* data_frame = new DataFrame; + data_frame->data = (const char *)dataPtr; + data_frame->size = len; + data_frame->delete_when_done = true; + connection_->EnqueueDataFrame(data_frame); + return len; +} + +bool HttpSM::MessageFullyRead() const { + return http_framer_->MessageFullyRead(); +} + +bool HttpSM::Error() const { + return http_framer_->Error(); +} + +const char* HttpSM::ErrorAsString() const { + return BalsaFrameEnums::ErrorCodeToString(http_framer_->ErrorCode()); +} + +void HttpSM::Reset() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Reset: stream " + << stream_id_; + http_framer_->Reset(); +} + +void HttpSM::ResetForNewConnection() { + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Server connection closing " + << "to: " << connection_->server_ip_ << ":" + << connection_->server_port_ << " "; + } + seq_num_ = 0; + output_ordering_.Reset(); + http_framer_->Reset(); + if (sm_spdy_interface_) { + sm_spdy_interface_->ResetForNewInterface(server_idx_); + } +} + +void HttpSM::Cleanup() { + if (!(acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER)) { + VLOG(2) << "HttpSM Request Fully Read; stream_id: " << stream_id_; + connection_->Cleanup("request complete"); + } +} + +void HttpSM::NewStream(uint32 stream_id, uint32 priority, + const std::string& filename) { + MemCacheIter mci; + mci.stream_id = stream_id; + mci.priority = priority; + if (!memory_cache_->AssignFileData(filename, &mci)) { + // error creating new stream. + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; + SendErrorNotFound(stream_id); + } else { + AddToOutputOrder(mci); + } +} + +void HttpSM::AddToOutputOrder(const MemCacheIter& mci) { + output_ordering_.AddToOutputOrder(mci); +} + +void HttpSM::SendEOF(uint32 stream_id) { + SendEOFImpl(stream_id); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + sm_spdy_interface_->ResetForNewInterface(server_idx_); + } +} + +void HttpSM::SendErrorNotFound(uint32 stream_id) { + SendErrorNotFoundImpl(stream_id); +} + +void HttpSM::SendOKResponse(uint32 stream_id, std::string* output) { + SendOKResponseImpl(stream_id, output); +} + +size_t HttpSM::SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { + return 0; +} + +size_t HttpSM::SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { + return SendSynReplyImpl(stream_id, headers); +} + +void HttpSM::SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) { + SendDataFrameImpl(stream_id, data, len, flags, compress); +} + +void HttpSM::SendEOFImpl(uint32 stream_id) { + DataFrame* df = new DataFrame; + df->data = "0\r\n\r\n"; + df->size = 5; + df->delete_when_done = false; + EnqueueDataFrame(df); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + Reset(); + } +} + +void HttpSM::SendErrorNotFoundImpl(uint32 stream_id) { + BalsaHeaders my_headers; + my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); + my_headers.RemoveAllOfHeader("content-length"); + my_headers.AppendHeader("transfer-encoding", "chunked"); + SendSynReplyImpl(stream_id, my_headers); + SendDataFrame(stream_id, "page not found", 14, 0, false); + SendEOFImpl(stream_id); + output_ordering_.RemoveStreamId(stream_id); +} + +void HttpSM::SendOKResponseImpl(uint32 stream_id, std::string* output) { + BalsaHeaders my_headers; + my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); + my_headers.RemoveAllOfHeader("content-length"); + my_headers.AppendHeader("transfer-encoding", "chunked"); + SendSynReplyImpl(stream_id, my_headers); + SendDataFrame(stream_id, output->c_str(), output->size(), 0, false); + SendEOFImpl(stream_id); + output_ordering_.RemoveStreamId(stream_id); +} + +size_t HttpSM::SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { + SimpleBuffer sb; + headers.WriteHeaderAndEndingToBuffer(&sb); + DataFrame* df = new DataFrame; + df->size = sb.ReadableBytes(); + char* buffer = new char[df->size]; + df->data = buffer; + df->delete_when_done = true; + sb.Read(buffer, df->size); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " + << stream_id_; + size_t df_size = df->size; + EnqueueDataFrame(df); + return df_size; +} + +size_t HttpSM::SendSynStreamImpl(uint32 stream_id, + const BalsaHeaders& headers) { + SimpleBuffer sb; + headers.WriteHeaderAndEndingToBuffer(&sb); + DataFrame* df = new DataFrame; + df->size = sb.ReadableBytes(); + char* buffer = new char[df->size]; + df->data = buffer; + df->delete_when_done = true; + sb.Read(buffer, df->size); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " + << stream_id_; + size_t df_size = df->size; + EnqueueDataFrame(df); + return df_size; +} + +void HttpSM::SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) { + char chunk_buf[128]; + snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len); + std::string chunk_description(chunk_buf); + DataFrame* df = new DataFrame; + df->size = chunk_description.size() + len + 2; + char* buffer = new char[df->size]; + df->data = buffer; + df->delete_when_done = true; + memcpy(buffer, chunk_description.data(), chunk_description.size()); + memcpy(buffer + chunk_description.size(), data, len); + memcpy(buffer + chunk_description.size() + len, "\r\n", 2); + EnqueueDataFrame(df); +} + +void HttpSM::EnqueueDataFrame(DataFrame* df) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " + << stream_id_; + connection_->EnqueueDataFrame(df); +} + +void HttpSM::GetOutput() { + MemCacheIter* mci = output_ordering_.GetIter(); + if (mci == NULL) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to " + << "output!?: stream " << stream_id_; + return; + } + if (!mci->transformed_header) { + mci->bytes_sent = SendSynReply(mci->stream_id, + *(mci->file_data->headers)); + mci->transformed_header = true; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput transformed " + << "header stream_id: [" << mci->stream_id << "]"; + return; + } + if (mci->body_bytes_consumed >= mci->file_data->body.size()) { + SendEOF(mci->stream_id); + output_ordering_.RemoveStreamId(mci->stream_id); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "GetOutput remove_stream_id: [" + << mci->stream_id << "]"; + return; + } + size_t num_to_write = + mci->file_data->body.size() - mci->body_bytes_consumed; + if (num_to_write > mci->max_segment_size) + num_to_write = mci->max_segment_size; + + SendDataFrame(mci->stream_id, + mci->file_data->body.data() + mci->body_bytes_consumed, + num_to_write, 0, true); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput SendDataFrame[" + << mci->stream_id << "]: " << num_to_write; + mci->body_bytes_consumed += num_to_write; + mci->bytes_sent += num_to_write; +} + +} // namespace net + diff --git a/net/tools/flip_server/http_interface.h b/net/tools/flip_server/http_interface.h new file mode 100644 index 0000000..282923f --- /dev/null +++ b/net/tools/flip_server/http_interface.h @@ -0,0 +1,132 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_HTTP_INTERFACE_ +#define NET_TOOLS_FLIP_SERVER_HTTP_INTERFACE_ + +#include <string> + +#include "net/tools/flip_server/balsa_headers.h" +#include "net/tools/flip_server/balsa_visitor_interface.h" +#include "net/tools/flip_server/output_ordering.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/sm_interface.h" + +namespace net { + +class BalsaFrame; +class DataFrame; +class EpollServer; +class FlipAcceptor; +class MemoryCache; + +class HttpSM : public BalsaVisitorInterface, + public SMInterface { + public: + HttpSM(SMConnection* connection, + SMInterface* sm_spdy_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor); + virtual ~HttpSM(); + + private: + virtual void ProcessBodyInput(const char *input, size_t size) {} + virtual void ProcessBodyData(const char *input, size_t size); + virtual void ProcessHeaderInput(const char *input, size_t size) {} + virtual void ProcessTrailerInput(const char *input, size_t size) {} + virtual void ProcessHeaders(const BalsaHeaders& headers); + virtual void ProcessRequestFirstLine(const char* line_input, + size_t line_length, + const char* method_input, + size_t method_length, + const char* request_uri_input, + size_t request_uri_length, + const char* version_input, + size_t version_length) {} + virtual void ProcessResponseFirstLine(const char *line_input, + size_t line_length, + const char *version_input, + size_t version_length, + const char *status_input, + size_t status_length, + const char *reason_input, + size_t reason_length) {} + virtual void ProcessChunkLength(size_t chunk_length) {} + virtual void ProcessChunkExtensions(const char *input, size_t size) {} + virtual void HeaderDone() {} + virtual void MessageDone(); + virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); } + virtual void HandleHeaderWarning(BalsaFrame* framer) {} + virtual void HandleChunkingError(BalsaFrame* framer) { HandleError(); } + virtual void HandleBodyError(BalsaFrame* framer) { HandleError(); } + + void HandleError(); + + public: + void InitSMInterface(SMInterface* sm_spdy_interface, + int32 server_idx); + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl); + + size_t ProcessReadInput(const char* data, size_t len); + size_t ProcessWriteInput(const char* data, size_t len); + bool MessageFullyRead() const; + void SetStreamID(uint32 stream_id) { stream_id_ = stream_id; } + bool Error() const; + const char* ErrorAsString() const; + void Reset(); + void ResetForNewInterface(int32 server_idx) {} + void ResetForNewConnection(); + void Cleanup(); + int PostAcceptHook() { return 1; } + + void NewStream(uint32 stream_id, uint32 priority, + const std::string& filename); + void AddToOutputOrder(const MemCacheIter& mci); + void SendEOF(uint32 stream_id); + void SendErrorNotFound(uint32 stream_id); + void SendOKResponse(uint32 stream_id, std::string* output); + size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers); + size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers); + void SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress); + BalsaFrame* spdy_framer() { return http_framer_; } + + private: + void SendEOFImpl(uint32 stream_id); + void SendErrorNotFoundImpl(uint32 stream_id); + void SendOKResponseImpl(uint32 stream_id, std::string* output); + size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers); + size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers); + void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress); + void EnqueueDataFrame(DataFrame* df); + void GetOutput(); + private: + uint64 seq_num_; + BalsaFrame* http_framer_; + BalsaHeaders headers_; + uint32 stream_id_; + int32 server_idx_; + + SMConnection* connection_; + SMInterface* sm_spdy_interface_; + OutputList* output_list_; + OutputOrdering output_ordering_; + MemoryCache* memory_cache_; + FlipAcceptor* acceptor_; +}; + +} // namespace + +#endif // NET_TOOLS_FLIP_SERVER_HTTP_INTERFACE_ + diff --git a/net/tools/flip_server/mem_cache.cc b/net/tools/flip_server/mem_cache.cc new file mode 100644 index 0000000..bf06263 --- /dev/null +++ b/net/tools/flip_server/mem_cache.cc @@ -0,0 +1,181 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/mem_cache.h" + +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/stat.h> +#include <sys/types.h> + +#include <deque> + +#include "base/string_piece.h" +#include "net/tools/dump_cache/url_to_filename_encoder.h" +#include "net/tools/dump_cache/url_utilities.h" +#include "net/tools/flip_server/balsa_frame.h" +#include "net/tools/flip_server/balsa_headers.h" + +// The directory where cache locates); +std::string FLAGS_cache_base_dir = "."; + +namespace net { + +void MemoryCache::AddFiles() { + std::deque<std::string> paths; + cwd_ = FLAGS_cache_base_dir; + paths.push_back(cwd_ + "/GET_"); + DIR* current_dir = NULL; + while (!paths.empty()) { + while (current_dir == NULL && !paths.empty()) { + std::string current_dir_name = paths.front(); + VLOG(1) << "Attempting to open dir: \"" << current_dir_name << "\""; + current_dir = opendir(current_dir_name.c_str()); + paths.pop_front(); + + if (current_dir == NULL) { + perror("Unable to open directory. "); + current_dir_name.clear(); + continue; + } + + if (current_dir) { + VLOG(1) << "Succeeded opening"; + for (struct dirent* dir_data = readdir(current_dir); + dir_data != NULL; + dir_data = readdir(current_dir)) { + std::string current_entry_name = + current_dir_name + "/" + dir_data->d_name; + if (dir_data->d_type == DT_REG) { + VLOG(1) << "Found file: " << current_entry_name; + ReadAndStoreFileContents(current_entry_name.c_str()); + } else if (dir_data->d_type == DT_DIR) { + VLOG(1) << "Found subdir: " << current_entry_name; + if (std::string(dir_data->d_name) != "." && + std::string(dir_data->d_name) != "..") { + VLOG(1) << "Adding to search path: " << current_entry_name; + paths.push_front(current_entry_name); + } + } + } + VLOG(1) << "Oops, no data left. Closing dir."; + closedir(current_dir); + current_dir = NULL; + } + } + } +} + +void MemoryCache::ReadToString(const char* filename, std::string* output) { + output->clear(); + int fd = open(filename, 0, "r"); + if (fd == -1) + return; + char buffer[4096]; + ssize_t read_status = read(fd, buffer, sizeof(buffer)); + while (read_status > 0) { + output->append(buffer, static_cast<size_t>(read_status)); + do { + read_status = read(fd, buffer, sizeof(buffer)); + } while (read_status <= 0 && errno == EINTR); + } + close(fd); +} + +void MemoryCache::ReadAndStoreFileContents(const char* filename) { + StoreBodyAndHeadersVisitor visitor; + BalsaFrame framer; + framer.set_balsa_visitor(&visitor); + framer.set_balsa_headers(&(visitor.headers)); + std::string filename_contents; + ReadToString(filename, &filename_contents); + + // Ugly hack to make everything look like 1.1. + if (filename_contents.find("HTTP/1.0") == 0) + filename_contents[7] = '1'; + + size_t pos = 0; + size_t old_pos = 0; + while (true) { + old_pos = pos; + pos += framer.ProcessInput(filename_contents.data() + pos, + filename_contents.size() - pos); + if (framer.Error() || pos == old_pos) { + LOG(ERROR) << "Unable to make forward progress, or error" + " framing file: " << filename; + if (framer.Error()) { + LOG(INFO) << "********************************************ERROR!"; + return; + } + return; + } + if (framer.MessageFullyRead()) { + // If no Content-Length or Transfer-Encoding was captured in the + // file, then the rest of the data is the body. Many of the captures + // from within Chrome don't have content-lengths. + if (!visitor.body.length()) + visitor.body = filename_contents.substr(pos); + break; + } + } + visitor.headers.RemoveAllOfHeader("content-length"); + visitor.headers.RemoveAllOfHeader("transfer-encoding"); + visitor.headers.RemoveAllOfHeader("connection"); + visitor.headers.AppendHeader("transfer-encoding", "chunked"); + visitor.headers.AppendHeader("connection", "keep-alive"); + + // Experiment with changing headers for forcing use of cached + // versions of content. + // TODO(mbelshe) REMOVE ME +#if 0 + // TODO(mbelshe) append current date. + visitor.headers.RemoveAllOfHeader("date"); + if (visitor.headers.HasHeader("expires")) { + visitor.headers.RemoveAllOfHeader("expires"); + visitor.headers.AppendHeader("expires", + "Fri, 30 Aug, 2019 12:00:00 GMT"); + } +#endif + BalsaHeaders* headers = new BalsaHeaders; + headers->CopyFrom(visitor.headers); + std::string filename_stripped = std::string(filename).substr(cwd_.size() + 1); + LOG(INFO) << "Adding file (" << visitor.body.length() << " bytes): " + << filename_stripped; + files_[filename_stripped] = FileData(); + FileData& fd = files_[filename_stripped]; + fd = FileData(headers, visitor.body); + fd.filename = std::string(filename_stripped, + filename_stripped.find_first_of('/')); +} + +FileData* MemoryCache::GetFileData(const std::string& filename) { + Files::iterator fi = files_.end(); + if (filename.compare(filename.length() - 5, 5, ".html", 5) == 0) { + std::string new_filename(filename.data(), filename.size() - 5); + new_filename += ".http"; + fi = files_.find(new_filename); + } + if (fi == files_.end()) + fi = files_.find(filename); + + if (fi == files_.end()) { + return NULL; + } + return &(fi->second); +} + +bool MemoryCache::AssignFileData(const std::string& filename, + MemCacheIter* mci) { + mci->file_data = GetFileData(filename); + if (mci->file_data == NULL) { + LOG(ERROR) << "Could not find file data for " << filename; + return false; + } + return true; +} + +} // namespace net + diff --git a/net/tools/flip_server/mem_cache.h b/net/tools/flip_server/mem_cache.h new file mode 100644 index 0000000..11450ec --- /dev/null +++ b/net/tools/flip_server/mem_cache.h @@ -0,0 +1,152 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_MEM_CACHE_H_ +#define NET_TOOLS_FLIP_SERVER_MEM_CACHE_H_ + +#include <map> +#include <string> +#include <vector> + +#include "net/tools/flip_server/balsa_headers.h" +#include "net/tools/flip_server/balsa_visitor_interface.h" +#include "net/tools/flip_server/constants.h" + +namespace net { + +class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface { + public: + BalsaHeaders headers; + std::string body; + bool error_; + + virtual void ProcessBodyInput(const char *input, size_t size) {} + virtual void ProcessBodyData(const char *input, size_t size) { + body.append(input, size); + } + virtual void ProcessHeaderInput(const char *input, size_t size) {} + virtual void ProcessTrailerInput(const char *input, size_t size) {} + virtual void ProcessHeaders(const BalsaHeaders& headers) { + // nothing to do here-- we're assuming that the BalsaFrame has + // been handed our headers. + } + virtual void ProcessRequestFirstLine(const char* line_input, + size_t line_length, + const char* method_input, + size_t method_length, + const char* request_uri_input, + size_t request_uri_length, + const char* version_input, + size_t version_length) {} + virtual void ProcessResponseFirstLine(const char *line_input, + size_t line_length, + const char *version_input, + size_t version_length, + const char *status_input, + size_t status_length, + const char *reason_input, + size_t reason_length) {} + virtual void ProcessChunkLength(size_t chunk_length) {} + virtual void ProcessChunkExtensions(const char *input, size_t size) {} + virtual void HeaderDone() {} + virtual void MessageDone() {} + virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); } + virtual void HandleHeaderWarning(BalsaFrame* framer) { HandleError(); } + virtual void HandleChunkingError(BalsaFrame* framer) { HandleError(); } + virtual void HandleBodyError(BalsaFrame* framer) { HandleError(); } + + void HandleError() { error_ = true; } +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct FileData { + void CopyFrom(const FileData& file_data) { + headers = new BalsaHeaders; + headers->CopyFrom(*(file_data.headers)); + filename = file_data.filename; + related_files = file_data.related_files; + body = file_data.body; + } + FileData(BalsaHeaders* h, const std::string& b) : headers(h), body(b) {} + FileData() {} + + BalsaHeaders* headers; + std::string filename; + // priority, filename + std::vector< std::pair<int, std::string> > related_files; + std::string body; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class MemCacheIter { + public: + MemCacheIter() : + file_data(NULL), + priority(0), + transformed_header(false), + body_bytes_consumed(0), + stream_id(0), + max_segment_size(kInitialDataSendersThreshold), + bytes_sent(0) {} + explicit MemCacheIter(FileData* fd) : + file_data(fd), + priority(0), + transformed_header(false), + body_bytes_consumed(0), + stream_id(0), + max_segment_size(kInitialDataSendersThreshold), + bytes_sent(0) {} + FileData* file_data; + int priority; + bool transformed_header; + size_t body_bytes_consumed; + uint32 stream_id; + uint32 max_segment_size; + size_t bytes_sent; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class MemoryCache { + public: + typedef std::map<std::string, FileData> Files; + + public: + Files files_; + std::string cwd_; + + void CloneFrom(const MemoryCache& mc) { + for (Files::const_iterator i = mc.files_.begin(); + i != mc.files_.end(); + ++i) { + Files::iterator out_i = + files_.insert(make_pair(i->first, FileData())).first; + out_i->second.CopyFrom(i->second); + cwd_ = mc.cwd_; + } + } + + void AddFiles(); + + void ReadToString(const char* filename, std::string* output); + + void ReadAndStoreFileContents(const char* filename); + + FileData* GetFileData(const std::string& filename); + + bool AssignFileData(const std::string& filename, MemCacheIter* mci); +}; + +class NotifierInterface { + public: + virtual ~NotifierInterface() {} + virtual void Notify() = 0; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_MEM_CACHE_H_ + diff --git a/net/tools/flip_server/output_ordering.cc b/net/tools/flip_server/output_ordering.cc new file mode 100644 index 0000000..101f668 --- /dev/null +++ b/net/tools/flip_server/output_ordering.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/output_ordering.h" + +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/sm_connection.h" + + +namespace net { + +// static +double OutputOrdering::server_think_time_in_s_ = 0.0; + +OutputOrdering::OutputOrdering(SMConnectionInterface* connection) + : first_data_senders_threshold_(kInitialDataSendersThreshold), + connection_(connection) { + if (connection) + epoll_server_ = connection->epoll_server(); +} + +void OutputOrdering::Reset() { + while (!stream_ids_.empty()) { + StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin(); + PriorityMapPointer& pmp = sitpmi->second; + if (pmp.alarm_enabled) { + epoll_server_->UnregisterAlarm(pmp.alarm_token); + } + stream_ids_.erase(sitpmi); + } + priority_map_.clear(); + first_data_senders_.clear(); +} + +bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) { + StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); + return sitpmi != stream_ids_.end(); +} + +void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { + VLOG(2) << "Moving to active!"; + first_data_senders_.push_back(mci); + pmp->ring = &first_data_senders_; + pmp->it = first_data_senders_.end(); + --pmp->it; + connection_->ReadyToSend(); +} + +void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) { + if (ExistsInPriorityMaps(mci.stream_id)) + LOG(ERROR) << "OOps, already was inserted here?!"; + + double think_time_in_s = server_think_time_in_s_; + std::string x_server_latency = + mci.file_data->headers->GetHeader("X-Server-Latency").as_string(); + if (x_server_latency.size() != 0) { + char* endp; + double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); + if (endp != x_server_latency.c_str() + x_server_latency.size()) { + LOG(ERROR) << "Unable to understand X-Server-Latency of: " + << x_server_latency << " for resource: " + << mci.file_data->filename.c_str(); + } else { + think_time_in_s = tmp_think_time_in_s; + } + } + StreamIdToPriorityMap::iterator sitpmi; + sitpmi = stream_ids_.insert( + std::pair<uint32, PriorityMapPointer>(mci.stream_id, + PriorityMapPointer())).first; + PriorityMapPointer& pmp = sitpmi->second; + + BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); + VLOG(1) << "Server think time: " << think_time_in_s; + epoll_server_->RegisterAlarmApproximateDelta( + think_time_in_s * 1000000, boa); +} + +void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) { + MemCacheIter& mci = *pri; + PriorityMap::iterator pmi = priority_map_.find(mci.priority); + if (pmi == priority_map_.end()) { + pmi = priority_map_.insert( + std::pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first; + } + + pmi->second.splice(pmi->second.end(), + first_data_senders_, + pri); + StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id); + sitpmi->second.ring = &(pmi->second); +} + +MemCacheIter* OutputOrdering::GetIter() { + while (!first_data_senders_.empty()) { + MemCacheIter& mci = first_data_senders_.front(); + if (mci.bytes_sent >= first_data_senders_threshold_) { + SpliceToPriorityRing(first_data_senders_.begin()); + } else { + first_data_senders_.splice(first_data_senders_.end(), + first_data_senders_, + first_data_senders_.begin()); + mci.max_segment_size = kInitialDataSendersThreshold; + return &mci; + } + } + while (!priority_map_.empty()) { + PriorityRing& first_ring = priority_map_.begin()->second; + if (first_ring.empty()) { + priority_map_.erase(priority_map_.begin()); + continue; + } + MemCacheIter& mci = first_ring.front(); + first_ring.splice(first_ring.end(), + first_ring, + first_ring.begin()); + mci.max_segment_size = kSpdySegmentSize; + return &mci; + } + return NULL; +} + +void OutputOrdering::RemoveStreamId(uint32 stream_id) { + StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); + if (sitpmi == stream_ids_.end()) + return; + + PriorityMapPointer& pmp = sitpmi->second; + if (pmp.alarm_enabled) + epoll_server_->UnregisterAlarm(pmp.alarm_token); + else + pmp.ring->erase(pmp.it); + stream_ids_.erase(sitpmi); +} + +} // namespace net + diff --git a/net/tools/flip_server/output_ordering.h b/net/tools/flip_server/output_ordering.h new file mode 100644 index 0000000..14c1587 --- /dev/null +++ b/net/tools/flip_server/output_ordering.h @@ -0,0 +1,103 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_OUTPUT_ORDERING_H_ +#define NET_TOOLS_FLIP_SERVER_OUTPUT_ORDERING_H_ + +#include <list> +#include <map> +#include <string> + +#include "base/basictypes.h" +#include "net/tools/flip_server/constants.h" +#include "net/tools/flip_server/epoll_server.h" +#include "net/tools/flip_server/mem_cache.h" + +namespace net { + +class SMConnectionInterface; + +class OutputOrdering { + public: + typedef std::list<MemCacheIter> PriorityRing; + typedef std::map<uint32, PriorityRing> PriorityMap; + + struct PriorityMapPointer { + PriorityMapPointer(): ring(NULL), alarm_enabled(false) {} + PriorityRing* ring; + PriorityRing::iterator it; + bool alarm_enabled; + EpollServer::AlarmRegToken alarm_token; + }; + + typedef std::map<uint32, PriorityMapPointer> StreamIdToPriorityMap; + + StreamIdToPriorityMap stream_ids_; + PriorityMap priority_map_; + PriorityRing first_data_senders_; + uint32 first_data_senders_threshold_; // when you've passed this, you're no + // longer a first_data_sender... + SMConnectionInterface* connection_; + EpollServer* epoll_server_; + + explicit OutputOrdering(SMConnectionInterface* connection); + void Reset(); + bool ExistsInPriorityMaps(uint32 stream_id); + + struct BeginOutputtingAlarm : public EpollAlarmCallbackInterface { + public: + BeginOutputtingAlarm(OutputOrdering* oo, + OutputOrdering::PriorityMapPointer* pmp, + const MemCacheIter& mci) : + output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {} + + int64 OnAlarm() { + OnUnregistration(); + output_ordering_->MoveToActive(pmp_, mci_); + VLOG(2) << "ON ALARM! Should now start to output..."; + delete this; + return 0; + } + void OnRegistration(const EpollServer::AlarmRegToken& tok, + EpollServer* eps) { + epoll_server_ = eps; + pmp_->alarm_token = tok; + pmp_->alarm_enabled = true; + } + void OnUnregistration() { + pmp_->alarm_enabled = false; + } + void OnShutdown(EpollServer* eps) { + OnUnregistration(); + } + ~BeginOutputtingAlarm() { + if (epoll_server_ && pmp_->alarm_enabled) + epoll_server_->UnregisterAlarm(pmp_->alarm_token); + } + private: + OutputOrdering* output_ordering_; + OutputOrdering::PriorityMapPointer* pmp_; + MemCacheIter mci_; + EpollServer* epoll_server_; + }; + + void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci); + void AddToOutputOrder(const MemCacheIter& mci); + void SpliceToPriorityRing(PriorityRing::iterator pri); + MemCacheIter* GetIter(); + void RemoveStreamId(uint32 stream_id); + + static double server_think_time_in_s() { return server_think_time_in_s_; } + static void set_server_think_time_in_s(double value) { + server_think_time_in_s_ = value; + } + + private: + static double server_think_time_in_s_; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_OUTPUT_ORDERING_H_ + diff --git a/net/tools/flip_server/sm_connection.cc b/net/tools/flip_server/sm_connection.cc new file mode 100644 index 0000000..be1e8db --- /dev/null +++ b/net/tools/flip_server/sm_connection.cc @@ -0,0 +1,648 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/sm_connection.h" + +#include <errno.h> +#include <netinet/tcp.h> +#include <sys/socket.h> + +#include <list> +#include <string> + +#include "net/tools/flip_server/constants.h" +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/http_interface.h" +#include "net/tools/flip_server/spdy_interface.h" +#include "net/tools/flip_server/spdy_ssl.h" +#include "net/tools/flip_server/streamer_interface.h" + +namespace net { + +// static +bool SMConnection::force_spdy_ = false; + +SMConnection::SMConnection(EpollServer* epoll_server, + SSLState* ssl_state, + MemoryCache* memory_cache, + FlipAcceptor* acceptor, + std::string log_prefix) + : last_read_time_(0), + fd_(-1), + events_(0), + registered_in_epoll_server_(false), + initialized_(false), + protocol_detected_(false), + connection_complete_(false), + connection_pool_(NULL), + epoll_server_(epoll_server), + ssl_state_(ssl_state), + memory_cache_(memory_cache), + acceptor_(acceptor), + read_buffer_(kSpdySegmentSize * 40), + sm_spdy_interface_(NULL), + sm_http_interface_(NULL), + sm_streamer_interface_(NULL), + sm_interface_(NULL), + log_prefix_(log_prefix), + max_bytes_sent_per_dowrite_(4096), + ssl_(NULL) { +} + +SMConnection::~SMConnection() { + if (initialized()) + Reset(); +} + +void SMConnection::ReadyToSend() { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Setting ready to send: EPOLLIN | EPOLLOUT"; + epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); +} + +void SMConnection::EnqueueDataFrame(DataFrame* df) { + output_list_.push_back(df); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " + << "size = " << df->size << ": Setting FD ready."; + ReadyToSend(); +} + +void SMConnection::InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl) { + if (initialized_) { + LOG(FATAL) << "Attempted to initialize already initialized server"; + return; + } + + client_ip_ = remote_ip; + + if (fd == -1) { + // If fd == -1, then we are initializing a new connection that will + // connect to the backend. + // + // ret: -1 == error + // 0 == connection in progress + // 1 == connection complete + // TODO(kelindsay): is_numeric_host_address value needs to be detected + server_ip_ = server_ip; + server_port_ = server_port; + int ret = CreateConnectedSocket(&fd_, + server_ip, + server_port, + true, + acceptor_->disable_nagle_); + + if (ret < 0) { + LOG(ERROR) << "-1 Could not create connected socket"; + return; + } else if (ret == 1) { + DCHECK_NE(-1, fd_); + connection_complete_ = true; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connection complete to: " << server_ip_ << ":" + << server_port_ << " "; + } + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connecting to server: " << server_ip_ << ":" + << server_port_ << " "; + } else { + // If fd != -1 then we are initializing a connection that has just been + // accepted from the listen socket. + connection_complete_ = true; + if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { + epoll_server_->UnregisterFD(fd_); + } + if (fd_ != -1) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Closing pre-existing fd"; + close(fd_); + fd_ = -1; + } + + fd_ = fd; + } + + registered_in_epoll_server_ = false; + // Set the last read time here as the idle checker will start from + // now. + last_read_time_ = time(NULL); + initialized_ = true; + + connection_pool_ = connection_pool; + epoll_server_ = epoll_server; + + if (sm_interface) { + sm_interface_ = sm_interface; + protocol_detected_ = true; + } + + read_buffer_.Clear(); + + epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET); + + if (use_ssl) { + ssl_ = CreateSSLContext(ssl_state_->ssl_ctx); + SSL_set_fd(ssl_, fd_); + PrintSslError(); + } +} + +void SMConnection::CorkSocket() { + int state = 1; + int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); + if (rv < 0) + VLOG(1) << "setsockopt(CORK): " << errno; +} + +void SMConnection::UncorkSocket() { + int state = 0; + int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); + if (rv < 0) + VLOG(1) << "setsockopt(CORK): " << errno; +} + +int SMConnection::Send(const char* data, int len, int flags) { + int rv = 0; + CorkSocket(); + if (ssl_) { + ssize_t bytes_written = 0; + // Write smallish chunks to SSL so that we don't have large + // multi-packet TLS records to receive before being able to handle + // the data. We don't have to be too careful here, because our data + // frames are already getting chunked appropriately, and those are + // the most likely "big" frames. + while (len > 0) { + const int kMaxTLSRecordSize = 1500; + const char* ptr = &(data[bytes_written]); + int chunksize = std::min(len, kMaxTLSRecordSize); + rv = SSL_write(ssl_, ptr, chunksize); + VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv; + if (rv <= 0) { + switch (SSL_get_error(ssl_, rv)) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_ACCEPT: + case SSL_ERROR_WANT_CONNECT: + rv = -2; + break; + default: + PrintSslError(); + break; + } + break; + } + bytes_written += rv; + len -= rv; + if (rv != chunksize) + break; // If we couldn't write everything, we're implicitly stalled + } + // If we wrote some data, return that count. Otherwise + // return the stall error. + if (bytes_written > 0) + rv = bytes_written; + } else { + rv = send(fd_, data, len, flags); + } + if (!(flags & MSG_MORE)) + UncorkSocket(); + return rv; +} + +void SMConnection::OnEvent(int fd, EpollEvent* event) { + events_ |= event->in_events; + HandleEvents(); + if (events_) { + event->out_ready_mask = events_; + events_ = 0; + } +} +void SMConnection::OnShutdown(EpollServer* eps, int fd) { + Cleanup("OnShutdown"); + return; +} + +void SMConnection::Cleanup(const char* cleanup) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup; + if (!initialized_) + return; + Reset(); + if (connection_pool_) + connection_pool_->SMConnectionDone(this); + if (sm_interface_) + sm_interface_->ResetForNewConnection(); + last_read_time_ = 0; +} + +void SMConnection::HandleEvents() { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " + << EpollServer::EventMaskToString(events_).c_str(); + + if (events_ & EPOLLIN) { + if (!DoRead()) + goto handle_close_or_error; + } + + if (events_ & EPOLLOUT) { + // Check if we have connected or not + if (connection_complete_ == false) { + int sock_error; + socklen_t sock_error_len = sizeof(sock_error); + int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, + &sock_error_len); + if (ret != 0) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "getsockopt error: " << errno << ": " << strerror(errno); + goto handle_close_or_error; + } + if (sock_error == 0) { + connection_complete_ = true; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connection complete to " << server_ip_ << ":" + << server_port_ << " "; + } else if (sock_error == EINPROGRESS) { + return; + } else { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "error connecting to server"; + goto handle_close_or_error; + } + } + if (!DoWrite()) + goto handle_close_or_error; + } + + if (events_ & (EPOLLHUP | EPOLLERR)) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR"; + goto handle_close_or_error; + } + return; + + handle_close_or_error: + Cleanup("HandleEvents"); +} + +// Decide if SPDY was negotiated. +bool SMConnection::WasSpdyNegotiated() { + if (force_spdy()) + return true; + + // If this is an SSL connection, check if NPN specifies SPDY. + if (ssl_) { + const unsigned char *npn_proto; + unsigned int npn_proto_len; + SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len); + if (npn_proto_len > 0) { + std::string npn_proto_str((const char *)npn_proto, npn_proto_len); + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "NPN protocol detected: " << npn_proto_str; + if (!strncmp(reinterpret_cast<const char*>(npn_proto), + "spdy/2", npn_proto_len)) + return true; + } + } + + return false; +} + +bool SMConnection::SetupProtocolInterfaces() { + DCHECK(!protocol_detected_); + protocol_detected_ = true; + + bool spdy_negotiated = WasSpdyNegotiated(); + bool using_ssl = ssl_ != NULL; + + if (using_ssl) + VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated") + << " SSL Session."; + + if (acceptor_->spdy_only_ && !spdy_negotiated) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "SPDY proxy only, closing HTTPS connection."; + return false; + } + + switch (acceptor_->flip_handler_type_) { + case FLIP_HANDLER_HTTP_SERVER: + { + DCHECK(!spdy_negotiated); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << (sm_http_interface_ ? "Creating" : "Reusing") + << " HTTP interface."; + if (!sm_http_interface_) + sm_http_interface_ = new HttpSM(this, + NULL, + epoll_server_, + memory_cache_, + acceptor_); + sm_interface_ = sm_http_interface_; + } + break; + case FLIP_HANDLER_PROXY: + { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << (sm_streamer_interface_ ? "Creating" : "Reusing") + << " PROXY Streamer interface."; + if (!sm_streamer_interface_) + sm_streamer_interface_ = new StreamerSM(this, + NULL, + epoll_server_, + acceptor_); + sm_interface_ = sm_streamer_interface_; + // If spdy is not negotiated, the streamer interface will proxy all + // data to the origin server. + if (!spdy_negotiated) + break; + } + // Otherwise fall through into the case below. + case FLIP_HANDLER_SPDY_SERVER: + { + DCHECK(spdy_negotiated); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << (sm_spdy_interface_ ? "Creating" : "Reusing") + << " SPDY interface."; + if (!sm_spdy_interface_) + sm_spdy_interface_ = new SpdySM(this, + NULL, + epoll_server_, + memory_cache_, + acceptor_); + sm_interface_ = sm_spdy_interface_; + } + break; + } + + CorkSocket(); + if (!sm_interface_->PostAcceptHook()) + return false; + + return true; +} + +bool SMConnection::DoRead() { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()"; + while (!read_buffer_.Full()) { + char* bytes; + int size; + if (fd_ == -1) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoRead(): fd_ == -1. Invalid FD. Returning false"; + return false; + } + read_buffer_.GetWritablePtr(&bytes, &size); + ssize_t bytes_read = 0; + if (ssl_) { + bytes_read = SSL_read(ssl_, bytes, size); + if (bytes_read < 0) { + int err = SSL_get_error(ssl_, bytes_read); + switch (err) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_ACCEPT: + case SSL_ERROR_WANT_CONNECT: + events_ &= ~EPOLLIN; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoRead: SSL WANT_XXX: " << err; + goto done; + default: + PrintSslError(); + goto error_or_close; + } + } + } else { + bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT); + } + int stored_errno = errno; + if (bytes_read == -1) { + switch (stored_errno) { + case EAGAIN: + events_ &= ~EPOLLIN; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EAGAIN while reading"; + goto done; + case EINTR: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EINTR while reading"; + continue; + default: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "While calling recv, got error: " + << (ssl_?"(ssl error)":strerror(stored_errno)); + goto error_or_close; + } + } else if (bytes_read > 0) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read + << " bytes"; + last_read_time_ = time(NULL); + // If the protocol hasn't been detected yet, set up the handlers + // we'll need. + if (!protocol_detected_) { + if (!SetupProtocolInterfaces()) { + LOG(ERROR) << "Error setting up protocol interfaces."; + goto error_or_close; + } + } + read_buffer_.AdvanceWritablePtr(bytes_read); + if (!DoConsumeReadData()) + goto error_or_close; + continue; + } else { // bytes_read == 0 + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "0 bytes read with recv call."; + } + goto error_or_close; + } + done: + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!"; + return true; + + error_or_close: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoRead(): error_or_close. " + << "Cleaning up, then returning false"; + Cleanup("DoRead"); + return false; +} + +bool SMConnection::DoConsumeReadData() { + char* bytes; + int size; + read_buffer_.GetReadablePtr(&bytes, &size); + while (size != 0) { + size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed " + << bytes_consumed << " bytes"; + if (bytes_consumed == 0) { + break; + } + read_buffer_.AdvanceReadablePtr(bytes_consumed); + if (sm_interface_->MessageFullyRead()) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "HandleRequestFullyRead: Setting EPOLLOUT"; + HandleResponseFullyRead(); + events_ |= EPOLLOUT; + } else if (sm_interface_->Error()) { + LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Framer error detected: Setting EPOLLOUT: " + << sm_interface_->ErrorAsString(); + // this causes everything to be closed/cleaned up. + events_ |= EPOLLOUT; + return false; + } + read_buffer_.GetReadablePtr(&bytes, &size); + } + return true; +} + +void SMConnection::HandleResponseFullyRead() { + sm_interface_->Cleanup(); +} + +bool SMConnection::DoWrite() { + size_t bytes_sent = 0; + int flags = MSG_NOSIGNAL | MSG_DONTWAIT; + if (fd_ == -1) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoWrite: fd == -1. Returning false."; + return false; + } + if (output_list_.empty()) { + VLOG(2) << log_prefix_ << "DoWrite: Output list empty."; + if (sm_interface_) { + sm_interface_->GetOutput(); + } + if (output_list_.empty()) { + events_ &= ~EPOLLOUT; + } + } + while (!output_list_.empty()) { + VLOG(2) << log_prefix_ << "DoWrite: Items in output list: " + << output_list_.size(); + if (bytes_sent >= max_bytes_sent_per_dowrite_) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << " byte sent >= max bytes sent per write: Setting EPOLLOUT: " + << bytes_sent; + events_ |= EPOLLOUT; + break; + } + if (sm_interface_ && output_list_.size() < 2) { + sm_interface_->GetOutput(); + } + DataFrame* data_frame = output_list_.front(); + const char* bytes = data_frame->data; + int size = data_frame->size; + bytes += data_frame->index; + size -= data_frame->index; + DCHECK_GE(size, 0); + if (size <= 0) { + output_list_.pop_front(); + delete data_frame; + continue; + } + + flags = MSG_NOSIGNAL | MSG_DONTWAIT; + // Look for a queue size > 1 because |this| frame is remains on the list + // until it has finished sending. + if (output_list_.size() > 1) { + VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() + << ": Adding MSG_MORE flag"; + flags |= MSG_MORE; + } + VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; + ssize_t bytes_written = Send(bytes, size, flags); + int stored_errno = errno; + if (bytes_written == -1) { + switch (stored_errno) { + case EAGAIN: + events_ &= ~EPOLLOUT; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EAGAIN while writing"; + goto done; + case EINTR: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EINTR while writing"; + continue; + default: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "While calling send, got error: " << stored_errno + << ": " << (ssl_?"":strerror(stored_errno)); + goto error_or_close; + } + } else if (bytes_written > 0) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " + << bytes_written << " bytes"; + data_frame->index += bytes_written; + bytes_sent += bytes_written; + continue; + } else if (bytes_written == -2) { + // -2 handles SSL_ERROR_WANT_* errors + events_ &= ~EPOLLOUT; + goto done; + } + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "0 bytes written with send call."; + goto error_or_close; + } + done: + UncorkSocket(); + return true; + + error_or_close: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoWrite: error_or_close. Returning false " + << "after cleaning up"; + Cleanup("DoWrite"); + UncorkSocket(); + return false; +} + +void SMConnection::Reset() { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; + if (ssl_) { + SSL_shutdown(ssl_); + PrintSslError(); + SSL_free(ssl_); + PrintSslError(); + ssl_ = NULL; + } + if (registered_in_epoll_server_) { + epoll_server_->UnregisterFD(fd_); + registered_in_epoll_server_ = false; + } + if (fd_ >= 0) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; + close(fd_); + fd_ = -1; + } + read_buffer_.Clear(); + initialized_ = false; + protocol_detected_ = false; + events_ = 0; + for (std::list<DataFrame*>::iterator i = + output_list_.begin(); + i != output_list_.end(); + ++i) { + delete *i; + } + output_list_.clear(); +} + +// static +SMConnection* SMConnection::NewSMConnection(EpollServer* epoll_server, + SSLState *ssl_state, + MemoryCache* memory_cache, + FlipAcceptor *acceptor, + std::string log_prefix) { + return new SMConnection(epoll_server, ssl_state, memory_cache, + acceptor, log_prefix); +} + +} // namespace net + + diff --git a/net/tools/flip_server/sm_connection.h b/net/tools/flip_server/sm_connection.h new file mode 100644 index 0000000..f62e86c --- /dev/null +++ b/net/tools/flip_server/sm_connection.h @@ -0,0 +1,167 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_SM_CONNECTION_H_ +#define NET_TOOLS_FLIP_SERVER_SM_CONNECTION_H_ + +#include <arpa/inet.h> // in_addr_t +#include <time.h> + +#include <list> +#include <string> + +#include "net/tools/flip_server/create_listener.h" +#include "net/tools/flip_server/epoll_server.h" +#include "net/tools/flip_server/mem_cache.h" +#include "net/tools/flip_server/ring_buffer.h" +#include "net/tools/flip_server/sm_interface.h" +#include "openssl/ssl.h" + +namespace net { + +class FlipAcceptor; +class MemoryCache; +class SSLState; + +// A frame of data to be sent. +class DataFrame { + public: + const char* data; + size_t size; + bool delete_when_done; + size_t index; + DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {} + virtual ~DataFrame() { + if (delete_when_done) + delete[] data; + } +}; + +typedef std::list<DataFrame*> OutputList; + +class SMConnection : public SMConnectionInterface, + public EpollCallbackInterface, + public NotifierInterface { + public: + virtual ~SMConnection(); + + static SMConnection* NewSMConnection(EpollServer* epoll_server, + SSLState *ssl_state, + MemoryCache* memory_cache, + FlipAcceptor *acceptor, + std::string log_prefix); + + // TODO(mbelshe): Make these private. + time_t last_read_time_; + std::string server_ip_; + std::string server_port_; + + EpollServer* epoll_server() { return epoll_server_; } + OutputList* output_list() { return &output_list_; } + MemoryCache* memory_cache() { return memory_cache_; } + void ReadyToSend(); + void EnqueueDataFrame(DataFrame* df); + + int fd() const { return fd_; } + bool initialized() const { return initialized_; } + std::string client_ip() const { return client_ip_; } + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl); + + void CorkSocket(); + void UncorkSocket(); + + int Send(const char* data, int len, int flags); + + // EpollCallbackInterface interface. + virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { + registered_in_epoll_server_ = true; + } + virtual void OnModification(int fd, int event_mask) {} + virtual void OnEvent(int fd, EpollEvent* event); + virtual void OnUnregistration(int fd, bool replaced) { + registered_in_epoll_server_ = false; + } + virtual void OnShutdown(EpollServer* eps, int fd); + + // NotifierInterface interface. + virtual void Notify() {} + + void Cleanup(const char* cleanup); + + // Flag indicating if we should force spdy on all connections. + static bool force_spdy() { return force_spdy_; } + static void set_force_spdy(bool value) { force_spdy_ = value; } + + private: + // Decide if SPDY was negotiated. + bool WasSpdyNegotiated(); + + // Initialize the protocol interfaces we'll need for this connection. + // Returns true if successful, false otherwise. + bool SetupProtocolInterfaces(); + + bool DoRead(); + bool DoWrite(); + bool DoConsumeReadData(); + void Reset(); + + void HandleEvents(); + void HandleResponseFullyRead(); + + protected: + friend std::ostream& operator<<(std::ostream& os, const SMConnection& c) { + os << &c << "\n"; + return os; + } + + private: + SMConnection(EpollServer* epoll_server, + SSLState* ssl_state, + MemoryCache* memory_cache, + FlipAcceptor* acceptor, + std::string log_prefix); + int fd_; + int events_; + + bool registered_in_epoll_server_; + bool initialized_; + bool protocol_detected_; + bool connection_complete_; + + SMConnectionPoolInterface* connection_pool_; + + EpollServer *epoll_server_; + SSLState *ssl_state_; + MemoryCache* memory_cache_; + FlipAcceptor *acceptor_; + std::string client_ip_; + + RingBuffer read_buffer_; + + OutputList output_list_; + SMInterface* sm_spdy_interface_; + SMInterface* sm_http_interface_; + SMInterface* sm_streamer_interface_; + SMInterface* sm_interface_; + std::string log_prefix_; + + size_t max_bytes_sent_per_dowrite_; + + SSL* ssl_; + + static bool force_spdy_; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_SM_CONNECTION_H_ + diff --git a/net/tools/flip_server/sm_interface.h b/net/tools/flip_server/sm_interface.h new file mode 100644 index 0000000..29196cf --- /dev/null +++ b/net/tools/flip_server/sm_interface.h @@ -0,0 +1,80 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_SM_INTERFACE_H_ +#define NET_TOOLS_FLIP_SERVER_SM_INTERFACE_H_ + +// State Machine Interfaces + +#include <string> + +#include "net/tools/flip_server/balsa_headers.h" + +namespace net { + +class EpollServer; +class SMConnectionPoolInterface; +class SMConnection; + +class SMInterface { + public: + virtual void InitSMInterface(SMInterface* sm_other_interface, + int32 server_idx) = 0; + virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl) = 0; + virtual size_t ProcessReadInput(const char* data, size_t len) = 0; + virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; + virtual void SetStreamID(uint32 stream_id) = 0; + virtual bool MessageFullyRead() const = 0; + virtual bool Error() const = 0; + virtual const char* ErrorAsString() const = 0; + virtual void Reset() = 0; + virtual void ResetForNewInterface(int32 server_idx) = 0; + // ResetForNewConnection is used for interfaces which control SMConnection + // objects. When called an interface may put its connection object into + // a reusable instance pool. Currently this is what the HttpSM interface + // does. + virtual void ResetForNewConnection() = 0; + virtual void Cleanup() = 0; + + virtual int PostAcceptHook() = 0; + + virtual void NewStream(uint32 stream_id, uint32 priority, + const std::string& filename) = 0; + virtual void SendEOF(uint32 stream_id) = 0; + virtual void SendErrorNotFound(uint32 stream_id) = 0; + virtual size_t SendSynStream(uint32 stream_id, + const BalsaHeaders& headers) = 0; + virtual size_t SendSynReply(uint32 stream_id, + const BalsaHeaders& headers) = 0; + virtual void SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) = 0; + virtual void GetOutput() = 0; + + virtual ~SMInterface() {} +}; + +class SMConnectionInterface { + public: + virtual ~SMConnectionInterface() {} + virtual void ReadyToSend() = 0; + virtual EpollServer* epoll_server() = 0; +}; + +class SMConnectionPoolInterface { + public: + virtual ~SMConnectionPoolInterface() {} + virtual void SMConnectionDone(SMConnection* connection) = 0; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_SM_INTERFACE_H_ + diff --git a/net/tools/flip_server/spdy_interface.cc b/net/tools/flip_server/spdy_interface.cc new file mode 100644 index 0000000..c6149b8 --- /dev/null +++ b/net/tools/flip_server/spdy_interface.cc @@ -0,0 +1,591 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/spdy_interface.h" + +#include <string> + +#include "net/spdy/spdy_framer.h" +#include "net/spdy/spdy_protocol.h" +#include "net/tools/dump_cache/url_utilities.h" +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/http_interface.h" +#include "net/tools/flip_server/spdy_util.h" + +using spdy::kSpdyStreamMaximumWindowSize; +using spdy::CONTROL_FLAG_NONE; +using spdy::DATA_FLAG_COMPRESSED; +using spdy::DATA_FLAG_FIN; +using spdy::RST_STREAM; +using spdy::SETTINGS_MAX_CONCURRENT_STREAMS; +using spdy::SYN_REPLY; +using spdy::SYN_STREAM; +using spdy::SettingsFlagsAndId; +using spdy::SpdyControlFrame; +using spdy::SpdySettingsControlFrame; +using spdy::SpdyDataFlags; +using spdy::SpdyDataFrame; +using spdy::SpdyRstStreamControlFrame; +using spdy::SpdyFrame; +using spdy::SpdyFramer; +using spdy::SpdyFramerVisitorInterface; +using spdy::SpdyHeaderBlock; +using spdy::SpdySetting; +using spdy::SpdySettings; +using spdy::SpdyStreamId; +using spdy::SpdySynReplyControlFrame; +using spdy::SpdySynStreamControlFrame; + +namespace net { + +// static +bool SpdySM::disable_data_compression_ = true; +// static +std::string SpdySM::forward_ip_header_; + +class SpdyFrameDataFrame : public DataFrame { + public: + SpdyFrameDataFrame(SpdyFrame* spdy_frame) + : frame(spdy_frame) { + data = spdy_frame->data(); + size = spdy_frame->length() + SpdyFrame::size(); + } + + virtual ~SpdyFrameDataFrame() { + delete frame; + } + + const SpdyFrame* frame; +}; + +SpdySM::SpdySM(SMConnection* connection, + SMInterface* sm_http_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) + : seq_num_(0), + spdy_framer_(new SpdyFramer), + valid_spdy_session_(false), + connection_(connection), + client_output_list_(connection->output_list()), + client_output_ordering_(connection), + next_outgoing_stream_id_(2), + epoll_server_(epoll_server), + acceptor_(acceptor), + memory_cache_(memory_cache), + close_on_error_(false) { + spdy_framer_->set_visitor(this); +} + +SpdySM::~SpdySM() { + delete spdy_framer_; +} + +void SpdySM::InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: Initializing server connection."; + connection_->InitSMConnection(connection_pool, sm_interface, + epoll_server, fd, server_ip, server_port, + remote_ip, use_ssl); +} + +SMInterface* SpdySM::NewConnectionInterface() { + SMConnection* server_connection = + SMConnection::NewSMConnection(epoll_server_, + NULL, + memory_cache_, + acceptor_, + "http_conn: "); + if (server_connection == NULL) { + LOG(ERROR) << "SpdySM: Could not create server connection"; + return NULL; + } + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface"; + SMInterface *sm_http_interface = new HttpSM(server_connection, + this, + epoll_server_, + memory_cache_, + acceptor_); + return sm_http_interface; +} + +SMInterface* SpdySM::FindOrMakeNewSMConnectionInterface( + std::string server_ip, std::string server_port) { + SMInterface *sm_http_interface; + int32 server_idx; + if (unused_server_interface_list.empty()) { + sm_http_interface = NewConnectionInterface(); + server_idx = server_interface_list.size(); + server_interface_list.push_back(sm_http_interface); + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: Making new server connection on index: " + << server_idx; + } else { + server_idx = unused_server_interface_list.back(); + unused_server_interface_list.pop_back(); + sm_http_interface = server_interface_list.at(server_idx); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on " + << "index: " << server_idx; + } + + sm_http_interface->InitSMInterface(this, server_idx); + sm_http_interface->InitSMConnection(NULL, sm_http_interface, + epoll_server_, -1, + server_ip, server_port, "", false); + + return sm_http_interface; +} + +int SpdySM::SpdyHandleNewStream(const SpdyControlFrame* frame, + std::string &http_data, + bool *is_https_scheme) { + bool parsed_headers = false; + SpdyHeaderBlock headers; + const SpdySynStreamControlFrame* syn_stream = + reinterpret_cast<const SpdySynStreamControlFrame*>(frame); + + *is_https_scheme = false; + parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" + << syn_stream->stream_id() << ")"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: headers parsed?: " + << (parsed_headers? "yes": "no"); + if (parsed_headers) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: " + << headers.size(); + } + SpdyHeaderBlock::iterator url = headers.find("url"); + SpdyHeaderBlock::iterator method = headers.find("method"); + if (url == headers.end() || method == headers.end()) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: didn't find method or url " + << "or method. Not creating stream"; + return 0; + } + + SpdyHeaderBlock::iterator scheme = headers.find("scheme"); + if (scheme->second.compare("https") == 0) { + *is_https_scheme = true; + } + + std::string uri = UrlUtilities::GetUrlPath(url->second); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { + SpdyHeaderBlock::iterator referer = headers.find("referer"); + std::string host = UrlUtilities::GetUrlHost(url->second); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second + << " " << uri; + std::string filename = EncodeURL(uri, host, method->second); + NewStream(syn_stream->stream_id(), + reinterpret_cast<const SpdySynStreamControlFrame*> + (frame)->priority(), + filename); + } else { + SpdyHeaderBlock::iterator version = headers.find("version"); + http_data += method->second + " " + uri + " " + version->second + "\r\n"; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " + << uri << " " << version->second; + for (SpdyHeaderBlock::iterator i = headers.begin(); + i != headers.end(); ++i) { + http_data += i->first + ": " + i->second + "\r\n"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" + << i->second.c_str(); + } + if (forward_ip_header_.length()) { + // X-Client-Cluster-IP header + http_data += forward_ip_header_ + ": " + + connection_->client_ip() + "\r\n"; + } + http_data += "\r\n"; + } + + VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; + return 1; +} + +void SpdySM::OnControl(const SpdyControlFrame* frame) { + SpdyHeaderBlock headers; + bool parsed_headers = false; + switch (frame->type()) { + case SYN_STREAM: + { + const SpdySynStreamControlFrame* syn_stream = + reinterpret_cast<const SpdySynStreamControlFrame*>(frame); + + std::string http_data; + bool is_https_scheme; + int ret = SpdyHandleNewStream(frame, http_data, &is_https_scheme); + if (!ret) { + LOG(ERROR) << "SpdySM: Could not convert spdy into http."; + break; + } + // We've seen a valid looking SYN_STREAM, consider this to have + // been a real spdy session. + valid_spdy_session_ = true; + + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + std::string server_ip; + std::string server_port; + if (is_https_scheme) { + server_ip = acceptor_->https_server_ip_; + server_port = acceptor_->https_server_port_; + } else { + server_ip = acceptor_->http_server_ip_; + server_port = acceptor_->http_server_port_; + } + SMInterface *sm_http_interface = + FindOrMakeNewSMConnectionInterface(server_ip, server_port); + stream_to_smif_[syn_stream->stream_id()] = sm_http_interface; + sm_http_interface->SetStreamID(syn_stream->stream_id()); + sm_http_interface->ProcessWriteInput(http_data.c_str(), + http_data.size()); + } + } + break; + + case SYN_REPLY: + parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" << + reinterpret_cast<const SpdySynReplyControlFrame*>(frame)->stream_id() + << ")"; + break; + case RST_STREAM: + { + const SpdyRstStreamControlFrame* rst_stream = + reinterpret_cast<const SpdyRstStreamControlFrame*>(frame); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRst(" + << rst_stream->stream_id() << ")"; + client_output_ordering_.RemoveStreamId(rst_stream ->stream_id()); + } + break; + + default: + LOG(ERROR) << "SpdySM: Unknown control frame type"; + } +} + +void SpdySM::OnStreamFrameData(SpdyStreamId stream_id, + const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id + << ", [" << len << "])"; + StreamToSmif::iterator it = stream_to_smif_.find(stream_id); + if (it == stream_to_smif_.end()) { + VLOG(2) << "Dropping frame from unknown stream " << stream_id; + if (!valid_spdy_session_) + close_on_error_ = true; + return; + } + + SMInterface* interface = it->second; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) + interface->ProcessWriteInput(data, len); +} + +size_t SpdySM::ProcessReadInput(const char* data, size_t len) { + return spdy_framer_->ProcessInput(data, len); +} + +size_t SpdySM::ProcessWriteInput(const char* data, size_t len) { + return 0; +} + +bool SpdySM::MessageFullyRead() const { + return spdy_framer_->MessageFullyRead(); +} + +bool SpdySM::Error() const { + return close_on_error_ || spdy_framer_->HasError(); +} + +const char* SpdySM::ErrorAsString() const { + DCHECK(Error()); + return SpdyFramer::ErrorCodeToString(spdy_framer_->error_code()); +} + +void SpdySM::ResetForNewInterface(int32 server_idx) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: " + << "server_idx: " << server_idx; + unused_server_interface_list.push_back(server_idx); +} + +void SpdySM::ResetForNewConnection() { + // seq_num is not cleared, intentionally. + delete spdy_framer_; + spdy_framer_ = new SpdyFramer; + spdy_framer_->set_visitor(this); + valid_spdy_session_ = false; + client_output_ordering_.Reset(); + next_outgoing_stream_id_ = 2; +} + +// Send a settings frame +int SpdySM::PostAcceptHook() { + SpdySettings settings; + SettingsFlagsAndId settings_id(SETTINGS_MAX_CONCURRENT_STREAMS); + settings.push_back(SpdySetting(settings_id, 100)); + SpdySettingsControlFrame* settings_frame = + spdy_framer_->CreateSettings(settings); + + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame"; + EnqueueDataFrame(new SpdyFrameDataFrame(settings_frame)); + return 1; +} + +void SpdySM::NewStream(uint32 stream_id, + uint32 priority, + const std::string& filename) { + MemCacheIter mci; + mci.stream_id = stream_id; + mci.priority = priority; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { + if (!memory_cache_->AssignFileData(filename, &mci)) { + // error creating new stream. + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; + SendErrorNotFound(stream_id); + } else { + AddToOutputOrder(mci); + } + } else { + AddToOutputOrder(mci); + } +} + +void SpdySM::AddToOutputOrder(const MemCacheIter& mci) { + client_output_ordering_.AddToOutputOrder(mci); +} + +void SpdySM::SendEOF(uint32 stream_id) { + SendEOFImpl(stream_id); +} + +void SpdySM::SendErrorNotFound(uint32 stream_id) { + SendErrorNotFoundImpl(stream_id); +} + +void SpdySM::SendOKResponse(uint32 stream_id, std::string* output) { + SendOKResponseImpl(stream_id, output); +} + +size_t SpdySM::SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { + return SendSynStreamImpl(stream_id, headers); +} + +size_t SpdySM::SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { + return SendSynReplyImpl(stream_id, headers); +} + +void SpdySM::SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) { + SpdyDataFlags spdy_flags = static_cast<SpdyDataFlags>(flags); + SendDataFrameImpl(stream_id, data, len, spdy_flags, compress); +} + +void SpdySM::SendEOFImpl(uint32 stream_id) { + SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id; + KillStream(stream_id); +} + +void SpdySM::SendErrorNotFoundImpl(uint32 stream_id) { + BalsaHeaders my_headers; + my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); + SendSynReplyImpl(stream_id, my_headers); + SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false); + client_output_ordering_.RemoveStreamId(stream_id); +} + +void SpdySM::SendOKResponseImpl(uint32 stream_id, std::string* output) { + BalsaHeaders my_headers; + my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); + SendSynReplyImpl(stream_id, my_headers); + SendDataFrame( + stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false); + client_output_ordering_.RemoveStreamId(stream_id); +} + +void SpdySM::KillStream(uint32 stream_id) { + client_output_ordering_.RemoveStreamId(stream_id); +} + +void SpdySM::CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) { + for (BalsaHeaders::const_header_lines_iterator hi = + headers.header_lines_begin(); + hi != headers.header_lines_end(); + ++hi) { + // It is illegal to send SPDY headers with empty value or header + // names. + if (!hi->first.length() || !hi->second.length()) + continue; + + SpdyHeaderBlock::iterator fhi = dest.find(hi->first.as_string()); + if (fhi == dest.end()) { + dest[hi->first.as_string()] = hi->second.as_string(); + } else { + dest[hi->first.as_string()] = ( + std::string(fhi->second.data(), fhi->second.size()) + "," + + std::string(hi->second.data(), hi->second.size())); + } + } + + // These headers have no value + dest.erase("X-Associated-Content"); // TODO(mbelshe): case-sensitive + dest.erase("X-Original-Url"); // TODO(mbelshe): case-sensitive +} + +size_t SpdySM::SendSynStreamImpl(uint32 stream_id, + const BalsaHeaders& headers) { + SpdyHeaderBlock block; + block["method"] = headers.request_method().as_string(); + if (!headers.HasHeader("status")) + block["status"] = headers.response_code().as_string(); + if (!headers.HasHeader("version")) + block["version"] =headers.response_version().as_string(); + if (headers.HasHeader("X-Original-Url")) { + std::string original_url = headers.GetHeader("X-Original-Url").as_string(); + block["path"] = UrlUtilities::GetUrlPath(original_url); + } else { + block["path"] = headers.request_uri().as_string(); + } + CopyHeaders(block, headers); + + SpdySynStreamControlFrame* fsrcf = + spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, + &block); + size_t df_size = fsrcf->length() + SpdyFrame::size(); + EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); + + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " + << stream_id; + return df_size; +} + +size_t SpdySM::SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { + SpdyHeaderBlock block; + CopyHeaders(block, headers); + block["status"] = headers.response_code().as_string() + " " + + headers.response_reason_phrase().as_string(); + block["version"] = headers.response_version().as_string(); + + SpdySynReplyControlFrame* fsrcf = + spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); + size_t df_size = fsrcf->length() + SpdyFrame::size(); + EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); + + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " + << stream_id; + return df_size; +} + +void SpdySM::SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + SpdyDataFlags flags, bool compress) { + // Force compression off if disabled via command line. + if (disable_data_compression()) + flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED); + + // TODO(mbelshe): We can't compress here - before going into the + // priority queue. Compression needs to be done + // with late binding. + if (len == 0) { + SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, + flags); + EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); + return; + } + + // Chop data frames into chunks so that one stream can't monopolize the + // output channel. + while (len > 0) { + int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize)); + SpdyDataFlags chunk_flags = flags; + + // If we chunked this block, and the FIN flag was set, there is more + // data coming. So, remove the flag. + if ((size < len) && (flags & DATA_FLAG_FIN)) + chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN); + + SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size, + chunk_flags); + EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); + + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " + << stream_id << " [" << size << "] shrunk to " << fdf->length() + << ", flags=" << flags; + + data += size; + len -= size; + } +} + +void SpdySM::EnqueueDataFrame(DataFrame* df) { + connection_->EnqueueDataFrame(df); +} + +void SpdySM::GetOutput() { + while (client_output_list_->size() < 2) { + MemCacheIter* mci = client_output_ordering_.GetIter(); + if (mci == NULL) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: GetOutput: nothing to output!?"; + return; + } + if (!mci->transformed_header) { + mci->transformed_header = true; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed " + << "header stream_id: [" << mci->stream_id << "]"; + if ((mci->stream_id % 2) == 0) { + // this is a server initiated stream. + // Ideally, we'd do a 'syn-push' here, instead of a syn-reply. + BalsaHeaders headers; + headers.CopyFrom(*(mci->file_data->headers)); + headers.ReplaceOrAppendHeader("status", "200"); + headers.ReplaceOrAppendHeader("version", "http/1.1"); + headers.SetRequestFirstlineFromStringPieces("PUSH", + mci->file_data->filename, + ""); + mci->bytes_sent = SendSynStream(mci->stream_id, headers); + } else { + BalsaHeaders headers; + headers.CopyFrom(*(mci->file_data->headers)); + mci->bytes_sent = SendSynReply(mci->stream_id, headers); + } + return; + } + if (mci->body_bytes_consumed >= mci->file_data->body.size()) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput " + << "remove_stream_id: [" << mci->stream_id << "]"; + SendEOF(mci->stream_id); + return; + } + size_t num_to_write = + mci->file_data->body.size() - mci->body_bytes_consumed; + if (num_to_write > mci->max_segment_size) + num_to_write = mci->max_segment_size; + + bool should_compress = false; + if (!mci->file_data->headers->HasHeader("content-encoding")) { + if (mci->file_data->headers->HasHeader("content-type")) { + std::string content_type = + mci->file_data->headers->GetHeader("content-type").as_string(); + if (content_type.find("image") == content_type.npos) + should_compress = true; + } + } + + SendDataFrame(mci->stream_id, + mci->file_data->body.data() + mci->body_bytes_consumed, + num_to_write, 0, should_compress); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame[" + << mci->stream_id << "]: " << num_to_write; + mci->body_bytes_consumed += num_to_write; + mci->bytes_sent += num_to_write; + } +} + +} // namespace net + diff --git a/net/tools/flip_server/spdy_interface.h b/net/tools/flip_server/spdy_interface.h new file mode 100644 index 0000000..34f2716 --- /dev/null +++ b/net/tools/flip_server/spdy_interface.h @@ -0,0 +1,142 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_SPDY_INTERFACE_ +#define NET_TOOLS_FLIP_SERVER_SPDY_INTERFACE_ + +#include <map> +#include <string> +#include <vector> + +#include "net/spdy/spdy_framer.h" +#include "net/spdy/spdy_protocol.h" +#include "net/tools/flip_server/balsa_headers.h" +#include "net/tools/flip_server/balsa_visitor_interface.h" +#include "net/tools/flip_server/output_ordering.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/sm_interface.h" + +namespace net { + +class BalsaFrame; +class FlipAcceptor; +class MemoryCache; + +class SpdySM : public spdy::SpdyFramerVisitorInterface, + public SMInterface { + public: + SpdySM(SMConnection* connection, + SMInterface* sm_http_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor); + virtual ~SpdySM(); + + void InitSMInterface(SMInterface* sm_http_interface, + int32 server_idx) { } + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl); + + static bool disable_data_compression() { return disable_data_compression_; } + static void set_disable_data_compression(bool value) { + disable_data_compression_ = value; + } + + private: + virtual void OnError(spdy::SpdyFramer* framer) {} + SMInterface* NewConnectionInterface(); + SMInterface* FindOrMakeNewSMConnectionInterface(std::string server_ip, + std::string server_port); + int SpdyHandleNewStream(const spdy::SpdyControlFrame* frame, + std::string &http_data, + bool *is_https_scheme); + + // SpdyFramerVisitor interface. + virtual void OnControl(const spdy::SpdyControlFrame* frame); + virtual void OnStreamFrameData(spdy::SpdyStreamId stream_id, + const char* data, size_t len); + + public: + size_t ProcessReadInput(const char* data, size_t len); + size_t ProcessWriteInput(const char* data, size_t len); + bool MessageFullyRead() const; + void SetStreamID(uint32 stream_id) {} + bool Error() const; + const char* ErrorAsString() const; + void Reset() { } + void ResetForNewInterface(int32 server_idx); + void ResetForNewConnection(); + // SMInterface's Cleanup is currently only called by SMConnection after a + // protocol message as been fully read. Spdy's SMInterface does not need + // to do any cleanup at this time. + // TODO (klindsay) This method is probably not being used properly and + // some logic review and method renaming is probably in order. + void Cleanup() {} + // Send a settings frame + int PostAcceptHook(); + void NewStream(uint32 stream_id, + uint32 priority, + const std::string& filename); + void AddToOutputOrder(const MemCacheIter& mci); + void SendEOF(uint32 stream_id); + void SendErrorNotFound(uint32 stream_id); + void SendOKResponse(uint32 stream_id, std::string* output); + size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers); + size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers); + void SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress); + spdy::SpdyFramer* spdy_framer() { return spdy_framer_; } + + static std::string forward_ip_header() { return forward_ip_header_; } + static void set_forward_ip_header(std::string value) { + forward_ip_header_ = value; + } + + private: + void SendEOFImpl(uint32 stream_id); + void SendErrorNotFoundImpl(uint32 stream_id); + void SendOKResponseImpl(uint32 stream_id, std::string* output); + void KillStream(uint32 stream_id); + void CopyHeaders(spdy::SpdyHeaderBlock& dest, const BalsaHeaders& headers); + size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers); + size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers); + void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + spdy::SpdyDataFlags flags, bool compress); + void EnqueueDataFrame(DataFrame* df); + void GetOutput(); + private: + uint64 seq_num_; + spdy::SpdyFramer* spdy_framer_; + bool valid_spdy_session_; // True if we have seen valid data on this session. + // Use this to fail fast when junk is sent to our + // port. + + SMConnection* connection_; + OutputList* client_output_list_; + OutputOrdering client_output_ordering_; + uint32 next_outgoing_stream_id_; + EpollServer* epoll_server_; + FlipAcceptor* acceptor_; + MemoryCache* memory_cache_; + std::vector<SMInterface*> server_interface_list; + std::vector<int32> unused_server_interface_list; + typedef std::map<uint32, SMInterface*> StreamToSmif; + StreamToSmif stream_to_smif_; + bool close_on_error_; + + static bool disable_data_compression_; + static std::string forward_ip_header_; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_SPDY_INTERFACE_ + diff --git a/net/tools/flip_server/spdy_ssl.cc b/net/tools/flip_server/spdy_ssl.cc new file mode 100644 index 0000000..79559aa --- /dev/null +++ b/net/tools/flip_server/spdy_ssl.cc @@ -0,0 +1,109 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/spdy_ssl.h" + +#include "base/logging.h" +#include "openssl/err.h" +#include "openssl/ssl.h" + +namespace net { + +#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" +#define SSL_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH" + +int ssl_set_npn_callback(SSL *s, + const unsigned char **data, + unsigned int *len, + void *arg) { + VLOG(1) << "SSL NPN callback: advertising protocols."; + *data = (const unsigned char *) NEXT_PROTO_STRING; + *len = strlen(NEXT_PROTO_STRING); + return SSL_TLSEXT_ERR_OK; +} + +void InitSSL(SSLState* state, + std::string ssl_cert_name, + std::string ssl_key_name, + bool use_npn, + int session_expiration_time, + bool disable_ssl_compression) { + SSL_library_init(); + PrintSslError(); + + SSL_load_error_strings(); + PrintSslError(); + + state->ssl_method = SSLv23_method(); + state->ssl_ctx = SSL_CTX_new(state->ssl_method); + if (!state->ssl_ctx) { + PrintSslError(); + LOG(FATAL) << "Unable to create SSL context"; + } + // Disable SSLv2 support. + SSL_CTX_set_options(state->ssl_ctx, + SSL_OP_NO_SSLv2 | SSL_OP_CIPHER_SERVER_PREFERENCE); + if (SSL_CTX_use_certificate_file(state->ssl_ctx, + ssl_cert_name.c_str(), + SSL_FILETYPE_PEM) <= 0) { + PrintSslError(); + LOG(FATAL) << "Unable to use cert.pem as SSL cert."; + } + if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx, + ssl_key_name.c_str(), + SSL_FILETYPE_PEM) <= 0) { + PrintSslError(); + LOG(FATAL) << "Unable to use key.pem as SSL key."; + } + if (!SSL_CTX_check_private_key(state->ssl_ctx)) { + PrintSslError(); + LOG(FATAL) << "The cert.pem and key.pem files don't match"; + } + if (use_npn) { + SSL_CTX_set_next_protos_advertised_cb(state->ssl_ctx, + ssl_set_npn_callback, NULL); + } + VLOG(1) << "SSL CTX default cipher list: " << SSL_CIPHER_LIST; + SSL_CTX_set_cipher_list(state->ssl_ctx, SSL_CIPHER_LIST); + + VLOG(1) << "SSL CTX session expiry: " << session_expiration_time + << " seconds"; + SSL_CTX_set_timeout(state->ssl_ctx, session_expiration_time); + +#ifdef SSL_MODE_RELEASE_BUFFERS + VLOG(1) << "SSL CTX: Setting Release Buffers mode."; + SSL_CTX_set_mode(state->ssl_ctx, SSL_MODE_RELEASE_BUFFERS); +#endif + + // Proper methods to disable compression don't exist until 0.9.9+. For now + // we must manipulate the stack of compression methods directly. + if (disable_ssl_compression) { + STACK_OF(SSL_COMP) *ssl_comp_methods = SSL_COMP_get_compression_methods(); + int num_methods = sk_SSL_COMP_num(ssl_comp_methods); + int i; + for (i = 0; i < num_methods; i++) { + static_cast<void>(sk_SSL_COMP_delete(ssl_comp_methods, i)); + } + } +} + +SSL* CreateSSLContext(SSL_CTX* ssl_ctx) { + SSL* ssl = SSL_new(ssl_ctx); + SSL_set_accept_state(ssl); + PrintSslError(); + return ssl; +} + +void PrintSslError() { + char buf[128]; // this buffer must be at least 120 chars long. + int error_num = ERR_get_error(); + while (error_num != 0) { + ERR_error_string_n(error_num, buf, sizeof(buf)); + LOG(ERROR) << buf; + error_num = ERR_get_error(); + } +} + +} // namespace net + diff --git a/net/tools/flip_server/spdy_ssl.h b/net/tools/flip_server/spdy_ssl.h new file mode 100644 index 0000000..eac6ad2 --- /dev/null +++ b/net/tools/flip_server/spdy_ssl.h @@ -0,0 +1,31 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_SPDY_SSL_H_ +#define NET_TOOLS_FLIP_SERVER_SPDY_SSL_H_ + +#include <string> + +#include "openssl/ssl.h" + +namespace net { + +struct SSLState { + SSL_METHOD* ssl_method; + SSL_CTX* ssl_ctx; +}; + +void InitSSL(SSLState* state, + std::string ssl_cert_name, + std::string ssl_key_name, + bool use_npn, + int session_expiration_time, + bool disable_ssl_compression); +SSL* CreateSSLContext(SSL_CTX* ssl_ctx); +void PrintSslError(); + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_SPDY_SSL_H_ + diff --git a/net/tools/flip_server/spdy_util.cc b/net/tools/flip_server/spdy_util.cc new file mode 100644 index 0000000..d4a789a --- /dev/null +++ b/net/tools/flip_server/spdy_util.cc @@ -0,0 +1,34 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/spdy_util.h" + +#include <string> + +#include "net/tools/dump_cache/url_to_filename_encoder.h" + +namespace net { + +bool g_need_to_encode_url = false; + +// Encode the URL. +std::string EncodeURL(std::string uri, std::string host, std::string method) { + if (!g_need_to_encode_url) { + // TODO(mbelshe): if uri is fully qualified, need to strip protocol/host. + return std::string(method + "_" + uri); + } + + std::string filename; + if (uri[0] == '/') { + // uri is not fully qualified. + filename = UrlToFilenameEncoder::Encode( + "http://" + host + uri, method + "_/", false); + } else { + filename = UrlToFilenameEncoder::Encode(uri, method + "_/", false); + } + return filename; +} + +} // namespace net + diff --git a/net/tools/flip_server/spdy_util.h b/net/tools/flip_server/spdy_util.h new file mode 100644 index 0000000..537053d --- /dev/null +++ b/net/tools/flip_server/spdy_util.h @@ -0,0 +1,21 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_SPDY_UTIL_H_ +#define NET_TOOLS_FLIP_SERVER_SPDY_UTIL_H_ + +#include <string> + +namespace net { + +// Flag indicating if we need to encode urls into filenames (legacy). +extern bool g_need_to_encode_url; + +// Encode the URL. +std::string EncodeURL(std::string uri, std::string host, std::string method); + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_SPDY_UTIL_H_ + diff --git a/net/tools/flip_server/split.cc b/net/tools/flip_server/split.cc index 7e3329a..6ab50cf 100644 --- a/net/tools/flip_server/split.cc +++ b/net/tools/flip_server/split.cc @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "net/tools/flip_server/split.h" + #include <string.h> #include <vector> diff --git a/net/tools/flip_server/streamer_interface.cc b/net/tools/flip_server/streamer_interface.cc new file mode 100644 index 0000000..e29ff19 --- /dev/null +++ b/net/tools/flip_server/streamer_interface.cc @@ -0,0 +1,99 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/tools/flip_server/streamer_interface.h" + +#include <string> + +#include "net/tools/flip_server/constants.h" +#include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/sm_connection.h" + +namespace net { + +StreamerSM::StreamerSM(SMConnection* connection, + SMInterface* sm_other_interface, + EpollServer* epoll_server, + FlipAcceptor* acceptor) + : connection_(connection), + sm_other_interface_(sm_other_interface), + epoll_server_(epoll_server), + acceptor_(acceptor) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Creating StreamerSM object"; +} + +StreamerSM::~StreamerSM() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Destroying StreamerSM object"; + Reset(); +} + +void StreamerSM::InitSMInterface(SMInterface* sm_other_interface, + int32 server_idx) { + sm_other_interface_ = sm_other_interface; +} + +void StreamerSM::InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " + << "connection."; + connection_->InitSMConnection(connection_pool, sm_interface, + epoll_server, fd, server_ip, + server_port, remote_ip, use_ssl); +} + +size_t StreamerSM::ProcessReadInput(const char* data, size_t len) { + return sm_other_interface_->ProcessWriteInput(data, len); +} + +size_t StreamerSM::ProcessWriteInput(const char* data, size_t len) { + char * dataPtr = new char[len]; + memcpy(dataPtr, data, len); + DataFrame* df = new DataFrame; + df->data = (const char *)dataPtr; + df->size = len; + df->delete_when_done = true; + connection_->EnqueueDataFrame(df); + return len; +} + +void StreamerSM::Reset() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Reset"; + connection_->Cleanup("Server Reset"); +} + +int StreamerSM::PostAcceptHook() { + if (!sm_other_interface_) { + SMConnection *server_connection = + SMConnection::NewSMConnection(epoll_server_, NULL, NULL, + acceptor_, "server_conn: "); + if (server_connection == NULL) { + LOG(ERROR) << "StreamerSM: Could not create server conenction."; + return 0; + } + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Creating new server " + << "connection."; + sm_other_interface_ = new StreamerSM(server_connection, this, + epoll_server_, acceptor_); + sm_other_interface_->InitSMInterface(this, 0); + } + // The Streamer interface is used to stream HTTPS connections, so we + // will always use the https_server_ip/port here. + sm_other_interface_->InitSMConnection(NULL, sm_other_interface_, + epoll_server_, -1, + acceptor_->https_server_ip_, + acceptor_->https_server_port_, + "", + false); + + return 1; +} + +} // namespace net + diff --git a/net/tools/flip_server/streamer_interface.h b/net/tools/flip_server/streamer_interface.h new file mode 100644 index 0000000..67b218b --- /dev/null +++ b/net/tools/flip_server/streamer_interface.h @@ -0,0 +1,87 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_SERVER_STREAMER_INTERFACE_ +#define NET_TOOLS_FLIP_SERVER_STREAMER_INTERFACE_ + +#include <string> + +#include "net/tools/flip_server/sm_interface.h" + +namespace net { + +class FlipAcceptor; +class MemCacheIter; +class SMConnection; +class EpollServer; + +class StreamerSM : public SMInterface { + public: + StreamerSM(SMConnection* connection, + SMInterface* sm_other_interface, + EpollServer* epoll_server, + FlipAcceptor* acceptor); + virtual ~StreamerSM(); + + void InitSMInterface(SMInterface* sm_other_interface, + int32 server_idx); + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + std::string server_ip, + std::string server_port, + std::string remote_ip, + bool use_ssl); + + size_t ProcessReadInput(const char* data, size_t len); + size_t ProcessWriteInput(const char* data, size_t len); + bool MessageFullyRead() const { return false; } + void SetStreamID(uint32 stream_id) {} + bool Error() const { return false; } + const char* ErrorAsString() const { return "(none)"; } + void Reset(); + void ResetForNewInterface(int32 server_idx) {} + void ResetForNewConnection() { sm_other_interface_->Reset(); } + void Cleanup() {} + int PostAcceptHook(); + void NewStream(uint32 stream_id, uint32 priority, + const std::string& filename) {} + void AddToOutputOrder(const MemCacheIter& mci) {} + void SendEOF(uint32 stream_id) {} + void SendErrorNotFound(uint32 stream_id) {} + void SendOKResponse(uint32 stream_id, std::string output) {} + size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + void SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) {} + + private: + void SendEOFImpl(uint32 stream_id) {} + void SendErrorNotFoundImpl(uint32 stream_id) {} + void SendOKResponseImpl(uint32 stream_id, std::string* output) {} + size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) {} + void GetOutput() {} + + SMConnection* connection_; + SMInterface* sm_other_interface_; + EpollServer* epoll_server_; + FlipAcceptor* acceptor_; +}; + +} // namespace net + +#endif // NET_TOOLS_FLIP_SERVER_STREAMER_INTERFACE_ + |