summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2010-11-24 10:19:22 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2010-11-24 10:19:22 +0000
commitb64abf63068267e8a3566cbcffc84cac54ee3e9d (patch)
treecff3a086a271027e61d9922349192963cc16b4d6 /net
parent011521d7190d56d3f48ebf42bba3a2e69cc16bfb (diff)
downloadchromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.zip
chromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.tar.gz
chromium_src-b64abf63068267e8a3566cbcffc84cac54ee3e9d.tar.bz2
Landing fix for keindsay@gmail.com (Strangeloop Networks)
Added proxy and NPN support as well as basic command line configuration for the flip server. Turn on building of flip server for linux by default. BUG=none TEST=test tool Review URL: http://codereview.chromium.org/5255008 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@67245 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/net.gyp104
-rw-r--r--net/tools/flip_server/balsa_headers.cc4
-rw-r--r--net/tools/flip_server/create_listener.cc164
-rw-r--r--net/tools/flip_server/create_listener.h32
-rw-r--r--net/tools/flip_server/epoll_server.cc4
-rw-r--r--net/tools/flip_server/flip_config.h150
-rw-r--r--net/tools/flip_server/flip_in_mem_edsm_server.cc1828
-rw-r--r--net/tools/flip_server/other_defines.h21
-rw-r--r--net/tools/flip_server/ring_buffer.h1
-rw-r--r--net/tools/flip_server/simple_buffer.h1
10 files changed, 1668 insertions, 641 deletions
diff --git a/net/net.gyp b/net/net.gyp
index dd0fcde..a5b1948 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -1334,61 +1334,57 @@
},
],
'conditions': [
- # ['OS=="linux"', {
- # 'targets': [
- # {
- # 'target_name': 'flip_in_mem_edsm_server',
- # 'type': 'executable',
- # 'dependencies': [
- # '../base/base.gyp:base',
- # 'net.gyp:net',
- # ],
- # 'link_settings': {
- # 'ldflags': [
- # '-lssl'
- # ],
- # 'libraries': [
- # '-lssl'
- # ],
- # },
- # 'sources': [
- # 'tools/dump_cache/url_to_filename_encoder.cc',
- # 'tools/dump_cache/url_to_filename_encoder.h',
- # 'tools/dump_cache/url_utilities.h',
- # 'tools/dump_cache/url_utilities.cc',
+ ['OS=="linux"', {
+ 'targets': [
+ {
+ 'target_name': 'flip_in_mem_edsm_server',
+ 'type': 'executable',
+ 'cflags': [
+ '-Wno-deprecated',
+ ],
+ 'dependencies': [
+ '../base/base.gyp:base',
+ 'net.gyp:net',
+ '../third_party/openssl/openssl.gyp:openssl',
+ ],
+ 'sources': [
+ 'tools/dump_cache/url_to_filename_encoder.cc',
+ 'tools/dump_cache/url_to_filename_encoder.h',
+ 'tools/dump_cache/url_utilities.h',
+ 'tools/dump_cache/url_utilities.cc',
- # 'tools/flip_server/balsa_enums.h',
- # 'tools/flip_server/balsa_frame.cc',
- # 'tools/flip_server/balsa_frame.h',
- # 'tools/flip_server/balsa_headers.cc',
- # 'tools/flip_server/balsa_headers.h',
- # 'tools/flip_server/balsa_headers_token_utils.cc',
- # 'tools/flip_server/balsa_headers_token_utils.h',
- # 'tools/flip_server/balsa_visitor_interface.h',
- # 'tools/flip_server/buffer_interface.h',
- # 'tools/flip_server/create_listener.cc',
- # 'tools/flip_server/create_listener.h',
- # 'tools/flip_server/epoll_server.cc',
- # 'tools/flip_server/epoll_server.h',
- # 'tools/flip_server/flip_in_mem_edsm_server.cc',
- # 'tools/flip_server/http_message_constants.cc',
- # 'tools/flip_server/http_message_constants.h',
- # 'tools/flip_server/loadtime_measurement.h',
- # 'tools/flip_server/porting.txt',
- # 'tools/flip_server/ring_buffer.cc',
- # 'tools/flip_server/ring_buffer.h',
- # 'tools/flip_server/simple_buffer.cc',
- # 'tools/flip_server/simple_buffer.h',
- # 'tools/flip_server/split.h',
- # 'tools/flip_server/split.cc',
- # 'tools/flip_server/string_piece_utils.h',
- # 'tools/flip_server/thread.h',
- # 'tools/flip_server/url_to_filename_encoder.h',
- # 'tools/flip_server/url_utilities.h',
- # ],
- # },
- # ]
- # }],
+ 'tools/flip_server/balsa_enums.h',
+ 'tools/flip_server/balsa_frame.cc',
+ 'tools/flip_server/balsa_frame.h',
+ 'tools/flip_server/balsa_headers.cc',
+ 'tools/flip_server/balsa_headers.h',
+ 'tools/flip_server/balsa_headers_token_utils.cc',
+ 'tools/flip_server/balsa_headers_token_utils.h',
+ 'tools/flip_server/balsa_visitor_interface.h',
+ 'tools/flip_server/buffer_interface.h',
+ 'tools/flip_server/create_listener.cc',
+ 'tools/flip_server/create_listener.h',
+ 'tools/flip_server/epoll_server.cc',
+ 'tools/flip_server/epoll_server.h',
+ 'tools/flip_server/flip_in_mem_edsm_server.cc',
+ 'tools/flip_server/http_message_constants.cc',
+ 'tools/flip_server/http_message_constants.h',
+ 'tools/flip_server/loadtime_measurement.h',
+ 'tools/flip_server/porting.txt',
+ 'tools/flip_server/ring_buffer.cc',
+ 'tools/flip_server/ring_buffer.h',
+ 'tools/flip_server/simple_buffer.cc',
+ 'tools/flip_server/simple_buffer.h',
+ 'tools/flip_server/split.h',
+ 'tools/flip_server/split.cc',
+ 'tools/flip_server/string_piece_utils.h',
+ 'tools/flip_server/thread.h',
+ 'tools/flip_server/url_to_filename_encoder.h',
+ 'tools/flip_server/url_utilities.h',
+ ],
+ },
+ ]
+ }],
['OS=="linux"', {
'targets': [
{
diff --git a/net/tools/flip_server/balsa_headers.cc b/net/tools/flip_server/balsa_headers.cc
index 2196cd4..74364a2 100644
--- a/net/tools/flip_server/balsa_headers.cc
+++ b/net/tools/flip_server/balsa_headers.cc
@@ -616,7 +616,7 @@ void BalsaHeaders::SetContentLength(size_t length) {
content_length_ = length;
// FastUInt64ToBuffer is supposed to use a maximum of kFastToBufferSize bytes.
char buffer[kFastToBufferSize];
- int len_converted = snprintf(buffer, sizeof(buffer), "%d", length);
+ int len_converted = snprintf(buffer, sizeof(buffer), "%zd", length);
CHECK_GT(len_converted, 0);
const base::StringPiece length_str(buffer, len_converted);
AppendHeader(content_length, length_str);
@@ -725,7 +725,7 @@ void BalsaHeaders::SetParsedResponseCodeAndUpdateFirstline(
size_t parsed_response_code) {
char buffer[kFastToBufferSize];
int len_converted = snprintf(buffer, sizeof(buffer),
- "%d", parsed_response_code);
+ "%zd", parsed_response_code);
CHECK_GT(len_converted, 0);
SetResponseCode(base::StringPiece(buffer, len_converted));
}
diff --git a/net/tools/flip_server/create_listener.cc b/net/tools/flip_server/create_listener.cc
index 3538261..59a03a6 100644
--- a/net/tools/flip_server/create_listener.cc
+++ b/net/tools/flip_server/create_listener.cc
@@ -7,8 +7,10 @@
#include <netdb.h> // for getaddrinfo and getnameinfo
#include <netinet/in.h> // for IPPROTO_*, etc.
#include <stdlib.h> // for EXIT_FAILURE
+#include <netinet/tcp.h> // For TCP_NODELAY
#include <sys/socket.h> // for getaddrinfo and getnameinfo
#include <sys/types.h> // "
+#include <fcntl.h>
#include <unistd.h> // for exit()
#include <ostream>
@@ -64,15 +66,47 @@ bool CloseSocket(int *fd, int tries) {
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
+// Sets an FD to be nonblocking.
+void SetNonBlocking(int fd) {
+ DCHECK_GE(fd, 0);
+
+ int fcntl_return = fcntl(fd, F_GETFL, 0);
+ CHECK_NE(fcntl_return, -1)
+ << "error doing fcntl(fd, F_GETFL, 0) fd: " << fd
+ << " errno=" << errno;
+
+ if (fcntl_return & O_NONBLOCK)
+ return;
+
+ fcntl_return = fcntl(fd, F_SETFL, fcntl_return | O_NONBLOCK);
+ CHECK_NE(fcntl_return, -1)
+ << "error doing fcntl(fd, F_SETFL, fcntl_return) fd: " << fd
+ << " errno=" << errno;
+}
+
+int SetDisableNagle(int fd) {
+ int on = 1;
+ int rc;
+ rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
+ reinterpret_cast<char*>(&on), sizeof(on));
+ if (rc < 0) {
+ close(fd);
+ LOG(FATAL) << "setsockopt() TCP_NODELAY: failed on fd " << fd;
+ return 0;
+ }
+ return 1;
+}
+
// see header for documentation of this function.
-void CreateListeningSocket(const std::string& host,
- const std::string& port,
- bool is_numeric_host_address,
- int backlog,
- int * listen_fd,
- bool reuseaddr,
- bool reuseport,
- std::ostream* error_stream) {
+int CreateListeningSocket(const std::string& host,
+ const std::string& port,
+ bool is_numeric_host_address,
+ int backlog,
+ bool reuseaddr,
+ bool reuseport,
+ bool wait_for_iface,
+ bool disable_nagle,
+ int * listen_fd ) {
// start out by assuming things will fail.
*listen_fd = -1;
@@ -91,16 +125,15 @@ void CreateListeningSocket(const std::string& host,
}
hints.ai_flags |= AI_PASSIVE;
- hints.ai_family = PF_INET; // we know it'll be IPv4, but if we didn't
- // hints.ai_family = PF_UNSPEC; // know we'd use this. <---
+ hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_STREAM;
int err = 0;
if ((err=getaddrinfo(node, service, &hints, &results))) {
// gai_strerror -is- threadsafe, so we get to use it here.
- *error_stream << "getaddrinfo " << " for (" << host << ":" << port
+ LOG(ERROR) << "getaddrinfo " << " for (" << host << ":" << port
<< ") " << gai_strerror(err) << "\n";
- return;
+ return -1;
}
// this will delete the addrinfo memory when we return from this function.
AddrinfoGuard addrinfo_guard(results);
@@ -109,9 +142,9 @@ void CreateListeningSocket(const std::string& host,
results->ai_socktype,
results->ai_protocol);
if (sock == -1) {
- *error_stream << "Unable to create socket for (" << host << ":"
+ LOG(ERROR) << "Unable to create socket for (" << host << ":"
<< port << "): " << strerror(errno) << "\n";
- return;
+ return -1;
}
if (reuseaddr) {
@@ -141,39 +174,126 @@ void CreateListeningSocket(const std::string& host,
}
if (bind(sock, results->ai_addr, results->ai_addrlen)) {
- *error_stream << "Bind was unsuccessful for (" << host << ":"
- << port << "): " << strerror(errno) << "\n";
+ // If we are waiting for the interface to be raised, such as in an
+ // HA environment, ignore reporting any errors.
+ int saved_errno = errno;
+ if ( !wait_for_iface || errno != EADDRNOTAVAIL) {
+ LOG(ERROR) << "Bind was unsuccessful for (" << host << ":"
+ << port << "): " << strerror(errno) << "\n";
+ }
// if we knew that we were not multithreaded, we could do the following:
// " : " << strerror(errno) << "\n";
if (CloseSocket(&sock, 100)) {
- return;
+ if ( saved_errno == EADDRNOTAVAIL ) {
+ return -3;
+ }
+ return -2;
} else {
// couldn't even close the dang socket?!
- *error_stream << "Unable to close the socket.. Considering this a fatal "
+ LOG(ERROR) << "Unable to close the socket.. Considering this a fatal "
"error, and exiting\n";
exit(EXIT_FAILURE);
+ return -1;
+ }
+ }
+
+ if (disable_nagle) {
+ if (!SetDisableNagle(sock)) {
+ return -1;
}
}
if (listen(sock, backlog)) {
// listen was unsuccessful.
- *error_stream << "Listen was unsuccessful for (" << host << ":"
+ LOG(ERROR) << "Listen was unsuccessful for (" << host << ":"
<< port << "): " << strerror(errno) << "\n";
// if we knew that we were not multithreaded, we could do the following:
// " : " << strerror(errno) << "\n";
if (CloseSocket(&sock, 100)) {
sock = -1;
- return;
+ return -1;
} else {
// couldn't even close the dang socket?!
- *error_stream << "Unable to close the socket.. Considering this a fatal "
+ LOG(FATAL) << "Unable to close the socket.. Considering this a fatal "
"error, and exiting\n";
- exit(EXIT_FAILURE);
}
}
+
// If we've gotten to here, Yeay! Success!
*listen_fd = sock;
+
+ return 0;
+}
+
+int CreateConnectedSocket( int *connect_fd,
+ const std::string& host,
+ const std::string& port,
+ bool is_numeric_host_address,
+ bool disable_nagle ) {
+ const char* node = NULL;
+ const char* service = NULL;
+
+ *connect_fd = -1;
+ if (!host.empty())
+ node = host.c_str();
+ if (!port.empty())
+ service = port.c_str();
+
+ struct addrinfo *results = 0;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+
+ if (is_numeric_host_address)
+ hints.ai_flags = AI_NUMERICHOST; // iff you know the name is numeric.
+ hints.ai_flags |= AI_PASSIVE;
+
+ hints.ai_family = PF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+
+ int err = 0;
+ if ((err=getaddrinfo(node, service, &hints, &results))) {
+ // gai_strerror -is- threadsafe, so we get to use it here.
+ LOG(ERROR) << "getaddrinfo for (" << node << ":" << service << "): "
+ << gai_strerror(err);
+ return -1;
+ }
+ // this will delete the addrinfo memory when we return from this function.
+ AddrinfoGuard addrinfo_guard(results);
+
+ int sock = socket(results->ai_family,
+ results->ai_socktype,
+ results->ai_protocol);
+ if (sock == -1) {
+ LOG(ERROR) << "Unable to create socket for (" << node << ":" << service
+ << "): " << strerror( errno );
+ return -1;
+ }
+
+ SetNonBlocking( sock );
+
+ if (disable_nagle) {
+ if (!SetDisableNagle(sock)) {
+ return -1;
+ }
+ }
+
+ int ret_val = 0;
+ if ( connect( sock, results->ai_addr, results->ai_addrlen ) ) {
+ if ( errno != EINPROGRESS ) {
+ LOG(ERROR) << "Connect was unsuccessful for (" << node << ":" << service
+ << "): " << strerror(errno);
+ close( sock );
+ return -1;
+ }
+ } else {
+ ret_val = 1;
+ }
+
+ // If we've gotten to here, Yeay! Success!
+ *connect_fd = sock;
+
+ return ret_val;
}
} // namespace net
diff --git a/net/tools/flip_server/create_listener.h b/net/tools/flip_server/create_listener.h
index 4c0a277..3a7b16e 100644
--- a/net/tools/flip_server/create_listener.h
+++ b/net/tools/flip_server/create_listener.h
@@ -11,6 +11,8 @@
namespace net {
+void SetNonBlocking(int fd);
+
// Summary:
// creates a socket for listening, and bind()s and listen()s it.
// Args:
@@ -27,19 +29,29 @@ namespace net {
// backlog - passed into listen. This is the number of pending incoming
// connections a socket which is listening may have acquired before
// the OS starts rejecting new incoming connections.
+// reuseaddr - if true sets SO_REUSEADDR on the listening socket
+// reuseport - if true sets SO_REUSEPORT on the listening socket
+// wait_for_iface - A boolean indicating that CreateListeningSocket should
+// block until the interface that it will bind to has been
+// raised. This is intended for HA environments.
+// disable_nagle - if true sets TCP_NODELAY on the listening socket.
// listen_fd - this will be assigned a positive value if the socket is
// successfully created, else it will be assigned -1.
-// error_stream - in the case of errors, output describing the error will
-// be written into error_stream.
-void CreateListeningSocket(const std::string& host,
- const std::string& port,
- bool is_numeric_host_address,
- int backlog,
- int * listen_fd,
- bool reuseaddr,
- bool reuseport,
- std::ostream* error_stream);
+int CreateListeningSocket(const std::string& host,
+ const std::string& port,
+ bool is_numeric_host_address,
+ int backlog,
+ bool reuseaddr,
+ bool reuseport,
+ bool wait_for_iface,
+ bool disable_nagle,
+ int * listen_fd);
+int CreateConnectedSocket(int *connect_fd,
+ const std::string& host,
+ const std::string& port,
+ bool is_numeric_host_address,
+ bool disable_nagle);
} // namespace net
#endif // NET_TOOLS_FLIP_SERVER_CREATE_LISTENER_H__
diff --git a/net/tools/flip_server/epoll_server.cc b/net/tools/flip_server/epoll_server.cc
index a1d6ca1..f78663c 100644
--- a/net/tools/flip_server/epoll_server.cc
+++ b/net/tools/flip_server/epoll_server.cc
@@ -13,7 +13,6 @@
#include "base/logging.h"
#include "base/timer.h"
-#include "net/tools/flip_server/other_defines.h"
// Design notes: An efficient implementation of ready list has the following
// desirable properties:
@@ -478,7 +477,8 @@ int EpollServer::NumFDsRegistered() const {
void EpollServer::Wake() {
char data = 'd'; // 'd' is for data. It's good enough for me.
- write(write_fd_, &data, 1);
+ int rv = write(write_fd_, &data, 1);
+ DCHECK(rv == 1);
}
int64 EpollServer::NowInUsec() const {
diff --git a/net/tools/flip_server/flip_config.h b/net/tools/flip_server/flip_config.h
new file mode 100644
index 0000000..3f202f8
--- /dev/null
+++ b/net/tools/flip_server/flip_config.h
@@ -0,0 +1,150 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_TOOLS_FLIP_PROXY_CONFIG_H
+#define NET_TOOLS_FLIP_PROXY_CONFIG_H
+#pragma once
+
+#include <arpa/inet.h> // in_addr_t
+
+#include "base/logging.h"
+#include "net/tools/flip_server/create_listener.h"
+
+#include <vector>
+#include <string>
+
+using std::string;
+using std::vector;
+
+enum FlipHandlerType {
+ FLIP_HANDLER_PROXY,
+ FLIP_HANDLER_SPDY_SERVER,
+ FLIP_HANDLER_HTTP_SERVER
+};
+
+class FlipAcceptor {
+public:
+ enum FlipHandlerType flip_handler_type_;
+ string listen_ip_;
+ string listen_port_;
+ string ssl_cert_filename_;
+ string ssl_key_filename_;
+ string server_ip_;
+ string server_port_;
+ int accept_backlog_size_;
+ bool disable_nagle_;
+ int accepts_per_wake_;
+ int listen_fd_;
+ void* memory_cache_;
+
+ FlipAcceptor(enum FlipHandlerType flip_handler_type,
+ string listen_ip,
+ string listen_port,
+ string ssl_cert_filename,
+ string ssl_key_filename,
+ string server_ip,
+ string server_port,
+ int accept_backlog_size,
+ bool disable_nagle,
+ int accepts_per_wake,
+ bool reuseport,
+ bool wait_for_iface,
+ void *memory_cache) :
+ flip_handler_type_(flip_handler_type),
+ listen_ip_(listen_ip),
+ listen_port_(listen_port),
+ ssl_cert_filename_(ssl_cert_filename),
+ ssl_key_filename_(ssl_key_filename),
+ server_ip_(server_ip),
+ server_port_(server_port),
+ accept_backlog_size_(accept_backlog_size),
+ disable_nagle_(disable_nagle),
+ accepts_per_wake_(accepts_per_wake),
+ memory_cache_(memory_cache)
+ {
+ VLOG(1) << "Attempting to listen on " << listen_ip_.c_str() << ":"
+ << listen_port_.c_str();
+ while (1) {
+ int ret = net::CreateListeningSocket(listen_ip_,
+ listen_port_,
+ true,
+ accept_backlog_size_,
+ true,
+ reuseport,
+ wait_for_iface,
+ disable_nagle_,
+ &listen_fd_);
+ if ( ret == 0 ) {
+ break;
+ } else if ( ret == -3 && wait_for_iface ) {
+ // Binding error EADDRNOTAVAIL was encounted. We need
+ // to wait for the interfaces to raised. try again.
+ usleep(200000);
+ } else {
+ LOG(ERROR) << "Unable to create listening socket for: ret = " << ret
+ << ": " << listen_ip_.c_str() << ":"
+ << listen_port_.c_str();
+ return;
+ }
+ }
+ net::SetNonBlocking(listen_fd_);
+ VLOG(1) << "Listening for spdy on port: " << listen_ip_ << ":"
+ << listen_port_;
+ }
+ ~FlipAcceptor () {}
+};
+
+class FlipConfig {
+public:
+ std::vector <FlipAcceptor*> acceptors_;
+ double server_think_time_in_s_;
+ enum logging::LoggingDestination log_destination_;
+ string log_filename_;
+ bool forward_ip_header_enabled_;
+ string forward_ip_header_;
+ bool wait_for_iface_;
+ int ssl_session_expiry_;
+
+ FlipConfig() :
+ server_think_time_in_s_(0),
+ log_destination_(logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG),
+ forward_ip_header_enabled_(false),
+ wait_for_iface_(false),
+ ssl_session_expiry_(300)
+ {}
+
+ ~FlipConfig() {}
+
+ void AddAcceptor(enum FlipHandlerType flip_handler_type,
+ string listen_ip,
+ string listen_port,
+ string ssl_cert_filename,
+ string ssl_key_filename,
+ string server_ip,
+ string server_port,
+ int accept_backlog_size,
+ bool disable_nagle,
+ int accepts_per_wake,
+ bool reuseport,
+ bool wait_for_iface,
+ void *memory_cache) {
+ // TODO(mbelshe): create a struct FlipConfigArgs{} for the arguments.
+ acceptors_.push_back(new FlipAcceptor(flip_handler_type,
+ listen_ip,
+ listen_port,
+ ssl_cert_filename,
+ ssl_key_filename,
+ server_ip,
+ server_port,
+ accept_backlog_size,
+ disable_nagle,
+ accepts_per_wake,
+ reuseport,
+ wait_for_iface,
+ memory_cache));
+ }
+
+};
+
+#endif
diff --git a/net/tools/flip_server/flip_in_mem_edsm_server.cc b/net/tools/flip_server/flip_in_mem_edsm_server.cc
index 13658184..7848861 100644
--- a/net/tools/flip_server/flip_in_mem_edsm_server.cc
+++ b/net/tools/flip_server/flip_in_mem_edsm_server.cc
@@ -9,6 +9,7 @@
#include <unistd.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
+#include <signal.h>
#include <deque>
#include <iostream>
@@ -16,6 +17,7 @@
#include <vector>
#include <list>
+#include "base/command_line.h"
#include "base/logging.h"
#include "base/simple_thread.h"
#include "base/timer.h"
@@ -30,16 +32,14 @@
#include "net/tools/flip_server/balsa_headers.h"
#include "net/tools/flip_server/balsa_visitor_interface.h"
#include "net/tools/flip_server/buffer_interface.h"
-#include "net/tools/flip_server/create_listener.h"
#include "net/tools/flip_server/epoll_server.h"
-#include "net/tools/flip_server/other_defines.h"
#include "net/tools/flip_server/ring_buffer.h"
#include "net/tools/flip_server/simple_buffer.h"
#include "net/tools/flip_server/split.h"
+#include "net/tools/flip_server/flip_config.h"
////////////////////////////////////////////////////////////////////////////////
-using std::cerr;
using std::deque;
using std::list;
using std::map;
@@ -47,26 +47,24 @@ using std::ostream;
using std::pair;
using std::string;
using std::vector;
+using std::cout;
////////////////////////////////////////////////////////////////////////////////
+#define IPV4_PRINTABLE_FORMAT(IP) (((IP)>>0)&0xff),(((IP)>>8)&0xff), \
+ (((IP)>>16)&0xff),(((IP)>>24)&0xff)
-// If set to true, then the server will act as an SSL server for both
-// HTTP and SPDY);
-bool FLAGS_use_ssl = true;
+#define ACCEPTOR_CLIENT_IDENT acceptor_->listen_ip_ << ":" \
+ << acceptor_->listen_port_ << " "
+#define ACCEPTOR_SERVER_IDENT acceptor_->server_ip_ << ":" \
+ << acceptor_->server_port_ << " "
-// The name of the cert .pem file);
-string FLAGS_ssl_cert_name = "cert.pem";
+#define NEXT_PROTO_STRING "\x06spdy/2\x08http/1.1\x08http/1.0"
-// The name of the key .pem file);
-string FLAGS_ssl_key_name = "key.pem";
-
-// The number of responses given before the server closes the
-// connection);
-int32 FLAGS_response_count_until_close = 1000*1000;
+#define SSL_CTX_DEFAULT_CIPHER_LIST "RC4:!aNULL:!eNULL"
// If true, then disables the nagle algorithm);
-bool FLAGS_no_nagle = true;
+bool FLAGS_disable_nagle = true;
// The number of times that accept() will be called when the
// alarm goes off when the accept_using_alarm flag is set to true.
@@ -74,12 +72,6 @@ bool FLAGS_no_nagle = true;
// is completely drained and the accept() call returns an error);
int32 FLAGS_accepts_per_wake = 0;
-// The port on which the spdy server listens);
-int32 FLAGS_spdy_port = 10040;
-
-// The port on which the http server listens);
-int32 FLAGS_port = 16002;
-
// The size of the TCP accept backlog);
int32 FLAGS_accept_backlog_size = 1024;
@@ -87,7 +79,7 @@ int32 FLAGS_accept_backlog_size = 1024;
string FLAGS_cache_base_dir = ".";
// If true, then encode url to filename);
-bool FLAGS_need_to_encode_url = true;
+bool FLAGS_need_to_encode_url = false;
// If set to false a single socket will be used. If set to true
// then a new socket will be created for each accept thread.
@@ -105,9 +97,6 @@ bool FLAGS_use_xsub = false;
// Does the server send X-Associated-Content headers);
bool FLAGS_use_xac = false;
-// Does the server advance cwnd by sending no-op packets);
-bool FLAGS_use_cwnd_opener = false;
-
// Does the server compress data frames);
bool FLAGS_use_compression = false;
@@ -115,8 +104,6 @@ bool FLAGS_use_compression = false;
using base::StringPiece;
using base::SimpleThread;
-// using base::Lock; // heh, this isn't in base namespace?!
-// using base::AutoLock; // ditto!
using net::BalsaFrame;
using net::BalsaFrameEnums;
using net::BalsaHeaders;
@@ -137,6 +124,7 @@ using spdy::RST_STREAM;
using spdy::SYN_REPLY;
using spdy::SYN_STREAM;
using spdy::SpdyControlFrame;
+using spdy::SpdySettingsControlFrame;
using spdy::SpdyDataFlags;
using spdy::SpdyDataFrame;
using spdy::SpdyRstStreamControlFrame;
@@ -149,6 +137,7 @@ using spdy::SpdyStreamId;
using spdy::SpdySynReplyControlFrame;
using spdy::SpdySynStreamControlFrame;
+FlipConfig g_proxy_config;
////////////////////////////////////////////////////////////////////////////////
@@ -156,11 +145,21 @@ void PrintSslError() {
char buf[128]; // this buffer must be at least 120 chars long.
int error_num = ERR_get_error();
while (error_num != 0) {
- LOG(INFO)<< ERR_error_string(error_num, buf);
+ LOG(ERROR) << ERR_error_string(error_num, buf);
error_num = ERR_get_error();
}
}
+static int ssl_set_npn_callback(SSL *s,
+ const unsigned char **data,
+ unsigned int *len,
+ void *arg)
+{
+ VLOG(1) << "SSL NPN callback: advertising protocols.";
+ *data = (const unsigned char *) NEXT_PROTO_STRING;
+ *len = strlen(NEXT_PROTO_STRING);
+ return SSL_TLSEXT_ERR_OK;
+}
////////////////////////////////////////////////////////////////////////////////
// Creates a socket with domain, type and protocol parameters.
@@ -172,26 +171,6 @@ int CreateSocket(int domain, int type, int protocol, int *fd) {
return (*fd == -1) ? errno : 0;
}
-////////////////////////////////////////////////////////////////////////////////
-
-// Sets an FD to be nonblocking.
-void SetNonBlocking(int fd) {
- DCHECK(fd >= 0);
-
- int fcntl_return = fcntl(fd, F_GETFL, 0);
- CHECK_NE(fcntl_return, -1)
- << "error doing fcntl(fd, F_GETFL, 0) fd: " << fd
- << " errno=" << errno;
-
- if (fcntl_return & O_NONBLOCK)
- return;
-
- fcntl_return = fcntl(fd, F_SETFL, fcntl_return | O_NONBLOCK);
- CHECK_NE(fcntl_return, -1)
- << "error doing fcntl(fd, F_SETFL, fcntl_return) fd: " << fd
- << " errno=" << errno;
-}
-
// Encode the URL.
string EncodeURL(string uri, string host, string method) {
if (!FLAGS_need_to_encode_url) {
@@ -212,20 +191,16 @@ string EncodeURL(string uri, string host, string method) {
////////////////////////////////////////////////////////////////////////////////
-
-struct GlobalSSLState {
+struct SSLState {
SSL_METHOD* ssl_method;
SSL_CTX* ssl_ctx;
};
-////////////////////////////////////////////////////////////////////////////////
-
-GlobalSSLState* global_ssl_state = NULL;
-
-////////////////////////////////////////////////////////////////////////////////
-
// SSL stuff
-void spdy_init_ssl(GlobalSSLState* state) {
+void spdy_init_ssl(SSLState* state,
+ string ssl_cert_name,
+ string ssl_key_name,
+ bool use_npn) {
SSL_library_init();
PrintSslError();
@@ -241,13 +216,13 @@ void spdy_init_ssl(GlobalSSLState* state) {
// Disable SSLv2 support.
SSL_CTX_set_options(state->ssl_ctx, SSL_OP_NO_SSLv2);
if (SSL_CTX_use_certificate_file(state->ssl_ctx,
- FLAGS_ssl_cert_name.c_str(),
+ ssl_cert_name.c_str(),
SSL_FILETYPE_PEM) <= 0) {
PrintSslError();
LOG(FATAL) << "Unable to use cert.pem as SSL cert.";
}
if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx,
- FLAGS_ssl_key_name.c_str(),
+ ssl_key_name.c_str(),
SSL_FILETYPE_PEM) <= 0) {
PrintSslError();
LOG(FATAL) << "Unable to use key.pem as SSL key.";
@@ -256,6 +231,21 @@ void spdy_init_ssl(GlobalSSLState* state) {
PrintSslError();
LOG(FATAL) << "The cert.pem and key.pem files don't match";
}
+ if (use_npn) {
+ SSL_CTX_set_next_protos_advertised_cb(state->ssl_ctx,
+ ssl_set_npn_callback, NULL);
+ }
+ VLOG(1) << "SSL CTX default cipher list: " << SSL_CTX_DEFAULT_CIPHER_LIST;
+ SSL_CTX_set_cipher_list(state->ssl_ctx, SSL_CTX_DEFAULT_CIPHER_LIST);
+
+ VLOG(1) << "SSL CTX session expiry: " << g_proxy_config.ssl_session_expiry_
+ << " seconds";
+ SSL_CTX_set_timeout(state->ssl_ctx, g_proxy_config.ssl_session_expiry_);
+
+#ifdef SSL_MODE_RELEASE_BUFFERS
+ VLOG(1) << "SSL CTX: Setting Release Buffers mode.";
+ SSL_CTX_set_mode(state->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
+#endif
}
SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) {
@@ -264,6 +254,7 @@ SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) {
SSL_set_accept_state(ssl);
PrintSslError();
+
return ssl;
}
@@ -404,7 +395,6 @@ class MemoryCache {
}
void AddFiles() {
- LOG(INFO) << "Adding files!";
deque<string> paths;
cwd_ = FLAGS_cache_base_dir;
paths.push_back(cwd_ + "/GET_");
@@ -580,7 +570,6 @@ class MemoryCache {
LOG(ERROR) << "Skipping subresource with unknown content-type";
return;
}
-
// Now, lets see if this is the same host or not
bool same_host = (UrlUtilities::GetUrlHost(referrer) ==
UrlUtilities::GetUrlHost(url));
@@ -673,8 +662,6 @@ class MemoryCache {
}
};
-////////////////////////////////////////////////////////////////////////////////
-
class NotifierInterface {
public:
virtual ~NotifierInterface() {}
@@ -683,16 +670,33 @@ class NotifierInterface {
////////////////////////////////////////////////////////////////////////////////
+class SMConnectionPoolInterface;
+
class SMInterface {
public:
- virtual size_t ProcessInput(const char* data, size_t len) = 0;
+ virtual void InitSMInterface(SMInterface* sm_other_interface,
+ int32 server_idx) = 0;
+ virtual void InitSMConnection(SMConnectionPoolInterface* connection_pool,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ int fd,
+ bool use_ssl) = 0;
+ virtual size_t ProcessReadInput(const char* data, size_t len) = 0;
+ virtual size_t ProcessWriteInput(const char* data, size_t len) = 0;
+ virtual void SetStreamID(uint32 stream_id) = 0;
virtual bool MessageFullyRead() const = 0;
virtual bool Error() const = 0;
virtual const char* ErrorAsString() const = 0;
virtual void Reset() = 0;
+ virtual void ResetForNewInterface(int32 server_idx) = 0;
+ // ResetForNewConnection is used for interfaces which control SMConnection
+ // objects. When called an interface may put its connection object into
+ // a reusable instance pool. Currently this is what the HttpSM interface
+ // does.
virtual void ResetForNewConnection() = 0;
+ virtual void Cleanup() = 0;
- virtual void PostAcceptHook() = 0;
+ virtual int PostAcceptHook() = 0;
virtual void NewStream(uint32 stream_id, uint32 priority,
const string& filename) = 0;
@@ -709,65 +713,98 @@ class SMInterface {
virtual ~SMInterface() {}
};
-////////////////////////////////////////////////////////////////////////////////
+class SMConnectionInterface {
+ public:
+ virtual ~SMConnectionInterface() {}
+ virtual void ReadyToSend() = 0;
+ virtual EpollServer* epoll_server() = 0;
+};
-class SMServerConnection;
-typedef SMInterface*(SMInterfaceFactory)(SMServerConnection*);
-
-////////////////////////////////////////////////////////////////////////////////
+class HttpSM;
+class SMConnection;
typedef list<DataFrame> OutputList;
-////////////////////////////////////////////////////////////////////////////////
-
-class SMServerConnection;
-
-class SMServerConnectionPoolInterface {
+class SMConnectionPoolInterface {
public:
- virtual ~SMServerConnectionPoolInterface() {}
- // SMServerConnections will use this:
- virtual void SMServerConnectionDone(SMServerConnection* connection) = 0;
+ virtual ~SMConnectionPoolInterface() {}
+ // SMConnections will use this:
+ virtual void SMConnectionDone(SMConnection* connection) = 0;
};
-////////////////////////////////////////////////////////////////////////////////
-
-class SMServerConnection: public EpollCallbackInterface,
- public NotifierInterface {
- private:
- SMServerConnection(SMInterfaceFactory* sm_interface_factory,
- MemoryCache* memory_cache,
- EpollServer* epoll_server) :
- fd_(-1),
- events_(0),
-
- registered_in_epoll_server_(false),
- initialized_(false),
+SMInterface* NewStreamerSM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ FlipAcceptor* acceptor);
- connection_pool_(NULL),
- epoll_server_(epoll_server),
+SMInterface* NewSpdySM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor);
- read_buffer_(4096*10),
- memory_cache_(memory_cache),
- sm_interface_(sm_interface_factory(this)),
+SMInterface* NewHttpSM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor);
- max_bytes_sent_per_dowrite_(4096),
+////////////////////////////////////////////////////////////////////////////////
- ssl_(NULL) {}
+class SMConnection: public SMConnectionInterface,
+ public EpollCallbackInterface,
+ public NotifierInterface {
+ private:
+ SMConnection(EpollServer* epoll_server,
+ SSLState* ssl_state,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor,
+ string log_prefix)
+ : fd_(-1),
+ events_(0),
+ registered_in_epoll_server_(false),
+ initialized_(false),
+ protocol_detected_(false),
+ connection_complete_(false),
+ connection_pool_(NULL),
+ epoll_server_(epoll_server),
+ ssl_state_(ssl_state),
+ memory_cache_(memory_cache),
+ acceptor_(acceptor),
+ read_buffer_(4096*10),
+ sm_spdy_interface_(NULL),
+ sm_http_interface_(NULL),
+ sm_streamer_interface_(NULL),
+ sm_interface_(NULL),
+ log_prefix_(log_prefix),
+ max_bytes_sent_per_dowrite_(4096),
+ ssl_(NULL)
+ {}
int fd_;
int events_;
bool registered_in_epoll_server_;
bool initialized_;
+ bool protocol_detected_;
+ bool connection_complete_;
- SMServerConnectionPoolInterface* connection_pool_;
- EpollServer* epoll_server_;
+ SMConnectionPoolInterface* connection_pool_;
+
+ EpollServer *epoll_server_;
+ SSLState *ssl_state_;
+ MemoryCache* memory_cache_;
+ FlipAcceptor *acceptor_;
+ string client_ip_;
RingBuffer read_buffer_;
OutputList output_list_;
- MemoryCache* memory_cache_;
+ SMInterface* sm_spdy_interface_;
+ SMInterface* sm_http_interface_;
+ SMInterface* sm_streamer_interface_;
SMInterface* sm_interface_;
+ string log_prefix_;
size_t max_bytes_sent_per_dowrite_;
@@ -777,45 +814,101 @@ class SMServerConnection: public EpollCallbackInterface,
OutputList* output_list() { return &output_list_; }
MemoryCache* memory_cache() { return memory_cache_; }
void ReadyToSend() {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Setting ready to send: EPOLLIN | EPOLLOUT";
epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
}
void EnqueueDataFrame(const DataFrame& df) {
output_list_.push_back(df);
- VLOG(2) << "EnqueueDataFrame. Setting FD ready.";
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: "
+ << "size = " << df.size << ": Setting FD ready.";
ReadyToSend();
}
+ int fd() { return fd_; }
public:
- ~SMServerConnection() {
+ ~SMConnection() {
if (initialized()) {
Reset();
}
}
- static SMServerConnection* NewSMServerConnection(SMInterfaceFactory* smif,
- MemoryCache* memory_cache,
- EpollServer* epoll_server) {
- return new SMServerConnection(smif, memory_cache, epoll_server);
+ static SMConnection* NewSMConnection(EpollServer* epoll_server,
+ SSLState *ssl_state,
+ MemoryCache* memory_cache,
+ FlipAcceptor *acceptor,
+ string log_prefix) {
+ return new SMConnection(epoll_server, ssl_state, memory_cache,
+ acceptor, log_prefix);
}
bool initialized() const { return initialized_; }
+ string client_ip() const { return client_ip_; }
- void InitSMServerConnection(SMServerConnectionPoolInterface* connection_pool,
- EpollServer* epoll_server,
- int fd) {
+ void InitSMConnection(SMConnectionPoolInterface* connection_pool,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ int fd,
+ bool use_ssl) {
if (initialized_) {
LOG(FATAL) << "Attempted to initialize already initialized server";
return;
}
- if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) {
- epoll_server_->UnregisterFD(fd_);
- }
- if (fd_ != -1) {
- VLOG(2) << "Closing pre-existing fd";
- close(fd_);
- fd_ = -1;
- }
- fd_ = fd;
+ if (fd == -1) {
+ // If fd == -1, then we are initializing a new connection that will
+ // connect to the backend.
+ //
+ // ret: -1 == error
+ // 0 == connection in progress
+ // 1 == connection complete
+ // TODO: is_numeric_host_address value needs to be detected
+ int ret = net::CreateConnectedSocket(&fd_,
+ acceptor_->server_ip_,
+ acceptor_->server_port_,
+ true,
+ acceptor_->disable_nagle_);
+
+ if (ret < 0) {
+ LOG(ERROR) << "-1 Could not create connected socket";
+ return;
+ } else if (ret == 1) {
+ DCHECK_NE(-1, fd_);
+ connection_complete_ = true;
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Connection complete to: " << ACCEPTOR_SERVER_IDENT;
+ }
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Connecting to server: " << ACCEPTOR_SERVER_IDENT;
+ } else {
+ // If fd != -1 then we are initializing a connection that has just been
+ // accepted from the listen socket.
+ connection_complete_ = true;
+ if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) {
+ epoll_server_->UnregisterFD(fd_);
+ }
+ if (fd_ != -1) {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Closing pre-existing fd";
+ close(fd_);
+ fd_ = -1;
+ }
+
+ fd_ = fd;
+ struct sockaddr sock_addr;
+ socklen_t addr_size = sizeof(sock_addr);
+ addr_size = sizeof(sock_addr);
+ int res = getsockname(fd_, &sock_addr, &addr_size);
+ if (res < 0) {
+ LOG(ERROR) << "Could not get socket address for fd " << fd_
+ << ": getsockname: " << strerror(errno);
+ } else {
+ struct sockaddr_in *sock_addr_in = (struct sockaddr_in *)&sock_addr;
+ char ip[16];
+ snprintf(ip, sizeof(ip), "%d.%d.%d.%d",
+ IPV4_PRINTABLE_FORMAT(sock_addr_in->sin_addr.s_addr));
+ client_ip_ = ip;
+ }
+ }
registered_in_epoll_server_ = false;
initialized_ = true;
@@ -823,21 +916,42 @@ class SMServerConnection: public EpollCallbackInterface,
connection_pool_ = connection_pool;
epoll_server_ = epoll_server;
- sm_interface_->Reset();
+ if (sm_interface) {
+ sm_interface_ = sm_interface;
+ protocol_detected_ = true;
+ }
+
read_buffer_.Clear();
epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET);
- if (global_ssl_state) {
- ssl_ = spdy_new_ssl(global_ssl_state->ssl_ctx);
+ if (use_ssl) {
+ ssl_ = spdy_new_ssl(ssl_state_->ssl_ctx);
SSL_set_fd(ssl_, fd_);
PrintSslError();
}
- sm_interface_->PostAcceptHook();
}
int Send(const char* bytes, int len, int flags) {
- return send(fd_, bytes, len, flags);
+ ssize_t bytes_written = 0;
+ if (ssl_) {
+ bytes_written = SSL_write(ssl_, bytes, len);
+ if (bytes_written < 0) {
+ switch(SSL_get_error(ssl_, bytes_written)) {
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_ACCEPT:
+ case SSL_ERROR_WANT_CONNECT:
+ return -2;
+ default:
+ PrintSslError();
+ break;
+ }
+ }
+ } else {
+ bytes_written = send(fd_, bytes, len, flags);
+ }
+ return bytes_written;
}
// the following are from the EpollCallbackInterface
@@ -861,43 +975,95 @@ class SMServerConnection: public EpollCallbackInterface,
return;
}
+ void Cleanup(const char* cleanup) {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup";
+ if (!initialized_) {
+ return;
+ }
+ Reset();
+ if (connection_pool_) {
+ connection_pool_->SMConnectionDone(this);
+ }
+ if (sm_interface_) {
+ sm_interface_->ResetForNewConnection();
+ }
+ }
+
private:
void HandleEvents() {
- VLOG(1) << "Received: " << EpollServer::EventMaskToString(events_);
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: "
+ << EpollServer::EventMaskToString(events_).c_str();
+
if (events_ & EPOLLIN) {
if (!DoRead())
goto handle_close_or_error;
}
if (events_ & EPOLLOUT) {
+ // Check if we have connected or not
+ if (connection_complete_ == false) {
+ int sock_error;
+ socklen_t sock_error_len = sizeof(sock_error);
+ int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error,
+ &sock_error_len);
+ if (ret != 0) {
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "getsockopt error: " << errno << ": " << strerror(errno);
+ goto handle_close_or_error;
+ }
+ if (sock_error == 0) {
+ connection_complete_ = true;
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Connection complete to " << ACCEPTOR_SERVER_IDENT;
+ } else if (sock_error == EINPROGRESS) {
+ return;
+ } else {
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "error connecting to server";
+ goto handle_close_or_error;
+ }
+ }
if (!DoWrite())
goto handle_close_or_error;
}
if (events_ & (EPOLLHUP | EPOLLERR)) {
- VLOG(2) << "!!!! Got HUP or ERR";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR";
goto handle_close_or_error;
}
return;
- handle_close_or_error:
+ handle_close_or_error:
Cleanup("HandleEvents");
}
bool DoRead() {
- VLOG(2) << "DoRead()";
- if (fd_ == -1) {
- VLOG(2) << "DoRead(): fd_ == -1. Invalid FD. Returning false";
- return false;
- }
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()";
while (!read_buffer_.Full()) {
char* bytes;
int size;
+ if (fd_ == -1) {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "DoRead(): fd_ == -1. Invalid FD. Returning false";
+ return false;
+ }
read_buffer_.GetWritablePtr(&bytes, &size);
ssize_t bytes_read = 0;
if (ssl_) {
bytes_read = SSL_read(ssl_, bytes, size);
- PrintSslError();
+ if (bytes_read < 0) {
+ switch(SSL_get_error(ssl_, bytes_read)) {
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_ACCEPT:
+ case SSL_ERROR_WANT_CONNECT:
+ events_ &= ~EPOLLIN;
+ goto done;
+ default:
+ PrintSslError();
+ break;
+ }
+ }
} else {
bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT);
}
@@ -906,33 +1072,107 @@ class SMServerConnection: public EpollCallbackInterface,
switch (stored_errno) {
case EAGAIN:
events_ &= ~EPOLLIN;
- VLOG(2) << "Got EAGAIN while reading";
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Got EAGAIN while reading";
goto done;
case EINTR:
- VLOG(2) << "Got EINTR while reading";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Got EINTR while reading";
continue;
default:
- VLOG(2) << "While calling recv, got error: " << stored_errno
- << " " << strerror(stored_errno);
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "While calling recv, got error: "
+ << (ssl_?"(ssl error)":strerror(stored_errno));
goto error_or_close;
}
} else if (bytes_read > 0) {
- VLOG(2) << "Read: " << bytes_read << " bytes from fd: " << fd_;
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read
+ << " bytes";
+ if (!protocol_detected_) {
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
+ // Http Server
+ protocol_detected_ = true;
+ if (!sm_http_interface_) {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Created HTTP interface.";
+ sm_http_interface_ = NewHttpSM(this, NULL, epoll_server_,
+ memory_cache_, acceptor_);
+ } else {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Reusing HTTP interface.";
+ }
+ sm_interface_ = sm_http_interface_;
+ } else if (ssl_) {
+ protocol_detected_ = true;
+ if (SSL_session_reused(ssl_) == 0) {
+ VLOG(1) << "Session status: renegotiated";
+ } else {
+ VLOG(1) << "Session status: resumed";
+ }
+ const unsigned char *npn_proto;
+ unsigned int npn_proto_len;
+ SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len);
+ if (npn_proto_len > 0) {
+ string npn_proto_str((const char *)npn_proto, npn_proto_len);
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "NPN protocol detected: " << npn_proto_str;
+ } else {
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "NPN protocol detected: none";
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) {
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "NPN protocol: Could not negotiate SPDY protocol.";
+ goto error_or_close;
+ }
+ }
+ if (npn_proto_len > 0 &&
+ !strncmp((char *)npn_proto, "spdy/2", npn_proto_len)) {
+ if (!sm_spdy_interface_) {
+ sm_spdy_interface_ = NewSpdySM(this, NULL, epoll_server_,
+ memory_cache_, acceptor_);
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Created SPDY interface.";
+ } else {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Reusing SPDY interface.";
+ }
+ sm_interface_ = sm_spdy_interface_;
+ } else {
+ if (!sm_streamer_interface_) {
+ sm_streamer_interface_ = NewStreamerSM(this, NULL,
+ epoll_server_,
+ acceptor_);
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Created Streamer interface.";
+ } else {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Reusing Streamer interface: ";
+ }
+ sm_interface_ = sm_streamer_interface_;
+ }
+ }
+ if (sm_interface_->PostAcceptHook() == 0) {
+ goto error_or_close;
+ }
+ }
read_buffer_.AdvanceWritablePtr(bytes_read);
if (!DoConsumeReadData()) {
goto error_or_close;
}
continue;
} else { // bytes_read == 0
- VLOG(2) << "0 bytes read with recv call.";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "0 bytes read with recv call.";
}
goto error_or_close;
}
done:
return true;
- error_or_close:
- VLOG(2) << "DoRead(): error_or_close. Cleaning up, then returning false";
+ error_or_close:
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "DoRead(): error_or_close. "
+ << "Cleaning up, then returning false";
Cleanup("DoRead");
return false;
}
@@ -942,19 +1182,21 @@ class SMServerConnection: public EpollCallbackInterface,
int size;
read_buffer_.GetReadablePtr(&bytes, &size);
while (size != 0) {
- size_t bytes_consumed = sm_interface_->ProcessInput(bytes, size);
- VLOG(2) << "consumed: " << bytes_consumed << " from socket fd: " << fd_;
+ size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size);
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed "
+ << bytes_consumed << " bytes";
if (bytes_consumed == 0) {
break;
}
read_buffer_.AdvanceReadablePtr(bytes_consumed);
if (sm_interface_->MessageFullyRead()) {
- VLOG(2) << "HandleRequestFullyRead";
- HandleRequestFullyRead();
- sm_interface_->Reset();
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "HandleRequestFullyRead: Setting EPOLLOUT";
+ HandleResponseFullyRead();
events_ |= EPOLLOUT;
} else if (sm_interface_->Error()) {
- LOG(ERROR) << "Framer error detected: "
+ LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Framer error detected: Setting EPOLLOUT: "
<< sm_interface_->ErrorAsString();
// this causes everything to be closed/cleaned up.
events_ |= EPOLLOUT;
@@ -970,7 +1212,8 @@ class SMServerConnection: public EpollCallbackInterface,
// feeding files into the output buffer.
}
- void HandleRequestFullyRead() {
+ void HandleResponseFullyRead() {
+ sm_interface_->Cleanup();
}
void Notify() {
@@ -980,20 +1223,29 @@ class SMServerConnection: public EpollCallbackInterface,
size_t bytes_sent = 0;
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
if (fd_ == -1) {
- VLOG(2) << "DoWrite: fd == -1. Returning false.";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "DoWrite: fd == -1. Returning false.";
return false;
}
if (output_list_.empty()) {
- sm_interface_->GetOutput();
- if (output_list_.empty())
+ VLOG(2) << log_prefix_ << "DoWrite: Output list empty.";
+ if (sm_interface_) {
+ sm_interface_->GetOutput();
+ }
+ if (output_list_.empty()) {
events_ &= ~EPOLLOUT;
+ }
}
while (!output_list_.empty()) {
+ VLOG(2) << log_prefix_ << "DoWrite: Items in output list: "
+ << output_list_.size();
if (bytes_sent >= max_bytes_sent_per_dowrite_) {
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << " byte sent >= max bytes sent per write: Setting EPOLLOUT";
events_ |= EPOLLOUT;
break;
}
- if (output_list_.size() < 2) {
+ if (sm_interface_ && output_list_.size() < 2) {
sm_interface_->GetOutput();
}
DataFrame& data_frame = output_list_.front();
@@ -1003,6 +1255,11 @@ class SMServerConnection: public EpollCallbackInterface,
size -= data_frame.index;
DCHECK_GE(size, 0);
if (size <= 0) {
+ // Empty data frame. Indicates end of data from client.
+ // Uncork the socket.
+ int state = 0;
+ VLOG(2) << log_prefix_ << "Empty data frame, uncorking socket.";
+ setsockopt( fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof( state ) );
data_frame.MaybeDelete();
output_list_.pop_front();
continue;
@@ -1010,56 +1267,63 @@ class SMServerConnection: public EpollCallbackInterface,
flags = MSG_NOSIGNAL | MSG_DONTWAIT;
if (output_list_.size() > 1) {
+ VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size()
+ << ": Adding MSG_MORE flag";
flags |= MSG_MORE;
}
- ssize_t bytes_written = 0;
- if (ssl_) {
- bytes_written = SSL_write(ssl_, bytes, size);
- PrintSslError();
- } else {
- bytes_written = send(fd_, bytes, size, flags);
- }
+ VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes.";
+ ssize_t bytes_written = Send(bytes, size, flags);
int stored_errno = errno;
if (bytes_written == -1) {
switch (stored_errno) {
case EAGAIN:
events_ &= ~EPOLLOUT;
- VLOG(2) << " Got EAGAIN while writing";
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Got EAGAIN while writing";
goto done;
case EINTR:
- VLOG(2) << " Got EINTR while writing";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "Got EINTR while writing";
continue;
default:
- VLOG(2) << "While calling send, got error: " << stored_errno
- << " " << strerror(stored_errno);
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "While calling send, got error: " << stored_errno
+ << ": " << (ssl_?"":strerror(stored_errno));
goto error_or_close;
}
} else if (bytes_written > 0) {
- VLOG(1) << "Wrote: " << bytes_written << " bytes to socket fd: "
- << fd_;
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: "
+ << bytes_written << " bytes";
data_frame.index += bytes_written;
bytes_sent += bytes_written;
continue;
+ } else if (bytes_written == -2) {
+ // -2 handles SSL_ERROR_WANT_* errors
+ events_ &= ~EPOLLOUT;
+ goto done;
}
- VLOG(2) << "0 bytes written to socket " << fd_ << " with send call.";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "0 bytes written with send call.";
goto error_or_close;
}
done:
return true;
error_or_close:
- VLOG(2) << "DoWrite: error_or_close. Returning false after cleaning up";
+ VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
+ << "DoWrite: error_or_close. Returning false "
+ << "after cleaning up";
Cleanup("DoWrite");
return false;
}
- friend ostream& operator<<(ostream& os, const SMServerConnection& c) {
+ friend ostream& operator<<(ostream& os, const SMConnection& c) {
os << &c << "\n";
return os;
}
void Reset() {
- VLOG(2) << "Resetting";
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting";
if (ssl_) {
SSL_shutdown(ssl_);
PrintSslError();
@@ -1071,25 +1335,17 @@ class SMServerConnection: public EpollCallbackInterface,
registered_in_epoll_server_ = false;
}
if (fd_ >= 0) {
- VLOG(2) << "Closing connection";
+ VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection";
close(fd_);
fd_ = -1;
}
- sm_interface_->ResetForNewConnection();
read_buffer_.Clear();
initialized_ = false;
+ protocol_detected_ = false;
events_ = 0;
output_list_.clear();
}
- void Cleanup(const char* cleanup) {
- VLOG(2) << "Cleaning up: " << cleanup;
- if (!initialized_) {
- return;
- }
- Reset();
- connection_pool_->SMServerConnectionDone(this);
- }
};
////////////////////////////////////////////////////////////////////////////////
@@ -1114,13 +1370,15 @@ class OutputOrdering {
PriorityRing first_data_senders_;
uint32 first_data_senders_threshold_; // when you've passed this, you're no
// longer a first_data_sender...
- SMServerConnection* connection_;
+ SMConnectionInterface* connection_;
EpollServer* epoll_server_;
- explicit OutputOrdering(SMServerConnection* connection) :
- first_data_senders_threshold_(kInitialDataSendersThreshold),
- connection_(connection),
- epoll_server_(connection->epoll_server()) {
+ explicit OutputOrdering(SMConnectionInterface* connection) :
+ first_data_senders_threshold_(kInitialDataSendersThreshold),
+ connection_(connection) {
+ if (connection) {
+ epoll_server_ = connection->epoll_server();
+ }
}
void Reset() {
@@ -1151,7 +1409,7 @@ class OutputOrdering {
int64 OnAlarm() {
OnUnregistration();
output_ordering_->MoveToActive(pmp_, mci_);
- VLOG(1) << "ON ALARM! Should now start to output...";
+ VLOG(2) << "ON ALARM! Should now start to output...";
delete this;
return 0;
}
@@ -1179,7 +1437,7 @@ class OutputOrdering {
};
void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) {
- VLOG(1) <<"Moving to active!";
+ VLOG(2) << "Moving to active!";
first_data_senders_.push_back(mci);
pmp->ring = &first_data_senders_;
pmp->it = first_data_senders_.end();
@@ -1189,9 +1447,9 @@ class OutputOrdering {
void AddToOutputOrder(const MemCacheIter& mci) {
if (ExistsInPriorityMaps(mci.stream_id))
- LOG(FATAL) << "OOps, already was inserted here?!";
+ LOG(ERROR) << "OOps, already was inserted here?!";
- double think_time_in_s = FLAGS_server_think_time_in_s;
+ double think_time_in_s = g_proxy_config.server_think_time_in_s_;
string x_server_latency =
mci.file_data->headers->GetHeader("X-Server-Latency").as_string();
if (x_server_latency.size() != 0) {
@@ -1199,7 +1457,8 @@ class OutputOrdering {
double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp);
if (endp != x_server_latency.c_str() + x_server_latency.size()) {
LOG(ERROR) << "Unable to understand X-Server-Latency of: "
- << x_server_latency << " for resource: " << mci.file_data->filename;
+ << x_server_latency << " for resource: "
+ << mci.file_data->filename.c_str();
} else {
think_time_in_s = tmp_think_time_in_s;
}
@@ -1211,7 +1470,7 @@ class OutputOrdering {
PriorityMapPointer& pmp = sitpmi->second;
BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci);
- VLOG(2) << "Server think time: " << think_time_in_s;
+ VLOG(1) << "Server think time: " << think_time_in_s;
epoll_server_->RegisterAlarmApproximateDelta(
think_time_in_s * 1000000, boa);
}
@@ -1275,34 +1534,169 @@ class OutputOrdering {
}
};
+
////////////////////////////////////////////////////////////////////////////////
class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
private:
uint64 seq_num_;
- SpdyFramer* framer_;
+ SpdyFramer* spdy_framer_;
- SMServerConnection* connection_;
- OutputList* output_list_;
- OutputOrdering output_ordering_;
- MemoryCache* memory_cache_;
+ SMConnection* connection_;
+ OutputList* client_output_list_;
+ OutputOrdering client_output_ordering_;
uint32 next_outgoing_stream_id_;
+ EpollServer* epoll_server_;
+ FlipAcceptor* acceptor_;
+ MemoryCache* memory_cache_;
+ vector<SMInterface*> server_interface_list;
+ vector<int32> unused_server_interface_list;
+ typedef map<uint32,SMInterface*> StreamToSmif;
+ StreamToSmif stream_to_smif_;
public:
- explicit SpdySM(SMServerConnection* connection) :
- seq_num_(0),
- framer_(new SpdyFramer),
- connection_(connection),
- output_list_(connection->output_list()),
- output_ordering_(connection),
- memory_cache_(connection->memory_cache()),
- next_outgoing_stream_id_(2) {
- framer_->set_visitor(this);
+ SpdySM(SMConnection* connection,
+ SMInterface* sm_http_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor)
+ : seq_num_(0),
+ spdy_framer_(new SpdyFramer),
+ connection_(connection),
+ client_output_list_(connection->output_list()),
+ client_output_ordering_(connection),
+ next_outgoing_stream_id_(2),
+ epoll_server_(epoll_server),
+ acceptor_(acceptor),
+ memory_cache_(memory_cache) {
+ spdy_framer_->set_visitor(this);
}
+
+ ~SpdySM() {
+ delete spdy_framer_;
+ }
+
+ void InitSMInterface(SMInterface* sm_http_interface,
+ int32 server_idx) { }
+
+ void InitSMConnection(SMConnectionPoolInterface* connection_pool,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ int fd,
+ bool use_ssl) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT
+ << "SpdySM: Initializing server connection.";
+ connection_->InitSMConnection(connection_pool, sm_interface,
+ epoll_server, fd, use_ssl);
+ }
+
private:
virtual void OnError(SpdyFramer* framer) {
/* do nothing with this right now */
}
+ SMInterface* NewConnectionInterface() {
+ SMConnection* server_connection =
+ SMConnection::NewSMConnection(epoll_server_, NULL,
+ memory_cache_, acceptor_,
+ "http_conn: ");
+ if (server_connection == NULL) {
+ LOG(ERROR) << "SpdySM: Could not create server connection";
+ return NULL;
+ }
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface";
+ SMInterface *sm_http_interface = NewHttpSM(server_connection, this,
+ epoll_server_, memory_cache_,
+ acceptor_);
+ return sm_http_interface;
+ }
+
+ SMInterface* FindOrMakeNewSMConnectionInterface() {
+ SMInterface *sm_http_interface;
+ int32 server_idx;
+ if (unused_server_interface_list.empty()) {
+ sm_http_interface = NewConnectionInterface();
+ server_idx = server_interface_list.size();
+ server_interface_list.push_back(sm_http_interface);
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT
+ << "SpdySM: Making new server connection on index: "
+ << server_idx;
+ } else {
+ server_idx = unused_server_interface_list.back();
+ unused_server_interface_list.pop_back();
+ sm_http_interface = server_interface_list.at(server_idx);
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on "
+ << "index: " << server_idx;
+ }
+
+ sm_http_interface->InitSMInterface(this, server_idx);
+ sm_http_interface->InitSMConnection(NULL, sm_http_interface,
+ epoll_server_, -1, false);
+
+ return sm_http_interface;
+ }
+
+ int SpdyHandleNewStream(const SpdyControlFrame* frame,
+ string *http_data)
+ {
+ bool parsed_headers = false;
+ SpdyHeaderBlock headers;
+ const SpdySynStreamControlFrame* syn_stream =
+ reinterpret_cast<const SpdySynStreamControlFrame*>(frame);
+
+ parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers);
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn("
+ << syn_stream->stream_id() << ")";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: headers parsed?: "
+ << (parsed_headers? "yes": "no");
+ if (parsed_headers) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: "
+ << headers.size();
+ }
+ SpdyHeaderBlock::iterator url = headers.find("url");
+ SpdyHeaderBlock::iterator method = headers.find("method");
+ if (url == headers.end() || method == headers.end()) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: didn't find method or url "
+ << "or method. Not creating stream";
+ return 0;
+ }
+
+ string uri = UrlUtilities::GetUrlPath(url->second);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) {
+ SpdyHeaderBlock::iterator referer = headers.find("referer");
+ if (referer != headers.end() && method->second == "GET") {
+ memory_cache_->UpdateHeaders(referer->second, url->second);
+ }
+ string host = UrlUtilities::GetUrlHost(url->second);
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second
+ << " " << uri;
+ string filename = EncodeURL(uri, host, method->second);
+ NewStream(syn_stream->stream_id(),
+ reinterpret_cast<const SpdySynStreamControlFrame*>(frame)->
+ priority(),
+ filename);
+ } else {
+ SpdyHeaderBlock::iterator version = headers.find("version");
+ *http_data += method->second + " " + uri + " " + version->second + "\r\n";
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " "
+ << uri << " " << version->second;
+ for (SpdyHeaderBlock::iterator i = headers.begin();
+ i != headers.end(); ++i) {
+ *http_data += i->first + ": " + i->second + "\r\n";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":"
+ << i->second.c_str();
+ }
+ if (g_proxy_config.forward_ip_header_enabled_) {
+ // X-Client-Cluster-IP header
+ *http_data += g_proxy_config.forward_ip_header_ + ": " +
+ connection_->client_ip() + "\r\n";
+ }
+ *http_data += "\r\n";
+ }
+
+ VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data;
+ return 1;
+ }
+
virtual void OnControl(const SpdyControlFrame* frame) {
SpdyHeaderBlock headers;
bool parsed_headers = false;
@@ -1311,122 +1705,126 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
{
const SpdySynStreamControlFrame* syn_stream =
reinterpret_cast<const SpdySynStreamControlFrame*>(frame);
- parsed_headers = framer_->ParseHeaderBlock(frame, &headers);
- VLOG(2) << "OnSyn(" << syn_stream->stream_id() << ")";
- VLOG(2) << "headers parsed?: " << (parsed_headers? "yes": "no");
- if (parsed_headers) {
- VLOG(2) << "# headers: " << headers.size();
- }
- for (SpdyHeaderBlock::iterator i = headers.begin();
- i != headers.end();
- ++i) {
- VLOG(2) << i->first << ": " << i->second;
- }
- SpdyHeaderBlock::iterator method = headers.find("method");
- SpdyHeaderBlock::iterator url = headers.find("url");
- if (url == headers.end() || method == headers.end()) {
- VLOG(2) << "didn't find method or url or method. Not creating stream";
- break;
- }
+ string http_data;
+ int ret = SpdyHandleNewStream(frame, &http_data);
+ if (!ret) {
+ LOG(ERROR) << "SpdySM: Could not convert spdy into http.";
+ break;
+ }
- SpdyHeaderBlock::iterator referer = headers.find("referer");
- if (referer != headers.end() && method->second == "GET") {
- memory_cache_->UpdateHeaders(referer->second, url->second);
- }
- string uri = UrlUtilities::GetUrlPath(url->second);
- string host = UrlUtilities::GetUrlHost(url->second);
-
- string filename = EncodeURL(uri, host, method->second);
- NewStream(syn_stream->stream_id(),
- reinterpret_cast<const SpdySynStreamControlFrame*>(frame)->
- priority(),
- filename);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ SMInterface *sm_http_interface =
+ FindOrMakeNewSMConnectionInterface();
+ stream_to_smif_[syn_stream->stream_id()] = sm_http_interface;
+ sm_http_interface->SetStreamID(syn_stream->stream_id());
+ sm_http_interface->ProcessWriteInput(http_data.c_str(),
+ http_data.size());
+ }
}
break;
case SYN_REPLY:
- parsed_headers = framer_->ParseHeaderBlock(frame, &headers);
- VLOG(2) << "OnSynReply("
- << reinterpret_cast<const SpdySynReplyControlFrame*>(
- frame)->stream_id() << ")";
+ parsed_headers = spdy_framer_->ParseHeaderBlock(frame, &headers);
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" <<
+ reinterpret_cast<const SpdySynReplyControlFrame*>(frame)->stream_id()
+ << ")";
break;
case RST_STREAM:
{
const SpdyRstStreamControlFrame* rst_stream =
reinterpret_cast<const SpdyRstStreamControlFrame*>(frame);
- VLOG(2) << "OnRst(" << rst_stream->stream_id() << ")";
- output_ordering_.RemoveStreamId(rst_stream ->stream_id());
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRst("
+ << rst_stream->stream_id() << ")";
+ client_output_ordering_.RemoveStreamId(rst_stream ->stream_id());
}
break;
default:
- LOG(DFATAL) << "Unknown control frame type";
+ LOG(ERROR) << "SpdySM: Unknown control frame type";
}
}
- virtual void OnStreamFrameData(
- SpdyStreamId stream_id,
- const char* data, size_t len) {
- VLOG(2) << "StreamData(" << stream_id << ", [" << len << "])";
- /* do nothing with this right now */
- }
- virtual void OnLameDuck() {
- /* do nothing with this right now */
+ virtual void OnStreamFrameData(SpdyStreamId stream_id,
+ const char* data, size_t len) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id
+ << ", [" << len << "])";
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ stream_to_smif_[stream_id]->ProcessWriteInput(data, len);
+ }
}
public:
- ~SpdySM() {
- Reset();
+ size_t ProcessReadInput(const char* data, size_t len) {
+ return spdy_framer_->ProcessInput(data, len);
}
- size_t ProcessInput(const char* data, size_t len) {
- return framer_->ProcessInput(data, len);
+
+ size_t ProcessWriteInput(const char* data, size_t len) {
+ return 0;
}
bool MessageFullyRead() const {
- return framer_->MessageFullyRead();
+ return spdy_framer_->MessageFullyRead();
}
+ void SetStreamID(uint32 stream_id) {}
+
bool Error() const {
- return framer_->HasError();
+ return spdy_framer_->HasError();
}
const char* ErrorAsString() const {
- return SpdyFramer::ErrorCodeToString(framer_->error_code());
+ DCHECK(Error());
+ return SpdyFramer::ErrorCodeToString(spdy_framer_->error_code());
+ }
+
+ void Reset() {
+ }
+
+ void ResetForNewInterface(int32 server_idx) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: "
+ << "server_idx: " << server_idx;
+ unused_server_interface_list.push_back(server_idx);
}
- void Reset() {}
void ResetForNewConnection() {
// seq_num is not cleared, intentionally.
- delete framer_;
- framer_ = new SpdyFramer;
- framer_->set_visitor(this);
- output_ordering_.Reset();
+ delete spdy_framer_;
+ spdy_framer_ = new SpdyFramer;
+ spdy_framer_->set_visitor(this);
+ client_output_ordering_.Reset();
next_outgoing_stream_id_ = 2;
}
- // Send a couple of NOOP packets to force opening of cwnd.
- void PostAcceptHook() {
- if (!FLAGS_use_cwnd_opener)
- return;
-
- // We send 2 because that is the initial cwnd, and also because
- // we have to in order to get an ACK back from the client due to
- // delayed ACK.
- const int kPkts = 2;
-
- LOG(ERROR) << "Sending NOP FRAMES";
-
- scoped_ptr<SpdyControlFrame> frame(SpdyFramer::CreateNopFrame());
- for (int i = 0; i < kPkts; ++i) {
- char* bytes = frame->data();
- size_t size = SpdyFrame::size();
- ssize_t bytes_written = connection_->Send(bytes, size, MSG_DONTWAIT);
- if (static_cast<size_t>(bytes_written) != size) {
- LOG(ERROR) << "Trouble sending Nop packet! (" << errno << ")";
- if (errno == EAGAIN)
- break;
+ // SMInterface's Cleanup is currently only called by SMConnection after a
+ // protocol message as been fully read. Spdy's SMInterface does not need
+ // to do any cleanup at this time.
+ // TODO (klindsay) This method is probably not being used properly and
+ // some logic review and method renaming is probably in order.
+ void Cleanup() {}
+
+ // Send a settings frame and possibly some NOOP packets to force
+ // opening of cwnd
+ int PostAcceptHook() {
+ ssize_t bytes_written;
+ spdy::SpdySettings settings;
+ spdy::SettingsFlagsAndId settings_id(0);
+ settings_id.set_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS);
+ settings.push_back(spdy::SpdySetting(settings_id, 100));
+ scoped_ptr<SpdySettingsControlFrame>
+ settings_frame(spdy_framer_->CreateSettings(settings));
+
+ char* bytes = settings_frame->data();
+ size_t size = SpdyFrame::size() + settings_frame->length();
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame";
+ bytes_written = connection_->Send(bytes, size,
+ MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (static_cast<size_t>(bytes_written) != size) {
+ LOG(ERROR) << "Trouble sending SETTINGS frame! (" << errno << ")";
+ if (errno == EAGAIN) {
+ return 0;
}
}
+ return 1;
}
void AddAssociatedContent(FileData* file_data) {
@@ -1436,10 +1834,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
string filename = "GET_";
filename += related_file.second;
if (!memory_cache_->AssignFileData(filename, &mci)) {
- VLOG(1) << "Unable to find associated content for: " << filename;
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Unable to find associated "
+ << "content for: " << filename;
continue;
}
- VLOG(1) << "Adding associated content: " << filename;
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Adding associated content: "
+ << filename;
mci.stream_id = next_outgoing_stream_id_;
next_outgoing_stream_id_ += 2;
mci.priority = related_file.first;
@@ -1451,20 +1851,24 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
MemCacheIter mci;
mci.stream_id = stream_id;
mci.priority = priority;
- if (!memory_cache_->AssignFileData(filename, &mci)) {
- // error creating new stream.
- VLOG(2) << "Sending ErrorNotFound";
- SendErrorNotFound(stream_id);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) {
+ if (!memory_cache_->AssignFileData(filename, &mci)) {
+ // error creating new stream.
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound";
+ SendErrorNotFound(stream_id);
+ } else {
+ AddToOutputOrder(mci);
+ if (FLAGS_use_xac) {
+ AddAssociatedContent(mci.file_data);
+ }
+ }
} else {
AddToOutputOrder(mci);
- if (FLAGS_use_xac) {
- AddAssociatedContent(mci.file_data);
- }
}
}
void AddToOutputOrder(const MemCacheIter& mci) {
- output_ordering_.AddToOutputOrder(mci);
+ client_output_ordering_.AddToOutputOrder(mci);
}
void SendEOF(uint32 stream_id) {
@@ -1493,12 +1897,12 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SendDataFrameImpl(stream_id, data, len, spdy_flags, compress);
}
- SpdyFramer* spdy_framer() { return framer_; }
+ SpdyFramer* spdy_framer() { return spdy_framer_; }
private:
void SendEOFImpl(uint32 stream_id) {
SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false);
- VLOG(2) << "Sending EOF: " << stream_id;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id;
KillStream(stream_id);
}
@@ -1507,7 +1911,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found");
SendSynReplyImpl(stream_id, my_headers);
SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false);
- output_ordering_.RemoveStreamId(stream_id);
+ client_output_ordering_.RemoveStreamId(stream_id);
}
void SendOKResponseImpl(uint32 stream_id, string* output) {
@@ -1516,11 +1920,11 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SendSynReplyImpl(stream_id, my_headers);
SendDataFrame(
stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false);
- output_ordering_.RemoveStreamId(stream_id);
+ client_output_ordering_.RemoveStreamId(stream_id);
}
void KillStream(uint32 stream_id) {
- output_ordering_.RemoveStreamId(stream_id);
+ client_output_ordering_.RemoveStreamId(stream_id);
}
void CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) {
@@ -1559,8 +1963,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
CopyHeaders(block, headers);
SpdySynStreamControlFrame* fsrcf =
- framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true,
- &block);
+ spdy_framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true,
+ &block);
DataFrame df;
df.size = fsrcf->length() + SpdyFrame::size();
size_t df_size = df.size;
@@ -1568,7 +1972,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
df.delete_when_done = true;
EnqueueDataFrame(df);
- VLOG(2) << "Sending SynStreamheader " << stream_id;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader "
+ << stream_id;
return df_size;
}
@@ -1580,7 +1985,7 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
block["version"] = headers.response_version().as_string();
SpdySynReplyControlFrame* fsrcf =
- framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block);
+ spdy_framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block);
DataFrame df;
df.size = fsrcf->length() + SpdyFrame::size();
size_t df_size = df.size;
@@ -1588,7 +1993,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
df.delete_when_done = true;
EnqueueDataFrame(df);
- VLOG(2) << "Sending SynReplyheader " << stream_id;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader "
+ << stream_id;
return df_size;
}
@@ -1601,16 +2007,16 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
// TODO(mbelshe): We can't compress here - before going into the
// priority queue. Compression needs to be done
// with late binding.
- SpdyDataFrame* fdf = framer_->CreateDataFrame(stream_id, data, len,
- flags);
+ SpdyDataFrame* fdf = spdy_framer_->CreateDataFrame(stream_id, data, len,
+ flags);
DataFrame df;
df.size = fdf->length() + SpdyFrame::size();
df.data = fdf->data();
df.delete_when_done = true;
EnqueueDataFrame(df);
- VLOG(2) << "Sending data frame" << stream_id << " [" << len << "]"
- << " shrunk to " << fdf->length();
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame "
+ << stream_id << " [" << len << "] shrunk to " << fdf->length();
}
void EnqueueDataFrame(const DataFrame& df) {
@@ -1618,16 +2024,17 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
}
void GetOutput() {
- while (output_list_->size() < 2) {
- MemCacheIter* mci = output_ordering_.GetIter();
+ while (client_output_list_->size() < 2) {
+ MemCacheIter* mci = client_output_ordering_.GetIter();
if (mci == NULL) {
- VLOG(2) << "GetOutput: nothing to output!?";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT
+ << "SpdySM: GetOutput: nothing to output!?";
return;
}
if (!mci->transformed_header) {
mci->transformed_header = true;
- VLOG(2) << "GetOutput transformed header stream_id: ["
- << mci->stream_id << "]";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed "
+ << "header stream_id: [" << mci->stream_id << "]";
if ((mci->stream_id % 2) == 0) {
// this is a server initiated stream.
// Ideally, we'd do a 'syn-push' here, instead of a syn-reply.
@@ -1647,7 +2054,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
return;
}
if (mci->body_bytes_consumed >= mci->file_data->body.size()) {
- VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput "
+ << "remove_stream_id: [" << mci->stream_id << "]";
SendEOF(mci->stream_id);
return;
}
@@ -1669,8 +2077,8 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
SendDataFrame(mci->stream_id,
mci->file_data->body.data() + mci->body_bytes_consumed,
num_to_write, 0, should_compress);
- VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id
- << "]: " << num_to_write;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame["
+ << mci->stream_id << "]: " << num_to_write;
mci->body_bytes_consumed += num_to_write;
mci->bytes_sent += num_to_write;
}
@@ -1679,28 +2087,41 @@ class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
////////////////////////////////////////////////////////////////////////////////
-class HTTPSM : public BalsaVisitorInterface, public SMInterface {
+class HttpSM : public BalsaVisitorInterface, public SMInterface {
private:
uint64 seq_num_;
- BalsaFrame* framer_;
+ BalsaFrame* http_framer_;
BalsaHeaders headers_;
uint32 stream_id_;
+ int32 server_idx_;
- SMServerConnection* connection_;
+ SMConnection* connection_;
+ SMInterface* sm_spdy_interface_;
OutputList* output_list_;
OutputOrdering output_ordering_;
MemoryCache* memory_cache_;
+ FlipAcceptor* acceptor_;
public:
- explicit HTTPSM(SMServerConnection* connection) :
+ explicit HttpSM(SMConnection* connection,
+ SMInterface* sm_spdy_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor) :
seq_num_(0),
- framer_(new BalsaFrame),
- stream_id_(1),
+ http_framer_(new BalsaFrame),
+ stream_id_(0),
+ server_idx_(-1),
connection_(connection),
+ sm_spdy_interface_(sm_spdy_interface),
output_list_(connection->output_list()),
output_ordering_(connection),
- memory_cache_(connection->memory_cache()) {
- framer_->set_balsa_visitor(this);
- framer_->set_balsa_headers(&headers_);
+ memory_cache_(connection->memory_cache()),
+ acceptor_(acceptor) {
+ http_framer_->set_balsa_visitor(this);
+ http_framer_->set_balsa_headers(&headers_);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ http_framer_->set_is_request(false);
+ }
}
private:
typedef map<string, uint32> ClientTokenMap;
@@ -1708,20 +2129,31 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
virtual void ProcessBodyInput(const char *input, size_t size) {
}
virtual void ProcessBodyData(const char *input, size_t size) {
- // ignoring this.
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process Body Data: stream "
+ << stream_id_ << ": size " << size;
+ sm_spdy_interface_->SendDataFrame(stream_id_, input, size, 0, false);
+ }
}
virtual void ProcessHeaderInput(const char *input, size_t size) {
}
virtual void ProcessTrailerInput(const char *input, size_t size) {}
virtual void ProcessHeaders(const BalsaHeaders& headers) {
- VLOG(2) << "Got new request!";
- string host = UrlUtilities::GetUrlHost(
- headers.GetHeader("Host").as_string());
- string method = headers.request_method().as_string();
- string filename = EncodeURL(headers.request_uri().as_string(), host,
- method);
- NewStream(stream_id_, 0, filename);
- stream_id_ += 2;
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
+ string host =
+ UrlUtilities::GetUrlHost(headers.GetHeader("Host").as_string());
+ string method = headers.request_method().as_string();
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Received Request: "
+ << headers.request_uri().as_string() << " " << method;
+ string filename = EncodeURL(headers.request_uri().as_string(),
+ host, method);
+ NewStream(stream_id_, 0, filename);
+ stream_id_ += 2;
+ } else {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Received Response from "
+ << ACCEPTOR_SERVER_IDENT;
+ sm_spdy_interface_->SendSynReply(stream_id_, headers);
+ }
}
virtual void ProcessRequestFirstLine(const char* line_input,
size_t line_length,
@@ -1743,7 +2175,13 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
virtual void ProcessChunkExtensions(const char *input, size_t size) {}
virtual void HeaderDone() {}
virtual void MessageDone() {
- VLOG(2) << "MessageDone!";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone. Sending EOF: "
+ << "stream " << stream_id_;
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ sm_spdy_interface_->SendEOF(stream_id_);
+ } else {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: MessageDone.";
+ }
}
virtual void HandleHeaderError(BalsaFrame* framer) {
HandleError();
@@ -1757,40 +2195,98 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
}
void HandleError() {
- VLOG(2) << "Error detected";
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Error detected";
}
public:
- ~HTTPSM() {
+ ~HttpSM() {
Reset();
+ delete http_framer_;
}
- size_t ProcessInput(const char* data, size_t len) {
- return framer_->ProcessInput(data, len);
+
+ void InitSMInterface(SMInterface* sm_spdy_interface,
+ int32 server_idx)
+ {
+ sm_spdy_interface_ = sm_spdy_interface;
+ server_idx_ = server_idx;
+ }
+
+ void InitSMConnection(SMConnectionPoolInterface* connection_pool,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ int fd,
+ bool use_ssl)
+ {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Initializing server "
+ << "connection.";
+ connection_->InitSMConnection(connection_pool, sm_interface,
+ epoll_server, fd, use_ssl);
+ }
+
+ size_t ProcessReadInput(const char* data, size_t len) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process read input: stream "
+ << stream_id_;
+ return http_framer_->ProcessInput(data, len);
+ }
+
+ size_t ProcessWriteInput(const char* data, size_t len) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Process write input: size "
+ << len << ": stream " << stream_id_;
+ char * dataPtr = new char[len];
+ memcpy(dataPtr, data, len);
+ DataFrame data_frame;
+ data_frame.data = (const char *)dataPtr;
+ data_frame.size = len;
+ data_frame.delete_when_done = true;
+ connection_->EnqueueDataFrame(data_frame);
+ return len;
}
bool MessageFullyRead() const {
- return framer_->MessageFullyRead();
+ return http_framer_->MessageFullyRead();
+ }
+
+ void SetStreamID(uint32 stream_id) {
+ stream_id_ = stream_id;
}
bool Error() const {
- return framer_->Error();
+ return http_framer_->Error();
}
const char* ErrorAsString() const {
- return BalsaFrameEnums::ErrorCodeToString(framer_->ErrorCode());
+ return BalsaFrameEnums::ErrorCodeToString(http_framer_->ErrorCode());
}
void Reset() {
- framer_->Reset();
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Reset: stream %d "
+ << stream_id_;
+ http_framer_->Reset();
+ }
+
+ void ResetForNewInterface(int32 server_idx) {
}
void ResetForNewConnection() {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Server connection closing "
+ << "to: " << ACCEPTOR_SERVER_IDENT;
seq_num_ = 0;
output_ordering_.Reset();
- framer_->Reset();
+ http_framer_->Reset();
+ if (sm_spdy_interface_) {
+ sm_spdy_interface_->ResetForNewInterface(server_idx_);
+ }
+ }
+
+ void Cleanup() {
+ if (!(acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER)) {
+ connection_->Cleanup("HttpSM Request Fully Read: stream_id " +
+ stream_id_);
+ }
}
- void PostAcceptHook() {
+ int PostAcceptHook() {
+ return 1;
}
void NewStream(uint32 stream_id, uint32 priority, const string& filename) {
@@ -1798,6 +2294,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
mci.stream_id = stream_id;
mci.priority = priority;
if (!memory_cache_->AssignFileData(filename, &mci)) {
+ // error creating new stream.
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound";
SendErrorNotFound(stream_id);
} else {
AddToOutputOrder(mci);
@@ -1810,6 +2308,9 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
void SendEOF(uint32 stream_id) {
SendEOFImpl(stream_id);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
+ sm_spdy_interface_->ResetForNewInterface(server_idx_);
+ }
}
void SendErrorNotFound(uint32 stream_id) {
@@ -1833,7 +2334,7 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
SendDataFrameImpl(stream_id, data, len, flags, compress);
}
- BalsaFrame* spdy_framer() { return framer_; }
+ BalsaFrame* spdy_framer() { return http_framer_; }
private:
void SendEOFImpl(uint32 stream_id) {
@@ -1842,6 +2343,9 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
df.size = 5;
df.delete_when_done = false;
EnqueueDataFrame(df);
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
+ Reset();
+ }
}
void SendErrorNotFoundImpl(uint32 stream_id) {
@@ -1875,7 +2379,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
df.data = buffer;
df.delete_when_done = true;
sb.Read(buffer, df.size);
- VLOG(2) << "******************Sending HTTP Reply header " << stream_id;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
+ << stream_id_;
size_t df_size = df.size;
EnqueueDataFrame(df);
return df_size;
@@ -1890,7 +2395,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
df.data = buffer;
df.delete_when_done = true;
sb.Read(buffer, df.size);
- VLOG(2) << "******************Sending HTTP Reply header " << stream_id;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Sending HTTP Reply header "
+ << stream_id_;
size_t df_size = df.size;
EnqueueDataFrame(df);
return df_size;
@@ -1913,27 +2419,31 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
}
void EnqueueDataFrame(const DataFrame& df) {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: Enqueue data frame: stream "
+ << stream_id_;
connection_->EnqueueDataFrame(df);
}
void GetOutput() {
MemCacheIter* mci = output_ordering_.GetIter();
if (mci == NULL) {
- VLOG(2) << "GetOutput: nothing to output!?";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput: nothing to "
+ << "output!?: stream " << stream_id_;
return;
}
if (!mci->transformed_header) {
mci->bytes_sent = SendSynReply(mci->stream_id,
*(mci->file_data->headers));
mci->transformed_header = true;
- VLOG(2) << "GetOutput transformed header stream_id: ["
- << mci->stream_id << "]";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput transformed "
+ << "header stream_id: [" << mci->stream_id << "]";
return;
}
if (mci->body_bytes_consumed >= mci->file_data->body.size()) {
SendEOF(mci->stream_id);
output_ordering_.RemoveStreamId(mci->stream_id);
- VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "GetOutput remove_stream_id: ["
+ << mci->stream_id << "]";
return;
}
size_t num_to_write =
@@ -1943,8 +2453,8 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
SendDataFrame(mci->stream_id,
mci->file_data->body.data() + mci->body_bytes_consumed,
num_to_write, 0, true);
- VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id
- << "]: " << num_to_write;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "HttpSM: GetOutput SendDataFrame["
+ << mci->stream_id << "]: " << num_to_write;
mci->body_bytes_consumed += num_to_write;
mci->bytes_sent += num_to_write;
}
@@ -1952,119 +2462,312 @@ class HTTPSM : public BalsaVisitorInterface, public SMInterface {
////////////////////////////////////////////////////////////////////////////////
-class Notification {
+class StreamerSM : public SMInterface {
+ private:
+ SMConnection* connection_;
+ SMInterface* sm_other_interface_;
+ EpollServer* epoll_server_;
+ FlipAcceptor* acceptor_;
public:
- explicit Notification(bool value) : value_(value) {}
+ explicit StreamerSM(SMConnection* connection,
+ SMInterface* sm_other_interface,
+ EpollServer* epoll_server,
+ FlipAcceptor* acceptor) :
+ connection_(connection),
+ sm_other_interface_(sm_other_interface),
+ epoll_server_(epoll_server),
+ acceptor_(acceptor)
+ {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Creating StreamerSM object";
+ }
+ ~StreamerSM() {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Destroying StreamerSM object";
+ Reset();
+ }
+
+ void InitSMInterface(SMInterface* sm_other_interface,
+ int32 server_idx)
+ {
+ sm_other_interface_ = sm_other_interface;
+ }
+
+ void InitSMConnection(SMConnectionPoolInterface* connection_pool,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ int fd,
+ bool use_ssl)
+ {
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Initializing server "
+ << "connection.";
+ connection_->InitSMConnection(connection_pool, sm_interface,
+ epoll_server, fd, use_ssl);
+ }
+
+ size_t ProcessReadInput(const char* data, size_t len) {
+ return sm_other_interface_->ProcessWriteInput(data, len);
+ }
+
+ size_t ProcessWriteInput(const char* data, size_t len) {
+ char * dataPtr = new char[len];
+ memcpy(dataPtr, data, len);
+ DataFrame df;
+ df.data = (const char *)dataPtr;
+ df.size = len;
+ df.delete_when_done = true;
+ connection_->EnqueueDataFrame(df);
+ return len;
+ }
+
+ bool MessageFullyRead() const {
+ return false;
+ }
+
+ void SetStreamID(uint32 stream_id) {}
+
+ bool Error() const {
+ return false;
+ }
+
+ const char* ErrorAsString() const {
+ return "(none)";
+ }
+
+ void Reset() {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Reset";
+ connection_->Cleanup("Server Reset");
+ }
+
+ void ResetForNewInterface(int32 server_idx) {
+ }
+
+ void ResetForNewConnection() {
+ sm_other_interface_->Reset();
+ }
+
+ void Cleanup() {
+ }
+
+ int PostAcceptHook() {
+ if (!sm_other_interface_) {
+ SMConnection *server_connection =
+ SMConnection::NewSMConnection(epoll_server_, NULL, NULL,
+ acceptor_, "server_conn: ");
+ if (server_connection == NULL) {
+ LOG(ERROR) << "StreamerSM: Could not create server conenction.";
+ return 0;
+ }
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "StreamerSM: Creating new server "
+ << "connection.";
+ sm_other_interface_ = new StreamerSM(server_connection, this,
+ epoll_server_, acceptor_);
+ sm_other_interface_->InitSMInterface(this, 0);
+ }
+ sm_other_interface_->InitSMConnection(NULL, sm_other_interface_,
+ epoll_server_, -1, false);
+
+ return 1;
+ }
+
+ void NewStream(uint32 stream_id, uint32 priority, const string& filename) {
+ }
+
+ void AddToOutputOrder(const MemCacheIter& mci) {
+ }
+
+ void SendEOF(uint32 stream_id) {
+ }
+
+ void SendErrorNotFound(uint32 stream_id) {
+ }
+
+ void SendOKResponse(uint32 stream_id, string output) {
+ }
+
+ size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) {
+ return 0;
+ }
+
+ size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) {
+ return 0;
+ }
+
+ void SendDataFrame(uint32 stream_id, const char* data, int64 len,
+ uint32 flags, bool compress) {
+ }
- void Notify() {
- AutoLock al(lock_);
- value_ = true;
- }
- bool HasBeenNotified() {
- AutoLock al(lock_);
- return value_;
- }
- bool value_;
- Lock lock_;
+ private:
+ void SendEOFImpl(uint32 stream_id) {
+ }
+
+ void SendErrorNotFoundImpl(uint32 stream_id) {
+ }
+
+ void SendOKResponseImpl(uint32 stream_id, string* output) {
+ }
+
+ size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
+ return 0;
+ }
+
+ size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) {
+ return 0;
+ }
+
+ void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len,
+ uint32 flags, bool compress) {
+ }
+
+ void GetOutput() {
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class Notification {
+ public:
+ explicit Notification(bool value) : value_(value) {}
+
+ void Notify() {
+ AutoLock al(lock_);
+ value_ = true;
+ }
+ bool HasBeenNotified() {
+ AutoLock al(lock_);
+ return value_;
+ }
+ bool value_;
+ Lock lock_;
};
////////////////////////////////////////////////////////////////////////////////
class SMAcceptorThread : public SimpleThread,
public EpollCallbackInterface,
- public SMServerConnectionPoolInterface {
+ public SMConnectionPoolInterface {
EpollServer epoll_server_;
- int listen_fd_;
- int accepts_per_wake_;
+ FlipAcceptor *acceptor_;
+ SSLState *ssl_state_;
+ bool use_ssl_;
- vector<SMServerConnection*> unused_server_connections_;
- vector<SMServerConnection*> tmp_unused_server_connections_;
- vector<SMServerConnection*> allocated_server_connections_;
+ vector<SMConnection*> unused_server_connections_;
+ vector<SMConnection*> tmp_unused_server_connections_;
+ vector<SMConnection*> allocated_server_connections_;
Notification quitting_;
- SMInterfaceFactory* sm_interface_factory_;
MemoryCache* memory_cache_;
public:
- SMAcceptorThread(int listen_fd,
- int accepts_per_wake,
- SMInterfaceFactory* smif,
+ SMAcceptorThread(FlipAcceptor *acceptor,
MemoryCache* memory_cache) :
SimpleThread("SMAcceptorThread"),
- listen_fd_(listen_fd),
- accepts_per_wake_(accepts_per_wake),
+ acceptor_(acceptor),
+ ssl_state_(NULL),
+ use_ssl_(false),
quitting_(false),
- sm_interface_factory_(smif),
- memory_cache_(memory_cache) {
+ memory_cache_(memory_cache)
+ {
+ if (!acceptor->ssl_cert_filename_.empty() &&
+ !acceptor->ssl_cert_filename_.empty()) {
+ ssl_state_ = new SSLState;
+ bool use_npn = true;
+ if (acceptor_->flip_handler_type_ == FLIP_HANDLER_HTTP_SERVER) {
+ use_npn = false;
+ }
+ spdy_init_ssl(ssl_state_, acceptor_->ssl_cert_filename_,
+ acceptor_->ssl_key_filename_, use_npn);
+ use_ssl_ = true;
+ }
}
~SMAcceptorThread() {
- for (vector<SMServerConnection*>::iterator i =
- allocated_server_connections_.begin();
+ for (vector<SMConnection*>::iterator i =
+ allocated_server_connections_.begin();
i != allocated_server_connections_.end();
++i) {
delete *i;
}
}
- SMServerConnection* NewConnection() {
- SMServerConnection* server =
- SMServerConnection::NewSMServerConnection(sm_interface_factory_,
- memory_cache_,
- &epoll_server_);
+ SMConnection* NewConnection() {
+ SMConnection* server =
+ SMConnection::NewSMConnection(&epoll_server_, ssl_state_,
+ memory_cache_, acceptor_,
+ "client_conn: ");
allocated_server_connections_.push_back(server);
- VLOG(3) << "Making new server: " << server;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Making new server.";
return server;
}
- SMServerConnection* FindOrMakeNewSMServerConnection() {
+ SMConnection* FindOrMakeNewSMConnection() {
if (unused_server_connections_.empty()) {
return NewConnection();
}
- SMServerConnection* retval = unused_server_connections_.back();
+ SMConnection* server = unused_server_connections_.back();
unused_server_connections_.pop_back();
- return retval;
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Reusing server.";
+ return server;
}
-
void InitWorker() {
- epoll_server_.RegisterFD(listen_fd_, this, EPOLLIN | EPOLLET);
+ epoll_server_.RegisterFD(acceptor_->listen_fd_, this, EPOLLIN | EPOLLET);
}
- void HandleConnection(int client_fd) {
- SMServerConnection* server_connection = FindOrMakeNewSMServerConnection();
+ void HandleConnection(int server_fd) {
+ int on = 1;
+ int rc;
+ if (acceptor_->disable_nagle_) {
+ rc = setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY,
+ reinterpret_cast<char*>(&on), sizeof(on));
+ if (rc < 0) {
+ close(server_fd);
+ LOG(ERROR) << "setsockopt() failed fd=" + server_fd;
+ return;
+ }
+ }
+
+ SMConnection* server_connection = FindOrMakeNewSMConnection();
if (server_connection == NULL) {
- VLOG(2) << "Closing " << client_fd;
- close(client_fd);
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: Closing fd " << server_fd;
+ close(server_fd);
return;
}
- server_connection->InitSMServerConnection(this,
- &epoll_server_,
- client_fd);
+ server_connection->InitSMConnection(this,
+ NULL,
+ &epoll_server_,
+ server_fd,
+ use_ssl_);
}
void AcceptFromListenFD() {
- if (accepts_per_wake_ > 0) {
- for (int i = 0; i < accepts_per_wake_; ++i) {
+ if (acceptor_->accepts_per_wake_ > 0) {
+ for (int i = 0; i < acceptor_->accepts_per_wake_; ++i) {
struct sockaddr address;
socklen_t socklen = sizeof(address);
- int fd = accept(listen_fd_, &address, &socklen);
+ int fd = accept(acceptor_->listen_fd_, &address, &socklen);
if (fd == -1) {
- VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno;
+ if (errno != 11) {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail("
+ << acceptor_->listen_fd_ << "): " << errno << ": "
+ << strerror(errno);
+ }
break;
}
- VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n";
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << " Accepted connection";
HandleConnection(fd);
}
} else {
while (true) {
struct sockaddr address;
socklen_t socklen = sizeof(address);
- int fd = accept(listen_fd_, &address, &socklen);
+ int fd = accept(acceptor_->listen_fd_, &address, &socklen);
if (fd == -1) {
- VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno;
+ if (errno != 11) {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Acceptor: accept fail("
+ << acceptor_->listen_fd_ << "): " << errno << ": "
+ << strerror(errno);
+ }
break;
}
- VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n";
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Accepted connection";
HandleConnection(fd);
}
}
@@ -2075,7 +2778,8 @@ class SMAcceptorThread : public SimpleThread,
virtual void OnModification(int fd, int event_mask) { }
virtual void OnEvent(int fd, EpollEvent* event) {
if (event->in_events | EPOLLIN) {
- VLOG(2) << "Accepting based upon epoll events";
+ VLOG(2) << ACCEPTOR_CLIENT_IDENT
+ << "Acceptor: Accepting based upon epoll events";
AcceptFromListenFD();
}
}
@@ -2097,56 +2801,58 @@ class SMAcceptorThread : public SimpleThread,
}
}
- // SMServerConnections will use this:
- virtual void SMServerConnectionDone(SMServerConnection* sc) {
- VLOG(3) << "Done with server connection: " << sc;
+ // SMConnections will use this:
+ virtual void SMConnectionDone(SMConnection* sc) {
+ VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Done with connection.";
tmp_unused_server_connections_.push_back(sc);
}
};
////////////////////////////////////////////////////////////////////////////////
-SMInterface* NewSpdySM(SMServerConnection* connection) {
- return new SpdySM(connection);
+SMInterface* NewStreamerSM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ FlipAcceptor* acceptor) {
+ return new StreamerSM(connection, sm_interface, epoll_server, acceptor);
}
-SMInterface* NewHTTPSM(SMServerConnection* connection) {
- return new HTTPSM(connection);
+
+SMInterface* NewSpdySM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor) {
+ return new SpdySM(connection, sm_interface, epoll_server,
+ memory_cache, acceptor);
+}
+
+SMInterface* NewHttpSM(SMConnection* connection,
+ SMInterface* sm_interface,
+ EpollServer* epoll_server,
+ MemoryCache* memory_cache,
+ FlipAcceptor* acceptor) {
+ return new HttpSM(connection, sm_interface, epoll_server,
+ memory_cache, acceptor);
}
////////////////////////////////////////////////////////////////////////////////
-int CreateListeningSocket(int port, int backlog_size,
- bool reuseport, bool no_nagle) {
- int listening_socket = 0;
- char port_buf[256];
- snprintf(port_buf, sizeof(port_buf), "%d", port);
- cerr <<" Attempting to listen on port: " << port_buf << "\n";
- cerr <<" input port: " << port << "\n";
- net::CreateListeningSocket("",
- port_buf,
- true,
- backlog_size,
- &listening_socket,
- true,
- reuseport,
- &cerr);
- SetNonBlocking(listening_socket);
- if (no_nagle) {
- // set SO_REUSEADDR on the listening socket.
- int on = 1;
- int rc;
- rc = setsockopt(listening_socket, IPPROTO_TCP, TCP_NODELAY,
- reinterpret_cast<char*>(&on), sizeof(on));
- if (rc < 0) {
- close(listening_socket);
- LOG(FATAL) << "setsockopt() failed fd=" << listening_socket << "\n";
- }
+std::vector<std::string> &split(const std::string &s,
+ char delim,
+ std::vector<std::string> &elems) {
+ std::stringstream ss(s);
+ std::string item;
+ while(std::getline(ss, item, delim)) {
+ elems.push_back(item);
}
- return listening_socket;
+ return elems;
}
-////////////////////////////////////////////////////////////////////////////////
+std::vector<std::string> split(const std::string &s, char delim) {
+ std::vector<std::string> elems;
+ return split(s, delim, elems);
+}
bool GotQuitFromStdin() {
// Make stdin nonblocking. Yes this is done each time. Oh well.
@@ -2157,16 +2863,13 @@ bool GotQuitFromStdin() {
maybequit += c;
}
if (maybequit.size()) {
- VLOG(2) << "scanning string: \"" << maybequit << "\"";
+ VLOG(1) << "scanning string: \"" << maybequit << "\"";
}
return (maybequit.size() > 1 &&
(maybequit.c_str()[0] == 'q' ||
maybequit.c_str()[0] == 'Q'));
}
-
-////////////////////////////////////////////////////////////////////////////////
-
const char* BoolToStr(bool b) {
if (b)
return "true";
@@ -2175,103 +2878,171 @@ const char* BoolToStr(bool b) {
////////////////////////////////////////////////////////////////////////////////
-int main(int argc, char**argv) {
- bool use_ssl = FLAGS_use_ssl;
- int response_count_until_close = FLAGS_response_count_until_close;
- int spdy_port = FLAGS_spdy_port;
- int port = FLAGS_port;
- int backlog_size = FLAGS_accept_backlog_size;
- bool reuseport = FLAGS_reuseport;
- bool no_nagle = FLAGS_no_nagle;
- double server_think_time_in_s = FLAGS_server_think_time_in_s;
- int accepts_per_wake = FLAGS_accepts_per_wake;
- int num_threads = 1;
+int main (int argc, char**argv)
+{
+ unsigned int i = 0;
+ bool wait_for_iface = false;
+
+ CommandLine::Init(argc, argv);
+ CommandLine cl(argc, argv);
+
+ if (cl.HasSwitch("--help") || argc < 2) {
+ cout << argv[0] << " <options>\n";
+ cout << "\t--proxy<1..n>=\"<listen ip>,<listen port>,<ssl cert filename>,"
+ << "<ssl key filename>,<server ip>,<server port>\"\n";
+ cout << "\t--spdy-server=\"<listen ip>,<listen port>,<ssl cert filename>,"
+ << "<ssl key filename>\"\n";
+ cout << "\t--http-server=\"<listen ip>,<listen port>,<ssl cert filename>,"
+ << "<ssl key filename>\"\n";
+ cout << "\t--forward-ip-header=<header name>\n";
+ cout << "\t--logdest=file|system|both\n";
+ cout << "\t--logfile=<logfile>\n";
+ cout << "\t--wait-for-iface\n";
+ cout << "\t--ssl-session-expiry=<seconds> (default is 300)\n";
+ cout << "\t--help\n";
+ exit(0);
+ }
+
+ g_proxy_config.server_think_time_in_s_ = FLAGS_server_think_time_in_s;
+
+ if (cl.HasSwitch("forward-ip-header")) {
+ g_proxy_config.forward_ip_header_enabled_ = true;
+ g_proxy_config.forward_ip_header_ =
+ cl.GetSwitchValueASCII("forward-ip-header");
+ } else {
+ g_proxy_config.forward_ip_header_enabled_ = false;
+ }
+
+ if (cl.HasSwitch("logdest")) {
+ string log_dest_value = cl.GetSwitchValueASCII("logdest");
+ if (log_dest_value.compare("file") == 0) {
+ g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE;
+ } else if (log_dest_value.compare("system") == 0) {
+ g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_SYSTEM_DEBUG_LOG;
+ } else if (log_dest_value.compare("both") == 0) {
+ g_proxy_config.log_destination_ =
+ logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG;
+ } else {
+ LOG(FATAL) << "Invalid logging destination value: " << log_dest_value;
+ }
+ } else {
+ g_proxy_config.log_destination_ = logging::LOG_NONE;
+ }
+ if (cl.HasSwitch("logfile")) {
+ g_proxy_config.log_filename_ = cl.GetSwitchValueASCII("logfile");
+ if (g_proxy_config.log_destination_ == logging::LOG_NONE) {
+ g_proxy_config.log_destination_ = logging::LOG_ONLY_TO_FILE;
+ }
+ } else if (g_proxy_config.log_destination_ == logging::LOG_ONLY_TO_FILE ||
+ g_proxy_config.log_destination_ ==
+ logging::LOG_TO_BOTH_FILE_AND_SYSTEM_DEBUG_LOG) {
+ LOG(FATAL) << "Logging destination requires a log file to be specified.";
+ }
- MemoryCache spdy_memory_cache;
- spdy_memory_cache.AddFiles();
+ if (cl.HasSwitch("wait-for-iface")) {
+ wait_for_iface = true;
+ }
+ if (cl.HasSwitch("ssl-session-expiry")) {
+ string session_expiry = cl.GetSwitchValueASCII("ssl-session-expiry");
+ g_proxy_config.ssl_session_expiry_ = atoi( session_expiry.c_str() );
+ }
+
+ InitLogging(g_proxy_config.log_filename_.c_str(),
+ g_proxy_config.log_destination_,
+ logging::DONT_LOCK_LOG_FILE,
+ logging::APPEND_TO_OLD_LOG_FILE);
+
+ LOG(INFO) << "Flip SPDY proxy started with configuration:";
+ LOG(INFO) << "Logging destination : " << g_proxy_config.log_destination_;
+ LOG(INFO) << "Log file : " << g_proxy_config.log_filename_;
+ LOG(INFO) << "Forward IP Header : "
+ << (g_proxy_config.forward_ip_header_enabled_ ?
+ g_proxy_config.forward_ip_header_ : "(disabled)");
+ LOG(INFO) << "Wait for interfaces : " << (wait_for_iface?"true":"false");
+ LOG(INFO) << "Accept backlog size : " << FLAGS_accept_backlog_size;
+ LOG(INFO) << "Accepts per wake : " << FLAGS_accepts_per_wake;
+ LOG(INFO) << "Disable nagle : "
+ << (FLAGS_disable_nagle?"true":"false");
+ LOG(INFO) << "Reuseport : " << (FLAGS_reuseport?"true":"false");
+
+ // Proxy Acceptors
+ while (true) {
+ i += 1;
+ std::stringstream name;
+ name << "proxy" << i;
+ if (!cl.HasSwitch(name.str())) {
+ break;
+ }
+ string value = cl.GetSwitchValueASCII(name.str());
+ vector<std::string> valueArgs = split(value, ',');
+ CHECK_EQ((unsigned int)6, valueArgs.size());
+ // If wait_for_iface is enabled, then this call will block
+ // indefinitely until the interface is raised.
+ g_proxy_config.AddAcceptor(FLIP_HANDLER_PROXY,
+ valueArgs[0], valueArgs[1],
+ valueArgs[2], valueArgs[3],
+ valueArgs[4], valueArgs[5],
+ FLAGS_accept_backlog_size,
+ FLAGS_disable_nagle,
+ FLAGS_accepts_per_wake,
+ FLAGS_reuseport,
+ wait_for_iface,
+ NULL);
+ }
+
+ // Spdy Server Acceptor
+ MemoryCache spdy_memory_cache;
+ if (cl.HasSwitch("spdy-server")) {
+ spdy_memory_cache.AddFiles();
+ string value = cl.GetSwitchValueASCII("spdy-server");
+ vector<std::string> valueArgs = split(value, ',');
+ g_proxy_config.AddAcceptor(FLIP_HANDLER_SPDY_SERVER,
+ valueArgs[0], valueArgs[1],
+ valueArgs[2], valueArgs[3],
+ "", "",
+ FLAGS_accept_backlog_size,
+ FLAGS_disable_nagle,
+ FLAGS_accepts_per_wake,
+ FLAGS_reuseport,
+ wait_for_iface,
+ &spdy_memory_cache);
+ }
+
+ // Spdy Server Acceptor
MemoryCache http_memory_cache;
- http_memory_cache.CloneFrom(spdy_memory_cache);
-
- LOG(INFO) <<
- "Starting up with the following state: \n"
- " use_ssl: " << use_ssl << "\n"
- " response_count_until_close: " << response_count_until_close << "\n"
- " port: " << port << "\n"
- " spdy_port: " << spdy_port << "\n"
- " backlog_size: " << backlog_size << "\n"
- " reuseport: " << BoolToStr(reuseport) << "\n"
- " no_nagle: " << BoolToStr(no_nagle) << "\n"
- " server_think_time_in_s: " << server_think_time_in_s << "\n"
- " accepts_per_wake: " << accepts_per_wake << "\n"
- " num_threads: " << num_threads << "\n"
- " use_xsub: " << BoolToStr(FLAGS_use_xsub) << "\n"
- " use_xac: " << BoolToStr(FLAGS_use_xac) << "\n";
-
- if (use_ssl) {
- global_ssl_state = new GlobalSSLState;
- spdy_init_ssl(global_ssl_state);
- } else {
- global_ssl_state = NULL;
+ if (cl.HasSwitch("http-server")) {
+ http_memory_cache.AddFiles();
+ string value = cl.GetSwitchValueASCII("http-server");
+ vector<std::string> valueArgs = split(value, ',');
+ g_proxy_config.AddAcceptor(FLIP_HANDLER_HTTP_SERVER,
+ valueArgs[0], valueArgs[1],
+ valueArgs[2], valueArgs[3],
+ "", "",
+ FLAGS_accept_backlog_size,
+ FLAGS_disable_nagle,
+ FLAGS_accepts_per_wake,
+ FLAGS_reuseport,
+ wait_for_iface,
+ &http_memory_cache);
}
- EpollServer epoll_server;
+
vector<SMAcceptorThread*> sm_worker_threads_;
- {
- // spdy
- int listen_fd = -1;
-
- if (reuseport || listen_fd == -1) {
- listen_fd = CreateListeningSocket(spdy_port, backlog_size,
- reuseport, no_nagle);
- if (listen_fd < 0) {
- LOG(FATAL) << "Unable to open listening socket on spdy_port: "
- << spdy_port;
- } else {
- LOG(INFO) << "Listening for spdy on port: " << spdy_port;
- }
- }
- sm_worker_threads_.push_back(
- new SMAcceptorThread(listen_fd,
- accepts_per_wake,
- &NewSpdySM,
- &spdy_memory_cache));
- // Note that spdy_memory_cache is not threadsafe, it is merely
- // thread compatible. Thus, if ever we are to spawn multiple threads,
- // we either must make the MemoryCache threadsafe, or use
- // a separate MemoryCache for each thread.
- //
- // The latter is what is currently being done as we spawn
- // two threads (one for spdy, one for http).
- sm_worker_threads_.back()->InitWorker();
- sm_worker_threads_.back()->Start();
- }
+ for (i = 0; i < g_proxy_config.acceptors_.size(); i++) {
+ FlipAcceptor *acceptor = g_proxy_config.acceptors_[i];
- {
- // http
- int listen_fd = -1;
- if (reuseport || listen_fd == -1) {
- listen_fd = CreateListeningSocket(port, backlog_size,
- reuseport, no_nagle);
- if (listen_fd < 0) {
- LOG(FATAL) << "Unable to open listening socket on port: " << port;
- } else {
- LOG(INFO) << "Listening for HTTP on port: " << port;
- }
- }
- sm_worker_threads_.push_back(
- new SMAcceptorThread(listen_fd,
- accepts_per_wake,
- &NewHTTPSM,
- &http_memory_cache));
+ sm_worker_threads_.push_back(new SMAcceptorThread(acceptor,
+ (MemoryCache *)acceptor->memory_cache_));
// Note that spdy_memory_cache is not threadsafe, it is merely
// thread compatible. Thus, if ever we are to spawn multiple threads,
// we either must make the MemoryCache threadsafe, or use
// a separate MemoryCache for each thread.
//
// The latter is what is currently being done as we spawn
- // two threads (one for spdy, one for http).
+ // a separate thread for each http and spdy server acceptor.
+
sm_worker_threads_.back()->InitWorker();
sm_worker_threads_.back()->Start();
}
@@ -2288,5 +3059,6 @@ int main(int argc, char**argv) {
}
usleep(1000*10); // 10 ms
}
+
return 0;
}
diff --git a/net/tools/flip_server/other_defines.h b/net/tools/flip_server/other_defines.h
deleted file mode 100644
index dda2151..0000000
--- a/net/tools/flip_server/other_defines.h
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef NET_TOOLS_FLIP_SERVER_OTHER_DEFINES
-#define NET_TOOLS_FLIP_SERVER_OTHER_DEFINES
-#pragma once
-
-class NullStream {
- public:
- NullStream() {}
- template <typename T>
- NullStream operator<<(T t) { return *this;}
-};
-
-#define VLOG(X) NullStream()
-#define DVLOG(X) NullStream()
-
-
-#endif // NET_TOOLS_FLIP_SERVER_OTHER_DEFINES
-
diff --git a/net/tools/flip_server/ring_buffer.h b/net/tools/flip_server/ring_buffer.h
index ee6f360..75ad0ec3 100644
--- a/net/tools/flip_server/ring_buffer.h
+++ b/net/tools/flip_server/ring_buffer.h
@@ -8,7 +8,6 @@
#include "base/scoped_ptr.h"
#include "net/tools/flip_server/buffer_interface.h"
-#include "net/tools/flip_server/other_defines.h"
namespace net {
diff --git a/net/tools/flip_server/simple_buffer.h b/net/tools/flip_server/simple_buffer.h
index 71fafc9..fee7d4b 100644
--- a/net/tools/flip_server/simple_buffer.h
+++ b/net/tools/flip_server/simple_buffer.h
@@ -9,7 +9,6 @@
#include <string>
#include "net/tools/flip_server/buffer_interface.h"
-#include "net/tools/flip_server/other_defines.h"
namespace net {