diff options
Diffstat (limited to 'net/tools/flip_server/flip_in_mem_edsm_server.cc')
-rw-r--r-- | net/tools/flip_server/flip_in_mem_edsm_server.cc | 3091 |
1 files changed, 40 insertions, 3051 deletions
diff --git a/net/tools/flip_server/flip_in_mem_edsm_server.cc b/net/tools/flip_server/flip_in_mem_edsm_server.cc index 5f55341..ef27afa 100644 --- a/net/tools/flip_server/flip_in_mem_edsm_server.cc +++ b/net/tools/flip_server/flip_in_mem_edsm_server.cc @@ -2,70 +2,30 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include <dirent.h> -#include <netinet/tcp.h> // For TCP_NODELAY -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/file.h> -#include <sys/stat.h> -#include <unistd.h> -#include <openssl/err.h> -#include <openssl/ssl.h> +#include <errno.h> #include <signal.h> +#include <sys/file.h> -#include <deque> #include <iostream> -#include <limits> +#include <string> #include <vector> -#include <list> #include "base/command_line.h" #include "base/logging.h" #include "base/synchronization/lock.h" -#include "base/threading/simple_thread.h" #include "base/timer.h" -#include "net/spdy/spdy_frame_builder.h" -#include "net/spdy/spdy_framer.h" -#include "net/spdy/spdy_protocol.h" -#include "net/tools/dump_cache/url_to_filename_encoder.h" -#include "net/tools/dump_cache/url_utilities.h" -#include "net/tools/flip_server/balsa_enums.h" -#include "net/tools/flip_server/balsa_frame.h" -#include "net/tools/flip_server/balsa_headers.h" -#include "net/tools/flip_server/balsa_visitor_interface.h" -#include "net/tools/flip_server/buffer_interface.h" -#include "net/tools/flip_server/epoll_server.h" -#include "net/tools/flip_server/ring_buffer.h" -#include "net/tools/flip_server/simple_buffer.h" -#include "net/tools/flip_server/split.h" +#include "net/tools/flip_server/acceptor_thread.h" +#include "net/tools/flip_server/constants.h" #include "net/tools/flip_server/flip_config.h" +#include "net/tools/flip_server/output_ordering.h" +#include "net/tools/flip_server/sm_connection.h" +#include "net/tools/flip_server/sm_interface.h" +#include "net/tools/flip_server/spdy_interface.h" +#include "net/tools/flip_server/split.h" -//////////////////////////////////////////////////////////////////////////////// - -using std::deque; -using std::list; -using std::map; -using std::ostream; -using std::pair; -using std::string; -using std::vector; using std::cout; using std::cerr; -//////////////////////////////////////////////////////////////////////////////// - -#define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \ - (((IP)>>16)&0xff),(((IP)>>24)&0xff) - -#define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \ - << acceptor_->listen_port_ << " " - -#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0" - -#define SSL_CTX_DEFAULT_CIPHER_LIST "!aNULL:!ADH:!eNull:!LOW:!EXP:RC4+RSA:MEDIUM:HIGH" - -#define PIDFILE "/var/run/flip-server.pid" - // If true, then disables the nagle algorithm); bool FLAGS_disable_nagle = true; @@ -78,12 +38,6 @@ int32 FLAGS_accepts_per_wake = 0; // The size of the TCP accept backlog); int32 FLAGS_accept_backlog_size = 1024; -// The directory where cache locates); -string FLAGS_cache_base_dir = "."; - -// If true, then encode url to filename); -bool FLAGS_need_to_encode_url = false; - // If set to false a single socket will be used. If set to true // then a new socket will be created for each accept thread. // Note that this only works with kernels that support @@ -97,2972 +51,7 @@ bool FLAGS_force_spdy = false; // reply); double FLAGS_server_think_time_in_s = 0; -// Does the server send X-Subresource headers); -bool FLAGS_use_xsub = false; - -// Does the server send X-Associated-Content headers); -bool FLAGS_use_xac = false; - -// Does the server compress data frames); -bool FLAGS_use_compression = false; - -//////////////////////////////////////////////////////////////////////////////// - -using base::StringPiece; -using base::SimpleThread; -using net::BalsaFrame; -using net::BalsaFrameEnums; -using net::BalsaHeaders; -using net::BalsaHeadersEnums; -using net::BalsaVisitorInterface; -using net::EpollAlarmCallbackInterface; -using net::EpollCallbackInterface; -using net::EpollEvent; -using net::EpollServer; -using net::RingBuffer; -using net::SimpleBuffer; -using net::SplitStringPieceToVector; -using net::UrlUtilities; -using spdy::CONTROL_FLAG_NONE; -using spdy::DATA_FLAG_COMPRESSED; -using spdy::DATA_FLAG_FIN; -using spdy::RST_STREAM; -using spdy::SYN_REPLY; -using spdy::SYN_STREAM; -using spdy::SpdyControlFrame; -using spdy::SpdySettingsControlFrame; -using spdy::SpdyDataFlags; -using spdy::SpdyDataFrame; -using spdy::SpdyRstStreamControlFrame; -using spdy::SpdyFrame; -using spdy::SpdyFrameBuilder; -using spdy::SpdyFramer; -using spdy::SpdyFramerVisitorInterface; -using spdy::SpdyHeaderBlock; -using spdy::SpdyStreamId; -using spdy::SpdySynReplyControlFrame; -using spdy::SpdySynStreamControlFrame; - -FlipConfig g_proxy_config; - -//////////////////////////////////////////////////////////////////////////////// - -void PrintSslError() { - char buf[128]; // this buffer must be at least 120 chars long. - int error_num = ERR_get_error(); - while (error_num != 0) { - ERR_error_string_n(error_num, buf, sizeof(buf)); - //LOG(ERROR) << buf; - error_num = ERR_get_error(); - } -} - -static int ssl_set_npn_callback(SSL *s, - const unsigned char **data, - unsigned int *len, - void *arg) -{ - VLOG(1) << "SSL NPN callback: advertising protocols."; - *data = (const unsigned char *) NEXT_PROTO_STRING; - *len = strlen(NEXT_PROTO_STRING); - return SSL_TLSEXT_ERR_OK; -} -//////////////////////////////////////////////////////////////////////////////// - -// Creates a socket with domain, type and protocol parameters. -// Assigns the return value of socket() to *fd. -// Returns errno if an error occurs, else returns zero. -int CreateSocket(int domain, int type, int protocol, int *fd) { - CHECK(fd != NULL); - *fd = ::socket(domain, type, protocol); - return (*fd == -1) ? errno : 0; -} - -// Encode the URL. -string EncodeURL(string uri, string host, string method) { - if (!FLAGS_need_to_encode_url) { - // TODO(mbelshe): if uri is fully qualified, need to strip protocol/host. - return string(method + "_" + uri); - } - - string filename; - if (uri[0] == '/') { - // uri is not fully qualified. - filename = net::UrlToFilenameEncoder::Encode( - "http://" + host + uri, method + "_/", false); - } else { - filename = net::UrlToFilenameEncoder::Encode(uri, method + "_/", false); - } - return filename; -} - -//////////////////////////////////////////////////////////////////////////////// - -struct SSLState { - SSL_METHOD* ssl_method; - SSL_CTX* ssl_ctx; -}; - -// SSL stuff -void spdy_init_ssl(SSLState* state, - string ssl_cert_name, - string ssl_key_name, - bool use_npn) { - SSL_library_init(); - PrintSslError(); - - SSL_load_error_strings(); - PrintSslError(); - - state->ssl_method = SSLv23_method(); - state->ssl_ctx = SSL_CTX_new(state->ssl_method); - if (!state->ssl_ctx) { - PrintSslError(); - LOG(FATAL) << "Unable to create SSL context"; - } - // Disable SSLv2 support. - SSL_CTX_set_options(state->ssl_ctx, - SSL_OP_NO_SSLv2 | SSL_OP_CIPHER_SERVER_PREFERENCE); - if (SSL_CTX_use_certificate_file(state->ssl_ctx, - ssl_cert_name.c_str(), - SSL_FILETYPE_PEM) <= 0) { - PrintSslError(); - LOG(FATAL) << "Unable to use cert.pem as SSL cert."; - } - if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx, - ssl_key_name.c_str(), - SSL_FILETYPE_PEM) <= 0) { - PrintSslError(); - LOG(FATAL) << "Unable to use key.pem as SSL key."; - } - if (!SSL_CTX_check_private_key(state->ssl_ctx)) { - PrintSslError(); - LOG(FATAL) << "The cert.pem and key.pem files don't match"; - } - if (use_npn) { - SSL_CTX_set_next_protos_advertised_cb(state->ssl_ctx, - ssl_set_npn_callback, NULL); - } - VLOG(1) << "SSL CTX default cipher list: " << SSL_CTX_DEFAULT_CIPHER_LIST; - SSL_CTX_set_cipher_list(state->ssl_ctx, SSL_CTX_DEFAULT_CIPHER_LIST); - - VLOG(1) << "SSL CTX session expiry: " << g_proxy_config.ssl_session_expiry_ - << " seconds"; - SSL_CTX_set_timeout(state->ssl_ctx, g_proxy_config.ssl_session_expiry_); - -#ifdef SSL_MODE_RELEASE_BUFFERS - VLOG(1) << "SSL CTX: Setting Release Buffers mode."; - SSL_CTX_set_mode(state->ssl_ctx, SSL_MODE_RELEASE_BUFFERS); -#endif - - // Proper methods to disable compression don't exist until 0.9.9+. For now - // we must manipulate the stack of compression methods directly. - if (g_proxy_config.ssl_disable_compression_) { - STACK_OF(SSL_COMP) *ssl_comp_methods = SSL_COMP_get_compression_methods(); - int num_methods = sk_SSL_COMP_num(ssl_comp_methods); - int i; - for (i = 0; i < num_methods; i++) { - static_cast<void>(sk_SSL_COMP_delete(ssl_comp_methods, i)); - } - } -} - -SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) { - SSL* ssl = SSL_new(ssl_ctx); - PrintSslError(); - - SSL_set_accept_state(ssl); - PrintSslError(); - - return ssl; -} - -//////////////////////////////////////////////////////////////////////////////// - -const int kMSS = 1460; -const int kSSLOverhead = 25; -const int kSpdyOverhead = SpdyFrame::size(); -const int kInitialDataSendersThreshold = (2 * kMSS) - kSpdyOverhead; -const int kSSLSegmentSize = (1 * kMSS) - kSSLOverhead; -const int kSpdySegmentSize = kSSLSegmentSize - kSpdyOverhead; - -//////////////////////////////////////////////////////////////////////////////// - -class DataFrame { - public: - const char* data; - size_t size; - bool delete_when_done; - size_t index; - DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {} - virtual ~DataFrame() { - if (delete_when_done) - delete[] data; - } -}; - -class SpdyFrameDataFrame : public DataFrame { - public: - SpdyFrameDataFrame(SpdyFrame* spdy_frame) - : frame(spdy_frame) { - data = spdy_frame->data(); - size = spdy_frame->length() + SpdyFrame::size(); - } - - virtual ~SpdyFrameDataFrame() { - delete frame; - } - - const SpdyFrame* frame; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface { - public: - BalsaHeaders headers; - string body; - bool error_; - - virtual void ProcessBodyInput(const char *input, size_t size) {} - virtual void ProcessBodyData(const char *input, size_t size) { - body.append(input, size); - } - virtual void ProcessHeaderInput(const char *input, size_t size) {} - virtual void ProcessTrailerInput(const char *input, size_t size) {} - virtual void ProcessHeaders(const BalsaHeaders& headers) { - // nothing to do here-- we're assuming that the BalsaFrame has - // been handed our headers. - } - virtual void ProcessRequestFirstLine(const char* line_input, - size_t line_length, - const char* method_input, - size_t method_length, - const char* request_uri_input, - size_t request_uri_length, - const char* version_input, - size_t version_length) {} - virtual void ProcessResponseFirstLine(const char *line_input, - size_t line_length, - const char *version_input, - size_t version_length, - const char *status_input, - size_t status_length, - const char *reason_input, - size_t reason_length) {} - virtual void ProcessChunkLength(size_t chunk_length) {} - virtual void ProcessChunkExtensions(const char *input, size_t size) {} - virtual void HeaderDone() {} - virtual void MessageDone() {} - virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); } - virtual void HandleHeaderWarning(BalsaFrame* framer) { HandleError(); } - virtual void HandleChunkingError(BalsaFrame* framer) { HandleError(); } - virtual void HandleBodyError(BalsaFrame* framer) { HandleError(); } - - void HandleError() { error_ = true; } -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct FileData { - void CopyFrom(const FileData& file_data) { - headers = new BalsaHeaders; - headers->CopyFrom(*(file_data.headers)); - filename = file_data.filename; - related_files = file_data.related_files; - body = file_data.body; - } - FileData(BalsaHeaders* h, const string& b) : headers(h), body(b) {} - FileData() {} - BalsaHeaders* headers; - string filename; - vector< pair<int, string> > related_files; // priority, filename - string body; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class MemCacheIter { - public: - MemCacheIter() : - file_data(NULL), - priority(0), - transformed_header(false), - body_bytes_consumed(0), - stream_id(0), - max_segment_size(kInitialDataSendersThreshold), - bytes_sent(0) {} - explicit MemCacheIter(FileData* fd) : - file_data(fd), - priority(0), - transformed_header(false), - body_bytes_consumed(0), - stream_id(0), - max_segment_size(kInitialDataSendersThreshold), - bytes_sent(0) {} - FileData* file_data; - int priority; - bool transformed_header; - size_t body_bytes_consumed; - uint32 stream_id; - uint32 max_segment_size; - size_t bytes_sent; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class MemoryCache { - public: - typedef map<string, FileData> Files; - - public: - Files files_; - string cwd_; - - void CloneFrom(const MemoryCache& mc) { - for (Files::const_iterator i = mc.files_.begin(); - i != mc.files_.end(); - ++i) { - Files::iterator out_i = - files_.insert(make_pair(i->first, FileData())).first; - out_i->second.CopyFrom(i->second); - cwd_ = mc.cwd_; - } - } - - void AddFiles() { - deque<string> paths; - cwd_ = FLAGS_cache_base_dir; - paths.push_back(cwd_ + "/GET_"); - DIR* current_dir = NULL; - while (!paths.empty()) { - while (current_dir == NULL && !paths.empty()) { - string current_dir_name = paths.front(); - VLOG(1) << "Attempting to open dir: \"" << current_dir_name << "\""; - current_dir = opendir(current_dir_name.c_str()); - paths.pop_front(); - - if (current_dir == NULL) { - perror("Unable to open directory. "); - current_dir_name.clear(); - continue; - } - - if (current_dir) { - VLOG(1) << "Succeeded opening"; - for (struct dirent* dir_data = readdir(current_dir); - dir_data != NULL; - dir_data = readdir(current_dir)) { - string current_entry_name = - current_dir_name + "/" + dir_data->d_name; - if (dir_data->d_type == DT_REG) { - VLOG(1) << "Found file: " << current_entry_name; - ReadAndStoreFileContents(current_entry_name.c_str()); - } else if (dir_data->d_type == DT_DIR) { - VLOG(1) << "Found subdir: " << current_entry_name; - if (string(dir_data->d_name) != "." && - string(dir_data->d_name) != "..") { - VLOG(1) << "Adding to search path: " << current_entry_name; - paths.push_front(current_entry_name); - } - } - } - VLOG(1) << "Oops, no data left. Closing dir."; - closedir(current_dir); - current_dir = NULL; - } - } - } - } - - void ReadToString(const char* filename, string* output) { - output->clear(); - int fd = open(filename, 0, "r"); - if (fd == -1) - return; - char buffer[4096]; - ssize_t read_status = read(fd, buffer, sizeof(buffer)); - while (read_status > 0) { - output->append(buffer, static_cast<size_t>(read_status)); - do { - read_status = read(fd, buffer, sizeof(buffer)); - } while (read_status <= 0 && errno == EINTR); - } - close(fd); - } - - void ReadAndStoreFileContents(const char* filename) { - StoreBodyAndHeadersVisitor visitor; - BalsaFrame framer; - framer.set_balsa_visitor(&visitor); - framer.set_balsa_headers(&(visitor.headers)); - string filename_contents; - ReadToString(filename, &filename_contents); - - // Ugly hack to make everything look like 1.1. - if (filename_contents.find("HTTP/1.0") == 0) - filename_contents[7] = '1'; - - size_t pos = 0; - size_t old_pos = 0; - while (true) { - old_pos = pos; - pos += framer.ProcessInput(filename_contents.data() + pos, - filename_contents.size() - pos); - if (framer.Error() || pos == old_pos) { - LOG(ERROR) << "Unable to make forward progress, or error" - " framing file: " << filename; - if (framer.Error()) { - LOG(INFO) << "********************************************ERROR!"; - return; - } - return; - } - if (framer.MessageFullyRead()) { - // If no Content-Length or Transfer-Encoding was captured in the - // file, then the rest of the data is the body. Many of the captures - // from within Chrome don't have content-lengths. - if (!visitor.body.length()) - visitor.body = filename_contents.substr(pos); - break; - } - } - visitor.headers.RemoveAllOfHeader("content-length"); - visitor.headers.RemoveAllOfHeader("transfer-encoding"); - visitor.headers.RemoveAllOfHeader("connection"); - visitor.headers.AppendHeader("transfer-encoding", "chunked"); - visitor.headers.AppendHeader("connection", "keep-alive"); - - // Experiment with changing headers for forcing use of cached - // versions of content. - // TODO(mbelshe) REMOVE ME -#if 0 - // TODO(mbelshe) append current date. - visitor.headers.RemoveAllOfHeader("date"); - if (visitor.headers.HasHeader("expires")) { - visitor.headers.RemoveAllOfHeader("expires"); - visitor.headers.AppendHeader("expires", - "Fri, 30 Aug, 2019 12:00:00 GMT"); - } -#endif - BalsaHeaders* headers = new BalsaHeaders; - headers->CopyFrom(visitor.headers); - string filename_stripped = string(filename).substr(cwd_.size() + 1); -// LOG(INFO) << "Adding file (" << visitor.body.length() << " bytes): " -// << filename_stripped; - files_[filename_stripped] = FileData(); - FileData& fd = files_[filename_stripped]; - fd = FileData(headers, visitor.body); - fd.filename = string(filename_stripped, - filename_stripped.find_first_of('/')); - if (headers->HasHeader("X-Associated-Content")) { - string content = headers->GetHeader("X-Associated-Content").as_string(); - vector<StringPiece> urls_and_priorities; - SplitStringPieceToVector(content, "||", &urls_and_priorities, true); - VLOG(1) << "Examining X-Associated-Content header"; - for (unsigned int i = 0; i < urls_and_priorities.size(); ++i) { - const StringPiece& url_and_priority_pair = urls_and_priorities[i]; - vector<StringPiece> url_and_priority; - SplitStringPieceToVector(url_and_priority_pair, "??", - &url_and_priority, true); - if (url_and_priority.size() >= 2) { - string priority_string(url_and_priority[0].data(), - url_and_priority[0].size()); - string filename_string(url_and_priority[1].data(), - url_and_priority[1].size()); - long priority; - char* last_eaten_char; - priority = strtol(priority_string.c_str(), &last_eaten_char, 0); - if (last_eaten_char == - priority_string.c_str() + priority_string.size()) { - pair<int, string> entry(priority, filename_string); - VLOG(1) << "Adding associated content: " << filename_string; - fd.related_files.push_back(entry); - } - } - } - } - } - - // Called at runtime to update learned headers - // |url| is a url which contains a referrer header. - // |referrer| is the referring URL - // Adds an X-Subresource or X-Associated-Content to |referer| for |url| - void UpdateHeaders(string referrer, string file_url) { - if (!FLAGS_use_xac && !FLAGS_use_xsub) - return; - - string referrer_host_path = - net::UrlToFilenameEncoder::Encode(referrer, "GET_/", false); - - FileData* fd1 = GetFileData(string("GET_") + file_url); - if (!fd1) { - LOG(ERROR) << "Updating headers for unknown url: " << file_url; - return; - } - string url = fd1->headers->GetHeader("X-Original-Url").as_string(); - string content_type = fd1->headers->GetHeader("Content-Type").as_string(); - if (content_type.length() == 0) { - LOG(ERROR) << "Skipping subresource with unknown content-type"; - return; - } - // Now, lets see if this is the same host or not - bool same_host = (UrlUtilities::GetUrlHost(referrer) == - UrlUtilities::GetUrlHost(url)); - - // This is a hacked algorithm for figuring out what priority - // to use with pushed content. - int priority = 4; - if (content_type.find("css") != string::npos) - priority = 1; - else if (content_type.find("cript") != string::npos) - priority = 1; - else if (content_type.find("html") != string::npos) - priority = 2; - - LOG(ERROR) << "Attempting update for " << referrer_host_path; - - FileData* fd2 = GetFileData(referrer_host_path); - if (fd2 != NULL) { - // If they are on the same host, we'll use X-Associated-Content - string header_name; - string new_value; - string delimiter; - bool related_files = false; - if (same_host && FLAGS_use_xac) { - header_name = "X-Associated-Content"; - char pri_ch = priority + '0'; - new_value = pri_ch + string("??") + url; - delimiter = "||"; - related_files = true; - } else { - if (!FLAGS_use_xsub) - return; - header_name = "X-Subresource"; - new_value = content_type + "!!" + url; - delimiter = "!!"; - } - - if (fd2->headers->HasNonEmptyHeader(header_name)) { - string existing_header = - fd2->headers->GetHeader(header_name).as_string(); - if (existing_header.find(url) != string::npos) - return; // header already recorded - - // Don't let these lists grow too long for low pri stuff. - // TODO(mbelshe) We need better algorithms for this. - if (existing_header.length() > 256 && priority > 2) - return; - - new_value = existing_header + delimiter + new_value; - } - - LOG(INFO) << "Recording " << header_name << " for " << new_value; - fd2->headers->ReplaceOrAppendHeader(header_name, new_value); - - // Add it to the related files so that it will actually get sent out. - if (related_files) { - pair<int, string> entry(4, file_url); - fd2->related_files.push_back(entry); - } - } else { - LOG(ERROR) << "Failed to update headers:"; - LOG(ERROR) << "FAIL url: " << url; - LOG(ERROR) << "FAIL ref: " << referrer_host_path; - } - } - - FileData* GetFileData(const string& filename) { - Files::iterator fi = files_.end(); - if (filename.compare(filename.length() - 5, 5, ".html", 5) == 0) { - string new_filename(filename.data(), filename.size() - 5); - new_filename += ".http"; - fi = files_.find(new_filename); - } - if (fi == files_.end()) - fi = files_.find(filename); - - if (fi == files_.end()) { - return NULL; - } - return &(fi->second); - } - - bool AssignFileData(const string& filename, MemCacheIter* mci) { - mci->file_data = GetFileData(filename); - if (mci->file_data == NULL) { - LOG(ERROR) << "Could not find file data for " << filename; - return false; - } - return true; - } -}; - -class NotifierInterface { - public: - virtual ~NotifierInterface() {} - virtual void Notify() = 0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class SMConnectionPoolInterface; - -class SMInterface { - public: - virtual void InitSMInterface(SMInterface* sm_other_interface, - int32 server_idx) = 0; - virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) = 0; - virtual size_t ProcessReadInput(const char* data, size_t len) = 0; - virtual size_t ProcessWriteInput(const char* data, size_t len) = 0; - virtual void SetStreamID(uint32 stream_id) = 0; - virtual bool MessageFullyRead() const = 0; - virtual bool Error() const = 0; - virtual const char* ErrorAsString() const = 0; - virtual void Reset() = 0; - virtual void ResetForNewInterface(int32 server_idx) = 0; - // ResetForNewConnection is used for interfaces which control SMConnection - // objects. When called an interface may put its connection object into - // a reusable instance pool. Currently this is what the HttpSM interface - // does. - virtual void ResetForNewConnection() = 0; - virtual void Cleanup() = 0; - - virtual int PostAcceptHook() = 0; - - virtual void NewStream(uint32 stream_id, uint32 priority, - const string& filename) = 0; - virtual void SendEOF(uint32 stream_id) = 0; - virtual void SendErrorNotFound(uint32 stream_id) = 0; - virtual size_t SendSynStream(uint32 stream_id, - const BalsaHeaders& headers) = 0; - virtual size_t SendSynReply(uint32 stream_id, - const BalsaHeaders& headers) = 0; - virtual void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) = 0; - virtual void GetOutput() = 0; - - virtual ~SMInterface() {} -}; - -class SMConnectionInterface { - public: - virtual ~SMConnectionInterface() {} - virtual void ReadyToSend() = 0; - virtual EpollServer* epoll_server() = 0; -}; - -class HttpSM; -class SMConnection; - -typedef list<DataFrame*> OutputList; - -class SMConnectionPoolInterface { - public: - virtual ~SMConnectionPoolInterface() {} - // SMConnections will use this: - virtual void SMConnectionDone(SMConnection* connection) = 0; -}; - -SMInterface* NewStreamerSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor); - -SMInterface* NewSpdySM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor); - -SMInterface* NewHttpSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor); - -//////////////////////////////////////////////////////////////////////////////// - -class SMConnection: public SMConnectionInterface, - public EpollCallbackInterface, - public NotifierInterface { - private: - SMConnection(EpollServer* epoll_server, - SSLState* ssl_state, - MemoryCache* memory_cache, - FlipAcceptor* acceptor, - string log_prefix) - : fd_(-1), - events_(0), - registered_in_epoll_server_(false), - initialized_(false), - protocol_detected_(false), - connection_complete_(false), - connection_pool_(NULL), - epoll_server_(epoll_server), - ssl_state_(ssl_state), - memory_cache_(memory_cache), - acceptor_(acceptor), - read_buffer_(kSpdySegmentSize * 40), - sm_spdy_interface_(NULL), - sm_http_interface_(NULL), - sm_streamer_interface_(NULL), - sm_interface_(NULL), - log_prefix_(log_prefix), - max_bytes_sent_per_dowrite_(4096), - ssl_(NULL), - last_read_time_(0) - {} - - int fd_; - int events_; - - bool registered_in_epoll_server_; - bool initialized_; - bool protocol_detected_; - bool connection_complete_; - - SMConnectionPoolInterface* connection_pool_; - - EpollServer *epoll_server_; - SSLState *ssl_state_; - MemoryCache* memory_cache_; - FlipAcceptor *acceptor_; - string client_ip_; - - RingBuffer read_buffer_; - - OutputList output_list_; - SMInterface* sm_spdy_interface_; - SMInterface* sm_http_interface_; - SMInterface* sm_streamer_interface_; - SMInterface* sm_interface_; - string log_prefix_; - - size_t max_bytes_sent_per_dowrite_; - - SSL* ssl_; - public: - time_t last_read_time_; - string server_ip_; - string server_port_; - - EpollServer* epoll_server() { return epoll_server_; } - OutputList* output_list() { return &output_list_; } - MemoryCache* memory_cache() { return memory_cache_; } - void ReadyToSend() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Setting ready to send: EPOLLIN | EPOLLOUT"; - epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT); - } - void EnqueueDataFrame(DataFrame* df) { - output_list_.push_back(df); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: " - << "size = " << df->size << ": Setting FD ready."; - ReadyToSend(); - } - int fd() { return fd_; } - - public: - ~SMConnection() { - if (initialized()) { - Reset(); - } - } - static SMConnection* NewSMConnection(EpollServer* epoll_server, - SSLState *ssl_state, - MemoryCache* memory_cache, - FlipAcceptor *acceptor, - string log_prefix) { - return new SMConnection(epoll_server, ssl_state, memory_cache, - acceptor, log_prefix); - } - - bool initialized() const { return initialized_; } - string client_ip() const { return client_ip_; } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) { - if (initialized_) { - LOG(FATAL) << "Attempted to initialize already initialized server"; - return; - } - - client_ip_ = remote_ip; - - if (fd == -1) { - // If fd == -1, then we are initializing a new connection that will - // connect to the backend. - // - // ret: -1 == error - // 0 == connection in progress - // 1 == connection complete - // TODO: is_numeric_host_address value needs to be detected - server_ip_ = server_ip; - server_port_ = server_port; - int ret = net::CreateConnectedSocket(&fd_, - server_ip, - server_port, - true, - acceptor_->disable_nagle_); - - if (ret < 0) { - LOG(ERROR) << "-1 Could not create connected socket"; - return; - } else if (ret == 1) { - DCHECK_NE(-1, fd_); - connection_complete_ = true; - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connection complete to: " << server_ip_ << ":" - << server_port_ << " "; - } - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connecting to server: " << server_ip_ << ":" - << server_port_ << " "; - } else { - // If fd != -1 then we are initializing a connection that has just been - // accepted from the listen socket. - connection_complete_ = true; - if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) { - epoll_server_->UnregisterFD(fd_); - } - if (fd_ != -1) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Closing pre-existing fd"; - close(fd_); - fd_ = -1; - } - - fd_ = fd; - } - - registered_in_epoll_server_ = false; - // Set the last read time here as the idle checker will start from - // now. - last_read_time_ = time(NULL); - initialized_ = true; - - connection_pool_ = connection_pool; - epoll_server_ = epoll_server; - - if (sm_interface) { - sm_interface_ = sm_interface; - protocol_detected_ = true; - } - - read_buffer_.Clear(); - - epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET); - - if (use_ssl) { - ssl_ = spdy_new_ssl(ssl_state_->ssl_ctx); - SSL_set_fd(ssl_, fd_); - PrintSslError(); - } - } - - void CorkSocket() { - int state = 1; - int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); - if (rv < 0) - VLOG(1) << "setsockopt(CORK): " << errno; - } - - void UncorkSocket() { - int state = 0; - int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state)); - if (rv < 0) - VLOG(1) << "setsockopt(CORK): " << errno; - } - - int Send(const char* data, int len, int flags) { - int rv = 0; - CorkSocket(); - if (ssl_) { - ssize_t bytes_written = 0; - // Write smallish chunks to SSL so that we don't have large - // multi-packet TLS records to receive before being able to handle - // the data. We don't have to be too careful here, because our data - // frames are already getting chunked appropriately, and those are - // the most likely "big" frames. - while(len > 0) { - const int kMaxTLSRecordSize = 1500; - const char* ptr = &(data[bytes_written]); - int chunksize = std::min(len, kMaxTLSRecordSize); - rv = SSL_write(ssl_, ptr, chunksize); - VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv; - if (rv <= 0) { - switch(SSL_get_error(ssl_, rv)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - case SSL_ERROR_WANT_ACCEPT: - case SSL_ERROR_WANT_CONNECT: - rv = -2; - break; - default: - PrintSslError(); - break; - } - break; - } - bytes_written += rv; - len -= rv; - if (rv != chunksize) - break; // If we couldn't write everything, we're implicitly stalled - } - // If we wrote some data, return that count. Otherwise - // return the stall error. - if (bytes_written > 0) - rv = bytes_written; - } else { - rv = send(fd_, data, len, flags); - } - if (!(flags & MSG_MORE)) - UncorkSocket(); - return rv; - } - - // the following are from the EpollCallbackInterface - virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { - registered_in_epoll_server_ = true; - } - virtual void OnModification(int fd, int event_mask) { } - virtual void OnEvent(int fd, EpollEvent* event) { - events_ |= event->in_events; - HandleEvents(); - if (events_) { - event->out_ready_mask = events_; - events_ = 0; - } - } - virtual void OnUnregistration(int fd, bool replaced) { - registered_in_epoll_server_ = false; - } - virtual void OnShutdown(EpollServer* eps, int fd) { - Cleanup("OnShutdown"); - return; - } - - void Cleanup(const char* cleanup) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup; - if (!initialized_) { - return; - } - Reset(); - if (connection_pool_) - connection_pool_->SMConnectionDone(this); - if (sm_interface_) - sm_interface_->ResetForNewConnection(); - last_read_time_ = 0; - } - - private: - void HandleEvents() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: " - << EpollServer::EventMaskToString(events_).c_str(); - - if (events_ & EPOLLIN) { - if (!DoRead()) - goto handle_close_or_error; - } - - if (events_ & EPOLLOUT) { - // Check if we have connected or not - if (connection_complete_ == false) { - int sock_error; - socklen_t sock_error_len = sizeof(sock_error); - int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, - &sock_error_len); - if (ret != 0) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "getsockopt error: " << errno << ": " << strerror(errno); - goto handle_close_or_error; - } - if (sock_error == 0) { - connection_complete_ = true; - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Connection complete to " << server_ip_ << ":" - << server_port_ << " "; - } else if (sock_error == EINPROGRESS) { - return; - } else { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "error connecting to server"; - goto handle_close_or_error; - } - } - if (!DoWrite()) - goto handle_close_or_error; - } - - if (events_ & (EPOLLHUP | EPOLLERR)) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR"; - goto handle_close_or_error; - } - return; - - handle_close_or_error: - Cleanup("HandleEvents"); - } - - // Decide if SPDY was negotiated. - bool WasSpdyNegotiated() { - if (FLAGS_force_spdy) - return true; - - // If this is an SSL connection, check if NPN specifies SPDY. - if (ssl_) { - const unsigned char *npn_proto; - unsigned int npn_proto_len; - SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len); - if (npn_proto_len > 0) { - string npn_proto_str((const char *)npn_proto, npn_proto_len); - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "NPN protocol detected: " << npn_proto_str; - if (!strncmp(reinterpret_cast<const char*>(npn_proto), - "spdy/2", npn_proto_len)) - return true; - } - } - - return false; - } - - // Initialize the protocol interfaces we'll need for this connection. - // Returns true if successful, false otherwise. - bool SetupProtocolInterfaces() { - DCHECK(!protocol_detected_); - protocol_detected_ = true; - - bool spdy_negotiated = WasSpdyNegotiated(); - bool using_ssl = ssl_ != NULL; - - if (using_ssl) - VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated") - << " SSL Session."; - - if (acceptor_->spdy_only_ && !spdy_negotiated) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "SPDY proxy only, closing HTTPS connection."; - return false; - } - - switch (acceptor_->flip_handler_type_) { - case FLIP_HANDLER_HTTP_SERVER: - { - DCHECK(!spdy_negotiated); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_http_interface_ ? "Creating" : "Reusing") - << " HTTP interface."; - if (!sm_http_interface_) - sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_, - memory_cache_, acceptor_); - sm_interface_ = sm_http_interface_; - } - break; - case FLIP_HANDLER_PROXY: - { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_streamer_interface_ ? "Creating" : "Reusing") - << " PROXY Streamer interface."; - if (!sm_streamer_interface_) - sm_streamer_interface_ = NewStreamerSM(this, NULL, - epoll_server_, - acceptor_); - sm_interface_ = sm_streamer_interface_; - // If spdy is not negotiated, the streamer interface will proxy all - // data to the origin server. - if (!spdy_negotiated) - break; - } - // Otherwise fall through into the case below. - case FLIP_HANDLER_SPDY_SERVER: - { - DCHECK(spdy_negotiated); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << (sm_spdy_interface_ ? "Creating" : "Reusing") - << " SPDY interface."; - if (!sm_spdy_interface_) - sm_spdy_interface_ = NewSpdySM(this, NULL, epoll_server_, - memory_cache_, acceptor_); - sm_interface_ = sm_spdy_interface_; - } - break; - } - - CorkSocket(); - if (!sm_interface_->PostAcceptHook()) - return false; - - return true; - } - - bool DoRead() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()"; - while (!read_buffer_.Full()) { - char* bytes; - int size; - if (fd_ == -1) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead(): fd_ == -1. Invalid FD. Returning false"; - return false; - } - read_buffer_.GetWritablePtr(&bytes, &size); - ssize_t bytes_read = 0; - if (ssl_) { - bytes_read = SSL_read(ssl_, bytes, size); - if (bytes_read < 0) { - int err = SSL_get_error(ssl_, bytes_read); - switch(err) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - case SSL_ERROR_WANT_ACCEPT: - case SSL_ERROR_WANT_CONNECT: - events_ &= ~EPOLLIN; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead: SSL WANT_XXX: " << err; - goto done; - default: - PrintSslError(); - goto error_or_close; - } - } - } else { - bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT); - } - int stored_errno = errno; - if (bytes_read == -1) { - switch (stored_errno) { - case EAGAIN: - events_ &= ~EPOLLIN; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EAGAIN while reading"; - goto done; - case EINTR: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EINTR while reading"; - continue; - default: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "While calling recv, got error: " - << (ssl_?"(ssl error)":strerror(stored_errno)); - goto error_or_close; - } - } else if (bytes_read > 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read - << " bytes"; - last_read_time_ = time(NULL); - // If the protocol hasn't been detected yet, set up the handlers - // we'll need. - if (!protocol_detected_) { - if (!SetupProtocolInterfaces()) { - LOG(ERROR) << "Error setting up protocol interfaces."; - goto error_or_close; - } - } - read_buffer_.AdvanceWritablePtr(bytes_read); - if (!DoConsumeReadData()) - goto error_or_close; - continue; - } else { // bytes_read == 0 - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "0 bytes read with recv call."; - } - goto error_or_close; - } - done: - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!"; - return true; - - error_or_close: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoRead(): error_or_close. " - << "Cleaning up, then returning false"; - Cleanup("DoRead"); - return false; - } - - bool DoConsumeReadData() { - char* bytes; - int size; - read_buffer_.GetReadablePtr(&bytes, &size); - while (size != 0) { - size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size); - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed " - << bytes_consumed << " bytes"; - if (bytes_consumed == 0) { - break; - } - read_buffer_.AdvanceReadablePtr(bytes_consumed); - if (sm_interface_->MessageFullyRead()) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "HandleRequestFullyRead: Setting EPOLLOUT"; - HandleResponseFullyRead(); - events_ |= EPOLLOUT; - } else if (sm_interface_->Error()) { - LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Framer error detected: Setting EPOLLOUT: " - << sm_interface_->ErrorAsString(); - // this causes everything to be closed/cleaned up. - events_ |= EPOLLOUT; - return false; - } - read_buffer_.GetReadablePtr(&bytes, &size); - } - return true; - } - - void WriteResponse() { - // this happens asynchronously from separate threads - // feeding files into the output buffer. - } - - void HandleResponseFullyRead() { - sm_interface_->Cleanup(); - } - - void Notify() { - } - - bool DoWrite() { - size_t bytes_sent = 0; - int flags = MSG_NOSIGNAL | MSG_DONTWAIT; - if (fd_ == -1) { - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoWrite: fd == -1. Returning false."; - return false; - } - if (output_list_.empty()) { - VLOG(2) << log_prefix_ << "DoWrite: Output list empty."; - if (sm_interface_) { - sm_interface_->GetOutput(); - } - if (output_list_.empty()) { - events_ &= ~EPOLLOUT; - } - } - while (!output_list_.empty()) { - VLOG(2) << log_prefix_ << "DoWrite: Items in output list: " - << output_list_.size(); - if (bytes_sent >= max_bytes_sent_per_dowrite_) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << " byte sent >= max bytes sent per write: Setting EPOLLOUT: " - << bytes_sent; - events_ |= EPOLLOUT; - break; - } - if (sm_interface_ && output_list_.size() < 2) { - sm_interface_->GetOutput(); - } - DataFrame* data_frame = output_list_.front(); - const char* bytes = data_frame->data; - int size = data_frame->size; - bytes += data_frame->index; - size -= data_frame->index; - DCHECK_GE(size, 0); - if (size <= 0) { - output_list_.pop_front(); - delete data_frame; - continue; - } - - flags = MSG_NOSIGNAL | MSG_DONTWAIT; - // Look for a queue size > 1 because |this| frame is remains on the list - // until it has finished sending. - if (output_list_.size() > 1) { - VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size() - << ": Adding MSG_MORE flag"; - flags |= MSG_MORE; - } - VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes."; - ssize_t bytes_written = Send(bytes, size, flags); - int stored_errno = errno; - if (bytes_written == -1) { - switch (stored_errno) { - case EAGAIN: - events_ &= ~EPOLLOUT; - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EAGAIN while writing"; - goto done; - case EINTR: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "Got EINTR while writing"; - continue; - default: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "While calling send, got error: " << stored_errno - << ": " << (ssl_?"":strerror(stored_errno)); - goto error_or_close; - } - } else if (bytes_written > 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: " - << bytes_written << " bytes"; - data_frame->index += bytes_written; - bytes_sent += bytes_written; - continue; - } else if (bytes_written == -2) { - // -2 handles SSL_ERROR_WANT_* errors - events_ &= ~EPOLLOUT; - goto done; - } - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "0 bytes written with send call."; - goto error_or_close; - } - done: - UncorkSocket(); - return true; - - error_or_close: - VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT - << "DoWrite: error_or_close. Returning false " - << "after cleaning up"; - Cleanup("DoWrite"); - UncorkSocket(); - return false; - } - - friend ostream& operator<<(ostream& os, const SMConnection& c) { - os << &c << "\n"; - return os; - } - - void Reset() { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting"; - if (ssl_) { - SSL_shutdown(ssl_); - PrintSslError(); - SSL_free(ssl_); - PrintSslError(); - ssl_ = NULL; - } - if (registered_in_epoll_server_) { - epoll_server_->UnregisterFD(fd_); - registered_in_epoll_server_ = false; - } - if (fd_ >= 0) { - VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection"; - close(fd_); - fd_ = -1; - } - read_buffer_.Clear(); - initialized_ = false; - protocol_detected_ = false; - events_ = 0; - for (list<DataFrame*>::iterator i = - output_list_.begin(); - i != output_list_.end(); - ++i) { - delete *i; - } - output_list_.clear(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class OutputOrdering { - public: - typedef list<MemCacheIter> PriorityRing; - - typedef map<uint32, PriorityRing> PriorityMap; - - struct PriorityMapPointer { - PriorityMapPointer(): ring(NULL), alarm_enabled(false) {} - PriorityRing* ring; - PriorityRing::iterator it; - bool alarm_enabled; - EpollServer::AlarmRegToken alarm_token; - }; - typedef map<uint32, PriorityMapPointer> StreamIdToPriorityMap; - - StreamIdToPriorityMap stream_ids_; - PriorityMap priority_map_; - PriorityRing first_data_senders_; - uint32 first_data_senders_threshold_; // when you've passed this, you're no - // longer a first_data_sender... - SMConnectionInterface* connection_; - EpollServer* epoll_server_; - - explicit OutputOrdering(SMConnectionInterface* connection) : - first_data_senders_threshold_(kInitialDataSendersThreshold), - connection_(connection) { - if (connection) { - epoll_server_ = connection->epoll_server(); - } - } - - void Reset() { - while (!stream_ids_.empty()) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin(); - PriorityMapPointer& pmp = sitpmi->second; - if (pmp.alarm_enabled) { - epoll_server_->UnregisterAlarm(pmp.alarm_token); - } - stream_ids_.erase(sitpmi); - } - priority_map_.clear(); - first_data_senders_.clear(); - } - - bool ExistsInPriorityMaps(uint32 stream_id) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); - return sitpmi != stream_ids_.end(); - } - - struct BeginOutputtingAlarm : public EpollAlarmCallbackInterface { - public: - BeginOutputtingAlarm(OutputOrdering* oo, - OutputOrdering::PriorityMapPointer* pmp, - const MemCacheIter& mci) : - output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {} - - int64 OnAlarm() { - OnUnregistration(); - output_ordering_->MoveToActive(pmp_, mci_); - VLOG(2) << "ON ALARM! Should now start to output..."; - delete this; - return 0; - } - void OnRegistration(const EpollServer::AlarmRegToken& tok, - EpollServer* eps) { - epoll_server_ = eps; - pmp_->alarm_token = tok; - pmp_->alarm_enabled = true; - } - void OnUnregistration() { - pmp_->alarm_enabled = false; - } - void OnShutdown(EpollServer* eps) { - OnUnregistration(); - } - ~BeginOutputtingAlarm() { - if (epoll_server_ && pmp_->alarm_enabled) - epoll_server_->UnregisterAlarm(pmp_->alarm_token); - } - private: - OutputOrdering* output_ordering_; - OutputOrdering::PriorityMapPointer* pmp_; - MemCacheIter mci_; - EpollServer* epoll_server_; - }; - - void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { - VLOG(2) << "Moving to active!"; - first_data_senders_.push_back(mci); - pmp->ring = &first_data_senders_; - pmp->it = first_data_senders_.end(); - --pmp->it; - connection_->ReadyToSend(); - } - - void AddToOutputOrder(const MemCacheIter& mci) { - if (ExistsInPriorityMaps(mci.stream_id)) - LOG(ERROR) << "OOps, already was inserted here?!"; - - double think_time_in_s = g_proxy_config.server_think_time_in_s_; - string x_server_latency = - mci.file_data->headers->GetHeader("X-Server-Latency").as_string(); - if (x_server_latency.size() != 0) { - char* endp; - double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); - if (endp != x_server_latency.c_str() + x_server_latency.size()) { - LOG(ERROR) << "Unable to understand X-Server-Latency of: " - << x_server_latency << " for resource: " - << mci.file_data->filename.c_str(); - } else { - think_time_in_s = tmp_think_time_in_s; - } - } - StreamIdToPriorityMap::iterator sitpmi; - sitpmi = stream_ids_.insert( - pair<uint32, PriorityMapPointer>(mci.stream_id, - PriorityMapPointer())).first; - PriorityMapPointer& pmp = sitpmi->second; - - BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); - VLOG(1) << "Server think time: " << think_time_in_s; - epoll_server_->RegisterAlarmApproximateDelta( - think_time_in_s * 1000000, boa); - } - - void SpliceToPriorityRing(PriorityRing::iterator pri) { - MemCacheIter& mci = *pri; - PriorityMap::iterator pmi = priority_map_.find(mci.priority); - if (pmi == priority_map_.end()) { - pmi = priority_map_.insert( - pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first; - } - - pmi->second.splice(pmi->second.end(), - first_data_senders_, - pri); - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id); - sitpmi->second.ring = &(pmi->second); - } - - MemCacheIter* GetIter() { - while (!first_data_senders_.empty()) { - MemCacheIter& mci = first_data_senders_.front(); - if (mci.bytes_sent >= first_data_senders_threshold_) { - SpliceToPriorityRing(first_data_senders_.begin()); - } else { - first_data_senders_.splice(first_data_senders_.end(), - first_data_senders_, - first_data_senders_.begin()); - mci.max_segment_size = kInitialDataSendersThreshold; - return &mci; - } - } - while (!priority_map_.empty()) { - PriorityRing& first_ring = priority_map_.begin()->second; - if (first_ring.empty()) { - priority_map_.erase(priority_map_.begin()); - continue; - } - MemCacheIter& mci = first_ring.front(); - first_ring.splice(first_ring.end(), - first_ring, - first_ring.begin()); - mci.max_segment_size = kSpdySegmentSize; - return &mci; - } - return NULL; - } - - void RemoveStreamId(uint32 stream_id) { - StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); - if (sitpmi == stream_ids_.end()) - return; - PriorityMapPointer& pmp = sitpmi->second; - if (pmp.alarm_enabled) { - epoll_server_->UnregisterAlarm(pmp.alarm_token); - } else { - pmp.ring->erase(pmp.it); - } - - stream_ids_.erase(sitpmi); - } -}; - - -//////////////////////////////////////////////////////////////////////////////// - -class SpdySM : public SpdyFramerVisitorInterface, public SMInterface { - private: - uint64 seq_num_; - SpdyFramer* spdy_framer_; - bool valid_spdy_session_; // True if we have seen valid data on this session. - // Use this to fail fast when junk is sent to our - // port. - - SMConnection* connection_; - OutputList* client_output_list_; - OutputOrdering client_output_ordering_; - uint32 next_outgoing_stream_id_; - EpollServer* epoll_server_; - FlipAcceptor* acceptor_; - MemoryCache* memory_cache_; - vector<SMInterface*> server_interface_list; - vector<int32> unused_server_interface_list; - typedef map<uint32,SMInterface*> StreamToSmif; - StreamToSmif stream_to_smif_; - bool close_on_error_; - public: - SpdySM(SMConnection* connection, - SMInterface* sm_http_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) - : seq_num_(0), - spdy_framer_(new SpdyFramer), - valid_spdy_session_(false), - connection_(connection), - client_output_list_(connection->output_list()), - client_output_ordering_(connection), - next_outgoing_stream_id_(2), - epoll_server_(epoll_server), - acceptor_(acceptor), - memory_cache_(memory_cache), - close_on_error_(false) { - spdy_framer_->set_visitor(this); - } - - ~SpdySM() { - delete spdy_framer_; - } - - void InitSMInterface(SMInterface* sm_http_interface, - int32 server_idx) { } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: Initializing server connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, server_port, - remote_ip, use_ssl); - } - - private: - virtual void OnError(SpdyFramer* framer) { - /* do nothing with this right now */ - } - - SMInterface* NewConnectionInterface() { - SMConnection* server_connection = - SMConnection::NewSMConnection(epoll_server_, NULL, - memory_cache_, acceptor_, - "http_conn: "); - if (server_connection == NULL) { - LOG(ERROR) << "SpdySM: Could not create server connection"; - return NULL; - } - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface"; - SMInterface *sm_http_interface = NewHttpSM(server_connection, this, - epoll_server_, memory_cache_, - acceptor_); - return sm_http_interface; - } - - SMInterface* FindOrMakeNewSMConnectionInterface(string server_ip, - string server_port) { - SMInterface *sm_http_interface; - int32 server_idx; - if (unused_server_interface_list.empty()) { - sm_http_interface = NewConnectionInterface(); - server_idx = server_interface_list.size(); - server_interface_list.push_back(sm_http_interface); - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: Making new server connection on index: " - << server_idx; - } else { - server_idx = unused_server_interface_list.back(); - unused_server_interface_list.pop_back(); - sm_http_interface = server_interface_list.at(server_idx); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on " - << "index: " << server_idx; - } - - sm_http_interface->InitSMInterface(this, server_idx); - sm_http_interface->InitSMConnection(NULL, sm_http_interface, - epoll_server_, -1, - server_ip, server_port, "", false); - - return sm_http_interface; - } - - int SpdyHandleNewStream(const SpdyControlFrame* frame, - string &http_data, - bool *is_https_scheme) - { - bool parsed_headers = false; - SpdyHeaderBlock headers; - const SpdySynStreamControlFrame* syn_stream = - reinterpret_cast<const SpdySynStreamControlFrame*>(frame); - - *is_https_scheme = false; - parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" - << syn_stream->stream_id() << ")"; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: headers parsed?: " - << (parsed_headers? "yes": "no"); - if (parsed_headers) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: " - << headers.size(); - } - SpdyHeaderBlock::iterator url = headers.find("url"); - SpdyHeaderBlock::iterator method = headers.find("method"); - if (url == headers.end() || method == headers.end()) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: didn't find method or url " - << "or method. Not creating stream"; - return 0; - } - - SpdyHeaderBlock::iterator scheme = headers.find("scheme"); - if (scheme->second.compare("https") == 0) { - *is_https_scheme = true; - } - - string uri = UrlUtilities::GetUrlPath(url->second); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { - SpdyHeaderBlock::iterator referer = headers.find("referer"); - if (referer != headers.end() && method->second == "GET") { - memory_cache_->UpdateHeaders(referer->second, url->second); - } - string host = UrlUtilities::GetUrlHost(url->second); - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second - << " " << uri; - string filename = EncodeURL(uri, host, method->second); - NewStream(syn_stream->stream_id(), - reinterpret_cast<const SpdySynStreamControlFrame*>(frame)-> - priority(), - filename); - } else { - SpdyHeaderBlock::iterator version = headers.find("version"); - http_data += method->second + " " + uri + " " + version->second + "\r\n"; - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " " - << uri << " " << version->second; - for (SpdyHeaderBlock::iterator i = headers.begin(); - i != headers.end(); ++i) { - http_data += i->first + ": " + i->second + "\r\n"; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":" - << i->second.c_str(); - } - if (g_proxy_config.forward_ip_header_enabled_) { - // X-Client-Cluster-IP header - http_data += g_proxy_config.forward_ip_header_ + ": " + - connection_->client_ip() + "\r\n"; - } - http_data += "\r\n"; - } - - VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data; - return 1; - } - - virtual void OnControl(const SpdyControlFrame* frame) { - SpdyHeaderBlock headers; - bool parsed_headers = false; - switch (frame->type()) { - case SYN_STREAM: - { - const SpdySynStreamControlFrame* syn_stream = - reinterpret_cast<const SpdySynStreamControlFrame*>(frame); - - string http_data; - bool is_https_scheme; - int ret = SpdyHandleNewStream(frame, http_data, &is_https_scheme); - if (!ret) { - LOG(ERROR) << "SpdySM: Could not convert spdy into http."; - break; - } - // We've seen a valid looking SYN_STREAM, consider this to have - // been a real spdy session. - valid_spdy_session_ = true; - - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - string server_ip; - string server_port; - if (is_https_scheme) { - server_ip = acceptor_->https_server_ip_; - server_port = acceptor_->https_server_port_; - } else { - server_ip = acceptor_->http_server_ip_; - server_port = acceptor_->http_server_port_; - } - SMInterface *sm_http_interface = - FindOrMakeNewSMConnectionInterface(server_ip, server_port); - stream_to_smif_[syn_stream->stream_id()] = sm_http_interface; - sm_http_interface->SetStreamID(syn_stream->stream_id()); - sm_http_interface->ProcessWriteInput(http_data.c_str(), - http_data.size()); - } - } - break; - - case SYN_REPLY: - parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" << - reinterpret_cast<const SpdySynReplyControlFrame*>(frame)->stream_id() - << ")"; - break; - case RST_STREAM: - { - const SpdyRstStreamControlFrame* rst_stream = - reinterpret_cast<const SpdyRstStreamControlFrame*>(frame); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRst(" - << rst_stream->stream_id() << ")"; - client_output_ordering_.RemoveStreamId(rst_stream ->stream_id()); - } - break; - - default: - LOG(ERROR) << "SpdySM: Unknown control frame type"; - } - } - virtual void OnStreamFrameData(SpdyStreamId stream_id, - const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id - << ", [" << len << "])"; - StreamToSmif::iterator it = stream_to_smif_.find(stream_id); - if (it == stream_to_smif_.end()) { - VLOG(2) << "Dropping frame from unknown stream " << stream_id; - if (!valid_spdy_session_) - close_on_error_ = true; - return; - } - - SMInterface* interface = it->second; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) - interface->ProcessWriteInput(data, len); - } - - public: - size_t ProcessReadInput(const char* data, size_t len) { - return spdy_framer_->ProcessInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - return 0; - } - - bool MessageFullyRead() const { - return spdy_framer_->MessageFullyRead(); - } - - void SetStreamID(uint32 stream_id) {} - - bool Error() const { - return close_on_error_ || spdy_framer_->HasError(); - } - - const char* ErrorAsString() const { - DCHECK(Error()); - return SpdyFramer::ErrorCodeToString(spdy_framer_->error_code()); - } - - void Reset() { - } - - void ResetForNewInterface(int32 server_idx) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: " - << "server_idx: " << server_idx; - unused_server_interface_list.push_back(server_idx); - } - - void ResetForNewConnection() { - // seq_num is not cleared, intentionally. - delete spdy_framer_; - spdy_framer_ = new SpdyFramer; - spdy_framer_->set_visitor(this); - valid_spdy_session_ = false; - client_output_ordering_.Reset(); - next_outgoing_stream_id_ = 2; - } - - // SMInterface's Cleanup is currently only called by SMConnection after a - // protocol message as been fully read. Spdy's SMInterface does not need - // to do any cleanup at this time. - // TODO (klindsay) This method is probably not being used properly and - // some logic review and method renaming is probably in order. - void Cleanup() {} - - // Send a settings frame - int PostAcceptHook() { - spdy::SpdySettings settings; - spdy::SettingsFlagsAndId settings_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS); - settings.push_back(spdy::SpdySetting(settings_id, 100)); - SpdySettingsControlFrame* settings_frame = - spdy_framer_->CreateSettings(settings); - - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame"; - EnqueueDataFrame(new SpdyFrameDataFrame(settings_frame)); - return 1; - } - - void AddAssociatedContent(FileData* file_data) { - for (unsigned int i = 0; i < file_data->related_files.size(); ++i) { - pair<int, string>& related_file = file_data->related_files[i]; - MemCacheIter mci; - string filename = "GET_"; - filename += related_file.second; - if (!memory_cache_->AssignFileData(filename, &mci)) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Unable to find associated " - << "content for: " << filename; - continue; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Adding associated content: " - << filename; - mci.stream_id = next_outgoing_stream_id_; - next_outgoing_stream_id_ += 2; - mci.priority = related_file.first; - AddToOutputOrder(mci); - } - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - MemCacheIter mci; - mci.stream_id = stream_id; - mci.priority = priority; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) { - if (!memory_cache_->AssignFileData(filename, &mci)) { - // error creating new stream. - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; - SendErrorNotFound(stream_id); - } else { - AddToOutputOrder(mci); - if (FLAGS_use_xac) { - AddAssociatedContent(mci.file_data); - } - } - } else { - AddToOutputOrder(mci); - } - } - - void AddToOutputOrder(const MemCacheIter& mci) { - client_output_ordering_.AddToOutputOrder(mci); - } - - void SendEOF(uint32 stream_id) { - SendEOFImpl(stream_id); - } - - void SendErrorNotFound(uint32 stream_id) { - SendErrorNotFoundImpl(stream_id); - } - - void SendOKResponse(uint32 stream_id, string* output) { - SendOKResponseImpl(stream_id, output); - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynStreamImpl(stream_id, headers); - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynReplyImpl(stream_id, headers); - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - SpdyDataFlags spdy_flags = static_cast<SpdyDataFlags>(flags); - SendDataFrameImpl(stream_id, data, len, spdy_flags, compress); - } - - SpdyFramer* spdy_framer() { return spdy_framer_; } - - private: - void SendEOFImpl(uint32 stream_id) { - SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id; - KillStream(stream_id); - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false); - client_output_ordering_.RemoveStreamId(stream_id); - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame( - stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false); - client_output_ordering_.RemoveStreamId(stream_id); - } - - void KillStream(uint32 stream_id) { - client_output_ordering_.RemoveStreamId(stream_id); - } - - void CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) { - for (BalsaHeaders::const_header_lines_iterator hi = - headers.header_lines_begin(); - hi != headers.header_lines_end(); - ++hi) { - // It is illegal to send SPDY headers with empty value or header - // names. - if (!hi->first.length() || !hi->second.length()) - continue; - - SpdyHeaderBlock::iterator fhi = dest.find(hi->first.as_string()); - if (fhi == dest.end()) { - dest[hi->first.as_string()] = hi->second.as_string(); - } else { - dest[hi->first.as_string()] = ( - string(fhi->second.data(), fhi->second.size()) + "," + - string(hi->second.data(), hi->second.size())); - } - } - - // These headers have no value - dest.erase("X-Associated-Content"); // TODO(mbelshe): case-sensitive - dest.erase("X-Original-Url"); // TODO(mbelshe): case-sensitive - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - SpdyHeaderBlock block; - block["method"] = headers.request_method().as_string(); - if (!headers.HasHeader("status")) - block["status"] = headers.response_code().as_string(); - if (!headers.HasHeader("version")) - block["version"] =headers.response_version().as_string(); - if (headers.HasHeader("X-Original-Url")) { - string original_url = headers.GetHeader("X-Original-Url").as_string(); - block["path"] = UrlUtilities::GetUrlPath(original_url); - } else { - block["path"] = headers.request_uri().as_string(); - } - CopyHeaders(block, headers); - - SpdySynStreamControlFrame* fsrcf = - spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true, - &block); - size_t df_size = fsrcf->length() + SpdyFrame::size(); - EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader " - << stream_id; - return df_size; - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - SpdyHeaderBlock block; - CopyHeaders(block, headers); - block["status"] = headers.response_code().as_string() + " " + - headers.response_reason_phrase().as_string(); - block["version"] = headers.response_version().as_string(); - - SpdySynReplyControlFrame* fsrcf = - spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block); - size_t df_size = fsrcf->length() + SpdyFrame::size(); - EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader " - << stream_id; - return df_size; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - SpdyDataFlags flags, bool compress) { - // Force compression off if disabled via command line. - if (!FLAGS_use_compression) - flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED); - - // TODO(mbelshe): We can't compress here - before going into the - // priority queue. Compression needs to be done - // with late binding. - if (len == 0) { - SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len, - flags); - EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); - return; - } - - // Chop data frames into chunks so that one stream can't monopolize the - // output channel. - while(len > 0) { - int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize)); - SpdyDataFlags chunk_flags = flags; - - // If we chunked this block, and the FIN flag was set, there is more - // data coming. So, remove the flag. - if ((size < len) && (flags & DATA_FLAG_FIN)) - chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN); - - SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, size, - chunk_flags); - EnqueueDataFrame(new SpdyFrameDataFrame(fdf)); - - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame " - << stream_id << " [" << size << "] shrunk to " << fdf->length() - << ", flags=" << flags; - - data += size; - len -= size; - } - } - - void EnqueueDataFrame(DataFrame* df) { - connection_->EnqueueDataFrame(df); - } - - void GetOutput() { - while (client_output_list_->size() < 2) { - MemCacheIter* mci = client_output_ordering_.GetIter(); - if (mci == NULL) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "SpdySM: GetOutput: nothing to output!?"; - return; - } - if (!mci->transformed_header) { - mci->transformed_header = true; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed " - << "header stream_id: [" << mci->stream_id << "]"; - if ((mci->stream_id % 2) == 0) { - // this is a server initiated stream. - // Ideally, we'd do a 'syn-push' here, instead of a syn-reply. - BalsaHeaders headers; - headers.CopyFrom(*(mci->file_data->headers)); - headers.ReplaceOrAppendHeader("status", "200"); - headers.ReplaceOrAppendHeader("version", "http/1.1"); - headers.SetRequestFirstlineFromStringPieces("PUSH", - mci->file_data->filename, - ""); - mci->bytes_sent = SendSynStream(mci->stream_id, headers); - } else { - BalsaHeaders headers; - headers.CopyFrom(*(mci->file_data->headers)); - mci->bytes_sent = SendSynReply(mci->stream_id, headers); - } - return; - } - if (mci->body_bytes_consumed >= mci->file_data->body.size()) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput " - << "remove_stream_id: [" << mci->stream_id << "]"; - SendEOF(mci->stream_id); - return; - } - size_t num_to_write = - mci->file_data->body.size() - mci->body_bytes_consumed; - if (num_to_write > mci->max_segment_size) - num_to_write = mci->max_segment_size; - - bool should_compress = false; - if (!mci->file_data->headers->HasHeader("content-encoding")) { - if (mci->file_data->headers->HasHeader("content-type")) { - string content_type = - mci->file_data->headers->GetHeader("content-type").as_string(); - if (content_type.find("image") == content_type.npos) - should_compress = true; - } - } - - SendDataFrame(mci->stream_id, - mci->file_data->body.data() + mci->body_bytes_consumed, - num_to_write, 0, should_compress); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame[" - << mci->stream_id << "]: " << num_to_write; - mci->body_bytes_consumed += num_to_write; - mci->bytes_sent += num_to_write; - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class HttpSM : public BalsaVisitorInterface, public SMInterface { - private: - uint64 seq_num_; - BalsaFrame* http_framer_; - BalsaHeaders headers_; - uint32 stream_id_; - int32 server_idx_; - - SMConnection* connection_; - SMInterface* sm_spdy_interface_; - OutputList* output_list_; - OutputOrdering output_ordering_; - MemoryCache* memory_cache_; - FlipAcceptor* acceptor_; - public: - explicit HttpSM(SMConnection* connection, - SMInterface* sm_spdy_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) : - seq_num_(0), - http_framer_(new BalsaFrame), - stream_id_(0), - server_idx_(-1), - connection_(connection), - sm_spdy_interface_(sm_spdy_interface), - output_list_(connection->output_list()), - output_ordering_(connection), - memory_cache_(connection->memory_cache()), - acceptor_(acceptor) { - http_framer_->set_balsa_visitor(this); - http_framer_->set_balsa_headers(&headers_); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - http_framer_->set_is_request(false); - } - } - private: - typedef map<string, uint32> ClientTokenMap; - private: - virtual void ProcessBodyInput(const char *input, size_t size) { - } - virtual void ProcessBodyData(const char *input, size_t size) { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process Body Data: stream " - << stream_id_ << ": size " << size; - sm_spdy_interface_->SendDataFrame(stream_id_, input, size, 0, false); - } - } - virtual void ProcessHeaderInput(const char *input, size_t size) { - } - virtual void ProcessTrailerInput(const char *input, size_t size) {} - virtual void ProcessHeaders(const BalsaHeaders& headers) { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - string host = - UrlUtilities::GetUrlHost(headers.GetHeader("Host").as_string()); - string method = headers.request_method().as_string(); - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Received Request: " - << headers.request_uri().as_string() << " " << method; - string filename = EncodeURL(headers.request_uri().as_string(), - host, method); - NewStream(stream_id_, 0, filename); - stream_id_ += 2; - } else { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Received Response from " - << connection_->server_ip_ << ":" - << connection_->server_port_ << " "; - sm_spdy_interface_->SendSynReply(stream_id_, headers); - } - } - virtual void ProcessRequestFirstLine(const char* line_input, - size_t line_length, - const char* method_input, - size_t method_length, - const char* request_uri_input, - size_t request_uri_length, - const char* version_input, - size_t version_length) {} - virtual void ProcessResponseFirstLine(const char *line_input, - size_t line_length, - const char *version_input, - size_t version_length, - const char *status_input, - size_t status_length, - const char *reason_input, - size_t reason_length) {} - virtual void ProcessChunkLength(size_t chunk_length) {} - virtual void ProcessChunkExtensions(const char *input, size_t size) {} - virtual void HeaderDone() {} - virtual void MessageDone() { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone. Sending EOF: " - << "stream " << stream_id_; - sm_spdy_interface_->SendEOF(stream_id_); - } else { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone."; - } - } - virtual void HandleHeaderError(BalsaFrame* framer) { - HandleError(); - } - virtual void HandleHeaderWarning(BalsaFrame* framer) {} - virtual void HandleChunkingError(BalsaFrame* framer) { - HandleError(); - } - virtual void HandleBodyError(BalsaFrame* framer) { - HandleError(); - } - - void HandleError() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Error detected"; - } - - public: - ~HttpSM() { - Reset(); - delete http_framer_; - } - - void InitSMInterface(SMInterface* sm_spdy_interface, - int32 server_idx) - { - sm_spdy_interface_ = sm_spdy_interface; - server_idx_ = server_idx; - } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server " - << "connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, server_port, - remote_ip, use_ssl); - } - - size_t ProcessReadInput(const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream " - << stream_id_; - return http_framer_->ProcessInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size " - << len << ": stream " << stream_id_; - char * dataPtr = new char[len]; - memcpy(dataPtr, data, len); - DataFrame* data_frame = new DataFrame; - data_frame->data = (const char *)dataPtr; - data_frame->size = len; - data_frame->delete_when_done = true; - connection_->EnqueueDataFrame(data_frame); - return len; - } - - bool MessageFullyRead() const { - return http_framer_->MessageFullyRead(); - } - - void SetStreamID(uint32 stream_id) { - stream_id_ = stream_id; - } - - bool Error() const { - return http_framer_->Error(); - } - - const char* ErrorAsString() const { - return BalsaFrameEnums::ErrorCodeToString(http_framer_->ErrorCode()); - } - - void Reset() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Reset: stream " - << stream_id_; - http_framer_->Reset(); - } - - void ResetForNewInterface(int32 server_idx) { - } - - void ResetForNewConnection() { - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Server connection closing " - << "to: " << connection_->server_ip_ << ":" - << connection_->server_port_ << " "; - } - seq_num_ = 0; - output_ordering_.Reset(); - http_framer_->Reset(); - if (sm_spdy_interface_) { - sm_spdy_interface_->ResetForNewInterface(server_idx_); - } - } - - void Cleanup() { - if (!(acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER)) { - VLOG(2) << "HttpSM Request Fully Read; stream_id: " << stream_id_; - connection_->Cleanup("request complete"); - } - } - - int PostAcceptHook() { - return 1; - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - MemCacheIter mci; - mci.stream_id = stream_id; - mci.priority = priority; - if (!memory_cache_->AssignFileData(filename, &mci)) { - // error creating new stream. - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound"; - SendErrorNotFound(stream_id); - } else { - AddToOutputOrder(mci); - } - } - - void AddToOutputOrder(const MemCacheIter& mci) { - output_ordering_.AddToOutputOrder(mci); - } - - void SendEOF(uint32 stream_id) { - SendEOFImpl(stream_id); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) { - sm_spdy_interface_->ResetForNewInterface(server_idx_); - } - } - - void SendErrorNotFound(uint32 stream_id) { - SendErrorNotFoundImpl(stream_id); - } - - void SendOKResponse(uint32 stream_id, string* output) { - SendOKResponseImpl(stream_id, output); - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return SendSynReplyImpl(stream_id, headers); - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - SendDataFrameImpl(stream_id, data, len, flags, compress); - } - - BalsaFrame* spdy_framer() { return http_framer_; } - - private: - void SendEOFImpl(uint32 stream_id) { - DataFrame* df = new DataFrame; - df->data = "0\r\n\r\n"; - df->size = 5; - df->delete_when_done = false; - EnqueueDataFrame(df); - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - Reset(); - } - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found"); - my_headers.RemoveAllOfHeader("content-length"); - my_headers.AppendHeader("transfer-encoding", "chunked"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, "wtf?", 4, 0, false); - SendEOFImpl(stream_id); - output_ordering_.RemoveStreamId(stream_id); - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - BalsaHeaders my_headers; - my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK"); - my_headers.RemoveAllOfHeader("content-length"); - my_headers.AppendHeader("transfer-encoding", "chunked"); - SendSynReplyImpl(stream_id, my_headers); - SendDataFrame(stream_id, output->c_str(), output->size(), 0, false); - SendEOFImpl(stream_id); - output_ordering_.RemoveStreamId(stream_id); - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - SimpleBuffer sb; - headers.WriteHeaderAndEndingToBuffer(&sb); - DataFrame* df = new DataFrame; - df->size = sb.ReadableBytes(); - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - sb.Read(buffer, df->size); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " - << stream_id_; - size_t df_size = df->size; - EnqueueDataFrame(df); - return df_size; - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - SimpleBuffer sb; - headers.WriteHeaderAndEndingToBuffer(&sb); - DataFrame* df = new DataFrame; - df->size = sb.ReadableBytes(); - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - sb.Read(buffer, df->size); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header " - << stream_id_; - size_t df_size = df->size; - EnqueueDataFrame(df); - return df_size; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - char chunk_buf[128]; - snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len); - string chunk_description(chunk_buf); - DataFrame* df = new DataFrame; - df->size = chunk_description.size() + len + 2; - char* buffer = new char[df->size]; - df->data = buffer; - df->delete_when_done = true; - memcpy(buffer, chunk_description.data(), chunk_description.size()); - memcpy(buffer + chunk_description.size(), data, len); - memcpy(buffer + chunk_description.size() + len, "\r\n", 2); - EnqueueDataFrame(df); - } - - void EnqueueDataFrame(DataFrame* df) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream " - << stream_id_; - connection_->EnqueueDataFrame(df); - } - - void GetOutput() { - MemCacheIter* mci = output_ordering_.GetIter(); - if (mci == NULL) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to " - << "output!?: stream " << stream_id_; - return; - } - if (!mci->transformed_header) { - mci->bytes_sent = SendSynReply(mci->stream_id, - *(mci->file_data->headers)); - mci->transformed_header = true; - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput transformed " - << "header stream_id: [" << mci->stream_id << "]"; - return; - } - if (mci->body_bytes_consumed >= mci->file_data->body.size()) { - SendEOF(mci->stream_id); - output_ordering_.RemoveStreamId(mci->stream_id); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "GetOutput remove_stream_id: [" - << mci->stream_id << "]"; - return; - } - size_t num_to_write = - mci->file_data->body.size() - mci->body_bytes_consumed; - if (num_to_write > mci->max_segment_size) - num_to_write = mci->max_segment_size; - - SendDataFrame(mci->stream_id, - mci->file_data->body.data() + mci->body_bytes_consumed, - num_to_write, 0, true); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput SendDataFrame[" - << mci->stream_id << "]: " << num_to_write; - mci->body_bytes_consumed += num_to_write; - mci->bytes_sent += num_to_write; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class StreamerSM : public SMInterface { - private: - SMConnection* connection_; - SMInterface* sm_other_interface_; - EpollServer* epoll_server_; - FlipAcceptor* acceptor_; - public: - explicit StreamerSM(SMConnection* connection, - SMInterface* sm_other_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor) : - connection_(connection), - sm_other_interface_(sm_other_interface), - epoll_server_(epoll_server), - acceptor_(acceptor) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Creating StreamerSM object"; - } - ~StreamerSM() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Destroying StreamerSM object"; - Reset(); - } - - void InitSMInterface(SMInterface* sm_other_interface, - int32 server_idx) - { - sm_other_interface_ = sm_other_interface; - } - - void InitSMConnection(SMConnectionPoolInterface* connection_pool, - SMInterface* sm_interface, - EpollServer* epoll_server, - int fd, - string server_ip, - string server_port, - string remote_ip, - bool use_ssl) - { - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server " - << "connection."; - connection_->InitSMConnection(connection_pool, sm_interface, - epoll_server, fd, server_ip, - server_port, remote_ip, use_ssl); - } - - size_t ProcessReadInput(const char* data, size_t len) { - return sm_other_interface_->ProcessWriteInput(data, len); - } - - size_t ProcessWriteInput(const char* data, size_t len) { - char * dataPtr = new char[len]; - memcpy(dataPtr, data, len); - DataFrame* df = new DataFrame; - df->data = (const char *)dataPtr; - df->size = len; - df->delete_when_done = true; - connection_->EnqueueDataFrame(df); - return len; - } - - bool MessageFullyRead() const { - return false; - } - - void SetStreamID(uint32 stream_id) {} - - bool Error() const { - return false; - } - - const char* ErrorAsString() const { - return "(none)"; - } - - void Reset() { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Reset"; - connection_->Cleanup("Server Reset"); - } - - void ResetForNewInterface(int32 server_idx) { - } - - void ResetForNewConnection() { - sm_other_interface_->Reset(); - } - - void Cleanup() { - } - - int PostAcceptHook() { - if (!sm_other_interface_) { - SMConnection *server_connection = - SMConnection::NewSMConnection(epoll_server_, NULL, NULL, - acceptor_, "server_conn: "); - if (server_connection == NULL) { - LOG(ERROR) << "StreamerSM: Could not create server conenction."; - return 0; - } - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Creating new server " - << "connection."; - sm_other_interface_ = new StreamerSM(server_connection, this, - epoll_server_, acceptor_); - sm_other_interface_->InitSMInterface(this, 0); - } - // The Streamer interface is used to stream HTTPS connections, so we - // will always use the https_server_ip/port here. - sm_other_interface_->InitSMConnection(NULL, sm_other_interface_, - epoll_server_, -1, - acceptor_->https_server_ip_, - acceptor_->https_server_port_, - "", - false); - - return 1; - } - - void NewStream(uint32 stream_id, uint32 priority, const string& filename) { - } - - void AddToOutputOrder(const MemCacheIter& mci) { - } - - void SendEOF(uint32 stream_id) { - } - - void SendErrorNotFound(uint32 stream_id) { - } - - void SendOKResponse(uint32 stream_id, string output) { - } - - size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - void SendDataFrame(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - } - - private: - void SendEOFImpl(uint32 stream_id) { - } - - void SendErrorNotFoundImpl(uint32 stream_id) { - } - - void SendOKResponseImpl(uint32 stream_id, string* output) { - } - - size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) { - return 0; - } - - void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len, - uint32 flags, bool compress) { - } - - void GetOutput() { - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -class Notification { - public: - explicit Notification(bool value) : value_(value) {} - - void Notify() { - base::AutoLock al(lock_); - value_ = true; - } - bool HasBeenNotified() { - base::AutoLock al(lock_); - return value_; - } - bool value_; - base::Lock lock_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class SMAcceptorThread : public SimpleThread, - public EpollCallbackInterface, - public SMConnectionPoolInterface { - EpollServer epoll_server_; - FlipAcceptor *acceptor_; - SSLState *ssl_state_; - bool use_ssl_; - - vector<SMConnection*> unused_server_connections_; - vector<SMConnection*> tmp_unused_server_connections_; - vector<SMConnection*> allocated_server_connections_; - list<SMConnection*> active_server_connections_; - Notification quitting_; - MemoryCache* memory_cache_; - public: - - SMAcceptorThread(FlipAcceptor *acceptor, - MemoryCache* memory_cache) : - SimpleThread("SMAcceptorThread"), - acceptor_(acceptor), - ssl_state_(NULL), - use_ssl_(false), - quitting_(false), - memory_cache_(memory_cache) - { - if (!acceptor->ssl_cert_filename_.empty() && - !acceptor->ssl_key_filename_.empty()) { - ssl_state_ = new SSLState; - bool use_npn = true; - if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) { - use_npn = false; - } - spdy_init_ssl(ssl_state_, acceptor_->ssl_cert_filename_, - acceptor_->ssl_key_filename_, use_npn); - use_ssl_ = true; - } - } - - ~SMAcceptorThread() { - for (vector<SMConnection*>::iterator i = - allocated_server_connections_.begin(); - i != allocated_server_connections_.end(); - ++i) { - delete *i; - } - delete ssl_state_; - } - - SMConnection* NewConnection() { - SMConnection* server = - SMConnection::NewSMConnection(&epoll_server_, ssl_state_, - memory_cache_, acceptor_, - "client_conn: "); - allocated_server_connections_.push_back(server); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server."; - return server; - } - - SMConnection* FindOrMakeNewSMConnection() { - if (unused_server_connections_.empty()) { - return NewConnection(); - } - SMConnection* server = unused_server_connections_.back(); - unused_server_connections_.pop_back(); - VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server."; - return server; - } - - void InitWorker() { - epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET); - } - - void HandleConnection(int server_fd, struct sockaddr_in *remote_addr) { - int on = 1; - int rc; - if (acceptor_->disable_nagle_) { - rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, - reinterpret_cast<char*>(&on), sizeof(on)); - if (rc < 0) { - close(server_fd); - LOG(ERROR) << "setsockopt() failed fd=" + server_fd; - return; - } - } - - SMConnection* server_connection = FindOrMakeNewSMConnection(); - if (server_connection == NULL) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd; - close(server_fd); - return; - } - string remote_ip = inet_ntoa(remote_addr->sin_addr); - server_connection->InitSMConnection(this, - NULL, - &epoll_server_, - server_fd, - "", "", remote_ip, - use_ssl_); - if (server_connection->initialized()) - active_server_connections_.push_back(server_connection); - } - - void AcceptFromListenFD() { - if (acceptor_->accepts_per_wake_ > 0) { - for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) { - struct sockaddr address; - socklen_t socklen = sizeof(address); - int fd = accept(acceptor_->listen_fd_, &address, &socklen); - if (fd == -1) { - if (errno != 11) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" - << acceptor_->listen_fd_ << "): " << errno << ": " - << strerror(errno); - } - break; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection"; - HandleConnection(fd, (struct sockaddr_in *)&address); - } - } else { - while (true) { - struct sockaddr address; - socklen_t socklen = sizeof(address); - int fd = accept(acceptor_->listen_fd_, &address, &socklen); - if (fd == -1) { - if (errno != 11) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail(" - << acceptor_->listen_fd_ << "): " << errno << ": " - << strerror(errno); - } - break; - } - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection"; - HandleConnection(fd, (struct sockaddr_in *)&address); - } - } - } - - // EpollCallbackInteface virtual functions. - virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { } - virtual void OnModification(int fd, int event_mask) { } - virtual void OnEvent(int fd, EpollEvent* event) { - if (event->in_events | EPOLLIN) { - VLOG(2) << ACCEPTOR_CLIENT_IDENT - << "Acceptor: Accepting based upon epoll events"; - AcceptFromListenFD(); - } - } - virtual void OnUnregistration(int fd, bool replaced) { } - virtual void OnShutdown(EpollServer* eps, int fd) { } - - void Quit() { - quitting_.Notify(); - } - - // Iterates through a list of active connections expiring any that have been - // idle longer than the configured timeout. - void HandleConnectionIdleTimeout() { - static time_t oldest_time = time(NULL); - - int cur_time = time(NULL); - // Only iterate the list if we speculate that a connection is ready to be - // expired - if ((cur_time - oldest_time) < g_proxy_config.idle_timeout_s_) - return; - list<SMConnection*>::iterator iter = active_server_connections_.begin(); - while (iter != active_server_connections_.end()) { - SMConnection *conn = *iter; - int elapsed_time = (cur_time - conn->last_read_time_); - if (elapsed_time > g_proxy_config.idle_timeout_s_) { - conn->Cleanup("Connection idle timeout reached."); - iter = active_server_connections_.erase(iter); - delete(conn); - continue; - } - if (conn->last_read_time_ < oldest_time) - oldest_time = conn->last_read_time_; - iter++; - } - if ((cur_time - oldest_time) >= g_proxy_config.idle_timeout_s_) - oldest_time = cur_time; - } - - void Run() { - while (!quitting_.HasBeenNotified()) { - epoll_server_.set_timeout_in_us(10 * 1000); // 10 ms - epoll_server_.WaitForEventsAndExecuteCallbacks(); - if (tmp_unused_server_connections_.size()) { - VLOG(2) << "have " << tmp_unused_server_connections_.size() - << " additional unused connections. Total = " - << unused_server_connections_.size(); - unused_server_connections_.insert(unused_server_connections_.end(), - tmp_unused_server_connections_.begin(), - tmp_unused_server_connections_.end()); - tmp_unused_server_connections_.clear(); - } - HandleConnectionIdleTimeout(); - } - } - - // SMConnections will use this: - virtual void SMConnectionDone(SMConnection* sc) { - VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection."; - tmp_unused_server_connections_.push_back(sc); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -SMInterface* NewStreamerSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - FlipAcceptor* acceptor) { - return new StreamerSM(connection, sm_interface, epoll_server, acceptor); -} - - -SMInterface* NewSpdySM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) { - return new SpdySM(connection, sm_interface, epoll_server, - memory_cache, acceptor); -} - -SMInterface* NewHttpSM(SMConnection* connection, - SMInterface* sm_interface, - EpollServer* epoll_server, - MemoryCache* memory_cache, - FlipAcceptor* acceptor) { - return new HttpSM(connection, sm_interface, epoll_server, - memory_cache, acceptor); -} +net::FlipConfig g_proxy_config; //////////////////////////////////////////////////////////////////////////////// @@ -3086,7 +75,7 @@ bool GotQuitFromStdin() { // Make stdin nonblocking. Yes this is done each time. Oh well. fcntl(0, F_SETFL, O_NONBLOCK); char c; - string maybequit; + std::string maybequit; while (read(0, &c, 1) > 0) { maybequit += c; } @@ -3224,18 +213,15 @@ int main (int argc, char**argv) pidfile_fd = OpenPidFile(PIDFILE); } - g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s; + net::OutputOrdering::set_server_think_time_in_s(FLAGS_server_think_time_in_s); if (cl.HasSwitch("forward-ip-header")) { - g_proxy_config.forward_ip_header_enabled_ = true; - g_proxy_config.forward_ip_header_ = - cl.GetSwitchValueASCII("forward-ip-header"); - } else { - g_proxy_config.forward_ip_header_enabled_ = false; + net::SpdySM::set_forward_ip_header( + cl.GetSwitchValueASCII("forward-ip-header")); } if (cl.HasSwitch("logdest")) { - string log_dest_value = cl.GetSwitchValueASCII("logdest"); + std::string log_dest_value = cl.GetSwitchValueASCII("logdest"); if (log_dest_value.compare("file") == 0) { g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE; } else if (log_dest_value.compare("system") == 0) { @@ -3266,7 +252,7 @@ int main (int argc, char**argv) } if (cl.HasSwitch("ssl-session-expiry")) { - string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); + std::string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry"); g_proxy_config.ssl_session_expiry_ = atoi(session_expiry.c_str()); } @@ -3275,12 +261,12 @@ int main (int argc, char**argv) } if (cl.HasSwitch("idle-timeout")) { - g_proxy_config.idle_timeout_s_ = + g_proxy_config.idle_socket_timeout_s_ = atoi(cl.GetSwitchValueASCII("idle-timeout").c_str()); } if (cl.HasSwitch("force_spdy")) - FLAGS_force_spdy = true; + net::SMConnection::set_force_spdy(true); InitLogging(g_proxy_config.log_filename_.c_str(), g_proxy_config.log_destination_, @@ -3292,8 +278,8 @@ int main (int argc, char**argv) LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_; LOG(INFO) << "Log file : " << g_proxy_config.log_filename_; LOG(INFO) << "Forward IP Header : " - << (g_proxy_config.forward_ip_header_enabled_ ? - g_proxy_config.forward_ip_header_ : "(disabled)"); + << (net::SpdySM::forward_ip_header().length() ? + net::SpdySM::forward_ip_header() : "<disabled>"); LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false"); LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size; LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake; @@ -3307,7 +293,8 @@ int main (int argc, char**argv) << g_proxy_config.ssl_session_expiry_; LOG(INFO) << "SSL disable compression : " << g_proxy_config.ssl_disable_compression_; - LOG(INFO) << "Connection idle timeout : " << g_proxy_config.idle_timeout_s_; + LOG(INFO) << "Connection idle timeout : " + << g_proxy_config.idle_socket_timeout_s_; // Proxy Acceptors while (true) { @@ -3317,13 +304,13 @@ int main (int argc, char**argv) if (!cl.HasSwitch(name.str())) { break; } - string value = cl.GetSwitchValueASCII(name.str()); - vector<std::string> valueArgs = split(value, ','); + std::string value = cl.GetSwitchValueASCII(name.str()); + std::vector<std::string> valueArgs = split(value, ','); CHECK_EQ((unsigned int)9, valueArgs.size()); int spdy_only = atoi(valueArgs[8].c_str()); // If wait_for_iface is enabled, then this call will block // indefinitely until the interface is raised. - g_proxy_config.AddAcceptor(FLIP_HANDLER_PROXY, + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_PROXY, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], valueArgs[4], valueArgs[5], @@ -3338,12 +325,12 @@ int main (int argc, char**argv) } // Spdy Server Acceptor - MemoryCache spdy_memory_cache; + net::MemoryCache spdy_memory_cache; if (cl.HasSwitch("spdy-server")) { spdy_memory_cache.AddFiles(); - string value = cl.GetSwitchValueASCII("spdy-server"); - vector<std::string> valueArgs = split(value, ','); - g_proxy_config.AddAcceptor(FLIP_HANDLER_SPDY_SERVER, + std::string value = cl.GetSwitchValueASCII("spdy-server"); + std::vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_SPDY_SERVER, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], "", "", "", "", @@ -3357,12 +344,12 @@ int main (int argc, char**argv) } // Spdy Server Acceptor - MemoryCache http_memory_cache; + net::MemoryCache http_memory_cache; if (cl.HasSwitch("http-server")) { http_memory_cache.AddFiles(); - string value = cl.GetSwitchValueASCII("http-server"); - vector<std::string> valueArgs = split(value, ','); - g_proxy_config.AddAcceptor(FLIP_HANDLER_HTTP_SERVER, + std::string value = cl.GetSwitchValueASCII("http-server"); + std::vector<std::string> valueArgs = split(value, ','); + g_proxy_config.AddAcceptor(net::FLIP_HANDLER_HTTP_SERVER, valueArgs[0], valueArgs[1], valueArgs[2], valueArgs[3], "", "", "", "", @@ -3375,13 +362,14 @@ int main (int argc, char**argv) &http_memory_cache); } - vector<SMAcceptorThread*> sm_worker_threads_; + std::vector<net::SMAcceptorThread*> sm_worker_threads_; for (i = 0; i < g_proxy_config.acceptors_.size(); i++) { - FlipAcceptor *acceptor = g_proxy_config.acceptors_[i]; + net::FlipAcceptor *acceptor = g_proxy_config.acceptors_[i]; - sm_worker_threads_.push_back(new SMAcceptorThread(acceptor, - (MemoryCache *)acceptor->memory_cache_)); + sm_worker_threads_.push_back( + new net::SMAcceptorThread(acceptor, + (net::MemoryCache *)acceptor->memory_cache_)); // Note that spdy_memory_cache is not threadsafe, it is merely // thread compatible. Thus, if ever we are to spawn multiple threads, // we either must make the MemoryCache threadsafe, or use @@ -3418,3 +406,4 @@ int main (int argc, char**argv) close(pidfile_fd); return 0; } + |