#!/usr/bin/env python # Authors: # Trevor Perrin # Marcelo Fernandez - bugfix and NPN support # Martin von Loewis - python 3 port # # See the LICENSE file for legal information regarding use of this file. from __future__ import print_function import sys import os import os.path import socket import time import getopt try: import httplib from SocketServer import * from BaseHTTPServer import * from SimpleHTTPServer import * except ImportError: # Python 3.x from http import client as httplib from socketserver import * from http.server import * if __name__ != "__main__": raise "This must be run as a command, not used as a module!" from tlslite.api import * from tlslite import __version__ try: from tack.structures.Tack import Tack except ImportError: pass def printUsage(s=None): if s: print("ERROR: %s" % s) print("") print("Version: %s" % __version__) print("") print("RNG: %s" % prngName) print("") print("Modules:") if tackpyLoaded: print(" tackpy : Loaded") else: print(" tackpy : Not Loaded") if m2cryptoLoaded: print(" M2Crypto : Loaded") else: print(" M2Crypto : Not Loaded") if pycryptoLoaded: print(" pycrypto : Loaded") else: print(" pycrypto : Not Loaded") if gmpyLoaded: print(" GMPY : Loaded") else: print(" GMPY : Not Loaded") print("") print("""Commands: server [-k KEY] [-c CERT] [-t TACK] [-v VERIFIERDB] [-d DIR] [--reqcert] HOST:PORT client [-k KEY] [-c CERT] [-u USER] [-p PASS] HOST:PORT """) sys.exit(-1) def printError(s): """Print error message and exit""" sys.stderr.write("ERROR: %s\n" % s) sys.exit(-1) def handleArgs(argv, argString, flagsList=[]): # Convert to getopt argstring format: # Add ":" after each arg, ie "abc" -> "a:b:c:" getOptArgString = ":".join(argString) + ":" try: opts, argv = getopt.getopt(argv, getOptArgString, flagsList) except getopt.GetoptError as e: printError(e) # Default values if arg not present privateKey = None certChain = None username = None password = None tacks = None verifierDB = None reqCert = False directory = None for opt, arg in opts: if opt == "-k": s = open(arg, "rb").read() privateKey = parsePEMKey(s, private=True) elif opt == "-c": s = open(arg, "rb").read() x509 = X509() x509.parse(s) certChain = X509CertChain([x509]) elif opt == "-u": username = arg elif opt == "-p": password = arg elif opt == "-t": if tackpyLoaded: s = open(arg, "rU").read() tacks = Tack.createFromPemList(s) elif opt == "-v": verifierDB = VerifierDB(arg) verifierDB.open() elif opt == "-d": directory = arg elif opt == "--reqcert": reqCert = True else: assert(False) if not argv: printError("Missing address") if len(argv)>1: printError("Too many arguments") #Split address into hostname/port tuple address = argv[0] address = address.split(":") if len(address) != 2: raise SyntaxError("Must specify :") address = ( address[0], int(address[1]) ) # Populate the return list retList = [address] if "k" in argString: retList.append(privateKey) if "c" in argString: retList.append(certChain) if "u" in argString: retList.append(username) if "p" in argString: retList.append(password) if "t" in argString: retList.append(tacks) if "v" in argString: retList.append(verifierDB) if "d" in argString: retList.append(directory) if "reqcert" in flagsList: retList.append(reqCert) return retList def printGoodConnection(connection, seconds): print(" Handshake time: %.3f seconds" % seconds) print(" Version: %s" % connection.getVersionName()) print(" Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())) if connection.session.srpUsername: print(" Client SRP username: %s" % connection.session.srpUsername) if connection.session.clientCertChain: print(" Client X.509 SHA1 fingerprint: %s" % connection.session.clientCertChain.getFingerprint()) if connection.session.serverCertChain: print(" Server X.509 SHA1 fingerprint: %s" % connection.session.serverCertChain.getFingerprint()) if connection.session.serverName: print(" SNI: %s" % connection.session.serverName) if connection.session.tackExt: if connection.session.tackInHelloExt: emptyStr = "\n (via TLS Extension)" else: emptyStr = "\n (via TACK Certificate)" print(" TACK: %s" % emptyStr) print(str(connection.session.tackExt)) print(" Next-Protocol Negotiated: %s" % connection.next_proto) def clientCmd(argv): (address, privateKey, certChain, username, password) = \ handleArgs(argv, "kcup") if (certChain and not privateKey) or (not certChain and privateKey): raise SyntaxError("Must specify CERT and KEY together") if (username and not password) or (not username and password): raise SyntaxError("Must specify USER with PASS") if certChain and username: raise SyntaxError("Can use SRP or client cert for auth, not both") #Connect to server sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect(address) connection = TLSConnection(sock) settings = HandshakeSettings() settings.useExperimentalTackExtension = True try: start = time.clock() if username and password: connection.handshakeClientSRP(username, password, settings=settings, serverName=address[0]) else: connection.handshakeClientCert(certChain, privateKey, settings=settings, serverName=address[0]) stop = time.clock() print("Handshake success") except TLSLocalAlert as a: if a.description == AlertDescription.user_canceled: print(str(a)) else: raise sys.exit(-1) except TLSRemoteAlert as a: if a.description == AlertDescription.unknown_psk_identity: if username: print("Unknown username") else: raise elif a.description == AlertDescription.bad_record_mac: if username: print("Bad username or password") else: raise elif a.description == AlertDescription.handshake_failure: print("Unable to negotiate mutually acceptable parameters") else: raise sys.exit(-1) printGoodConnection(connection, stop-start) connection.close() def serverCmd(argv): (address, privateKey, certChain, tacks, verifierDB, directory, reqCert) = handleArgs(argv, "kctbvd", ["reqcert"]) if (certChain and not privateKey) or (not certChain and privateKey): raise SyntaxError("Must specify CERT and KEY together") if tacks and not certChain: raise SyntaxError("Must specify CERT with Tacks") print("I am an HTTPS test server, I will listen on %s:%d" % (address[0], address[1])) if directory: os.chdir(directory) print("Serving files from %s" % os.getcwd()) if certChain and privateKey: print("Using certificate and private key...") if verifierDB: print("Using verifier DB...") if tacks: print("Using Tacks...") ############# sessionCache = SessionCache() class MyHTTPServer(ThreadingMixIn, TLSSocketServerMixIn, HTTPServer): def handshake(self, connection): print("About to handshake...") activationFlags = 0 if tacks: if len(tacks) == 1: activationFlags = 1 elif len(tacks) == 2: activationFlags = 3 try: start = time.clock() settings = HandshakeSettings() settings.useExperimentalTackExtension=True connection.handshakeServer(certChain=certChain, privateKey=privateKey, verifierDB=verifierDB, tacks=tacks, activationFlags=activationFlags, sessionCache=sessionCache, settings=settings, nextProtos=[b"http/1.1"]) # As an example (does not work here): #nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) stop = time.clock() except TLSRemoteAlert as a: if a.description == AlertDescription.user_canceled: print(str(a)) return False else: raise except TLSLocalAlert as a: if a.description == AlertDescription.unknown_psk_identity: if username: print("Unknown username") return False else: raise elif a.description == AlertDescription.bad_record_mac: if username: print("Bad username or password") return False else: raise elif a.description == AlertDescription.handshake_failure: print("Unable to negotiate mutually acceptable parameters") return False else: raise connection.ignoreAbruptClose = True printGoodConnection(connection, stop-start) return True httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) httpd.serve_forever() if __name__ == '__main__': if len(sys.argv) < 2: printUsage("Missing command") elif sys.argv[1] == "client"[:len(sys.argv[1])]: clientCmd(sys.argv[2:]) elif sys.argv[1] == "server"[:len(sys.argv[1])]: serverCmd(sys.argv[2:]) else: printUsage("Unknown command: %s" % sys.argv[1])