#!/usr/bin/python2.4 # Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This is a simple HTTP server used for testing Chrome. It supports several test URLs, as specified by the handlers in TestPageHandler. It defaults to living on localhost:8888. It can use https if you specify the flag --https=CERT where CERT is the path to a pem file containing the certificate and private key that should be used. To shut it down properly, visit localhost:8888/kill. """ import base64 import BaseHTTPServer import cgi import optparse import os import re import shutil import SocketServer import sys import time import tlslite import tlslite.api import pyftpdlib.ftpserver try: import hashlib _new_md5 = hashlib.md5 except ImportError: import md5 _new_md5 = md5.new SERVER_HTTP = 0 SERVER_FTP = 1 debug_output = sys.stderr def debug(str): debug_output.write(str + "\n") debug_output.flush() class StoppableHTTPServer(BaseHTTPServer.HTTPServer): """This is a specialization of of BaseHTTPServer to allow it to be exited cleanly (by setting its "stop" member to True).""" def serve_forever(self): self.stop = False self.nonce_time = None while not self.stop: self.handle_request() self.socket.close() class HTTPSServer(tlslite.api.TLSSocketServerMixIn, StoppableHTTPServer): """This is a specialization of StoppableHTTPerver that add https support.""" def __init__(self, server_address, request_hander_class, cert_path): s = open(cert_path).read() x509 = tlslite.api.X509() x509.parse(s) self.cert_chain = tlslite.api.X509CertChain([x509]) s = open(cert_path).read() self.private_key = tlslite.api.parsePEMKey(s, private=True) self.session_cache = tlslite.api.SessionCache() StoppableHTTPServer.__init__(self, server_address, request_hander_class) def handshake(self, tlsConnection): """Creates the SSL connection.""" try: tlsConnection.handshakeServer(certChain=self.cert_chain, privateKey=self.private_key, sessionCache=self.session_cache) tlsConnection.ignoreAbruptClose = True return True except tlslite.api.TLSError, error: print "Handshake failure:", str(error) return False class ForkingHTTPServer(SocketServer.ForkingMixIn, StoppableHTTPServer): """This is a specialization of of StoppableHTTPServer which serves each request in a separate process""" pass class ForkingHTTPSServer(SocketServer.ForkingMixIn, HTTPSServer): """This is a specialization of of HTTPSServer which serves each request in a separate process""" pass class TestPageHandler(BaseHTTPServer.BaseHTTPRequestHandler): def __init__(self, request, client_address, socket_server): self._connect_handlers = [ self.RedirectConnectHandler, self.ServerAuthConnectHandler, self.DefaultConnectResponseHandler] self._get_handlers = [ self.KillHandler, self.NoCacheMaxAgeTimeHandler, self.NoCacheTimeHandler, self.CacheTimeHandler, self.CacheExpiresHandler, self.CacheProxyRevalidateHandler, self.CachePrivateHandler, self.CachePublicHandler, self.CacheSMaxAgeHandler, self.CacheMustRevalidateHandler, self.CacheMustRevalidateMaxAgeHandler, self.CacheNoStoreHandler, self.CacheNoStoreMaxAgeHandler, self.CacheNoTransformHandler, self.DownloadHandler, self.DownloadFinishHandler, self.EchoHeader, self.EchoHeaderOverride, self.EchoAllHandler, self.FileHandler, self.RealFileWithCommonHeaderHandler, self.RealBZ2FileWithCommonHeaderHandler, self.SetCookieHandler, self.AuthBasicHandler, self.AuthDigestHandler, self.SlowServerHandler, self.ContentTypeHandler, self.ServerRedirectHandler, self.ClientRedirectHandler, self.MultipartHandler, self.DefaultResponseHandler] self._post_handlers = [ self.WriteFile, self.EchoTitleHandler, self.EchoAllHandler, self.EchoHandler] + self._get_handlers self._put_handlers = [ self.WriteFile, self.EchoTitleHandler, self.EchoAllHandler, self.EchoHandler] + self._get_handlers self._mime_types = { 'gif': 'image/gif', 'jpeg' : 'image/jpeg', 'jpg' : 'image/jpeg', 'xml' : 'text/xml' } self._default_mime_type = 'text/html' BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, socket_server) def _ShouldHandleRequest(self, handler_name): """Determines if the path can be handled by the handler. We consider a handler valid if the path begins with the handler name. It can optionally be followed by "?*", "/*". """ pattern = re.compile('%s($|\?|/).*' % handler_name) return pattern.match(self.path) def GetMIMETypeFromName(self, file_name): """Returns the mime type for the specified file_name. So far it only looks at the file extension.""" (shortname, extension) = os.path.splitext(file_name) if len(extension) == 0: # no extension. return self._default_mime_type # extension starts with a dot, so we need to remove it return self._mime_types.get(extension[1:], self._default_mime_type) def KillHandler(self): """This request handler kills the server, for use when we're done" with the a particular test.""" if (self.path.find("kill") < 0): return False self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-Control', 'max-age=0') self.end_headers() if options.never_die: self.wfile.write('I cannot die!! BWAHAHA') else: self.wfile.write('Goodbye cruel world!') self.server.stop = True return True def NoCacheMaxAgeTimeHandler(self): """This request handler yields a page with the title set to the current system time, and no caching requested.""" if not self._ShouldHandleRequest("/nocachetime/maxage"): return False self.send_response(200) self.send_header('Cache-Control', 'max-age=0') self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') if self.command == 'POST' or self.command == 'PUT': length = int(self.headers.getheader('content-length')) qs = self.rfile.read(length) params = cgi.parse_qs(qs, keep_blank_values=1) for param in params: self.wfile.write('%s=%s\n' % (param, params[param][0])) self.wfile.write('') self.wfile.write('
%s' % self.headers) self.wfile.write('') return True def DownloadHandler(self): """This handler sends a downloadable file with or without reporting the size (6K).""" if self.path.startswith("/download-unknown-size"): send_length = False elif self.path.startswith("/download-known-size"): send_length = True else: return False # # The test which uses this functionality is attempting to send # small chunks of data to the client. Use a fairly large buffer # so that we'll fill chrome's IO buffer enough to force it to # actually write the data. # See also the comments in the client-side of this test in # download_uitest.cc # size_chunk1 = 35*1024 size_chunk2 = 10*1024 self.send_response(200) self.send_header('Content-type', 'application/octet-stream') self.send_header('Cache-Control', 'max-age=0') if send_length: self.send_header('Content-Length', size_chunk1 + size_chunk2) self.end_headers() # First chunk of data: self.wfile.write("*" * size_chunk1) self.wfile.flush() # handle requests until one of them clears this flag. self.server.waitForDownload = True while self.server.waitForDownload: self.server.handle_request() # Second chunk of data: self.wfile.write("*" * size_chunk2) return True def DownloadFinishHandler(self): """This handler just tells the server to finish the current download.""" if not self._ShouldHandleRequest("/download-finish"): return False self.server.waitForDownload = False self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-Control', 'max-age=0') self.end_headers() return True def FileHandler(self): """This handler sends the contents of the requested file. Wow, it's like a real webserver!""" prefix = self.server.file_root_url if not self.path.startswith(prefix): return False # Consume a request body if present. if self.command == 'POST' or self.command == 'PUT' : self.rfile.read(int(self.headers.getheader('content-length'))) file = self.path[len(prefix):] if file.find('?') > -1: # Ignore the query parameters entirely. url, querystring = file.split('?') else: url = file entries = url.split('/') path = os.path.join(self.server.data_dir, *entries) if os.path.isdir(path): path = os.path.join(path, 'index.html') if not os.path.isfile(path): print "File not found " + file + " full path:" + path self.send_error(404) return True f = open(path, "rb") data = f.read() f.close() # If file.mock-http-headers exists, it contains the headers we # should send. Read them in and parse them. headers_path = path + '.mock-http-headers' if os.path.isfile(headers_path): f = open(headers_path, "r") # "HTTP/1.1 200 OK" response = f.readline() status_code = re.findall('HTTP/\d+.\d+ (\d+)', response)[0] self.send_response(int(status_code)) for line in f: header_values = re.findall('(\S+):\s*(.*)', line) if len(header_values) > 0: # "name: value" name, value = header_values[0] self.send_header(name, value) f.close() else: # Could be more generic once we support mime-type sniffing, but for # now we need to set it explicitly. self.send_response(200) self.send_header('Content-type', self.GetMIMETypeFromName(file)) self.send_header('Content-Length', len(data)) self.end_headers() self.wfile.write(data) return True def RealFileWithCommonHeaderHandler(self): """This handler sends the contents of the requested file without the pseudo http head!""" prefix='/realfiles/' if not self.path.startswith(prefix): return False file = self.path[len(prefix):] path = os.path.join(self.server.data_dir, file) try: f = open(path, "rb") data = f.read() f.close() # just simply set the MIME as octal stream self.send_response(200) self.send_header('Content-type', 'application/octet-stream') self.end_headers() self.wfile.write(data) except: self.send_error(404) return True def RealBZ2FileWithCommonHeaderHandler(self): """This handler sends the bzip2 contents of the requested file with corresponding Content-Encoding field in http head!""" prefix='/realbz2files/' if not self.path.startswith(prefix): return False parts = self.path.split('?') file = parts[0][len(prefix):] path = os.path.join(self.server.data_dir, file) + '.bz2' if len(parts) > 1: options = parts[1] else: options = '' try: self.send_response(200) accept_encoding = self.headers.get("Accept-Encoding") if accept_encoding.find("bzip2") != -1: f = open(path, "rb") data = f.read() f.close() self.send_header('Content-Encoding', 'bzip2') self.send_header('Content-type', 'application/x-bzip2') self.end_headers() if options == 'incremental-header': self.wfile.write(data[:1]) self.wfile.flush() time.sleep(1.0) self.wfile.write(data[1:]) else: self.wfile.write(data) else: """client do not support bzip2 format, send pseudo content """ self.send_header('Content-type', 'text/html; charset=ISO-8859-1') self.end_headers() self.wfile.write("you do not support bzip2 encoding") except: self.send_error(404) return True def SetCookieHandler(self): """This handler just sets a cookie, for testing cookie handling.""" if not self._ShouldHandleRequest("/set-cookie"): return False query_char = self.path.find('?') if query_char != -1: cookie_values = self.path[query_char + 1:].split('&') else: cookie_values = ("",) self.send_response(200) self.send_header('Content-type', 'text/html') for cookie_value in cookie_values: self.send_header('Set-Cookie', '%s' % cookie_value) self.end_headers() for cookie_value in cookie_values: self.wfile.write('%s' % cookie_value) return True def AuthBasicHandler(self): """This handler tests 'Basic' authentication. It just sends a page with title 'user/pass' if you succeed.""" if not self._ShouldHandleRequest("/auth-basic"): return False username = userpass = password = b64str = "" set_cookie_if_challenged = self.path.find('?set-cookie-if-challenged') > 0 auth = self.headers.getheader('authorization') try: if not auth: raise Exception('no auth') b64str = re.findall(r'Basic (\S+)', auth)[0] userpass = base64.b64decode(b64str) username, password = re.findall(r'([^:]+):(\S+)', userpass)[0] if password != 'secret': raise Exception('wrong password') except Exception, e: # Authentication failed. self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm="testrealm"') self.send_header('Content-type', 'text/html') if set_cookie_if_challenged: self.send_header('Set-Cookie', 'got_challenged=true') self.end_headers() self.wfile.write('') self.wfile.write('
' % auth) self.wfile.write('b64str=%s
' % b64str) self.wfile.write('username: %s
' % username) self.wfile.write('userpass: %s
' % userpass) self.wfile.write('password: %s
' % password)
self.wfile.write('You sent:
%s
' % self.headers) self.wfile.write('') return True # Authentication successful. (Return a cachable response to allow for # testing cached pages that require authentication.) if_none_match = self.headers.getheader('if-none-match') if if_none_match == "abc": self.send_response(304) self.end_headers() else: self.send_response(200) self.send_header('Content-type', 'text/html') self.send_header('Cache-control', 'max-age=60000') self.send_header('Etag', 'abc') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth)
self.wfile.write('You sent:
%s
' % self.headers) self.wfile.write('') return True def GetNonce(self, force_reset=False): """Returns a nonce that's stable per request path for the server's lifetime. This is a fake implementation. A real implementation would only use a given nonce a single time (hence the name n-once). However, for the purposes of unittesting, we don't care about the security of the nonce. Args: force_reset: Iff set, the nonce will be changed. Useful for testing the "stale" response. """ if force_reset or not self.server.nonce_time: self.server.nonce_time = time.time() return _new_md5('privatekey%s%d' % (self.path, self.server.nonce_time)).hexdigest() def AuthDigestHandler(self): """This handler tests 'Digest' authentication. It just sends a page with title 'user/pass' if you succeed. A stale response is sent iff "stale" is present in the request path. """ if not self._ShouldHandleRequest("/auth-digest"): return False stale = 'stale' in self.path nonce = self.GetNonce(force_reset=stale) opaque = _new_md5('opaque').hexdigest() password = 'secret' realm = 'testrealm' auth = self.headers.getheader('authorization') pairs = {} try: if not auth: raise Exception('no auth') if not auth.startswith('Digest'): raise Exception('not digest') # Pull out all the name="value" pairs as a dictionary. pairs = dict(re.findall(r'(\b[^ ,=]+)="?([^",]+)"?', auth)) # Make sure it's all valid. if pairs['nonce'] != nonce: raise Exception('wrong nonce') if pairs['opaque'] != opaque: raise Exception('wrong opaque') # Check the 'response' value and make sure it matches our magic hash. # See http://www.ietf.org/rfc/rfc2617.txt hash_a1 = _new_md5( ':'.join([pairs['username'], realm, password])).hexdigest() hash_a2 = _new_md5(':'.join([self.command, pairs['uri']])).hexdigest() if 'qop' in pairs and 'nc' in pairs and 'cnonce' in pairs: response = _new_md5(':'.join([hash_a1, nonce, pairs['nc'], pairs['cnonce'], pairs['qop'], hash_a2])).hexdigest() else: response = _new_md5(':'.join([hash_a1, nonce, hash_a2])).hexdigest() if pairs['response'] != response: raise Exception('wrong password') except Exception, e: # Authentication failed. self.send_response(401) hdr = ('Digest ' 'realm="%s", ' 'domain="/", ' 'qop="auth", ' 'algorithm=MD5, ' 'nonce="%s", ' 'opaque="%s"') % (realm, nonce, opaque) if stale: hdr += ', stale="TRUE"' self.send_header('WWW-Authenticate', hdr) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth) self.wfile.write('pairs=%s
' % pairs)
self.wfile.write('You sent:
%s
' % self.headers)
self.wfile.write('We are replying:
%s
' % hdr) self.wfile.write('') return True # Authentication successful. self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('
') self.wfile.write('' % auth) self.wfile.write('pairs=%s
' % pairs) self.wfile.write('') return True def SlowServerHandler(self): """Wait for the user suggested time before responding. The syntax is /slow?0.5 to wait for half a second.""" if not self._ShouldHandleRequest("/slow"): return False query_char = self.path.find('?') wait_sec = 1.0 if query_char >= 0: try: wait_sec = int(self.path[query_char + 1:]) except ValueError: pass time.sleep(wait_sec) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write("waited %d seconds" % wait_sec) return True def ContentTypeHandler(self): """Returns a string of html with the given content type. E.g., /contenttype?text/css returns an html file with the Content-Type header set to text/css.""" if not self._ShouldHandleRequest("/contenttype"): return False query_char = self.path.find('?') content_type = self.path[query_char + 1:].strip() if not content_type: content_type = 'text/html' self.send_response(200) self.send_header('Content-Type', content_type) self.end_headers() self.wfile.write("\n
\nHTML text
\n\n\n"); return True def ServerRedirectHandler(self): """Sends a server redirect to the given URL. The syntax is '/server-redirect?http://foo.bar/asdf' to redirect to 'http://foo.bar/asdf'""" test_name = "/server-redirect" if not self._ShouldHandleRequest(test_name): return False query_char = self.path.find('?') if query_char < 0 or len(self.path) <= query_char + 1: self.sendRedirectHelp(test_name) return True dest = self.path[query_char + 1:] self.send_response(301) # moved permanently self.send_header('Location', dest) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('') self.wfile.write('Redirecting to %s' % dest) return True def ClientRedirectHandler(self): """Sends a client redirect to the given URL. The syntax is '/client-redirect?http://foo.bar/asdf' to redirect to 'http://foo.bar/asdf'""" test_name = "/client-redirect" if not self._ShouldHandleRequest(test_name): return False query_char = self.path.find('?'); if query_char < 0 or len(self.path) <= query_char + 1: self.sendRedirectHelp(test_name) return True dest = self.path[query_char + 1:] self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('') self.wfile.write('' % dest) self.wfile.write('Redirecting to %s' % dest) return True def MultipartHandler(self): """Send a multipart response (10 text/html pages).""" test_name = "/multipart" if not self._ShouldHandleRequest(test_name): return False num_frames = 10 bound = '12345' self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=' + bound) self.end_headers() for i in xrange(num_frames): self.wfile.write('--' + bound + '\r\n') self.wfile.write('Content-type: text/html\r\n\r\n') self.wfile.write('%s?http://dest...' % redirect_name) self.wfile.write('') def MakeDumpDir(data_dir): """Create directory named 'dump' where uploaded data via HTTP POST/PUT requests will be stored. If the directory already exists all files and subdirectories will be deleted.""" dump_dir = os.path.join(data_dir, 'dump'); if os.path.isdir(dump_dir): shutil.rmtree(dump_dir) os.mkdir(dump_dir) def MakeDataDir(): if options.data_dir: if not os.path.isdir(options.data_dir): print 'specified data dir not found: ' + options.data_dir + ' exiting...' return None my_data_dir = options.data_dir else: # Create the default path to our data dir, relative to the exe dir. my_data_dir = os.path.dirname(sys.argv[0]) my_data_dir = os.path.join(my_data_dir, "..", "..", "..", "..", "test", "data") #TODO(ibrar): Must use Find* funtion defined in google\tools #i.e my_data_dir = FindUpward(my_data_dir, "test", "data") return my_data_dir def main(options, args): # redirect output to a log file so it doesn't spam the unit test output logfile = open('testserver.log', 'w') sys.stderr = sys.stdout = logfile port = options.port if options.server_type == SERVER_HTTP: if options.cert: # let's make sure the cert file exists. if not os.path.isfile(options.cert): print 'specified cert file not found: ' + options.cert + ' exiting...' return if options.forking: server_class = ForkingHTTPSServer else: server_class = HTTPSServer server = server_class(('127.0.0.1', port), TestPageHandler, options.cert) print 'HTTPS server started on port %d...' % port else: if options.forking: server_class = ForkingHTTPServer else: server_class = StoppableHTTPServer server = server_class(('127.0.0.1', port), TestPageHandler) print 'HTTP server started on port %d...' % port server.data_dir = MakeDataDir() server.file_root_url = options.file_root_url MakeDumpDir(server.data_dir) # means FTP Server else: my_data_dir = MakeDataDir() def line_logger(msg): if (msg.find("kill") >= 0): server.stop = True print 'shutting down server' sys.exit(0) # Instantiate a dummy authorizer for managing 'virtual' users authorizer = pyftpdlib.ftpserver.DummyAuthorizer() # Define a new user having full r/w permissions and a read-only # anonymous user authorizer.add_user('chrome', 'chrome', my_data_dir, perm='elradfmw') authorizer.add_anonymous(my_data_dir) # Instantiate FTP handler class ftp_handler = pyftpdlib.ftpserver.FTPHandler ftp_handler.authorizer = authorizer pyftpdlib.ftpserver.logline = line_logger # Define a customized banner (string returned when client connects) ftp_handler.banner = ("pyftpdlib %s based ftpd ready." % pyftpdlib.ftpserver.__ver__) # Instantiate FTP server class and listen to 127.0.0.1:port address = ('127.0.0.1', port) server = pyftpdlib.ftpserver.FTPServer(address, ftp_handler) print 'FTP server started on port %d...' % port try: server.serve_forever() except KeyboardInterrupt: print 'shutting down server' server.stop = True if __name__ == '__main__': option_parser = optparse.OptionParser() option_parser.add_option("-f", '--ftp', action='store_const', const=SERVER_FTP, default=SERVER_HTTP, dest='server_type', help='FTP or HTTP server: default is HTTP.') option_parser.add_option('--forking', action='store_true', default=False, dest='forking', help='Serve each request in a separate process.') option_parser.add_option('', '--port', default='8888', type='int', help='Port used by the server.') option_parser.add_option('', '--data-dir', dest='data_dir', help='Directory from which to read the files.') option_parser.add_option('', '--https', dest='cert', help='Specify that https should be used, specify ' 'the path to the cert containing the private key ' 'the server should use.') option_parser.add_option('', '--file-root-url', default='/files/', help='Specify a root URL for files served.') option_parser.add_option('', '--never-die', default=False, action="store_true", help='Prevent the server from dying when visiting ' 'a /kill URL. Useful for manually running some ' 'tests.') options, args = option_parser.parse_args() sys.exit(main(options, args))