#!/usr/bin/python2.4 # Copyright (c) 2006-2010 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This is a simple HTTP server used for testing Chrome. It supports several test URLs, as specified by the handlers in TestPageHandler. It defaults to living on localhost:8888. It can use https if you specify the flag --https=CERT where CERT is the path to a pem file containing the certificate and private key that should be used. To shut it down properly, visit localhost:8888/kill. """ import base64 import BaseHTTPServer import cgi import optparse import os import re import shutil import SocketServer import sys import time import urllib2 import pyftpdlib.ftpserver import tlslite import tlslite.api import chromiumsync try: import hashlib _new_md5 = hashlib.md5 except ImportError: import md5 _new_md5 = md5.new SERVER_HTTP = 0 SERVER_FTP = 1 debug_output = sys.stderr def debug(str): debug_output.write(str + "\n") debug_output.flush() class StoppableHTTPServer(BaseHTTPServer.HTTPServer): """This is a specialization of of BaseHTTPServer to allow it to be exited cleanly (by setting its "stop" member to True).""" def serve_forever(self): self.stop = False self.nonce_time = None while not self.stop: self.handle_request() self.socket.close() class HTTPSServer(tlslite.api.TLSSocketServerMixIn, StoppableHTTPServer): """This is a specialization of StoppableHTTPerver that add https support.""" def __init__(self, server_address, request_hander_class, cert_path): s = open(cert_path).read() x509 = tlslite.api.X509() x509.parse(s) self.cert_chain = tlslite.api.X509CertChain([x509]) s = open(cert_path).read() self.private_key = tlslite.api.parsePEMKey(s, private=True) self.session_cache = tlslite.api.SessionCache() StoppableHTTPServer.__init__(self, server_address, request_hander_class) def handshake(self, tlsConnection): """Creates the SSL connection.""" try: tlsConnection.handshakeServer(certChain=self.cert_chain, privateKey=self.private_key, sessionCache=self.session_cache) tlsConnection.ignoreAbruptClose = True return True except tlslite.api.TLSError, error: print "Handshake failure:", str(error) return False class ForkingHTTPServer(SocketServer.ForkingMixIn, StoppableHTTPServer): """This is a specialization of of StoppableHTTPServer which serves each request in a separate process""" pass class ForkingHTTPSServer(SocketServer.ForkingMixIn, HTTPSServer): """This is a specialization of of HTTPSServer which serves each request in a separate process""" pass class TestPageHandler(BaseHTTPServer.BaseHTTPRequestHandler): def __init__(self, request, client_address, socket_server): self._connect_handlers = [ self.RedirectConnectHandler, self.ServerAuthConnectHandler, self.DefaultConnectResponseHandler] self._get_handlers = [ self.KillHandler, self.NoCacheMaxAgeTimeHandler, self.NoCacheTimeHandler, self.CacheTimeHandler, self.CacheExpiresHandler, self.CacheProxyRevalidateHandler, self.CachePrivateHandler, self.CachePublicHandler, self.CacheSMaxAgeHandler, self.CacheMustRevalidateHandler, self.CacheMustRevalidateMaxAgeHandler, self.CacheNoStoreHandler, self.CacheNoStoreMaxAgeHandler, self.CacheNoTransformHandler, self.DownloadHandler, self.DownloadFinishHandler, self.EchoHeader, self.EchoHeaderOverride, self.EchoAllHandler, self.FileHandler, self.RealFileWithCommonHeaderHandler, self.RealBZ2FileWithCommonHeaderHandler, self.SetCookieHandler, self.AuthBasicHandler, self.AuthDigestHandler, self.SlowServerHandler, self.ContentTypeHandler, self.ServerRedirectHandler, self.ClientRedirectHandler, self.ChromiumSyncTimeHandler, self.MultipartHandler, self.DefaultResponseHandler] self._post_handlers = [ self.WriteFile, self.EchoTitleHandler, self.EchoAllHandler, self.ChromiumSyncCommandHandler, self.EchoHandler] + self._get_handlers self._put_handlers = [ self.WriteFile, self.EchoTitleHandler, self.EchoAllHandler, self.EchoHandler] + self._get_handlers self._mime_types = { 'gif': 'image/gif', 'jpeg' : 'image/jpeg', 'jpg' : 'image/jpeg', 'xml' : 'text/xml' } self._default_mime_type = 'text/html' BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, socket_server) # Class variable; shared across requests. _sync_handler = chromiumsync.TestServer() def _ShouldHandleRequest(self, handler_name): """Determines if the path can be handled by the handler. We consider a handler valid if the path begins with the handler name. It can optionally be followed by "?*", "/*". """ pattern = re.compile('%s($|\?|/).*' % handler_name) return pattern.match(self.path) def GetMIMETypeFromName(self, file_name): """Returns the mime type for the specified file_name. So far it only looks at the file extension.""" (shortname, extension) = os.path.splitext(file_name) if len(extension) == 0: # no extension. return self._default_mime_type # extension starts with a dot, so we need to remove it return self._mime_types.get(extension[1:], self._default_mime_type) def KillHandler(self): """This request handler kills the server, for use when we're done" with the a particular test.""" if (self.path.find("kill") < 0): return False self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-Control', 'max-age=0') self.end_headers() if options.never_die: self.wfile.write('I cannot die!! BWAHAHA') else: self.wfile.write('Goodbye cruel world!') self.server.stop = True return True def NoCacheMaxAgeTimeHandler(self): """This request handler yields a page with the title set to the current system time, and no caching requested.""" if not self._ShouldHandleRequest("/nocachetime/maxage"): return False self.send_response(200) self.send_header('Cache-Control', 'max-age=0') self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') if self.command == 'POST' or self.command == 'PUT': length = int(self.headers.getheader('content-length')) qs = self.rfile.read(length) params = cgi.parse_qs(qs, keep_blank_values=1) for param in params: self.wfile.write('%s=%s\n' % (param, params[param][0])) self.wfile.write('') self.wfile.write('
%s' % self.headers) self.wfile.write('') return True def DownloadHandler(self): """This handler sends a downloadable file with or without reporting the size (6K).""" if self.path.startswith("/download-unknown-size"): send_length = False elif self.path.startswith("/download-known-size"): send_length = True else: return False # # The test which uses this functionality is attempting to send # small chunks of data to the client. Use a fairly large buffer # so that we'll fill chrome's IO buffer enough to force it to # actually write the data. # See also the comments in the client-side of this test in # download_uitest.cc # size_chunk1 = 35*1024 size_chunk2 = 10*1024 self.send_response(200) self.send_header('Content-type', 'application/octet-stream') self.send_header('Cache-Control', 'max-age=0') if send_length: self.send_header('Content-Length', size_chunk1 + size_chunk2) self.end_headers() # First chunk of data: self.wfile.write("*" * size_chunk1) self.wfile.flush() # handle requests until one of them clears this flag. self.server.waitForDownload = True while self.server.waitForDownload: self.server.handle_request() # Second chunk of data: self.wfile.write("*" * size_chunk2) return True def DownloadFinishHandler(self): """This handler just tells the server to finish the current download.""" if not self._ShouldHandleRequest("/download-finish"): return False self.server.waitForDownload = False self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-Control', 'max-age=0') self.end_headers() return True def FileHandler(self): """This handler sends the contents of the requested file. Wow, it's like a real webserver!""" prefix = self.server.file_root_url if not self.path.startswith(prefix): return False # Consume a request body if present. if self.command == 'POST' or self.command == 'PUT' : self.rfile.read(int(self.headers.getheader('content-length'))) file = self.path[len(prefix):] if file.find('?') > -1: # Ignore the query parameters entirely. url, querystring = file.split('?') else: url = file entries = url.split('/') path = os.path.join(self.server.data_dir, *entries) if os.path.isdir(path): path = os.path.join(path, 'index.html') if not os.path.isfile(path): print "File not found " + file + " full path:" + path self.send_error(404) return True f = open(path, "rb") data = f.read() f.close() # If file.mock-http-headers exists, it contains the headers we # should send. Read them in and parse them. headers_path = path + '.mock-http-headers' if os.path.isfile(headers_path): f = open(headers_path, "r") # "HTTP/1.1 200 OK" response = f.readline() status_code = re.findall('HTTP/\d+.\d+ (\d+)', response)[0] self.send_response(int(status_code)) for line in f: header_values = re.findall('(\S+):\s*(.*)', line) if len(header_values) > 0: # "name: value" name, value = header_values[0] self.send_header(name, value) f.close() else: # Could be more generic once we support mime-type sniffing, but for # now we need to set it explicitly. self.send_response(200) self.send_header('Content-type', self.GetMIMETypeFromName(file)) self.send_header('Content-Length', len(data)) self.end_headers() self.wfile.write(data) return True def RealFileWithCommonHeaderHandler(self): """This handler sends the contents of the requested file without the pseudo http head!""" prefix='/realfiles/' if not self.path.startswith(prefix): return False file = self.path[len(prefix):] path = os.path.join(self.server.data_dir, file) try: f = open(path, "rb") data = f.read() f.close() # just simply set the MIME as octal stream self.send_response(200) self.send_header('Content-type', 'application/octet-stream') self.end_headers() self.wfile.write(data) except: self.send_error(404) return True def RealBZ2FileWithCommonHeaderHandler(self): """This handler sends the bzip2 contents of the requested file with corresponding Content-Encoding field in http head!""" prefix='/realbz2files/' if not self.path.startswith(prefix): return False parts = self.path.split('?') file = parts[0][len(prefix):] path = os.path.join(self.server.data_dir, file) + '.bz2' if len(parts) > 1: options = parts[1] else: options = '' try: self.send_response(200) accept_encoding = self.headers.get("Accept-Encoding") if accept_encoding.find("bzip2") != -1: f = open(path, "rb") data = f.read() f.close() self.send_header('Content-Encoding', 'bzip2') self.send_header('Content-type', 'application/x-bzip2') self.end_headers() if options == 'incremental-header': self.wfile.write(data[:1]) self.wfile.flush() time.sleep(1.0) self.wfile.write(data[1:]) else: self.wfile.write(data) else: """client do not support bzip2 format, send pseudo content """ self.send_header('Content-type', 'text/html; charset=ISO-8859-1') self.end_headers() self.wfile.write("you do not support bzip2 encoding") except: self.send_error(404) return True def SetCookieHandler(self): """This handler just sets a cookie, for testing cookie handling.""" if not self._ShouldHandleRequest("/set-cookie"): return False query_char = self.path.find('?') if query_char != -1: cookie_values = self.path[query_char + 1:].split('&') else: cookie_values = ("",) self.send_response(200) self.send_header('Content-type', 'text/html') for cookie_value in cookie_values: self.send_header('Set-Cookie', '%s' % cookie_value) self.end_headers() for cookie_value in cookie_values: self.wfile.write('%s' % cookie_value) return True def AuthBasicHandler(self): """This handler tests 'Basic' authentication. It just sends a page with title 'user/pass' if you succeed.""" if not self._ShouldHandleRequest("/auth-basic"): return False username = userpass = password = b64str = "" set_cookie_if_challenged = self.path.find('?set-cookie-if-challenged') > 0 auth = self.headers.getheader('authorization') try: if not auth: raise Exception('no auth') b64str = re.findall(r'Basic (\S+)', auth)[0] userpass = base64.b64decode(b64str) username, password = re.findall(r'([^:]+):(\S+)', userpass)[0] if password != 'secret': raise Exception('wrong password') except Exception, e: # Authentication failed. self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm="testrealm"') self.send_header('Content-type', 'text/html') if set_cookie_if_challenged: self.send_header('Set-Cookie', 'got_challenged=true') self.end_headers() self.wfile.write('') self.wfile.write('
' % auth) self.wfile.write('b64str=%s
' % b64str) self.wfile.write('username: %s
' % username) self.wfile.write('userpass: %s
' % userpass) self.wfile.write('password: %s
' % password)
self.wfile.write('You sent:
%s
' % self.headers) self.wfile.write('') return True # Authentication successful. (Return a cachable response to allow for # testing cached pages that require authentication.) if_none_match = self.headers.getheader('if-none-match') if if_none_match == "abc": self.send_response(304) self.end_headers() else: self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-control', 'max-age=60000') self.send_header('Etag', 'abc') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth)
self.wfile.write('You sent:
%s
' % self.headers) self.wfile.write('') return True def GetNonce(self, force_reset=False): """Returns a nonce that's stable per request path for the server's lifetime. This is a fake implementation. A real implementation would only use a given nonce a single time (hence the name n-once). However, for the purposes of unittesting, we don't care about the security of the nonce. Args: force_reset: Iff set, the nonce will be changed. Useful for testing the "stale" response. """ if force_reset or not self.server.nonce_time: self.server.nonce_time = time.time() return _new_md5('privatekey%s%d' % (self.path, self.server.nonce_time)).hexdigest() def AuthDigestHandler(self): """This handler tests 'Digest' authentication. It just sends a page with title 'user/pass' if you succeed. A stale response is sent iff "stale" is present in the request path. """ if not self._ShouldHandleRequest("/auth-digest"): return False stale = 'stale' in self.path nonce = self.GetNonce(force_reset=stale) opaque = _new_md5('opaque').hexdigest() password = 'secret' realm = 'testrealm' auth = self.headers.getheader('authorization') pairs = {} try: if not auth: raise Exception('no auth') if not auth.startswith('Digest'): raise Exception('not digest') # Pull out all the name="value" pairs as a dictionary. pairs = dict(re.findall(r'(\b[^ ,=]+)="?([^",]+)"?', auth)) # Make sure it's all valid. if pairs['nonce'] != nonce: raise Exception('wrong nonce') if pairs['opaque'] != opaque: raise Exception('wrong opaque') # Check the 'response' value and make sure it matches our magic hash. # See http://www.ietf.org/rfc/rfc2617.txt hash_a1 = _new_md5( ':'.join([pairs['username'], realm, password])).hexdigest() hash_a2 = _new_md5(':'.join([self.command, pairs['uri']])).hexdigest() if 'qop' in pairs and 'nc' in pairs and 'cnonce' in pairs: response = _new_md5(':'.join([hash_a1, nonce, pairs['nc'], pairs['cnonce'], pairs['qop'], hash_a2])).hexdigest() else: response = _new_md5(':'.join([hash_a1, nonce, hash_a2])).hexdigest() if pairs['response'] != response: raise Exception('wrong password') except Exception, e: # Authentication failed. self.send_response(401) hdr = ('Digest ' 'realm="%s", ' 'domain="/", ' 'qop="auth", ' 'algorithm=MD5, ' 'nonce="%s", ' 'opaque="%s"') % (realm, nonce, opaque) if stale: hdr += ', stale="TRUE"' self.send_header('WWW-Authenticate', hdr) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth) self.wfile.write('pairs=%s
' % pairs)
self.wfile.write('You sent:
%s
' % self.headers)
self.wfile.write('We are replying:
%s
' % hdr) self.wfile.write('') return True # Authentication successful. self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth) self.wfile.write('pairs=%s
' % pairs) self.wfile.write('') return True def SlowServerHandler(self): """Wait for the user suggested time before responding. The syntax is /slow?0.5 to wait for half a second.""" if not self._ShouldHandleRequest("/slow"): return False query_char = self.path.find('?') wait_sec = 1.0 if query_char >= 0: try: wait_sec = int(self.path[query_char + 1:]) except ValueError: pass time.sleep(wait_sec) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("waited %d seconds" % wait_sec) return True def ContentTypeHandler(self): """Returns a string of html with the given content type. E.g., /contenttype?text/css returns an html file with the Content-Type header set to text/css.""" if not self._ShouldHandleRequest("/contenttype"): return False query_char = self.path.find('?') content_type = self.path[query_char + 1:].strip() if not content_type: content_type = 'text/html' self.send_response(200) self.send_header('Content-Type', content_type) self.end_headers() self.wfile.write("\n
\nHTML text
\n\n\n"); return True def ServerRedirectHandler(self): """Sends a server redirect to the given URL. The syntax is '/server-redirect?http://foo.bar/asdf' to redirect to 'http://foo.bar/asdf'""" test_name = "/server-redirect" if not self._ShouldHandleRequest(test_name): return False query_char = self.path.find('?') if query_char < 0 or len(self.path) <= query_char + 1: self.sendRedirectHelp(test_name) return True dest = self.path[query_char + 1:] self.send_response(301) # moved permanently self.send_header('Location', dest) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('') self.wfile.write('Redirecting to %s' % dest) return True def ClientRedirectHandler(self): """Sends a client redirect to the given URL. The syntax is '/client-redirect?http://foo.bar/asdf' to redirect to 'http://foo.bar/asdf'""" test_name = "/client-redirect" if not self._ShouldHandleRequest(test_name): return False query_char = self.path.find('?'); if query_char < 0 or len(self.path) <= query_char + 1: self.sendRedirectHelp(test_name) return True dest = self.path[query_char + 1:] self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('') self.wfile.write('' % dest) self.wfile.write('Redirecting to %s' % dest) return True def ChromiumSyncTimeHandler(self): """Handle Chromium sync .../time requests. The syncer sometimes checks server reachability by examining /time. """ test_name = "/chromiumsync/time" if not self._ShouldHandleRequest(test_name): return False self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() return True def ChromiumSyncCommandHandler(self): """Handle a chromiumsync command arriving via http. This covers all sync protocol commands: authentication, getupdates, and commit. """ test_name = "/chromiumsync/command" if not self._ShouldHandleRequest(test_name): return False length = int(self.headers.getheader('content-length')) raw_request = self.rfile.read(length) http_response, raw_reply = self._sync_handler.HandleCommand(raw_request) self.send_response(http_response) self.end_headers() self.wfile.write(raw_reply) return True def MultipartHandler(self): """Send a multipart response (10 text/html pages).""" test_name = "/multipart" if not self._ShouldHandleRequest(test_name): return False num_frames = 10 bound = '12345' self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=' + bound) self.end_headers() for i in xrange(num_frames): self.wfile.write('--' + bound + '\r\n') self.wfile.write('Content-type: text/html\r\n\r\n') self.wfile.write('%s?http://dest...' % redirect_name) self.wfile.write('') def MakeDumpDir(data_dir): """Create directory named 'dump' where uploaded data via HTTP POST/PUT requests will be stored. If the directory already exists all files and subdirectories will be deleted.""" dump_dir = os.path.join(data_dir, 'dump'); if os.path.isdir(dump_dir): shutil.rmtree(dump_dir) os.mkdir(dump_dir) def MakeDataDir(): if options.data_dir: if not os.path.isdir(options.data_dir): print 'specified data dir not found: ' + options.data_dir + ' exiting...' return None my_data_dir = options.data_dir else: # Create the default path to our data dir, relative to the exe dir. my_data_dir = os.path.dirname(sys.argv[0]) my_data_dir = os.path.join(my_data_dir, "..", "..", "..", "..", "test", "data") #TODO(ibrar): Must use Find* funtion defined in google\tools #i.e my_data_dir = FindUpward(my_data_dir, "test", "data") return my_data_dir def TryKillingOldServer(port): # Note that an HTTP /kill request to the FTP server has the effect of # killing it. for protocol in ["http", "https"]: try: urllib2.urlopen("%s://localhost:%d/kill" % (protocol, port)).read() print "Killed old server instance on port %d (via %s)" % (port, protocol) except urllib2.URLError: # Common case, indicates no server running. pass def main(options, args): # redirect output to a log file so it doesn't spam the unit test output logfile = open('testserver.log', 'w') sys.stderr = sys.stdout = logfile port = options.port # Try to free up the port if there's an orphaned old instance. TryKillingOldServer(port) if options.server_type == SERVER_HTTP: if options.cert: # let's make sure the cert file exists. if not os.path.isfile(options.cert): print 'specified cert file not found: ' + options.cert + ' exiting...' return if options.forking: server_class = ForkingHTTPSServer else: server_class = HTTPSServer server = server_class(('127.0.0.1', port), TestPageHandler, options.cert) print 'HTTPS server started on port %d...' % port else: if options.forking: server_class = ForkingHTTPServer else: server_class = StoppableHTTPServer server = server_class(('127.0.0.1', port), TestPageHandler) print 'HTTP server started on port %d...' % port server.data_dir = MakeDataDir() server.file_root_url = options.file_root_url MakeDumpDir(server.data_dir) # means FTP Server else: my_data_dir = MakeDataDir() def line_logger(msg): if (msg.find("kill") >= 0): server.stop = True print 'shutting down server' sys.exit(0) # Instantiate a dummy authorizer for managing 'virtual' users authorizer = pyftpdlib.ftpserver.DummyAuthorizer() # Define a new user having full r/w permissions and a read-only # anonymous user authorizer.add_user('chrome', 'chrome', my_data_dir, perm='elradfmw') authorizer.add_anonymous(my_data_dir) # Instantiate FTP handler class ftp_handler = pyftpdlib.ftpserver.FTPHandler ftp_handler.authorizer = authorizer pyftpdlib.ftpserver.logline = line_logger # Define a customized banner (string returned when client connects) ftp_handler.banner = ("pyftpdlib %s based ftpd ready." % pyftpdlib.ftpserver.__ver__) # Instantiate FTP server class and listen to 127.0.0.1:port address = ('127.0.0.1', port) server = pyftpdlib.ftpserver.FTPServer(address, ftp_handler) print 'FTP server started on port %d...' % port try: server.serve_forever() except KeyboardInterrupt: print 'shutting down server' server.stop = True if __name__ == '__main__': option_parser = optparse.OptionParser() option_parser.add_option("-f", '--ftp', action='store_const', const=SERVER_FTP, default=SERVER_HTTP, dest='server_type', help='FTP or HTTP server: default is HTTP.') option_parser.add_option('--forking', action='store_true', default=False, dest='forking', help='Serve each request in a separate process.') option_parser.add_option('', '--port', default='8888', type='int', help='Port used by the server.') option_parser.add_option('', '--data-dir', dest='data_dir', help='Directory from which to read the files.') option_parser.add_option('', '--https', dest='cert', help='Specify that https should be used, specify ' 'the path to the cert containing the private key ' 'the server should use.') option_parser.add_option('', '--file-root-url', default='/files/', help='Specify a root URL for files served.') option_parser.add_option('', '--never-die', default=False, action="store_true", help='Prevent the server from dying when visiting ' 'a /kill URL. Useful for manually running some ' 'tests.') options, args = option_parser.parse_args() sys.exit(main(options, args))