diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-11-24 10:19:22 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-11-24 10:19:22 +0000 |
commit | b64abf63068267e8a3566cbcffc84cac54ee3e9d (patch) | |
tree | cff3a086a271027e61d9922349192963cc16b4d6 /net | |
parent | 011521d7190d56d3f48ebf42bba3a2e69cc16bfb (diff) | |
download | chromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.zip chromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.tar.gz chromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.tar.bz2 |
Landing fix for keindsay@gmail.com (Strangeloop Networks)
Added proxy and NPN support as well as basic command line configuration for the flip server.
Turn on building of flip server for linux by default.
BUG=none
TEST=test tool
Review URL: http://codereview.chromium.org/5255008
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@67245 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/net.gyp | 104 | ||||
-rw-r--r-- | net/tools/flip_server/balsa_headers.cc | 4 | ||||
-rw-r--r-- | net/tools/flip_server/create_listener.cc | 164 | ||||
-rw-r--r-- | net/tools/flip_server/create_listener.h | 32 | ||||
-rw-r--r-- | net/tools/flip_server/epoll_server.cc | 4 | ||||
-rw-r--r-- | net/tools/flip_server/flip_config.h | 150 | ||||
-rw-r--r-- | net/tools/flip_server/flip_in_mem_edsm_server.cc | 1828 | ||||
-rw-r--r-- | net/tools/flip_server/other_defines.h | 21 | ||||
-rw-r--r-- | net/tools/flip_server/ring_buffer.h | 1 | ||||
-rw-r--r-- | net/tools/flip_server/simple_buffer.h | 1 |
10 files changed, 1668 insertions, 641 deletions
diff --git a/net/net.gyp b/net/net.gyp index dd0fcde..a5b1948 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -1334,61 +1334,57 @@ }, ], 'conditions': [ - # ['OS=="linux"', { - # 'targets': [ - # { - # 'target_name': 'flip_in_mem_edsm_server', - # 'type': 'executable', - # 'dependencies': [ - # '../base/base.gyp:base', - # 'net.gyp:net', - # ], - # 'link_settings': { - # 'ldflags': [ - # '-lssl' - # ], - # 'libraries': [ - # '-lssl' - # ], - # }, - # 'sources': [ - # 'tools/dump_cache/url_to_filename_encoder.cc', - # 'tools/dump_cache/url_to_filename_encoder.h', - # 'tools/dump_cache/url_utilities.h', - # 'tools/dump_cache/url_utilities.cc', + ['OS=="linux"', { + 'targets': [ + { + 'target_name': 'flip_in_mem_edsm_server', + 'type': 'executable', + 'cflags': [ + '-Wno-deprecated', + ], + 'dependencies': [ + '../base/base.gyp:base', + 'net.gyp:net', + '../third_party/openssl/openssl.gyp:openssl', + ], + 'sources': [ + 'tools/dump_cache/url_to_filename_encoder.cc', + 'tools/dump_cache/url_to_filename_encoder.h', + 'tools/dump_cache/url_utilities.h', + 'tools/dump_cache/url_utilities.cc', - # 'tools/flip_server/balsa_enums.h', - # 'tools/flip_server/balsa_frame.cc', - # 'tools/flip_server/balsa_frame.h', - # 'tools/flip_server/balsa_headers.cc', - # 'tools/flip_server/balsa_headers.h', - # 'tools/flip_server/balsa_headers_token_utils.cc', - # 'tools/flip_server/balsa_headers_token_utils.h', - # 'tools/flip_server/balsa_visitor_interface.h', - # 'tools/flip_server/buffer_interface.h', - # 'tools/flip_server/create_listener.cc', - # 'tools/flip_server/create_listener.h', - # 'tools/flip_server/epoll_server.cc', - # 'tools/flip_server/epoll_server.h', - # 'tools/flip_server/flip_in_mem_edsm_server.cc', - # 'tools/flip_server/http_message_constants.cc', - # 'tools/flip_server/http_message_constants.h', - # 'tools/flip_server/loadtime_measurement.h', - # 'tools/flip_server/porting.txt', - # 'tools/flip_server/ring_buffer.cc', - # 'tools/flip_server/ring_buffer.h', - # 'tools/flip_server/simple_buffer.cc', - # 'tools/flip_server/simple_buffer.h', - # 'tools/flip_server/split.h', - # 'tools/flip_server/split.cc', - # 'tools/flip_server/string_piece_utils.h', - # 'tools/flip_server/thread.h', - # 'tools/flip_server/url_to_filename_encoder.h', - # 'tools/flip_server/url_utilities.h', - # ], - # }, - # ] - # }], + 'tools/flip_server/balsa_enums.h', + 'tools/flip_server/balsa_frame.cc', + 'tools/flip_server/balsa_frame.h', + 'tools/flip_server/balsa_headers.cc', + 'tools/flip_server/balsa_headers.h', + 'tools/flip_server/balsa_headers_token_utils.cc', + 'tools/flip_server/balsa_headers_token_utils.h', + 'tools/flip_server/balsa_visitor_interface.h', + 'tools/flip_server/buffer_interface.h', + 'tools/flip_server/create_listener.cc', + 'tools/flip_server/create_listener.h', + 'tools/flip_server/epoll_server.cc', + 'tools/flip_server/epoll_server.h', + 'tools/flip_server/flip_in_mem_edsm_server.cc', + 'tools/flip_server/http_message_constants.cc', + 'tools/flip_server/http_message_constants.h', + 'tools/flip_server/loadtime_measurement.h', + 'tools/flip_server/porting.txt', + 'tools/flip_server/ring_buffer.cc', + 'tools/flip_server/ring_buffer.h', + 'tools/flip_server/simple_buffer.cc', + 'tools/flip_server/simple_buffer.h', + 'tools/flip_server/split.h', + 'tools/flip_server/split.cc', + 'tools/flip_server/string_piece_utils.h', + 'tools/flip_server/thread.h', + 'tools/flip_server/url_to_filename_encoder.h', + 'tools/flip_server/url_utilities.h', + ], + }, + ] + }], ['OS=="linux"', { 'targets': [ { diff --git a/net/tools/flip_server/balsa_headers.cc b/net/tools/flip_server/balsa_headers.cc index 2196cd4..74364a2 100644 --- a/net/tools/flip_server/balsa_headers.cc +++ b/net/tools/flip_server/balsa_headers.cc @@ -616,7 +616,7 @@ void BalsaHeaders::SetContentLength(size_t length) { content_length_ = length; // FastUInt64ToBuffer is supposed to use a maximum of kFastToBufferSize bytes. char buffer[kFastToBufferSize]; - int len_converted = snprintf(buffer, sizeof(buffer), "%d", length); + int len_converted = snprintf(buffer, sizeof(buffer), "%zd", length); CHECK_GT(len_converted, 0); const base::StringPiece length_str(buffer, len_converted); AppendHeader(content_length, length_str); @@ -725,7 +725,7 @@ void BalsaHeaders::SetParsedResponseCodeAndUpdateFirstline( size_t parsed_response_code) { char buffer[kFastToBufferSize]; int len_converted = snprintf(buffer, sizeof(buffer), - "%d", parsed_response_code); + "%zd", parsed_response_code); CHECK_GT(len_converted, 0); SetResponseCode(base::StringPiece(buffer, len_converted)); } diff --git a/net/tools/flip_server/create_listener.cc b/net/tools/flip_server/create_listener.cc index 3538261..59a03a6 100644 --- a/net/tools/flip_server/create_listener.cc +++ b/net/tools/flip_server/create_listener.cc @@ -7,8 +7,10 @@ #include <netdb.h> // for getaddrinfo and getnameinfo #include <netinet/in.h> // for IPPROTO_*, etc. #include <stdlib.h> // for EXIT_FAILURE +#include <netinet/tcp.h> // For TCP_NODELAY #include <sys/socket.h> // for getaddrinfo and getnameinfo #include <sys/types.h> // " +#include <fcntl.h> #include <unistd.h> // for exit() #include <ostream> @@ -64,15 +66,47 @@ bool CloseSocket(int *fd, int tries) { //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// +// Sets an FD to be nonblocking. +void SetNonBlocking(int fd) { + DCHECK_GE(fd, 0); + + int fcntl_return = fcntl(fd, F_GETFL, 0); + CHECK_NE(fcntl_return, -1) + << "error doing fcntl(fd, F_GETFL, 0) fd: " << fd + << " errno=" << errno; + + if (fcntl_return & O_NONBLOCK) + return; + + fcntl_return = fcntl(fd, F_SETFL, fcntl_return | O_NONBLOCK); + CHECK_NE(fcntl_return, -1) + << "error doing fcntl(fd, F_SETFL, fcntl_return) fd: " << fd + << " errno=" << errno; +} + +int SetDisableNagle(int fd) { + int on = 1; + int rc; + rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast<char*>(&on), sizeof(on)); + if (rc < 0) { + close(fd); + LOG(FATAL) << "setsockopt() TCP_NODELAY: failed on fd " << fd; + return 0; + } + return 1; +} + // see header for documentation of this function. -void CreateListeningSocket(const std::string& host, - const std::string& port, - bool is_numeric_host_address, - int backlog, - int * listen_fd, - bool reuseaddr, - bool reuseport, - std::ostream* error_stream) { +int CreateListeningSocket(const std::string& host, + const std::string& port, + bool is_numeric_host_address, + int backlog, + bool reuseaddr, + bool reuseport, + bool wait_for_iface, + bool disable_nagle, + int * listen_fd ) { // start out by assuming things will fail. *listen_fd = -1; @@ -91,16 +125,15 @@ void CreateListeningSocket(const std::string& host, } hints.ai_flags |= AI_PASSIVE; - hints.ai_family = PF_INET; // we know it'll be IPv4, but if we didn't - // hints.ai_family = PF_UNSPEC; // know we'd use this. <--- + hints.ai_family = PF_INET; hints.ai_socktype = SOCK_STREAM; int err = 0; if ((err=getaddrinfo(node, service, &hints, &results))) { // gai_strerror -is- threadsafe, so we get to use it here. - *error_stream << "getaddrinfo " << " for (" << host << ":" << port + LOG(ERROR) << "getaddrinfo " << " for (" << host << ":" << port << ") " << gai_strerror(err) << "\n"; - return; + return -1; } // this will delete the addrinfo memory when we return from this function. AddrinfoGuard addrinfo_guard(results); @@ -109,9 +142,9 @@ void CreateListeningSocket(const std::string& host, results->ai_socktype, results->ai_protocol); if (sock == -1) { - *error_stream << "Unable to create socket for (" << host << ":" + LOG(ERROR) << "Unable to create socket for (" << host << ":" << port << "): " << strerror(errno) << "\n"; - return; + return -1; } if (reuseaddr) { @@ -141,39 +174,126 @@ void CreateListeningSocket(const std::string& host, } if (bind(sock, results->ai_addr, results->ai_addrlen)) { - *error_stream << "Bind was unsuccessful for (" << host << ":" - << port << "): " << strerror(errno) << "\n"; + // If we are waiting for the interface to be raised, such as in an + // HA environment, ignore reporting any errors. + int saved_errno = errno; + if ( !wait_for_iface || errno != EADDRNOTAVAIL) { + LOG(ERROR) << "Bind was unsuccessful for (" << host << ":" + << port << "): " << strerror(errno) << "\n"; + } // if we knew that we were not multithreaded, we could do the following: // " : " << strerror(errno) << "\n"; if (CloseSocket(&sock, 100)) { - return; + if ( saved_errno == EADDRNOTAVAIL ) { + return -3; + } + return -2; } else { // couldn't even close the dang socket?! - *error_stream << "Unable to close the socket.. Considering this a fatal " + LOG(ERROR) << "Unable to close the socket.. Considering this a fatal " "error, and exiting\n"; exit(EXIT_FAILURE); + return -1; + } + } + + if (disable_nagle) { + if (!SetDisableNagle(sock)) { + return -1; } } if (listen(sock, backlog)) { // listen was unsuccessful. - *error_stream << "Listen was unsuccessful for (" << host << ":" + LOG(ERROR) << "Listen was unsuccessful for (" << host << ":" << port << "): " << strerror(errno) << "\n"; // if we knew that we were not multithreaded, we could do the following: // " : " << strerror(errno) << "\n"; if (CloseSocket(&sock, 100)) { sock = -1; - return; + return -1; } else { // couldn't even close the dang socket?! - *error_stream << "Unable to close the socket.. Considering this a fatal " + LOG(FATAL) << "Unable to close the socket.. Considering this a fatal " "error, and exiting\n"; - exit(EXIT_FAILURE); } } + // If we've gotten to here, Yeay! Success! *listen_fd = sock; + + return 0; +} + +int CreateConnectedSocket( int *connect_fd, + const std::string& host, + const std::string& port, + bool is_numeric_host_address, + bool disable_nagle ) { + const char* node = NULL; + const char* service = NULL; + + *connect_fd = -1; + if (!host.empty()) + node = host.c_str(); + if (!port.empty()) + service = port.c_str(); + + struct addrinfo *results = 0; + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + + if (is_numeric_host_address) + hints.ai_flags = AI_NUMERICHOST; // iff you know the name is numeric. + hints.ai_flags |= AI_PASSIVE; + + hints.ai_family = PF_INET; + hints.ai_socktype = SOCK_STREAM; + + int err = 0; + if ((err=getaddrinfo(node, service, &hints, &results))) { + // gai_strerror -is- threadsafe, so we get to use it here. + LOG(ERROR) << "getaddrinfo for (" << node << ":" << service << "): " + << gai_strerror(err); + return -1; + } + // this will delete the addrinfo memory when we return from this function. + AddrinfoGuard addrinfo_guard(results); + + int sock = socket(results->ai_family, + results->ai_socktype, + results->ai_protocol); + if (sock == -1) { + LOG(ERROR) << "Unable to create socket for (" << node << ":" << service + << "): " << strerror( errno ); + return -1; + } + + SetNonBlocking( sock ); + + if (disable_nagle) { + if (!SetDisableNagle(sock)) { + return -1; + } + } + + int ret_val = 0; + if ( connect( sock, results->ai_addr, results->ai_addrlen ) ) { + if ( errno != EINPROGRESS ) { + LOG(ERROR) << "Connect was unsuccessful for (" << node << ":" << service + << "): " << strerror(errno); + close( sock ); + return -1; + } + } else { + ret_val = 1; + } + + // If we've gotten to here, Yeay! Success! + *connect_fd = sock; + + return ret_val; } } // namespace net diff --git a/net/tools/flip_server/create_listener.h b/net/tools/flip_server/create_listener.h index 4c0a277..3a7b16e 100644 --- a/net/tools/flip_server/create_listener.h +++ b/net/tools/flip_server/create_listener.h @@ -11,6 +11,8 @@ namespace net { +void SetNonBlocking(int fd); + // Summary: // creates a socket for listening, and bind()s and listen()s it. // Args: @@ -27,19 +29,29 @@ namespace net { // backlog - passed into listen. This is the number of pending incoming // connections a socket which is listening may have acquired before // the OS starts rejecting new incoming connections. +// reuseaddr - if true sets SO_REUSEADDR on the listening socket +// reuseport - if true sets SO_REUSEPORT on the listening socket +// wait_for_iface - A boolean indicating that CreateListeningSocket should +// block until the interface that it will bind to has been +// raised. This is intended for HA environments. +// disable_nagle - if true sets TCP_NODELAY on the listening socket. // listen_fd - this will be assigned a positive value if the socket is // successfully created, else it will be assigned -1. -// error_stream - in the case of errors, output describing the error will -// be written into error_stream. -void CreateListeningSocket(const std::string& host, - const std::string& port, - bool is_numeric_host_address, - int backlog, - int * listen_fd, - bool reuseaddr, - bool reuseport, - std::ostream* error_stream); +int CreateListeningSocket(const std::string& host, + const std::string& port, + bool is_numeric_host_address, + int backlog, + bool reuseaddr, + bool reuseport, + bool wait_for_iface, + bool disable_nagle, + int * listen_fd); +int CreateConnectedSocket(int *connect_fd, + const std::string& host, + const std::string& port, + bool is_numeric_host_address, + bool disable_nagle); } // namespace net #endif // NET_TOOLS_FLIP_SERVER_CREATE_LISTENER_H__ diff --git a/net/tools/flip_server/epoll_server.cc b/net/tools/flip_server/epoll_server.cc index a1d6ca1..f78663c 100644 --- a/net/tools/flip_server/epoll_server.cc +++ b/net/tools/flip_server/epoll_server.cc @@ -13,7 +13,6 @@ #include "base/logging.h" #include "base/timer.h" -#include "net/tools/flip_server/other_defines.h" // Design notes: An efficient implementation of ready list has the following // desirable properties: @@ -478,7 +477,8 @@ int EpollServer::NumFDsRegistered() const { void EpollServer::Wake() { char data = 'd'; // 'd' is for data. It's good enough for me. - write(write_fd_, &data, 1); + int rv = write(write_fd_, &data, 1); + DCHECK(rv == 1); } int64 EpollServer::NowInUsec() const { diff --git a/net/tools/flip_server/flip_config.h b/net/tools/flip_server/flip_config.h new file mode 100644 index 0000000..3f202f8 --- /dev/null +++ b/net/tools/flip_server/flip_config.h @@ -0,0 +1,150 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_TOOLS_FLIP_PROXY_CONFIG_H +#define NET_TOOLS_FLIP_PROXY_CONFIG_H +#pragma once + +#include <arpa/inet.h> // in_addr_t + +#include "base/logging.h" +#include "net/tools/flip_server/create_listener.h" + +#include <vector> +#include <string> + +using std::string; +using std::vector; + +enum FlipHandlerType { + FLIP_HANDLER_PROXY, + FLIP_HANDLER_SPDY_SERVER, + FLIP_HANDLER_HTTP_SERVER +}; + +class FlipAcceptor { +public: + enum FlipHandlerType flip_handler_type_; + string listen_ip_; + string listen_port_; + string ssl_cert_filename_; + string ssl_key_filename_; + string server_ip_; + string server_port_; + int accept_backlog_size_; + bool disable_nagle_; + int accepts_per_wake_; + int listen_fd_; + void* memory_cache_; + + FlipAcceptor(enum FlipHandlerType flip_handler_type, + string listen_ip, + string listen_port, + string ssl_cert_filename, + string ssl_key_filename, + string server_ip, + string server_port, + int accept_backlog_size, + bool disable_nagle, + int accepts_per_wake, + bool reuseport, + bool wait_for_iface, + void *memory_cache) : + flip_handler_type_(flip_handler_type), + listen_ip_(listen_ip), + listen_port_(listen_port), + ssl_cert_filename_(ssl_cert_filename), + ssl_key_filename_(ssl_key_filename), + server_ip_(server_ip), + server_port_(server_port), + accept_backlog_size_(accept_backlog_size), + disable_nagle_(disable_nagle), + accepts_per_wake_(accepts_per_wake), + memory_cache_(memory_cache) + { + VLOG(1) << "Attempting to listen on " << listen_ip_.c_str() << ":" + << listen_port_.c_str(); + while (1) { + int ret = net::CreateListeningSocket(listen_ip_, + listen_port_, + true, + accept_backlog_size_, + true, + reuseport, + wait_for_iface, + disable_nagle_, + &listen_fd_); + if ( ret == 0 ) { + break; + } else if ( ret == -3 && wait_for_iface ) { + // Binding error EADDRNOTAVAIL was encounted. We need + // to wait for the interfaces to raised. try again. + usleep(200000); + } else { + LOG(ERROR) << "Unable to create listening socket for: ret = " << ret + << ": " << listen_ip_.c_str() << ":" + << listen_port_.c_str(); + return; + } + } + net::SetNonBlocking(listen_fd_); + VLOG(1) << "Listening for spdy on port: " << listen_ip_ << ":" + << listen_port_; + } + ~FlipAcceptor () {} +}; + +class FlipConfig { +public: + std::vector <FlipAcceptor*> acceptors_; + double server_think_time_in_s_; + enum logging::LoggingDestination log_destination_; + string log_filename_; + bool forward_ip_header_enabled_; + string forward_ip_header_; + bool wait_for_iface_; + int ssl_session_expiry_; + + FlipConfig() : + server_think_time_in_s_(0), + log_destination_(logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG), + forward_ip_header_enabled_(false), + wait_for_iface_(false), + ssl_session_expiry_(300) + {} + + ~FlipConfig() {} + + void AddAcceptor(enum FlipHandlerType flip_handler_type, + string listen_ip, + string listen_port, + string ssl_cert_filename, + string ssl_key_filename, + string server_ip, + string server_port, + int accept_backlog_size, + bool disable_nagle, + int accepts_per_wake, + bool reuseport, + bool wait_for_iface, + void *memory_cache) { + // TODO(mbelshe): create a struct FlipConfigArgs{} for the arguments. + acceptors_.push_back(new FlipAcceptor(flip_handler_type, + listen_ip, + listen_port, + ssl_cert_filename, + ssl_key_filename, + server_ip, + server_port, + accept_backlog_size, + disable_nagle, + accepts_per_wake, + reuseport, + wait_for_iface, + memory_cache)); + } + +}; + +#endif diff --git a/net/tools/flip_server/flip_in_mem_edsm_server.cc b/net/tools/flip_server/flip_in_mem_edsm_server.cc index 13658184..7848861 100644 --- a/net/tools/flip_server/flip_in_mem_edsm_server.cc +++ b/net/tools/flip_server/flip_in_mem_edsm_server.cc @@ -9,6 +9,7 @@ #include <unistd.h> #include <openssl/err.h> #include <openssl/ssl.h> +#include <signal.h> #include <deque> #include <iostream> @@ -16,6 +17,7 @@ #include <vector> #include <list> +#include "base/command_line.h" #include "base/logging.h" #include "base/simple_thread.h" #include "base/timer.h" @@ -30,16 +32,14 @@ #include "net/tools/flip_server/balsa_headers.h" #include "net/tools/flip_server/balsa_visitor_interface.h" #include "net/tools/flip_server/buffer_interface.h" -#include "net/tools/flip_server/create_listener.h" #include "net/tools/flip_server/epoll_server.h" -#include "net/tools/flip_server/other_defines.h" #include "net/tools/flip_server/ring_buffer.h" #include "net/tools/flip_server/simple_buffer.h" #include "net/tools/flip_server/split.h" +#include "net/tools/flip_server/flip_config.h" //////////////////////////////////////////////////////////////////////////////// -using std::cerr; using std::deque; using std::list; using std::map; @@ -47,26 +47,24 @@ using std::ostream; using std::pair; using std::string; using std::vector; +using std::cout; //////////////////////////////////////////////////////////////////////////////// +#define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \ + (((IP)>>16)&0xff),(((IP)>>24)&0xff) -// If set to true, then the server will act as an SSL server for both -// HTTP and SPDY); -bool FLAGS_use_ssl = true; +#define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \ + << acceptor_->listen_port_ << " " +#define ACCEPTOR_SERVER_IDENT acceptor_->server_ip_ << ":" \ + << acceptor_->server_port_ << " " -// The name of the cert .pem file); -string FLAGS_ssl_cert_name = "cert.pem"; +#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" -// The name of the key .pem file); -string FLAGS_ssl_key_name = "key.pem"; - -// The number of responses given before the server closes the -// connection); -int32 FLAGS_response_count_until_close = 1000*1000; +#define SSL_CTX_DEFAULT_CIPHER_LIST "RC4:!aNULL:!eNULL" // If true, then disables the nagle algorithm); -bool FLAGS_no_nagle = true; +bool FLAGS_disable_nagle = true; // The number of times that accept() will be called when the // alarm goes off when the accept_using_alarm flag is set to true. @@ -74,12 +72,6 @@ bool FLAGS_no_nagle = true; // is completely drained and the accept() call returns an error); int32 FLAGS_accepts_per_wake = 0; -// The port on which the spdy server listens); -int32 FLAGS_spdy_port = 10040; - -// The port on which the http server listens); -int32 FLAGS_port = 16002; - // The size of the TCP accept backlog); int32 FLAGS_accept_backlog_size = 1024; @@ -87,7 +79,7 @@ int32 FLAGS_accept_backlog_size = 1024; string FLAGS_cache_base_dir = "."; // If true, then encode url to filename); -bool FLAGS_need_to_encode_url = true; +bool FLAGS_need_to_encode_url = false; // If set to false a single socket will be used. If set to true // then a new socket will be created for each accept thread. @@ -105,9 +97,6 @@ bool FLAGS_use_xsub = false; // Does the server send X-Associated-Content headers); bool FLAGS_use_xac = false; -// Does the server advance cwnd by sending no-op packets); -bool FLAGS_use_cwnd_opener = false; - // Does the server compress data frames); bool FLAGS_use_compression = false; @@ -115,8 +104,6 @@ bool FLAGS_use_compression = false; using base::StringPiece; using base::SimpleThread; -// using base::Lock; // heh, this isn't in base namespace?! -// using base::AutoLock; // ditto! using net::BalsaFrame; using net::BalsaFrameEnums; using net::BalsaHeaders; @@ -137,6 +124,7 @@ using spdy::RST_STREAM; using spdy::SYN_REPLY; using spdy::SYN_STREAM; using spdy::SpdyControlFrame; +using spdy::SpdySettingsControlFrame; using spdy::SpdyDataFlags; using spdy::SpdyDataFrame; using spdy::SpdyRstStreamControlFrame; @@ -149,6 +137,7 @@ using spdy::SpdyStreamId; using spdy::SpdySynReplyControlFrame; using spdy::SpdySynStreamControlFrame; +FlipConfig g_proxy_config; //////////////////////////////////////////////////////////////////////////////// @@ -156,11 +145,21 @@ void PrintSslError() { char buf[128]; // this buffer must be at least 120 chars long. int error_num = ERR_get_error(); while (error_num != 0) { - LOG(INFO)<< ERR_error_string(error_num, buf); + LOG(ERROR) << ERR_error_string(error_num, buf); error_num = ERR_get_error(); } } +static int ssl_set_npn_callback(SSL *s, + const unsigned char **data, + unsigned int *len, + void *arg) +{ + VLOG(1) << "SSL NPN callback: advertising protocols."; + *data = (const unsigned char *) NEXT_PROTO_STRING; + *len = strlen(NEXT_PROTO_STRING); + return SSL_TLSEXT_ERR_OK; +} //////////////////////////////////////////////////////////////////////////////// // Creates a socket with domain, type and protocol parameters. @@ -172,26 +171,6 @@ int CreateSocket(int domain, int type, int protocol, int *fd) { return (*fd == -1) ? errno : 0; } -//////////////////////////////////////////////////////////////////////////////// - -// Sets an FD to be nonblocking. -void SetNonBlocking(int fd) { - DCHECK(fd >= 0); - - int fcntl_return = fcntl(fd, F_GETFL, 0); - CHECK_NE(fcntl_return, -1) - << "error doing fcntl(fd, F_GETFL, 0) fd: " << fd - << " errno=" << errno; - - if (fcntl_return & O_NONBLOCK) - return; - - fcntl_return = fcntl(fd, F_SETFL, fcntl_return | O_NONBLOCK); - CHECK_NE(fcntl_return, -1) - << "error doing fcntl(fd, F_SETFL, fcntl_return) fd: " << fd - << " errno=" << errno; -} - // Encode the URL. string EncodeURL(string uri, string host, string method) { if (!FLAGS_need_to_encode_url) { @@ -212,20 +191,16 @@ string EncodeURL(string uri, string host, string method) { //////////////////////////////////////////////////////////////////////////////// - -struct GlobalSSLState { +struct SSLState { SSL_METHOD* ssl_method; SSL_CTX* ssl_ctx; }; -//////////////////////////////////////////////////////////////////////////////// - -GlobalSSLState* global_ssl_state = NULL; - -//////////////////////////////////////////////////////////////////////////////// - // SSL stuff -void spdy_init_ssl(GlobalSSLState* state) { +void spdy_init_ssl(SSLState* state, + string ssl_cert_name, + string ssl_key_name, + bool use_npn) { SSL_library_init(); PrintSslError(); @@ -241,13 +216,13 @@ void spdy_init_ssl(GlobalSSLState* state) { // Disable SSLv2 support. SSL_CTX_set_options(state->ssl_ctx, SSL_OP_NO_SSLv2); if (SSL_CTX_use_certificate_file(state->ssl_ctx, - FLAGS_ssl_cert_name.c_str(), + ssl_cert_name.c_str(), SSL_FILETYPE_PEM) <= 0) { PrintSslError(); LOG(FATAL) << "Unable to use cert.pem as SSL cert."; } if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx, - FLAGS_ssl_key_name.c_str(), + ssl_key_name.c_str(), SSL_FILETYPE_PEM) <= 0) { PrintSslError(); LOG(FATAL) << "Unable to use key.pem as SSL key."; @@ -256,6 +231,21 @@ void spdy_init_ssl(GlobalSSLState* state) { PrintSslError(); LOG(FATAL) << "The cert.pem and key.pem files don't match"; } + if (use_npn) { + SSL_CTX_set_next_protos_advertised_cb(state->ssl_ctx, + ssl_set_npn_callback, NULL); + } + VLOG(1) << "SSL CTX default cipher list: " << SSL_CTX_DEFAULT_CIPHER_LIST; + SSL_CTX_set_cipher_list(state->ssl_ctx, SSL_CTX_DEFAULT_CIPHER_LIST); + + VLOG(1) << "SSL CTX session expiry: " << g_proxy_config.ssl_session_expiry_ + << " seconds"; + SSL_CTX_set_timeout(state->ssl_ctx, g_proxy_config.ssl_session_expiry_); + +#ifdef SSL_MODE_RELEASE_BUFFERS + VLOG(1) << "SSL CTX: Setting Release Buffers mode."; + SSL_CTX_set_mode(state->ssl_ctx, SSL_MODE_RELEASE_BUFFERS); +#endif } SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) { @@ -264,6 +254,7 @@ SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) { SSL_set_accept_state(ssl); PrintSslError(); + return ssl; } @@ -404,7 +395,6 @@ class MemoryCache { } void AddFiles() { - LOG(INFO) << "Adding files!"; deque<string> paths; cwd_ = FLAGS_cache_base_dir; paths.push_back(cwd_ + "/GET_"); @@ -580,7 +570,6 @@ class MemoryCache { LOG(ERROR) << "Skipping subresource with unknown content-type"; return; } - // Now, lets see if this is the same host or not bool same_host = (UrlUtilities::GetUrlHost(referrer) == UrlUtilities::GetUrlHost(url)); @@ -673,8 +662,6 @@ class MemoryCache { } }; -//////////////////////////////////////////////////////////////////////////////// - class NotifierInterface { public: virtual ~NotifierInterface() {} @@ -683,16 +670,33 @@ class NotifierInterface { //////////////////////////////////////////////////////////////////////////////// +class SMConnectionPoolInterface; + class SMInterface { public: - virtual size_t ProcessInput(const char* data, size_t len) = 0; + virtual void InitSMInterface(SMInterface* sm_other_interface, + int32 server_idx) = 0; + virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + bool use_ssl) = 0; + virtual size_t ProcessReadInput(const char* data, size_t len) = 0; + virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; + virtual void SetStreamID(uint32 stream_id) = 0; virtual bool MessageFullyRead() const = 0; virtual bool Error() const = 0; virtual const char* ErrorAsString() const = 0; virtual void Reset() = 0; + virtual void ResetForNewInterface(int32 server_idx) = 0; + // ResetForNewConnection is used for interfaces which control SMConnection + // objects. When called an interface may put its connection object into + // a reusable instance pool. Currently this is what the HttpSM interface + // does. virtual void ResetForNewConnection() = 0; + virtual void Cleanup() = 0; - virtual void PostAcceptHook() = 0; + virtual int PostAcceptHook() = 0; virtual void NewStream(uint32 stream_id, uint32 priority, const string& filename) = 0; @@ -709,65 +713,98 @@ class SMInterface { virtual ~SMInterface() {} }; -//////////////////////////////////////////////////////////////////////////////// +class SMConnectionInterface { + public: + virtual ~SMConnectionInterface() {} + virtual void ReadyToSend() = 0; + virtual EpollServer* epoll_server() = 0; +}; -class SMServerConnection; -typedef SMInterface*(SMInterfaceFactory)(SMServerConnection*); - -//////////////////////////////////////////////////////////////////////////////// +class HttpSM; +class SMConnection; typedef list<DataFrame> OutputList; -//////////////////////////////////////////////////////////////////////////////// - -class SMServerConnection; - -class SMServerConnectionPoolInterface { +class SMConnectionPoolInterface { public: - virtual ~SMServerConnectionPoolInterface() {} - // SMServerConnections will use this: - virtual void SMServerConnectionDone(SMServerConnection* connection) = 0; + virtual ~SMConnectionPoolInterface() {} + // SMConnections will use this: + virtual void SMConnectionDone(SMConnection* connection) = 0; }; -//////////////////////////////////////////////////////////////////////////////// - -class SMServerConnection: public EpollCallbackInterface, - public NotifierInterface { - private: - SMServerConnection(SMInterfaceFactory* sm_interface_factory, - MemoryCache* memory_cache, - EpollServer* epoll_server) : - fd_(-1), - events_(0), - - registered_in_epoll_server_(false), - initialized_(false), +SMInterface* NewStreamerSM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + FlipAcceptor* acceptor); - connection_pool_(NULL), - epoll_server_(epoll_server), +SMInterface* NewSpdySM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor); - read_buffer_(4096*10), - memory_cache_(memory_cache), - sm_interface_(sm_interface_factory(this)), +SMInterface* NewHttpSM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor); - max_bytes_sent_per_dowrite_(4096), +//////////////////////////////////////////////////////////////////////////////// - ssl_(NULL) {} +class SMConnection: public SMConnectionInterface, + public EpollCallbackInterface, + public NotifierInterface { + private: + SMConnection(EpollServer* epoll_server, + SSLState* ssl_state, + MemoryCache* memory_cache, + FlipAcceptor* acceptor, + string log_prefix) + : fd_(-1), + events_(0), + registered_in_epoll_server_(false), + initialized_(false), + protocol_detected_(false), + connection_complete_(false), + connection_pool_(NULL), + epoll_server_(epoll_server), + ssl_state_(ssl_state), + memory_cache_(memory_cache), + acceptor_(acceptor), + read_buffer_(4096*10), + sm_spdy_interface_(NULL), + sm_http_interface_(NULL), + sm_streamer_interface_(NULL), + sm_interface_(NULL), + log_prefix_(log_prefix), + max_bytes_sent_per_dowrite_(4096), + ssl_(NULL) + {} int fd_; int events_; bool registered_in_epoll_server_; bool initialized_; + bool protocol_detected_; + bool connection_complete_; - SMServerConnectionPoolInterface* connection_pool_; - EpollServer* epoll_server_; + SMConnectionPoolInterface* connection_pool_; + + EpollServer *epoll_server_; + SSLState *ssl_state_; + MemoryCache* memory_cache_; + FlipAcceptor *acceptor_; + string client_ip_; RingBuffer read_buffer_; OutputList output_list_; - MemoryCache* memory_cache_; + SMInterface* sm_spdy_interface_; + SMInterface* sm_http_interface_; + SMInterface* sm_streamer_interface_; SMInterface* sm_interface_; + string log_prefix_; size_t max_bytes_sent_per_dowrite_; @@ -777,45 +814,101 @@ class SMServerConnection: public EpollCallbackInterface, OutputList* output_list() { return &output_list_; } MemoryCache* memory_cache() { return memory_cache_; } void ReadyToSend() { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Setting ready to send: EPOLLIN | EPOLLOUT"; epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); } void EnqueueDataFrame(const DataFrame& df) { output_list_.push_back(df); - VLOG(2) << "EnqueueDataFrame. Setting FD ready."; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " + << "size = " << df.size << ": Setting FD ready."; ReadyToSend(); } + int fd() { return fd_; } public: - ~SMServerConnection() { + ~SMConnection() { if (initialized()) { Reset(); } } - static SMServerConnection* NewSMServerConnection(SMInterfaceFactory* smif, - MemoryCache* memory_cache, - EpollServer* epoll_server) { - return new SMServerConnection(smif, memory_cache, epoll_server); + static SMConnection* NewSMConnection(EpollServer* epoll_server, + SSLState *ssl_state, + MemoryCache* memory_cache, + FlipAcceptor *acceptor, + string log_prefix) { + return new SMConnection(epoll_server, ssl_state, memory_cache, + acceptor, log_prefix); } bool initialized() const { return initialized_; } + string client_ip() const { return client_ip_; } - void InitSMServerConnection(SMServerConnectionPoolInterface* connection_pool, - EpollServer* epoll_server, - int fd) { + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + bool use_ssl) { if (initialized_) { LOG(FATAL) << "Attempted to initialize already initialized server"; return; } - if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { - epoll_server_->UnregisterFD(fd_); - } - if (fd_ != -1) { - VLOG(2) << "Closing pre-existing fd"; - close(fd_); - fd_ = -1; - } - fd_ = fd; + if (fd == -1) { + // If fd == -1, then we are initializing a new connection that will + // connect to the backend. + // + // ret: -1 == error + // 0 == connection in progress + // 1 == connection complete + // TODO: is_numeric_host_address value needs to be detected + int ret = net::CreateConnectedSocket(&fd_, + acceptor_->server_ip_, + acceptor_->server_port_, + true, + acceptor_->disable_nagle_); + + if (ret < 0) { + LOG(ERROR) << "-1 Could not create connected socket"; + return; + } else if (ret == 1) { + DCHECK_NE(-1, fd_); + connection_complete_ = true; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connection complete to: " << ACCEPTOR_SERVER_IDENT; + } + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connecting to server: " << ACCEPTOR_SERVER_IDENT; + } else { + // If fd != -1 then we are initializing a connection that has just been + // accepted from the listen socket. + connection_complete_ = true; + if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { + epoll_server_->UnregisterFD(fd_); + } + if (fd_ != -1) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Closing pre-existing fd"; + close(fd_); + fd_ = -1; + } + + fd_ = fd; + struct sockaddr sock_addr; + socklen_t addr_size = sizeof(sock_addr); + addr_size = sizeof(sock_addr); + int res = getsockname(fd_, &sock_addr, &addr_size); + if (res < 0) { + LOG(ERROR) << "Could not get socket address for fd " << fd_ + << ": getsockname: " << strerror(errno); + } else { + struct sockaddr_in *sock_addr_in = (struct sockaddr_in *)&sock_addr; + char ip[16]; + snprintf(ip, sizeof(ip), "%d.%d.%d.%d", + IPV4_PRINTABLE_FORMAT(sock_addr_in->sin_addr.s_addr)); + client_ip_ = ip; + } + } registered_in_epoll_server_ = false; initialized_ = true; @@ -823,21 +916,42 @@ class SMServerConnection: public EpollCallbackInterface, connection_pool_ = connection_pool; epoll_server_ = epoll_server; - sm_interface_->Reset(); + if (sm_interface) { + sm_interface_ = sm_interface; + protocol_detected_ = true; + } + read_buffer_.Clear(); epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET); - if (global_ssl_state) { - ssl_ = spdy_new_ssl(global_ssl_state->ssl_ctx); + if (use_ssl) { + ssl_ = spdy_new_ssl(ssl_state_->ssl_ctx); SSL_set_fd(ssl_, fd_); PrintSslError(); } - sm_interface_->PostAcceptHook(); } int Send(const char* bytes, int len, int flags) { - return send(fd_, bytes, len, flags); + ssize_t bytes_written = 0; + if (ssl_) { + bytes_written = SSL_write(ssl_, bytes, len); + if (bytes_written < 0) { + switch(SSL_get_error(ssl_, bytes_written)) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_ACCEPT: + case SSL_ERROR_WANT_CONNECT: + return -2; + default: + PrintSslError(); + break; + } + } + } else { + bytes_written = send(fd_, bytes, len, flags); + } + return bytes_written; } // the following are from the EpollCallbackInterface @@ -861,43 +975,95 @@ class SMServerConnection: public EpollCallbackInterface, return; } + void Cleanup(const char* cleanup) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup"; + if (!initialized_) { + return; + } + Reset(); + if (connection_pool_) { + connection_pool_->SMConnectionDone(this); + } + if (sm_interface_) { + sm_interface_->ResetForNewConnection(); + } + } + private: void HandleEvents() { - VLOG(1) << "Received: " << EpollServer::EventMaskToString(events_); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " + << EpollServer::EventMaskToString(events_).c_str(); + if (events_ & EPOLLIN) { if (!DoRead()) goto handle_close_or_error; } if (events_ & EPOLLOUT) { + // Check if we have connected or not + if (connection_complete_ == false) { + int sock_error; + socklen_t sock_error_len = sizeof(sock_error); + int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, + &sock_error_len); + if (ret != 0) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "getsockopt error: " << errno << ": " << strerror(errno); + goto handle_close_or_error; + } + if (sock_error == 0) { + connection_complete_ = true; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Connection complete to " << ACCEPTOR_SERVER_IDENT; + } else if (sock_error == EINPROGRESS) { + return; + } else { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "error connecting to server"; + goto handle_close_or_error; + } + } if (!DoWrite()) goto handle_close_or_error; } if (events_ & (EPOLLHUP | EPOLLERR)) { - VLOG(2) << "!!!! Got HUP or ERR"; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR"; goto handle_close_or_error; } return; - handle_close_or_error: + handle_close_or_error: Cleanup("HandleEvents"); } bool DoRead() { - VLOG(2) << "DoRead()"; - if (fd_ == -1) { - VLOG(2) << "DoRead(): fd_ == -1. Invalid FD. Returning false"; - return false; - } + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()"; while (!read_buffer_.Full()) { char* bytes; int size; + if (fd_ == -1) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoRead(): fd_ == -1. Invalid FD. Returning false"; + return false; + } read_buffer_.GetWritablePtr(&bytes, &size); ssize_t bytes_read = 0; if (ssl_) { bytes_read = SSL_read(ssl_, bytes, size); - PrintSslError(); + if (bytes_read < 0) { + switch(SSL_get_error(ssl_, bytes_read)) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_ACCEPT: + case SSL_ERROR_WANT_CONNECT: + events_ &= ~EPOLLIN; + goto done; + default: + PrintSslError(); + break; + } + } } else { bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT); } @@ -906,33 +1072,107 @@ class SMServerConnection: public EpollCallbackInterface, switch (stored_errno) { case EAGAIN: events_ &= ~EPOLLIN; - VLOG(2) << "Got EAGAIN while reading"; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EAGAIN while reading"; goto done; case EINTR: - VLOG(2) << "Got EINTR while reading"; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EINTR while reading"; continue; default: - VLOG(2) << "While calling recv, got error: " << stored_errno - << " " << strerror(stored_errno); + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "While calling recv, got error: " + << (ssl_?"(ssl error)":strerror(stored_errno)); goto error_or_close; } } else if (bytes_read > 0) { - VLOG(2) << "Read: " << bytes_read << " bytes from fd: " << fd_; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read + << " bytes"; + if (!protocol_detected_) { + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + // Http Server + protocol_detected_ = true; + if (!sm_http_interface_) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Created HTTP interface."; + sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_, + memory_cache_, acceptor_); + } else { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Reusing HTTP interface."; + } + sm_interface_ = sm_http_interface_; + } else if (ssl_) { + protocol_detected_ = true; + if (SSL_session_reused(ssl_) == 0) { + VLOG(1) << "Session status: renegotiated"; + } else { + VLOG(1) << "Session status: resumed"; + } + const unsigned char *npn_proto; + unsigned int npn_proto_len; + SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len); + if (npn_proto_len > 0) { + string npn_proto_str((const char *)npn_proto, npn_proto_len); + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "NPN protocol detected: " << npn_proto_str; + } else { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "NPN protocol detected: none"; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "NPN protocol: Could not negotiate SPDY protocol."; + goto error_or_close; + } + } + if (npn_proto_len > 0 && + !strncmp((char *)npn_proto, "spdy/2", npn_proto_len)) { + if (!sm_spdy_interface_) { + sm_spdy_interface_ = NewSpdySM(this, NULL, epoll_server_, + memory_cache_, acceptor_); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Created SPDY interface."; + } else { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Reusing SPDY interface."; + } + sm_interface_ = sm_spdy_interface_; + } else { + if (!sm_streamer_interface_) { + sm_streamer_interface_ = NewStreamerSM(this, NULL, + epoll_server_, + acceptor_); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Created Streamer interface."; + } else { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Reusing Streamer interface: "; + } + sm_interface_ = sm_streamer_interface_; + } + } + if (sm_interface_->PostAcceptHook() == 0) { + goto error_or_close; + } + } read_buffer_.AdvanceWritablePtr(bytes_read); if (!DoConsumeReadData()) { goto error_or_close; } continue; } else { // bytes_read == 0 - VLOG(2) << "0 bytes read with recv call."; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "0 bytes read with recv call."; } goto error_or_close; } done: return true; - error_or_close: - VLOG(2) << "DoRead(): error_or_close. Cleaning up, then returning false"; + error_or_close: + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoRead(): error_or_close. " + << "Cleaning up, then returning false"; Cleanup("DoRead"); return false; } @@ -942,19 +1182,21 @@ class SMServerConnection: public EpollCallbackInterface, int size; read_buffer_.GetReadablePtr(&bytes, &size); while (size != 0) { - size_t bytes_consumed = sm_interface_->ProcessInput(bytes, size); - VLOG(2) << "consumed: " << bytes_consumed << " from socket fd: " << fd_; + size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed " + << bytes_consumed << " bytes"; if (bytes_consumed == 0) { break; } read_buffer_.AdvanceReadablePtr(bytes_consumed); if (sm_interface_->MessageFullyRead()) { - VLOG(2) << "HandleRequestFullyRead"; - HandleRequestFullyRead(); - sm_interface_->Reset(); + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "HandleRequestFullyRead: Setting EPOLLOUT"; + HandleResponseFullyRead(); events_ |= EPOLLOUT; } else if (sm_interface_->Error()) { - LOG(ERROR) << "Framer error detected: " + LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Framer error detected: Setting EPOLLOUT: " << sm_interface_->ErrorAsString(); // this causes everything to be closed/cleaned up. events_ |= EPOLLOUT; @@ -970,7 +1212,8 @@ class SMServerConnection: public EpollCallbackInterface, // feeding files into the output buffer. } - void HandleRequestFullyRead() { + void HandleResponseFullyRead() { + sm_interface_->Cleanup(); } void Notify() { @@ -980,20 +1223,29 @@ class SMServerConnection: public EpollCallbackInterface, size_t bytes_sent = 0; int flags = MSG_NOSIGNAL | MSG_DONTWAIT; if (fd_ == -1) { - VLOG(2) << "DoWrite: fd == -1. Returning false."; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoWrite: fd == -1. Returning false."; return false; } if (output_list_.empty()) { - sm_interface_->GetOutput(); - if (output_list_.empty()) + VLOG(2) << log_prefix_ << "DoWrite: Output list empty."; + if (sm_interface_) { + sm_interface_->GetOutput(); + } + if (output_list_.empty()) { events_ &= ~EPOLLOUT; + } } while (!output_list_.empty()) { + VLOG(2) << log_prefix_ << "DoWrite: Items in output list: " + << output_list_.size(); if (bytes_sent >= max_bytes_sent_per_dowrite_) { + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << " byte sent >= max bytes sent per write: Setting EPOLLOUT"; events_ |= EPOLLOUT; break; } - if (output_list_.size() < 2) { + if (sm_interface_ && output_list_.size() < 2) { sm_interface_->GetOutput(); } DataFrame& data_frame = output_list_.front(); @@ -1003,6 +1255,11 @@ class SMServerConnection: public EpollCallbackInterface, size -= data_frame.index; DCHECK_GE(size, 0); if (size <= 0) { + // Empty data frame. Indicates end of data from client. + // Uncork the socket. + int state = 0; + VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket."; + setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) ); data_frame.MaybeDelete(); output_list_.pop_front(); continue; @@ -1010,56 +1267,63 @@ class SMServerConnection: public EpollCallbackInterface, flags = MSG_NOSIGNAL | MSG_DONTWAIT; if (output_list_.size() > 1) { + VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() + << ": Adding MSG_MORE flag"; flags |= MSG_MORE; } - ssize_t bytes_written = 0; - if (ssl_) { - bytes_written = SSL_write(ssl_, bytes, size); - PrintSslError(); - } else { - bytes_written = send(fd_, bytes, size, flags); - } + VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; + ssize_t bytes_written = Send(bytes, size, flags); int stored_errno = errno; if (bytes_written == -1) { switch (stored_errno) { case EAGAIN: events_ &= ~EPOLLOUT; - VLOG(2) << " Got EAGAIN while writing"; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EAGAIN while writing"; goto done; case EINTR: - VLOG(2) << " Got EINTR while writing"; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "Got EINTR while writing"; continue; default: - VLOG(2) << "While calling send, got error: " << stored_errno - << " " << strerror(stored_errno); + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "While calling send, got error: " << stored_errno + << ": " << (ssl_?"":strerror(stored_errno)); goto error_or_close; } } else if (bytes_written > 0) { - VLOG(1) << "Wrote: " << bytes_written << " bytes to socket fd: " - << fd_; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " + << bytes_written << " bytes"; data_frame.index += bytes_written; bytes_sent += bytes_written; continue; + } else if (bytes_written == -2) { + // -2 handles SSL_ERROR_WANT_* errors + events_ &= ~EPOLLOUT; + goto done; } - VLOG(2) << "0 bytes written to socket " << fd_ << " with send call."; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "0 bytes written with send call."; goto error_or_close; } done: return true; error_or_close: - VLOG(2) << "DoWrite: error_or_close. Returning false after cleaning up"; + VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT + << "DoWrite: error_or_close. Returning false " + << "after cleaning up"; Cleanup("DoWrite"); return false; } - friend ostream& operator<<(ostream& os, const SMServerConnection& c) { + friend ostream& operator<<(ostream& os, const SMConnection& c) { os << &c << "\n"; return os; } void Reset() { - VLOG(2) << "Resetting"; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; if (ssl_) { SSL_shutdown(ssl_); PrintSslError(); @@ -1071,25 +1335,17 @@ class SMServerConnection: public EpollCallbackInterface, registered_in_epoll_server_ = false; } if (fd_ >= 0) { - VLOG(2) << "Closing connection"; + VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; close(fd_); fd_ = -1; } - sm_interface_->ResetForNewConnection(); read_buffer_.Clear(); initialized_ = false; + protocol_detected_ = false; events_ = 0; output_list_.clear(); } - void Cleanup(const char* cleanup) { - VLOG(2) << "Cleaning up: " << cleanup; - if (!initialized_) { - return; - } - Reset(); - connection_pool_->SMServerConnectionDone(this); - } }; //////////////////////////////////////////////////////////////////////////////// @@ -1114,13 +1370,15 @@ class OutputOrdering { PriorityRing first_data_senders_; uint32 first_data_senders_threshold_; // when you've passed this, you're no // longer a first_data_sender... - SMServerConnection* connection_; + SMConnectionInterface* connection_; EpollServer* epoll_server_; - explicit OutputOrdering(SMServerConnection* connection) : - first_data_senders_threshold_(kInitialDataSendersThreshold), - connection_(connection), - epoll_server_(connection->epoll_server()) { + explicit OutputOrdering(SMConnectionInterface* connection) : + first_data_senders_threshold_(kInitialDataSendersThreshold), + connection_(connection) { + if (connection) { + epoll_server_ = connection->epoll_server(); + } } void Reset() { @@ -1151,7 +1409,7 @@ class OutputOrdering { int64 OnAlarm() { OnUnregistration(); output_ordering_->MoveToActive(pmp_, mci_); - VLOG(1) << "ON ALARM! Should now start to output..."; + VLOG(2) << "ON ALARM! Should now start to output..."; delete this; return 0; } @@ -1179,7 +1437,7 @@ class OutputOrdering { }; void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { - VLOG(1) <<"Moving to active!"; + VLOG(2) << "Moving to active!"; first_data_senders_.push_back(mci); pmp->ring = &first_data_senders_; pmp->it = first_data_senders_.end(); @@ -1189,9 +1447,9 @@ class OutputOrdering { void AddToOutputOrder(const MemCacheIter& mci) { if (ExistsInPriorityMaps(mci.stream_id)) - LOG(FATAL) << "OOps, already was inserted here?!"; + LOG(ERROR) << "OOps, already was inserted here?!"; - double think_time_in_s = FLAGS_server_think_time_in_s; + double think_time_in_s = g_proxy_config.server_think_time_in_s_; string x_server_latency = mci.file_data->headers->GetHeader("X-Server-Latency").as_string(); if (x_server_latency.size() != 0) { @@ -1199,7 +1457,8 @@ class OutputOrdering { double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); if (endp != x_server_latency.c_str() + x_server_latency.size()) { LOG(ERROR) << "Unable to understand X-Server-Latency of: " - << x_server_latency << " for resource: " << mci.file_data->filename; + << x_server_latency << " for resource: " + << mci.file_data->filename.c_str(); } else { think_time_in_s = tmp_think_time_in_s; } @@ -1211,7 +1470,7 @@ class OutputOrdering { PriorityMapPointer& pmp = sitpmi->second; BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); - VLOG(2) << "Server think time: " << think_time_in_s; + VLOG(1) << "Server think time: " << think_time_in_s; epoll_server_->RegisterAlarmApproximateDelta( think_time_in_s * 1000000, boa); } @@ -1275,34 +1534,169 @@ class OutputOrdering { } }; + //////////////////////////////////////////////////////////////////////////////// class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { private: uint64 seq_num_; - SpdyFramer* framer_; + SpdyFramer* spdy_framer_; - SMServerConnection* connection_; - OutputList* output_list_; - OutputOrdering output_ordering_; - MemoryCache* memory_cache_; + SMConnection* connection_; + OutputList* client_output_list_; + OutputOrdering client_output_ordering_; uint32 next_outgoing_stream_id_; + EpollServer* epoll_server_; + FlipAcceptor* acceptor_; + MemoryCache* memory_cache_; + vector<SMInterface*> server_interface_list; + vector<int32> unused_server_interface_list; + typedef map<uint32,SMInterface*> StreamToSmif; + StreamToSmif stream_to_smif_; public: - explicit SpdySM(SMServerConnection* connection) : - seq_num_(0), - framer_(new SpdyFramer), - connection_(connection), - output_list_(connection->output_list()), - output_ordering_(connection), - memory_cache_(connection->memory_cache()), - next_outgoing_stream_id_(2) { - framer_->set_visitor(this); + SpdySM(SMConnection* connection, + SMInterface* sm_http_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) + : seq_num_(0), + spdy_framer_(new SpdyFramer), + connection_(connection), + client_output_list_(connection->output_list()), + client_output_ordering_(connection), + next_outgoing_stream_id_(2), + epoll_server_(epoll_server), + acceptor_(acceptor), + memory_cache_(memory_cache) { + spdy_framer_->set_visitor(this); } + + ~SpdySM() { + delete spdy_framer_; + } + + void InitSMInterface(SMInterface* sm_http_interface, + int32 server_idx) { } + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + bool use_ssl) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: Initializing server connection."; + connection_->InitSMConnection(connection_pool, sm_interface, + epoll_server, fd, use_ssl); + } + private: virtual void OnError(SpdyFramer* framer) { /* do nothing with this right now */ } + SMInterface* NewConnectionInterface() { + SMConnection* server_connection = + SMConnection::NewSMConnection(epoll_server_, NULL, + memory_cache_, acceptor_, + "http_conn: "); + if (server_connection == NULL) { + LOG(ERROR) << "SpdySM: Could not create server connection"; + return NULL; + } + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface"; + SMInterface *sm_http_interface = NewHttpSM(server_connection, this, + epoll_server_, memory_cache_, + acceptor_); + return sm_http_interface; + } + + SMInterface* FindOrMakeNewSMConnectionInterface() { + SMInterface *sm_http_interface; + int32 server_idx; + if (unused_server_interface_list.empty()) { + sm_http_interface = NewConnectionInterface(); + server_idx = server_interface_list.size(); + server_interface_list.push_back(sm_http_interface); + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: Making new server connection on index: " + << server_idx; + } else { + server_idx = unused_server_interface_list.back(); + unused_server_interface_list.pop_back(); + sm_http_interface = server_interface_list.at(server_idx); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on " + << "index: " << server_idx; + } + + sm_http_interface->InitSMInterface(this, server_idx); + sm_http_interface->InitSMConnection(NULL, sm_http_interface, + epoll_server_, -1, false); + + return sm_http_interface; + } + + int SpdyHandleNewStream(const SpdyControlFrame* frame, + string *http_data) + { + bool parsed_headers = false; + SpdyHeaderBlock headers; + const SpdySynStreamControlFrame* syn_stream = + reinterpret_cast<const SpdySynStreamControlFrame*>(frame); + + parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" + << syn_stream->stream_id() << ")"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: headers parsed?: " + << (parsed_headers? "yes": "no"); + if (parsed_headers) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: " + << headers.size(); + } + SpdyHeaderBlock::iterator url = headers.find("url"); + SpdyHeaderBlock::iterator method = headers.find("method"); + if (url == headers.end() || method == headers.end()) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: didn't find method or url " + << "or method. Not creating stream"; + return 0; + } + + string uri = UrlUtilities::GetUrlPath(url->second); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { + SpdyHeaderBlock::iterator referer = headers.find("referer"); + if (referer != headers.end() && method->second == "GET") { + memory_cache_->UpdateHeaders(referer->second, url->second); + } + string host = UrlUtilities::GetUrlHost(url->second); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second + << " " << uri; + string filename = EncodeURL(uri, host, method->second); + NewStream(syn_stream->stream_id(), + reinterpret_cast<const SpdySynStreamControlFrame*>(frame)-> + priority(), + filename); + } else { + SpdyHeaderBlock::iterator version = headers.find("version"); + *http_data += method->second + " " + uri + " " + version->second + "\r\n"; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " + << uri << " " << version->second; + for (SpdyHeaderBlock::iterator i = headers.begin(); + i != headers.end(); ++i) { + *http_data += i->first + ": " + i->second + "\r\n"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" + << i->second.c_str(); + } + if (g_proxy_config.forward_ip_header_enabled_) { + // X-Client-Cluster-IP header + *http_data += g_proxy_config.forward_ip_header_ + ": " + + connection_->client_ip() + "\r\n"; + } + *http_data += "\r\n"; + } + + VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; + return 1; + } + virtual void OnControl(const SpdyControlFrame* frame) { SpdyHeaderBlock headers; bool parsed_headers = false; @@ -1311,122 +1705,126 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { { const SpdySynStreamControlFrame* syn_stream = reinterpret_cast<const SpdySynStreamControlFrame*>(frame); - parsed_headers = framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << "OnSyn(" << syn_stream->stream_id() << ")"; - VLOG(2) << "headers parsed?: " << (parsed_headers? "yes": "no"); - if (parsed_headers) { - VLOG(2) << "# headers: " << headers.size(); - } - for (SpdyHeaderBlock::iterator i = headers.begin(); - i != headers.end(); - ++i) { - VLOG(2) << i->first << ": " << i->second; - } - SpdyHeaderBlock::iterator method = headers.find("method"); - SpdyHeaderBlock::iterator url = headers.find("url"); - if (url == headers.end() || method == headers.end()) { - VLOG(2) << "didn't find method or url or method. Not creating stream"; - break; - } + string http_data; + int ret = SpdyHandleNewStream(frame, &http_data); + if (!ret) { + LOG(ERROR) << "SpdySM: Could not convert spdy into http."; + break; + } - SpdyHeaderBlock::iterator referer = headers.find("referer"); - if (referer != headers.end() && method->second == "GET") { - memory_cache_->UpdateHeaders(referer->second, url->second); - } - string uri = UrlUtilities::GetUrlPath(url->second); - string host = UrlUtilities::GetUrlHost(url->second); - - string filename = EncodeURL(uri, host, method->second); - NewStream(syn_stream->stream_id(), - reinterpret_cast<const SpdySynStreamControlFrame*>(frame)-> - priority(), - filename); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + SMInterface *sm_http_interface = + FindOrMakeNewSMConnectionInterface(); + stream_to_smif_[syn_stream->stream_id()] = sm_http_interface; + sm_http_interface->SetStreamID(syn_stream->stream_id()); + sm_http_interface->ProcessWriteInput(http_data.c_str(), + http_data.size()); + } } break; case SYN_REPLY: - parsed_headers = framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << "OnSynReply(" - << reinterpret_cast<const SpdySynReplyControlFrame*>( - frame)->stream_id() << ")"; + parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" << + reinterpret_cast<const SpdySynReplyControlFrame*>(frame)->stream_id() + << ")"; break; case RST_STREAM: { const SpdyRstStreamControlFrame* rst_stream = reinterpret_cast<const SpdyRstStreamControlFrame*>(frame); - VLOG(2) << "OnRst(" << rst_stream->stream_id() << ")"; - output_ordering_.RemoveStreamId(rst_stream ->stream_id()); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRst(" + << rst_stream->stream_id() << ")"; + client_output_ordering_.RemoveStreamId(rst_stream ->stream_id()); } break; default: - LOG(DFATAL) << "Unknown control frame type"; + LOG(ERROR) << "SpdySM: Unknown control frame type"; } } - virtual void OnStreamFrameData( - SpdyStreamId stream_id, - const char* data, size_t len) { - VLOG(2) << "StreamData(" << stream_id << ", [" << len << "])"; - /* do nothing with this right now */ - } - virtual void OnLameDuck() { - /* do nothing with this right now */ + virtual void OnStreamFrameData(SpdyStreamId stream_id, + const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id + << ", [" << len << "])"; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + stream_to_smif_[stream_id]->ProcessWriteInput(data, len); + } } public: - ~SpdySM() { - Reset(); + size_t ProcessReadInput(const char* data, size_t len) { + return spdy_framer_->ProcessInput(data, len); } - size_t ProcessInput(const char* data, size_t len) { - return framer_->ProcessInput(data, len); + + size_t ProcessWriteInput(const char* data, size_t len) { + return 0; } bool MessageFullyRead() const { - return framer_->MessageFullyRead(); + return spdy_framer_->MessageFullyRead(); } + void SetStreamID(uint32 stream_id) {} + bool Error() const { - return framer_->HasError(); + return spdy_framer_->HasError(); } const char* ErrorAsString() const { - return SpdyFramer::ErrorCodeToString(framer_->error_code()); + DCHECK(Error()); + return SpdyFramer::ErrorCodeToString(spdy_framer_->error_code()); + } + + void Reset() { + } + + void ResetForNewInterface(int32 server_idx) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: " + << "server_idx: " << server_idx; + unused_server_interface_list.push_back(server_idx); } - void Reset() {} void ResetForNewConnection() { // seq_num is not cleared, intentionally. - delete framer_; - framer_ = new SpdyFramer; - framer_->set_visitor(this); - output_ordering_.Reset(); + delete spdy_framer_; + spdy_framer_ = new SpdyFramer; + spdy_framer_->set_visitor(this); + client_output_ordering_.Reset(); next_outgoing_stream_id_ = 2; } - // Send a couple of NOOP packets to force opening of cwnd. - void PostAcceptHook() { - if (!FLAGS_use_cwnd_opener) - return; - - // We send 2 because that is the initial cwnd, and also because - // we have to in order to get an ACK back from the client due to - // delayed ACK. - const int kPkts = 2; - - LOG(ERROR) << "Sending NOP FRAMES"; - - scoped_ptr<SpdyControlFrame> frame(SpdyFramer::CreateNopFrame()); - for (int i = 0; i < kPkts; ++i) { - char* bytes = frame->data(); - size_t size = SpdyFrame::size(); - ssize_t bytes_written = connection_->Send(bytes, size, MSG_DONTWAIT); - if (static_cast<size_t>(bytes_written) != size) { - LOG(ERROR) << "Trouble sending Nop packet! (" << errno << ")"; - if (errno == EAGAIN) - break; + // SMInterface's Cleanup is currently only called by SMConnection after a + // protocol message as been fully read. Spdy's SMInterface does not need + // to do any cleanup at this time. + // TODO (klindsay) This method is probably not being used properly and + // some logic review and method renaming is probably in order. + void Cleanup() {} + + // Send a settings frame and possibly some NOOP packets to force + // opening of cwnd + int PostAcceptHook() { + ssize_t bytes_written; + spdy::SpdySettings settings; + spdy::SettingsFlagsAndId settings_id(0); + settings_id.set_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS); + settings.push_back(spdy::SpdySetting(settings_id, 100)); + scoped_ptr<SpdySettingsControlFrame> + settings_frame(spdy_framer_->CreateSettings(settings)); + + char* bytes = settings_frame->data(); + size_t size = SpdyFrame::size() + settings_frame->length(); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame"; + bytes_written = connection_->Send(bytes, size, + MSG_NOSIGNAL | MSG_DONTWAIT); + if (static_cast<size_t>(bytes_written) != size) { + LOG(ERROR) << "Trouble sending SETTINGS frame! (" << errno << ")"; + if (errno == EAGAIN) { + return 0; } } + return 1; } void AddAssociatedContent(FileData* file_data) { @@ -1436,10 +1834,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { string filename = "GET_"; filename += related_file.second; if (!memory_cache_->AssignFileData(filename, &mci)) { - VLOG(1) << "Unable to find associated content for: " << filename; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Unable to find associated " + << "content for: " << filename; continue; } - VLOG(1) << "Adding associated content: " << filename; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Adding associated content: " + << filename; mci.stream_id = next_outgoing_stream_id_; next_outgoing_stream_id_ += 2; mci.priority = related_file.first; @@ -1451,20 +1851,24 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { MemCacheIter mci; mci.stream_id = stream_id; mci.priority = priority; - if (!memory_cache_->AssignFileData(filename, &mci)) { - // error creating new stream. - VLOG(2) << "Sending ErrorNotFound"; - SendErrorNotFound(stream_id); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { + if (!memory_cache_->AssignFileData(filename, &mci)) { + // error creating new stream. + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; + SendErrorNotFound(stream_id); + } else { + AddToOutputOrder(mci); + if (FLAGS_use_xac) { + AddAssociatedContent(mci.file_data); + } + } } else { AddToOutputOrder(mci); - if (FLAGS_use_xac) { - AddAssociatedContent(mci.file_data); - } } } void AddToOutputOrder(const MemCacheIter& mci) { - output_ordering_.AddToOutputOrder(mci); + client_output_ordering_.AddToOutputOrder(mci); } void SendEOF(uint32 stream_id) { @@ -1493,12 +1897,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { SendDataFrameImpl(stream_id, data, len, spdy_flags, compress); } - SpdyFramer* spdy_framer() { return framer_; } + SpdyFramer* spdy_framer() { return spdy_framer_; } private: void SendEOFImpl(uint32 stream_id) { SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false); - VLOG(2) << "Sending EOF: " << stream_id; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id; KillStream(stream_id); } @@ -1507,7 +1911,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); SendSynReplyImpl(stream_id, my_headers); SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false); - output_ordering_.RemoveStreamId(stream_id); + client_output_ordering_.RemoveStreamId(stream_id); } void SendOKResponseImpl(uint32 stream_id, string* output) { @@ -1516,11 +1920,11 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { SendSynReplyImpl(stream_id, my_headers); SendDataFrame( stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false); - output_ordering_.RemoveStreamId(stream_id); + client_output_ordering_.RemoveStreamId(stream_id); } void KillStream(uint32 stream_id) { - output_ordering_.RemoveStreamId(stream_id); + client_output_ordering_.RemoveStreamId(stream_id); } void CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) { @@ -1559,8 +1963,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { CopyHeaders(block, headers); SpdySynStreamControlFrame* fsrcf = - framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, - &block); + spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, + &block); DataFrame df; df.size = fsrcf->length() + SpdyFrame::size(); size_t df_size = df.size; @@ -1568,7 +1972,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { df.delete_when_done = true; EnqueueDataFrame(df); - VLOG(2) << "Sending SynStreamheader " << stream_id; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " + << stream_id; return df_size; } @@ -1580,7 +1985,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { block["version"] = headers.response_version().as_string(); SpdySynReplyControlFrame* fsrcf = - framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); + spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); DataFrame df; df.size = fsrcf->length() + SpdyFrame::size(); size_t df_size = df.size; @@ -1588,7 +1993,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { df.delete_when_done = true; EnqueueDataFrame(df); - VLOG(2) << "Sending SynReplyheader " << stream_id; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " + << stream_id; return df_size; } @@ -1601,16 +2007,16 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { // TODO(mbelshe): We can't compress here - before going into the // priority queue. Compression needs to be done // with late binding. - SpdyDataFrame* fdf = framer_->CreateDataFrame(stream_id, data, len, - flags); + SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, + flags); DataFrame df; df.size = fdf->length() + SpdyFrame::size(); df.data = fdf->data(); df.delete_when_done = true; EnqueueDataFrame(df); - VLOG(2) << "Sending data frame" << stream_id << " [" << len << "]" - << " shrunk to " << fdf->length(); + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " + << stream_id << " [" << len << "] shrunk to " << fdf->length(); } void EnqueueDataFrame(const DataFrame& df) { @@ -1618,16 +2024,17 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { } void GetOutput() { - while (output_list_->size() < 2) { - MemCacheIter* mci = output_ordering_.GetIter(); + while (client_output_list_->size() < 2) { + MemCacheIter* mci = client_output_ordering_.GetIter(); if (mci == NULL) { - VLOG(2) << "GetOutput: nothing to output!?"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "SpdySM: GetOutput: nothing to output!?"; return; } if (!mci->transformed_header) { mci->transformed_header = true; - VLOG(2) << "GetOutput transformed header stream_id: [" - << mci->stream_id << "]"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed " + << "header stream_id: [" << mci->stream_id << "]"; if ((mci->stream_id % 2) == 0) { // this is a server initiated stream. // Ideally, we'd do a 'syn-push' here, instead of a syn-reply. @@ -1647,7 +2054,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { return; } if (mci->body_bytes_consumed >= mci->file_data->body.size()) { - VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput " + << "remove_stream_id: [" << mci->stream_id << "]"; SendEOF(mci->stream_id); return; } @@ -1669,8 +2077,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { SendDataFrame(mci->stream_id, mci->file_data->body.data() + mci->body_bytes_consumed, num_to_write, 0, should_compress); - VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id - << "]: " << num_to_write; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame[" + << mci->stream_id << "]: " << num_to_write; mci->body_bytes_consumed += num_to_write; mci->bytes_sent += num_to_write; } @@ -1679,28 +2087,41 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { //////////////////////////////////////////////////////////////////////////////// -class HTTPSM : public BalsaVisitorInterface, public SMInterface { +class HttpSM : public BalsaVisitorInterface, public SMInterface { private: uint64 seq_num_; - BalsaFrame* framer_; + BalsaFrame* http_framer_; BalsaHeaders headers_; uint32 stream_id_; + int32 server_idx_; - SMServerConnection* connection_; + SMConnection* connection_; + SMInterface* sm_spdy_interface_; OutputList* output_list_; OutputOrdering output_ordering_; MemoryCache* memory_cache_; + FlipAcceptor* acceptor_; public: - explicit HTTPSM(SMServerConnection* connection) : + explicit HttpSM(SMConnection* connection, + SMInterface* sm_spdy_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) : seq_num_(0), - framer_(new BalsaFrame), - stream_id_(1), + http_framer_(new BalsaFrame), + stream_id_(0), + server_idx_(-1), connection_(connection), + sm_spdy_interface_(sm_spdy_interface), output_list_(connection->output_list()), output_ordering_(connection), - memory_cache_(connection->memory_cache()) { - framer_->set_balsa_visitor(this); - framer_->set_balsa_headers(&headers_); + memory_cache_(connection->memory_cache()), + acceptor_(acceptor) { + http_framer_->set_balsa_visitor(this); + http_framer_->set_balsa_headers(&headers_); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + http_framer_->set_is_request(false); + } } private: typedef map<string, uint32> ClientTokenMap; @@ -1708,20 +2129,31 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { virtual void ProcessBodyInput(const char *input, size_t size) { } virtual void ProcessBodyData(const char *input, size_t size) { - // ignoring this. + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process Body Data: stream " + << stream_id_ << ": size " << size; + sm_spdy_interface_->SendDataFrame(stream_id_, input, size, 0, false); + } } virtual void ProcessHeaderInput(const char *input, size_t size) { } virtual void ProcessTrailerInput(const char *input, size_t size) {} virtual void ProcessHeaders(const BalsaHeaders& headers) { - VLOG(2) << "Got new request!"; - string host = UrlUtilities::GetUrlHost( - headers.GetHeader("Host").as_string()); - string method = headers.request_method().as_string(); - string filename = EncodeURL(headers.request_uri().as_string(), host, - method); - NewStream(stream_id_, 0, filename); - stream_id_ += 2; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + string host = + UrlUtilities::GetUrlHost(headers.GetHeader("Host").as_string()); + string method = headers.request_method().as_string(); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Received Request: " + << headers.request_uri().as_string() << " " << method; + string filename = EncodeURL(headers.request_uri().as_string(), + host, method); + NewStream(stream_id_, 0, filename); + stream_id_ += 2; + } else { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Received Response from " + << ACCEPTOR_SERVER_IDENT; + sm_spdy_interface_->SendSynReply(stream_id_, headers); + } } virtual void ProcessRequestFirstLine(const char* line_input, size_t line_length, @@ -1743,7 +2175,13 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { virtual void ProcessChunkExtensions(const char *input, size_t size) {} virtual void HeaderDone() {} virtual void MessageDone() { - VLOG(2) << "MessageDone!"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone. Sending EOF: " + << "stream " << stream_id_; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + sm_spdy_interface_->SendEOF(stream_id_); + } else { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone."; + } } virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); @@ -1757,40 +2195,98 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { } void HandleError() { - VLOG(2) << "Error detected"; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Error detected"; } public: - ~HTTPSM() { + ~HttpSM() { Reset(); + delete http_framer_; } - size_t ProcessInput(const char* data, size_t len) { - return framer_->ProcessInput(data, len); + + void InitSMInterface(SMInterface* sm_spdy_interface, + int32 server_idx) + { + sm_spdy_interface_ = sm_spdy_interface; + server_idx_ = server_idx; + } + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + bool use_ssl) + { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " + << "connection."; + connection_->InitSMConnection(connection_pool, sm_interface, + epoll_server, fd, use_ssl); + } + + size_t ProcessReadInput(const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream " + << stream_id_; + return http_framer_->ProcessInput(data, len); + } + + size_t ProcessWriteInput(const char* data, size_t len) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size " + << len << ": stream " << stream_id_; + char * dataPtr = new char[len]; + memcpy(dataPtr, data, len); + DataFrame data_frame; + data_frame.data = (const char *)dataPtr; + data_frame.size = len; + data_frame.delete_when_done = true; + connection_->EnqueueDataFrame(data_frame); + return len; } bool MessageFullyRead() const { - return framer_->MessageFullyRead(); + return http_framer_->MessageFullyRead(); + } + + void SetStreamID(uint32 stream_id) { + stream_id_ = stream_id; } bool Error() const { - return framer_->Error(); + return http_framer_->Error(); } const char* ErrorAsString() const { - return BalsaFrameEnums::ErrorCodeToString(framer_->ErrorCode()); + return BalsaFrameEnums::ErrorCodeToString(http_framer_->ErrorCode()); } void Reset() { - framer_->Reset(); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Reset: stream %d " + << stream_id_; + http_framer_->Reset(); + } + + void ResetForNewInterface(int32 server_idx) { } void ResetForNewConnection() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Server connection closing " + << "to: " << ACCEPTOR_SERVER_IDENT; seq_num_ = 0; output_ordering_.Reset(); - framer_->Reset(); + http_framer_->Reset(); + if (sm_spdy_interface_) { + sm_spdy_interface_->ResetForNewInterface(server_idx_); + } + } + + void Cleanup() { + if (!(acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER)) { + connection_->Cleanup("HttpSM Request Fully Read: stream_id " + + stream_id_); + } } - void PostAcceptHook() { + int PostAcceptHook() { + return 1; } void NewStream(uint32 stream_id, uint32 priority, const string& filename) { @@ -1798,6 +2294,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { mci.stream_id = stream_id; mci.priority = priority; if (!memory_cache_->AssignFileData(filename, &mci)) { + // error creating new stream. + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; SendErrorNotFound(stream_id); } else { AddToOutputOrder(mci); @@ -1810,6 +2308,9 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { void SendEOF(uint32 stream_id) { SendEOFImpl(stream_id); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { + sm_spdy_interface_->ResetForNewInterface(server_idx_); + } } void SendErrorNotFound(uint32 stream_id) { @@ -1833,7 +2334,7 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { SendDataFrameImpl(stream_id, data, len, flags, compress); } - BalsaFrame* spdy_framer() { return framer_; } + BalsaFrame* spdy_framer() { return http_framer_; } private: void SendEOFImpl(uint32 stream_id) { @@ -1842,6 +2343,9 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { df.size = 5; df.delete_when_done = false; EnqueueDataFrame(df); + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + Reset(); + } } void SendErrorNotFoundImpl(uint32 stream_id) { @@ -1875,7 +2379,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { df.data = buffer; df.delete_when_done = true; sb.Read(buffer, df.size); - VLOG(2) << "******************Sending HTTP Reply header " << stream_id; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " + << stream_id_; size_t df_size = df.size; EnqueueDataFrame(df); return df_size; @@ -1890,7 +2395,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { df.data = buffer; df.delete_when_done = true; sb.Read(buffer, df.size); - VLOG(2) << "******************Sending HTTP Reply header " << stream_id; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " + << stream_id_; size_t df_size = df.size; EnqueueDataFrame(df); return df_size; @@ -1913,27 +2419,31 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { } void EnqueueDataFrame(const DataFrame& df) { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " + << stream_id_; connection_->EnqueueDataFrame(df); } void GetOutput() { MemCacheIter* mci = output_ordering_.GetIter(); if (mci == NULL) { - VLOG(2) << "GetOutput: nothing to output!?"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to " + << "output!?: stream " << stream_id_; return; } if (!mci->transformed_header) { mci->bytes_sent = SendSynReply(mci->stream_id, *(mci->file_data->headers)); mci->transformed_header = true; - VLOG(2) << "GetOutput transformed header stream_id: [" - << mci->stream_id << "]"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput transformed " + << "header stream_id: [" << mci->stream_id << "]"; return; } if (mci->body_bytes_consumed >= mci->file_data->body.size()) { SendEOF(mci->stream_id); output_ordering_.RemoveStreamId(mci->stream_id); - VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "GetOutput remove_stream_id: [" + << mci->stream_id << "]"; return; } size_t num_to_write = @@ -1943,8 +2453,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { SendDataFrame(mci->stream_id, mci->file_data->body.data() + mci->body_bytes_consumed, num_to_write, 0, true); - VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id - << "]: " << num_to_write; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput SendDataFrame[" + << mci->stream_id << "]: " << num_to_write; mci->body_bytes_consumed += num_to_write; mci->bytes_sent += num_to_write; } @@ -1952,119 +2462,312 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface { //////////////////////////////////////////////////////////////////////////////// -class Notification { +class StreamerSM : public SMInterface { + private: + SMConnection* connection_; + SMInterface* sm_other_interface_; + EpollServer* epoll_server_; + FlipAcceptor* acceptor_; public: - explicit Notification(bool value) : value_(value) {} + explicit StreamerSM(SMConnection* connection, + SMInterface* sm_other_interface, + EpollServer* epoll_server, + FlipAcceptor* acceptor) : + connection_(connection), + sm_other_interface_(sm_other_interface), + epoll_server_(epoll_server), + acceptor_(acceptor) + { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Creating StreamerSM object"; + } + ~StreamerSM() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Destroying StreamerSM object"; + Reset(); + } + + void InitSMInterface(SMInterface* sm_other_interface, + int32 server_idx) + { + sm_other_interface_ = sm_other_interface; + } + + void InitSMConnection(SMConnectionPoolInterface* connection_pool, + SMInterface* sm_interface, + EpollServer* epoll_server, + int fd, + bool use_ssl) + { + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " + << "connection."; + connection_->InitSMConnection(connection_pool, sm_interface, + epoll_server, fd, use_ssl); + } + + size_t ProcessReadInput(const char* data, size_t len) { + return sm_other_interface_->ProcessWriteInput(data, len); + } + + size_t ProcessWriteInput(const char* data, size_t len) { + char * dataPtr = new char[len]; + memcpy(dataPtr, data, len); + DataFrame df; + df.data = (const char *)dataPtr; + df.size = len; + df.delete_when_done = true; + connection_->EnqueueDataFrame(df); + return len; + } + + bool MessageFullyRead() const { + return false; + } + + void SetStreamID(uint32 stream_id) {} + + bool Error() const { + return false; + } + + const char* ErrorAsString() const { + return "(none)"; + } + + void Reset() { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Reset"; + connection_->Cleanup("Server Reset"); + } + + void ResetForNewInterface(int32 server_idx) { + } + + void ResetForNewConnection() { + sm_other_interface_->Reset(); + } + + void Cleanup() { + } + + int PostAcceptHook() { + if (!sm_other_interface_) { + SMConnection *server_connection = + SMConnection::NewSMConnection(epoll_server_, NULL, NULL, + acceptor_, "server_conn: "); + if (server_connection == NULL) { + LOG(ERROR) << "StreamerSM: Could not create server conenction."; + return 0; + } + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Creating new server " + << "connection."; + sm_other_interface_ = new StreamerSM(server_connection, this, + epoll_server_, acceptor_); + sm_other_interface_->InitSMInterface(this, 0); + } + sm_other_interface_->InitSMConnection(NULL, sm_other_interface_, + epoll_server_, -1, false); + + return 1; + } + + void NewStream(uint32 stream_id, uint32 priority, const string& filename) { + } + + void AddToOutputOrder(const MemCacheIter& mci) { + } + + void SendEOF(uint32 stream_id) { + } + + void SendErrorNotFound(uint32 stream_id) { + } + + void SendOKResponse(uint32 stream_id, string output) { + } + + size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + + size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + + void SendDataFrame(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) { + } - void Notify() { - AutoLock al(lock_); - value_ = true; - } - bool HasBeenNotified() { - AutoLock al(lock_); - return value_; - } - bool value_; - Lock lock_; + private: + void SendEOFImpl(uint32 stream_id) { + } + + void SendErrorNotFoundImpl(uint32 stream_id) { + } + + void SendOKResponseImpl(uint32 stream_id, string* output) { + } + + size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + + size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { + return 0; + } + + void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, + uint32 flags, bool compress) { + } + + void GetOutput() { + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class Notification { + public: + explicit Notification(bool value) : value_(value) {} + + void Notify() { + AutoLock al(lock_); + value_ = true; + } + bool HasBeenNotified() { + AutoLock al(lock_); + return value_; + } + bool value_; + Lock lock_; }; //////////////////////////////////////////////////////////////////////////////// class SMAcceptorThread : public SimpleThread, public EpollCallbackInterface, - public SMServerConnectionPoolInterface { + public SMConnectionPoolInterface { EpollServer epoll_server_; - int listen_fd_; - int accepts_per_wake_; + FlipAcceptor *acceptor_; + SSLState *ssl_state_; + bool use_ssl_; - vector<SMServerConnection*> unused_server_connections_; - vector<SMServerConnection*> tmp_unused_server_connections_; - vector<SMServerConnection*> allocated_server_connections_; + vector<SMConnection*> unused_server_connections_; + vector<SMConnection*> tmp_unused_server_connections_; + vector<SMConnection*> allocated_server_connections_; Notification quitting_; - SMInterfaceFactory* sm_interface_factory_; MemoryCache* memory_cache_; public: - SMAcceptorThread(int listen_fd, - int accepts_per_wake, - SMInterfaceFactory* smif, + SMAcceptorThread(FlipAcceptor *acceptor, MemoryCache* memory_cache) : SimpleThread("SMAcceptorThread"), - listen_fd_(listen_fd), - accepts_per_wake_(accepts_per_wake), + acceptor_(acceptor), + ssl_state_(NULL), + use_ssl_(false), quitting_(false), - sm_interface_factory_(smif), - memory_cache_(memory_cache) { + memory_cache_(memory_cache) + { + if (!acceptor->ssl_cert_filename_.empty() && + !acceptor->ssl_cert_filename_.empty()) { + ssl_state_ = new SSLState; + bool use_npn = true; + if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { + use_npn = false; + } + spdy_init_ssl(ssl_state_, acceptor_->ssl_cert_filename_, + acceptor_->ssl_key_filename_, use_npn); + use_ssl_ = true; + } } ~SMAcceptorThread() { - for (vector<SMServerConnection*>::iterator i = - allocated_server_connections_.begin(); + for (vector<SMConnection*>::iterator i = + allocated_server_connections_.begin(); i != allocated_server_connections_.end(); ++i) { delete *i; } } - SMServerConnection* NewConnection() { - SMServerConnection* server = - SMServerConnection::NewSMServerConnection(sm_interface_factory_, - memory_cache_, - &epoll_server_); + SMConnection* NewConnection() { + SMConnection* server = + SMConnection::NewSMConnection(&epoll_server_, ssl_state_, + memory_cache_, acceptor_, + "client_conn: "); allocated_server_connections_.push_back(server); - VLOG(3) << "Making new server: " << server; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server."; return server; } - SMServerConnection* FindOrMakeNewSMServerConnection() { + SMConnection* FindOrMakeNewSMConnection() { if (unused_server_connections_.empty()) { return NewConnection(); } - SMServerConnection* retval = unused_server_connections_.back(); + SMConnection* server = unused_server_connections_.back(); unused_server_connections_.pop_back(); - return retval; + VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server."; + return server; } - void InitWorker() { - epoll_server_.RegisterFD(listen_fd_, this, EPOLLIN | EPOLLET); + epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); } - void HandleConnection(int client_fd) { - SMServerConnection* server_connection = FindOrMakeNewSMServerConnection(); + void HandleConnection(int server_fd) { + int on = 1; + int rc; + if (acceptor_->disable_nagle_) { + rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast<char*>(&on), sizeof(on)); + if (rc < 0) { + close(server_fd); + LOG(ERROR) << "setsockopt() failed fd=" + server_fd; + return; + } + } + + SMConnection* server_connection = FindOrMakeNewSMConnection(); if (server_connection == NULL) { - VLOG(2) << "Closing " << client_fd; - close(client_fd); + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd; + close(server_fd); return; } - server_connection->InitSMServerConnection(this, - &epoll_server_, - client_fd); + server_connection->InitSMConnection(this, + NULL, + &epoll_server_, + server_fd, + use_ssl_); } void AcceptFromListenFD() { - if (accepts_per_wake_ > 0) { - for (int i = 0; i < accepts_per_wake_; ++i) { + if (acceptor_->accepts_per_wake_ > 0) { + for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) { struct sockaddr address; socklen_t socklen = sizeof(address); - int fd = accept(listen_fd_, &address, &socklen); + int fd = accept(acceptor_->listen_fd_, &address, &socklen); if (fd == -1) { - VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno; + if (errno != 11) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" + << acceptor_->listen_fd_ << "): " << errno << ": " + << strerror(errno); + } break; } - VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n"; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; HandleConnection(fd); } } else { while (true) { struct sockaddr address; socklen_t socklen = sizeof(address); - int fd = accept(listen_fd_, &address, &socklen); + int fd = accept(acceptor_->listen_fd_, &address, &socklen); if (fd == -1) { - VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno; + if (errno != 11) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" + << acceptor_->listen_fd_ << "): " << errno << ": " + << strerror(errno); + } break; } - VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n"; + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; HandleConnection(fd); } } @@ -2075,7 +2778,8 @@ class SMAcceptorThread : public SimpleThread, virtual void OnModification(int fd, int event_mask) { } virtual void OnEvent(int fd, EpollEvent* event) { if (event->in_events | EPOLLIN) { - VLOG(2) << "Accepting based upon epoll events"; + VLOG(2) << ACCEPTOR_CLIENT_IDENT + << "Acceptor: Accepting based upon epoll events"; AcceptFromListenFD(); } } @@ -2097,56 +2801,58 @@ class SMAcceptorThread : public SimpleThread, } } - // SMServerConnections will use this: - virtual void SMServerConnectionDone(SMServerConnection* sc) { - VLOG(3) << "Done with server connection: " << sc; + // SMConnections will use this: + virtual void SMConnectionDone(SMConnection* sc) { + VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection."; tmp_unused_server_connections_.push_back(sc); } }; //////////////////////////////////////////////////////////////////////////////// -SMInterface* NewSpdySM(SMServerConnection* connection) { - return new SpdySM(connection); +SMInterface* NewStreamerSM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + FlipAcceptor* acceptor) { + return new StreamerSM(connection, sm_interface, epoll_server, acceptor); } -SMInterface* NewHTTPSM(SMServerConnection* connection) { - return new HTTPSM(connection); + +SMInterface* NewSpdySM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) { + return new SpdySM(connection, sm_interface, epoll_server, + memory_cache, acceptor); +} + +SMInterface* NewHttpSM(SMConnection* connection, + SMInterface* sm_interface, + EpollServer* epoll_server, + MemoryCache* memory_cache, + FlipAcceptor* acceptor) { + return new HttpSM(connection, sm_interface, epoll_server, + memory_cache, acceptor); } //////////////////////////////////////////////////////////////////////////////// -int CreateListeningSocket(int port, int backlog_size, - bool reuseport, bool no_nagle) { - int listening_socket = 0; - char port_buf[256]; - snprintf(port_buf, sizeof(port_buf), "%d", port); - cerr <<" Attempting to listen on port: " << port_buf << "\n"; - cerr <<" input port: " << port << "\n"; - net::CreateListeningSocket("", - port_buf, - true, - backlog_size, - &listening_socket, - true, - reuseport, - &cerr); - SetNonBlocking(listening_socket); - if (no_nagle) { - // set SO_REUSEADDR on the listening socket. - int on = 1; - int rc; - rc = setsockopt(listening_socket, IPPROTO_TCP, TCP_NODELAY, - reinterpret_cast<char*>(&on), sizeof(on)); - if (rc < 0) { - close(listening_socket); - LOG(FATAL) << "setsockopt() failed fd=" << listening_socket << "\n"; - } +std::vector<std::string> &split(const std::string &s, + char delim, + std::vector<std::string> &elems) { + std::stringstream ss(s); + std::string item; + while(std::getline(ss, item, delim)) { + elems.push_back(item); } - return listening_socket; + return elems; } -//////////////////////////////////////////////////////////////////////////////// +std::vector<std::string> split(const std::string &s, char delim) { + std::vector<std::string> elems; + return split(s, delim, elems); +} bool GotQuitFromStdin() { // Make stdin nonblocking. Yes this is done each time. Oh well. @@ -2157,16 +2863,13 @@ bool GotQuitFromStdin() { maybequit += c; } if (maybequit.size()) { - VLOG(2) << "scanning string: \"" << maybequit << "\""; + VLOG(1) << "scanning string: \"" << maybequit << "\""; } return (maybequit.size() > 1 && (maybequit.c_str()[0] == 'q' || maybequit.c_str()[0] == 'Q')); } - -//////////////////////////////////////////////////////////////////////////////// - const char* BoolToStr(bool b) { if (b) return "true"; @@ -2175,103 +2878,171 @@ const char* BoolToStr(bool b) { //////////////////////////////////////////////////////////////////////////////// -int main(int argc, char**argv) { - bool use_ssl = FLAGS_use_ssl; - int response_count_until_close = FLAGS_response_count_until_close; - int spdy_port = FLAGS_spdy_port; - int port = FLAGS_port; - int backlog_size = FLAGS_accept_backlog_size; - bool reuseport = FLAGS_reuseport; - bool no_nagle = FLAGS_no_nagle; - double server_think_time_in_s = FLAGS_server_think_time_in_s; - int accepts_per_wake = FLAGS_accepts_per_wake; - int num_threads = 1; +int main (int argc, char**argv) +{ + unsigned int i = 0; + bool wait_for_iface = false; + + CommandLine::Init(argc, argv); + CommandLine cl(argc, argv); + + if (cl.HasSwitch("--help") || argc < 2) { + cout << argv[0] << " <options>\n"; + cout << "\t--proxy<1..n>=\"<listen ip>,<listen port>,<ssl cert filename>," + << "<ssl key filename>,<server ip>,<server port>\"\n"; + cout << "\t--spdy-server=\"<listen ip>,<listen port>,<ssl cert filename>," + << "<ssl key filename>\"\n"; + cout << "\t--http-server=\"<listen ip>,<listen port>,<ssl cert filename>," + << "<ssl key filename>\"\n"; + cout << "\t--forward-ip-header=<header name>\n"; + cout << "\t--logdest=file|system|both\n"; + cout << "\t--logfile=<logfile>\n"; + cout << "\t--wait-for-iface\n"; + cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n"; + cout << "\t--help\n"; + exit(0); + } + + g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s; + + if (cl.HasSwitch("forward-ip-header")) { + g_proxy_config.forward_ip_header_enabled_ = true; + g_proxy_config.forward_ip_header_ = + cl.GetSwitchValueASCII("forward-ip-header"); + } else { + g_proxy_config.forward_ip_header_enabled_ = false; + } + + if (cl.HasSwitch("logdest")) { + string log_dest_value = cl.GetSwitchValueASCII("logdest"); + if (log_dest_value.compare("file") == 0) { + g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE; + } else if (log_dest_value.compare("system") == 0) { + g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG; + } else if (log_dest_value.compare("both") == 0) { + g_proxy_config.log_destination_ = + logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG; + } else { + LOG(FATAL) << "Invalid logging destination value: " << log_dest_value; + } + } else { + g_proxy_config.log_destination_ = logging::LOG_NONE; + } + if (cl.HasSwitch("logfile")) { + g_proxy_config.log_filename_ = cl.GetSwitchValueASCII("logfile"); + if (g_proxy_config.log_destination_ == logging::LOG_NONE) { + g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE; + } + } else if (g_proxy_config.log_destination_ == logging::LOG_ONLY_TO_FILE || + g_proxy_config.log_destination_ == + logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG) { + LOG(FATAL) << "Logging destination requires a log file to be specified."; + } - MemoryCache spdy_memory_cache; - spdy_memory_cache.AddFiles(); + if (cl.HasSwitch("wait-for-iface")) { + wait_for_iface = true; + } + if (cl.HasSwitch("ssl-session-expiry")) { + string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); + g_proxy_config.ssl_session_expiry_ = atoi( session_expiry.c_str() ); + } + + InitLogging(g_proxy_config.log_filename_.c_str(), + g_proxy_config.log_destination_, + logging::DONT_LOCK_LOG_FILE, + logging::APPEND_TO_OLD_LOG_FILE); + + LOG(INFO) << "Flip SPDY proxy started with configuration:"; + LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_; + LOG(INFO) << "Log file : " << g_proxy_config.log_filename_; + LOG(INFO) << "Forward IP Header : " + << (g_proxy_config.forward_ip_header_enabled_ ? + g_proxy_config.forward_ip_header_ : "(disabled)"); + LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false"); + LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size; + LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake; + LOG(INFO) << "Disable nagle : " + << (FLAGS_disable_nagle?"true":"false"); + LOG(INFO) << "Reuseport : " << (FLAGS_reuseport?"true":"false"); + + // Proxy Acceptors + while (true) { + i += 1; + std::stringstream name; + name << "proxy" << i; + if (!cl.HasSwitch(name.str())) { + break; + } + string value = cl.GetSwitchValueASCII(name.str()); + vector<std::string> valueArgs = split(value, ','); + CHECK_EQ((unsigned int)6, valueArgs.size()); + // If wait_for_iface is enabled, then this call will block + // indefinitely until the interface is raised. + g_proxy_config.AddAcceptor(FLIP_HANDLER_PROXY, + valueArgs[0], valueArgs[1], + valueArgs[2], valueArgs[3], + valueArgs[4], valueArgs[5], + FLAGS_accept_backlog_size, + FLAGS_disable_nagle, + FLAGS_accepts_per_wake, + FLAGS_reuseport, + wait_for_iface, + NULL); + } + + // Spdy Server Acceptor + MemoryCache spdy_memory_cache; + if (cl.HasSwitch("spdy-server")) { + spdy_memory_cache.AddFiles(); + string value = cl.GetSwitchValueASCII("spdy-server"); + vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(FLIP_HANDLER_SPDY_SERVER, + valueArgs[0], valueArgs[1], + valueArgs[2], valueArgs[3], + "", "", + FLAGS_accept_backlog_size, + FLAGS_disable_nagle, + FLAGS_accepts_per_wake, + FLAGS_reuseport, + wait_for_iface, + &spdy_memory_cache); + } + + // Spdy Server Acceptor MemoryCache http_memory_cache; - http_memory_cache.CloneFrom(spdy_memory_cache); - - LOG(INFO) << - "Starting up with the following state: \n" - " use_ssl: " << use_ssl << "\n" - " response_count_until_close: " << response_count_until_close << "\n" - " port: " << port << "\n" - " spdy_port: " << spdy_port << "\n" - " backlog_size: " << backlog_size << "\n" - " reuseport: " << BoolToStr(reuseport) << "\n" - " no_nagle: " << BoolToStr(no_nagle) << "\n" - " server_think_time_in_s: " << server_think_time_in_s << "\n" - " accepts_per_wake: " << accepts_per_wake << "\n" - " num_threads: " << num_threads << "\n" - " use_xsub: " << BoolToStr(FLAGS_use_xsub) << "\n" - " use_xac: " << BoolToStr(FLAGS_use_xac) << "\n"; - - if (use_ssl) { - global_ssl_state = new GlobalSSLState; - spdy_init_ssl(global_ssl_state); - } else { - global_ssl_state = NULL; + if (cl.HasSwitch("http-server")) { + http_memory_cache.AddFiles(); + string value = cl.GetSwitchValueASCII("http-server"); + vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(FLIP_HANDLER_HTTP_SERVER, + valueArgs[0], valueArgs[1], + valueArgs[2], valueArgs[3], + "", "", + FLAGS_accept_backlog_size, + FLAGS_disable_nagle, + FLAGS_accepts_per_wake, + FLAGS_reuseport, + wait_for_iface, + &http_memory_cache); } - EpollServer epoll_server; + vector<SMAcceptorThread*> sm_worker_threads_; - { - // spdy - int listen_fd = -1; - - if (reuseport || listen_fd == -1) { - listen_fd = CreateListeningSocket(spdy_port, backlog_size, - reuseport, no_nagle); - if (listen_fd < 0) { - LOG(FATAL) << "Unable to open listening socket on spdy_port: " - << spdy_port; - } else { - LOG(INFO) << "Listening for spdy on port: " << spdy_port; - } - } - sm_worker_threads_.push_back( - new SMAcceptorThread(listen_fd, - accepts_per_wake, - &NewSpdySM, - &spdy_memory_cache)); - // Note that spdy_memory_cache is not threadsafe, it is merely - // thread compatible. Thus, if ever we are to spawn multiple threads, - // we either must make the MemoryCache threadsafe, or use - // a separate MemoryCache for each thread. - // - // The latter is what is currently being done as we spawn - // two threads (one for spdy, one for http). - sm_worker_threads_.back()->InitWorker(); - sm_worker_threads_.back()->Start(); - } + for (i = 0; i < g_proxy_config.acceptors_.size(); i++) { + FlipAcceptor *acceptor = g_proxy_config.acceptors_[i]; - { - // http - int listen_fd = -1; - if (reuseport || listen_fd == -1) { - listen_fd = CreateListeningSocket(port, backlog_size, - reuseport, no_nagle); - if (listen_fd < 0) { - LOG(FATAL) << "Unable to open listening socket on port: " << port; - } else { - LOG(INFO) << "Listening for HTTP on port: " << port; - } - } - sm_worker_threads_.push_back( - new SMAcceptorThread(listen_fd, - accepts_per_wake, - &NewHTTPSM, - &http_memory_cache)); + sm_worker_threads_.push_back(new SMAcceptorThread(acceptor, + (MemoryCache *)acceptor->memory_cache_)); // Note that spdy_memory_cache is not threadsafe, it is merely // thread compatible. Thus, if ever we are to spawn multiple threads, // we either must make the MemoryCache threadsafe, or use // a separate MemoryCache for each thread. // // The latter is what is currently being done as we spawn - // two threads (one for spdy, one for http). + // a separate thread for each http and spdy server acceptor. + sm_worker_threads_.back()->InitWorker(); sm_worker_threads_.back()->Start(); } @@ -2288,5 +3059,6 @@ int main(int argc, char**argv) { } usleep(1000*10); // 10 ms } + return 0; } diff --git a/net/tools/flip_server/other_defines.h b/net/tools/flip_server/other_defines.h deleted file mode 100644 index dda2151..0000000 --- a/net/tools/flip_server/other_defines.h +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_TOOLS_FLIP_SERVER_OTHER_DEFINES -#define NET_TOOLS_FLIP_SERVER_OTHER_DEFINES -#pragma once - -class NullStream { - public: - NullStream() {} - template <typename T> - NullStream operator<<(T t) { return *this;} -}; - -#define VLOG(X) NullStream() -#define DVLOG(X) NullStream() - - -#endif // NET_TOOLS_FLIP_SERVER_OTHER_DEFINES - diff --git a/net/tools/flip_server/ring_buffer.h b/net/tools/flip_server/ring_buffer.h index ee6f360..75ad0ec3 100644 --- a/net/tools/flip_server/ring_buffer.h +++ b/net/tools/flip_server/ring_buffer.h @@ -8,7 +8,6 @@ #include "base/scoped_ptr.h" #include "net/tools/flip_server/buffer_interface.h" -#include "net/tools/flip_server/other_defines.h" namespace net { diff --git a/net/tools/flip_server/simple_buffer.h b/net/tools/flip_server/simple_buffer.h index 71fafc9..fee7d4b 100644 --- a/net/tools/flip_server/simple_buffer.h +++ b/net/tools/flip_server/simple_buffer.h @@ -9,7 +9,6 @@ #include <string> #include "net/tools/flip_server/buffer_interface.h" -#include "net/tools/flip_server/other_defines.h" namespace net { |