diff options
Diffstat (limited to 'chromium/third_party/tlslite/scripts/tls.py')
-rwxr-xr-x[-rw-r--r--] | chromium/third_party/tlslite/scripts/tls.py | 1324 |
1 files changed, 293 insertions, 1031 deletions
diff --git a/chromium/third_party/tlslite/scripts/tls.py b/chromium/third_party/tlslite/scripts/tls.py index fa2c663f3b0..48035ce2cd1 100644..100755 --- a/chromium/third_party/tlslite/scripts/tls.py +++ b/chromium/third_party/tlslite/scripts/tls.py @@ -1,1074 +1,336 @@ -#! python - +#!/usr/bin/env python + +# Authors: +# Trevor Perrin +# Marcelo Fernandez - bugfix and NPN support +# Martin von Loewis - python 3 port +# +# See the LICENSE file for legal information regarding use of this file. +from __future__ import print_function import sys import os import os.path import socket -import thread import time -import httplib -import BaseHTTPServer -import SimpleHTTPServer - - +import getopt try: - from cryptoIDlib.api import * - cryptoIDlibLoaded = True -except: - cryptoIDlibLoaded = False + import httplib + from SocketServer import * + from BaseHTTPServer import * + from SimpleHTTPServer import * +except ImportError: + # Python 3.x + from http import client as httplib + from socketserver import * + from http.server import * if __name__ != "__main__": raise "This must be run as a command, not used as a module!" -#import tlslite -#from tlslite.constants import AlertDescription, Fault - -#from tlslite.utils.jython_compat import formatExceptionTrace -#from tlslite.X509 import X509, X509CertChain - from tlslite.api import * +from tlslite import __version__ -def parsePrivateKey(s): - try: - return parsePEMKey(s, private=True) - except Exception, e: - print e - return parseXMLKey(s, private=True) - - -def clientTest(address, dir): - - #Split address into hostname/port tuple - address = address.split(":") - if len(address)==1: - address.append("4443") - address = ( address[0], int(address[1]) ) - - def connect(): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if hasattr(sock, 'settimeout'): #It's a python 2.3 feature - sock.settimeout(5) - sock.connect(address) - c = TLSConnection(sock) - return c - - test = 0 - - badFault = False - - print "Test 1 - good shared key" - connection = connect() - connection.handshakeClientSharedKey("shared", "key") - connection.close() - connection.sock.close() - - print "Test 2 - shared key faults" - for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientSharedKey("shared", "key") - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - print "Test 3 - good SRP" - connection = connect() - connection.handshakeClientSRP("test", "password") - connection.close() - - print "Test 4 - SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientSRP("test", "password") - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - print "Test 5 - good SRP: unknown_srp_username idiom" - def srpCallback(): - return ("test", "password") - connection = connect() - connection.handshakeClientUnknown(srpCallback=srpCallback) - connection.close() - connection.sock.close() - - print "Test 6 - good SRP: with X.509 certificate" - connection = connect() - connection.handshakeClientSRP("test", "password") - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 7 - X.509 with SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientSRP("test", "password") - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 8 - good SRP: with cryptoID certificate chain" - connection = connect() - connection.handshakeClientSRP("test", "password") - assert(isinstance(connection.session.serverCertChain, CertChain)) - if not (connection.session.serverCertChain.validate()): - print connection.session.serverCertChain.validate(listProblems=True) - - connection.close() - connection.sock.close() - - print "Test 9 - CryptoID with SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientSRP("test", "password") - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - print "Test 10 - good X509" - connection = connect() - connection.handshakeClientCert() - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 10.a - good X509, SSLv3" - connection = connect() - settings = HandshakeSettings() - settings.minVersion = (3,0) - settings.maxVersion = (3,0) - connection.handshakeClientCert(settings=settings) - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 11 - X.509 faults" - for fault in Fault.clientNoAuthFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientCert() - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 12 - good cryptoID" - connection = connect() - connection.handshakeClientCert() - assert(isinstance(connection.session.serverCertChain, CertChain)) - assert(connection.session.serverCertChain.validate()) - connection.close() - connection.sock.close() - - print "Test 13 - cryptoID faults" - for fault in Fault.clientNoAuthFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientCert() - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - print "Test 14 - good mutual X509" - x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) - x509Chain = X509CertChain([x509Cert]) - s = open(os.path.join(dir, "clientX509Key.pem")).read() - x509Key = parsePEMKey(s, private=True) - - connection = connect() - connection.handshakeClientCert(x509Chain, x509Key) - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 14.a - good mutual X509, SSLv3" - connection = connect() - settings = HandshakeSettings() - settings.minVersion = (3,0) - settings.maxVersion = (3,0) - connection.handshakeClientCert(x509Chain, x509Key, settings=settings) - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 15 - mutual X.509 faults" - for fault in Fault.clientCertFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientCert(x509Chain, x509Key) - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 16 - good mutual cryptoID" - cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) - cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) - - connection = connect() - connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) - assert(isinstance(connection.session.serverCertChain, CertChain)) - assert(connection.session.serverCertChain.validate()) - connection.close() - connection.sock.close() - - print "Test 17 - mutual cryptoID faults" - for fault in Fault.clientCertFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) - print " Good Fault %s" % (Fault.faultNames[fault]) - except TLSFaultError, e: - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) - badFault = True - connection.sock.close() - - print "Test 18 - good SRP, prepare to resume..." - connection = connect() - connection.handshakeClientSRP("test", "password") - connection.close() - connection.sock.close() - session = connection.session - - print "Test 19 - resumption" - connection = connect() - connection.handshakeClientSRP("test", "garbage", session=session) - #Don't close! -- see below - - print "Test 20 - invalidated resumption" - connection.sock.close() #Close the socket without a close_notify! - connection = connect() - try: - connection.handshakeClientSRP("test", "garbage", session=session) - assert() - except TLSRemoteAlert, alert: - if alert.description != AlertDescription.bad_record_mac: - raise - connection.sock.close() - - print "Test 21 - HTTPS test X.509" - address = address[0], address[1]+1 - if hasattr(socket, "timeout"): - timeoutEx = socket.timeout +try: + from tack.structures.Tack import Tack + +except ImportError: + pass + +def printUsage(s=None): + if s: + print("ERROR: %s" % s) + + print("") + print("Version: %s" % __version__) + print("") + print("RNG: %s" % prngName) + print("") + print("Modules:") + if tackpyLoaded: + print(" tackpy : Loaded") else: - timeoutEx = socket.error - while 1: - try: - time.sleep(2) - htmlBody = open(os.path.join(dir, "index.html")).read() - fingerprint = None - for y in range(2): - h = HTTPTLSConnection(\ - address[0], address[1], x509Fingerprint=fingerprint) - for x in range(3): - h.request("GET", "/index.html") - r = h.getresponse() - assert(r.status == 200) - s = r.read() - assert(s == htmlBody) - fingerprint = h.tlsSession.serverCertChain.getFingerprint() - assert(fingerprint) - time.sleep(2) - break - except timeoutEx: - print "timeout, retrying..." - pass - - if cryptoIDlibLoaded: - print "Test 21a - HTTPS test SRP+cryptoID" - address = address[0], address[1]+1 - if hasattr(socket, "timeout"): - timeoutEx = socket.timeout - else: - timeoutEx = socket.error - while 1: - try: - time.sleep(2) #Time to generate key and cryptoID - htmlBody = open(os.path.join(dir, "index.html")).read() - fingerprint = None - protocol = None - for y in range(2): - h = HTTPTLSConnection(\ - address[0], address[1], - username="test", password="password", - cryptoID=fingerprint, protocol=protocol) - for x in range(3): - h.request("GET", "/index.html") - r = h.getresponse() - assert(r.status == 200) - s = r.read() - assert(s == htmlBody) - fingerprint = h.tlsSession.serverCertChain.cryptoID - assert(fingerprint) - protocol = "urn:whatever" - time.sleep(2) - break - except timeoutEx: - print "timeout, retrying..." - pass - - address = address[0], address[1]+1 - - implementations = [] - if cryptlibpyLoaded: - implementations.append("cryptlib") + print(" tackpy : Not Loaded") if m2cryptoLoaded: - implementations.append("openssl") + print(" M2Crypto : Loaded") + else: + print(" M2Crypto : Not Loaded") if pycryptoLoaded: - implementations.append("pycrypto") - implementations.append("python") - - print "Test 22 - different ciphers" - for implementation in implementations: - for cipher in ["aes128", "aes256", "rc4"]: - - print "Test 22:", - connection = connect() - - settings = HandshakeSettings() - settings.cipherNames = [cipher] - settings.cipherImplementations = [implementation, "python"] - connection.handshakeClientSharedKey("shared", "key", settings=settings) - print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) - - connection.write("hello") - h = connection.read(min=5, max=5) - assert(h == "hello") - connection.close() - connection.sock.close() - - print "Test 23 - throughput test" - for implementation in implementations: - for cipher in ["aes128", "aes256", "3des", "rc4"]: - if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): - continue - - print "Test 23:", - connection = connect() - - settings = HandshakeSettings() - settings.cipherNames = [cipher] - settings.cipherImplementations = [implementation, "python"] - connection.handshakeClientSharedKey("shared", "key", settings=settings) - print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())), - - startTime = time.clock() - connection.write("hello"*10000) - h = connection.read(min=50000, max=50000) - stopTime = time.clock() - print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)) - - assert(h == "hello"*10000) - connection.close() - connection.sock.close() - - print "Test 24 - Internet servers test" - try: - i = IMAP4_TLS("cyrus.andrew.cmu.edu") - i.login("anonymous", "anonymous@anonymous.net") - i.logout() - print "Test 24: IMAP4 good" - p = POP3_TLS("pop.gmail.com") - p.quit() - print "Test 24: POP3 good" - except socket.error, e: - print "Non-critical error: socket error trying to reach internet server: ", e - - if not badFault: - print "Test succeeded" + print(" pycrypto : Loaded") else: - print "Test failed" - - -def serverTest(address, dir): + print(" pycrypto : Not Loaded") + if gmpyLoaded: + print(" GMPY : Loaded") + else: + print(" GMPY : Not Loaded") + + print("") + print("""Commands: + + server + [-k KEY] [-c CERT] [-t TACK] [-v VERIFIERDB] [-d DIR] + [--reqcert] HOST:PORT + + client + [-k KEY] [-c CERT] [-u USER] [-p PASS] + HOST:PORT +""") + sys.exit(-1) + +def printError(s): + """Print error message and exit""" + sys.stderr.write("ERROR: %s\n" % s) + sys.exit(-1) + + +def handleArgs(argv, argString, flagsList=[]): + # Convert to getopt argstring format: + # Add ":" after each arg, ie "abc" -> "a:b:c:" + getOptArgString = ":".join(argString) + ":" + try: + opts, argv = getopt.getopt(argv, getOptArgString, flagsList) + except getopt.GetoptError as e: + printError(e) + # Default values if arg not present + privateKey = None + certChain = None + username = None + password = None + tacks = None + verifierDB = None + reqCert = False + directory = None + + for opt, arg in opts: + if opt == "-k": + s = open(arg, "rb").read() + privateKey = parsePEMKey(s, private=True) + elif opt == "-c": + s = open(arg, "rb").read() + x509 = X509() + x509.parse(s) + certChain = X509CertChain([x509]) + elif opt == "-u": + username = arg + elif opt == "-p": + password = arg + elif opt == "-t": + if tackpyLoaded: + s = open(arg, "rU").read() + tacks = Tack.createFromPemList(s) + elif opt == "-v": + verifierDB = VerifierDB(arg) + verifierDB.open() + elif opt == "-d": + directory = arg + elif opt == "--reqcert": + reqCert = True + else: + assert(False) + + if not argv: + printError("Missing address") + if len(argv)>1: + printError("Too many arguments") #Split address into hostname/port tuple + address = argv[0] address = address.split(":") - if len(address)==1: - address.append("4443") + if len(address) != 2: + raise SyntaxError("Must specify <host>:<port>") address = ( address[0], int(address[1]) ) - #Connect to server - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - lsock.bind(address) - lsock.listen(5) - - def connect(): - return TLSConnection(lsock.accept()[0]) - - print "Test 1 - good shared key" - sharedKeyDB = SharedKeyDB() - sharedKeyDB["shared"] = "key" - sharedKeyDB["shared2"] = "key2" - connection = connect() - connection.handshakeServer(sharedKeyDB=sharedKeyDB) - connection.close() - connection.sock.close() - - print "Test 2 - shared key faults" - for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(sharedKeyDB=sharedKeyDB) - assert() - except: - pass - connection.sock.close() - - print "Test 3 - good SRP" - #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB")) - #verifierDB.open() - verifierDB = VerifierDB() - verifierDB.create() - entry = VerifierDB.makeVerifier("test", "password", 1536) - verifierDB["test"] = entry - - connection = connect() - connection.handshakeServer(verifierDB=verifierDB) - connection.close() - connection.sock.close() - - print "Test 4 - SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(verifierDB=verifierDB) - assert() - except: - pass - connection.sock.close() - - print "Test 5 - good SRP: unknown_srp_username idiom" - connection = connect() - connection.handshakeServer(verifierDB=verifierDB) - connection.close() - connection.sock.close() - - print "Test 6 - good SRP: with X.509 cert" - x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) - x509Chain = X509CertChain([x509Cert]) - s = open(os.path.join(dir, "serverX509Key.pem")).read() - x509Key = parsePEMKey(s, private=True) - - connection = connect() - connection.handshakeServer(verifierDB=verifierDB, \ - certChain=x509Chain, privateKey=x509Key) - connection.close() - connection.sock.close() - - print "Test 7 - X.509 with SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(verifierDB=verifierDB, \ - certChain=x509Chain, privateKey=x509Key) - assert() - except: - pass - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 8 - good SRP: with cryptoID certs" - cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) - cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) - connection = connect() - connection.handshakeServer(verifierDB=verifierDB, \ - certChain=cryptoIDChain, privateKey=cryptoIDKey) - connection.close() - connection.sock.close() - - print "Test 9 - cryptoID with SRP faults" - for fault in Fault.clientSrpFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(verifierDB=verifierDB, \ - certChain=cryptoIDChain, privateKey=cryptoIDKey) - assert() - except: - pass - connection.sock.close() - - print "Test 10 - good X.509" - connection = connect() - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) - connection.close() - connection.sock.close() - - print "Test 10.a - good X.509, SSL v3" - connection = connect() - settings = HandshakeSettings() - settings.minVersion = (3,0) - settings.maxVersion = (3,0) - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) - connection.close() - connection.sock.close() - - print "Test 11 - X.509 faults" - for fault in Fault.clientNoAuthFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) - assert() - except: - pass - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 12 - good cryptoID" - connection = connect() - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) - connection.close() - connection.sock.close() - - print "Test 13 - cryptoID faults" - for fault in Fault.clientNoAuthFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) - assert() - except: - pass - connection.sock.close() - - print "Test 14 - good mutual X.509" - connection = connect() - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() + # Populate the return list + retList = [address] + if "k" in argString: + retList.append(privateKey) + if "c" in argString: + retList.append(certChain) + if "u" in argString: + retList.append(username) + if "p" in argString: + retList.append(password) + if "t" in argString: + retList.append(tacks) + if "v" in argString: + retList.append(verifierDB) + if "d" in argString: + retList.append(directory) + if "reqcert" in flagsList: + retList.append(reqCert) + return retList + + +def printGoodConnection(connection, seconds): + print(" Handshake time: %.3f seconds" % seconds) + print(" Version: %s" % connection.getVersionName()) + print(" Cipher: %s %s" % (connection.getCipherName(), + connection.getCipherImplementation())) + if connection.session.srpUsername: + print(" Client SRP username: %s" % connection.session.srpUsername) + if connection.session.clientCertChain: + print(" Client X.509 SHA1 fingerprint: %s" % + connection.session.clientCertChain.getFingerprint()) + if connection.session.serverCertChain: + print(" Server X.509 SHA1 fingerprint: %s" % + connection.session.serverCertChain.getFingerprint()) + if connection.session.serverName: + print(" SNI: %s" % connection.session.serverName) + if connection.session.tackExt: + if connection.session.tackInHelloExt: + emptyStr = "\n (via TLS Extension)" + else: + emptyStr = "\n (via TACK Certificate)" + print(" TACK: %s" % emptyStr) + print(str(connection.session.tackExt)) + print(" Next-Protocol Negotiated: %s" % connection.next_proto) + + +def clientCmd(argv): + (address, privateKey, certChain, username, password) = \ + handleArgs(argv, "kcup") + + if (certChain and not privateKey) or (not certChain and privateKey): + raise SyntaxError("Must specify CERT and KEY together") + if (username and not password) or (not username and password): + raise SyntaxError("Must specify USER with PASS") + if certChain and username: + raise SyntaxError("Can use SRP or client cert for auth, not both") - print "Test 14a - good mutual X.509, SSLv3" - connection = connect() + #Connect to server + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect(address) + connection = TLSConnection(sock) + settings = HandshakeSettings() - settings.minVersion = (3,0) - settings.maxVersion = (3,0) - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) - assert(isinstance(connection.session.serverCertChain, X509CertChain)) - connection.close() - connection.sock.close() - - print "Test 15 - mutual X.509 faults" - for fault in Fault.clientCertFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) - assert() - except: - pass - connection.sock.close() - - if cryptoIDlibLoaded: - print "Test 16 - good mutual cryptoID" - connection = connect() - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) - assert(isinstance(connection.session.serverCertChain, CertChain)) - assert(connection.session.serverCertChain.validate()) - connection.close() - connection.sock.close() - - print "Test 17 - mutual cryptoID faults" - for fault in Fault.clientCertFaults + Fault.genericFaults: - connection = connect() - connection.fault = fault - try: - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) - assert() - except: - pass - connection.sock.close() - - print "Test 18 - good SRP, prepare to resume" - sessionCache = SessionCache() - connection = connect() - connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) - connection.close() - connection.sock.close() - - print "Test 19 - resumption" - connection = connect() - connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) - #Don't close! -- see next test - - print "Test 20 - invalidated resumption" - try: - connection.read(min=1, max=1) - assert() #Client is going to close the socket without a close_notify - except TLSAbruptCloseError, e: - pass - connection = connect() + settings.useExperimentalTackExtension = True + try: - connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) - except TLSLocalAlert, alert: - if alert.description != AlertDescription.bad_record_mac: + start = time.clock() + if username and password: + connection.handshakeClientSRP(username, password, + settings=settings, serverName=address[0]) + else: + connection.handshakeClientCert(certChain, privateKey, + settings=settings, serverName=address[0]) + stop = time.clock() + print("Handshake success") + except TLSLocalAlert as a: + if a.description == AlertDescription.user_canceled: + print(str(a)) + else: raise - connection.sock.close() - - print "Test 21 - HTTPS test X.509" - - #Close the current listening socket - lsock.close() - - #Create and run an HTTP Server using TLSSocketServerMixIn - class MyHTTPServer(TLSSocketServerMixIn, - BaseHTTPServer.HTTPServer): - def handshake(self, tlsConnection): - tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) - return True - cd = os.getcwd() - os.chdir(dir) - address = address[0], address[1]+1 - httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) - for x in range(6): - httpd.handle_request() - httpd.server_close() - cd = os.chdir(cd) - - if cryptoIDlibLoaded: - print "Test 21a - HTTPS test SRP+cryptoID" - - #Create and run an HTTP Server using TLSSocketServerMixIn - class MyHTTPServer(TLSSocketServerMixIn, - BaseHTTPServer.HTTPServer): - def handshake(self, tlsConnection): - tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, - verifierDB=verifierDB) - return True - cd = os.getcwd() - os.chdir(dir) - address = address[0], address[1]+1 - httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) - for x in range(6): - httpd.handle_request() - httpd.server_close() - cd = os.chdir(cd) - - #Re-connect the listening socket - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - address = address[0], address[1]+1 - lsock.bind(address) - lsock.listen(5) - - def connect(): - return TLSConnection(lsock.accept()[0]) - - implementations = [] - if cryptlibpyLoaded: - implementations.append("cryptlib") - if m2cryptoLoaded: - implementations.append("openssl") - if pycryptoLoaded: - implementations.append("pycrypto") - implementations.append("python") - - print "Test 22 - different ciphers" - for implementation in ["python"] * len(implementations): - for cipher in ["aes128", "aes256", "rc4"]: - - print "Test 22:", - connection = connect() - - settings = HandshakeSettings() - settings.cipherNames = [cipher] - settings.cipherImplementations = [implementation, "python"] - - connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) - print connection.getCipherName(), connection.getCipherImplementation() - h = connection.read(min=5, max=5) - assert(h == "hello") - connection.write(h) - connection.close() - connection.sock.close() - - print "Test 23 - throughput test" - for implementation in implementations: - for cipher in ["aes128", "aes256", "3des", "rc4"]: - if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): - continue - - print "Test 23:", - connection = connect() - - settings = HandshakeSettings() - settings.cipherNames = [cipher] - settings.cipherImplementations = [implementation, "python"] - - connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) - print connection.getCipherName(), connection.getCipherImplementation() - h = connection.read(min=50000, max=50000) - assert(h == "hello"*10000) - connection.write(h) - connection.close() - connection.sock.close() - - print "Test succeeded" - - - - - - - - - - - - -if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")): - print "" - print "Version: 0.3.8" - print "" - print "RNG: %s" % prngName - print "" - print "Modules:" - if cryptlibpyLoaded: - print " cryptlib_py : Loaded" - else: - print " cryptlib_py : Not Loaded" - if m2cryptoLoaded: - print " M2Crypto : Loaded" - else: - print " M2Crypto : Not Loaded" - if pycryptoLoaded: - print " pycrypto : Loaded" - else: - print " pycrypto : Not Loaded" - if gmpyLoaded: - print " GMPY : Loaded" - else: - print " GMPY : Not Loaded" - if cryptoIDlibLoaded: - print " cryptoIDlib : Loaded" - else: - print " cryptoIDlib : Not Loaded" - print "" - print "Commands:" - print "" - print " clientcert <server> [<chain> <key>]" - print " clientsharedkey <server> <user> <pass>" - print " clientsrp <server> <user> <pass>" - print " clienttest <server> <dir>" - print "" - print " serversrp <server> <verifierDB>" - print " servercert <server> <chain> <key> [req]" - print " serversrpcert <server> <verifierDB> <chain> <key>" - print " serversharedkey <server> <sharedkeyDB>" - print " servertest <server> <dir>" - sys.exit() - -cmd = sys.argv[1].lower() - -class Args: - def __init__(self, argv): - self.argv = argv - def get(self, index): - if len(self.argv)<=index: - raise SyntaxError("Not enough arguments") - return self.argv[index] - def getLast(self, index): - if len(self.argv)>index+1: - raise SyntaxError("Too many arguments") - return self.get(index) - -args = Args(sys.argv) - -def reformatDocString(s): - lines = s.splitlines() - newLines = [] - for line in lines: - newLines.append(" " + line.strip()) - return "\n".join(newLines) - -try: - if cmd == "clienttest": - address = args.get(2) - dir = args.getLast(3) - clientTest(address, dir) - sys.exit() - - elif cmd.startswith("client"): - address = args.get(2) - - #Split address into hostname/port tuple - address = address.split(":") - if len(address)==1: - address.append("4443") - address = ( address[0], int(address[1]) ) - - def connect(): - #Connect to server - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if hasattr(sock, "settimeout"): - sock.settimeout(5) - sock.connect(address) - - #Instantiate TLSConnections - return TLSConnection(sock) - - try: - if cmd == "clientsrp": - username = args.get(3) - password = args.getLast(4) - connection = connect() - start = time.clock() - connection.handshakeClientSRP(username, password) - elif cmd == "clientsharedkey": - username = args.get(3) - password = args.getLast(4) - connection = connect() - start = time.clock() - connection.handshakeClientSharedKey(username, password) - elif cmd == "clientcert": - certChain = None - privateKey = None - if len(sys.argv) > 3: - certFilename = args.get(3) - keyFilename = args.getLast(4) - - s1 = open(certFilename, "rb").read() - s2 = open(keyFilename, "rb").read() - - #Try to create cryptoID cert chain - if cryptoIDlibLoaded: - try: - certChain = CertChain().parse(s1) - privateKey = parsePrivateKey(s2) - except: - certChain = None - privateKey = None - - #Try to create X.509 cert chain - if not certChain: - x509 = X509() - x509.parse(s1) - certChain = X509CertChain([x509]) - privateKey = parsePrivateKey(s2) - - connection = connect() - start = time.clock() - connection.handshakeClientCert(certChain, privateKey) - else: - raise SyntaxError("Unknown command") - - except TLSLocalAlert, a: - if a.description == AlertDescription.bad_record_mac: - if cmd == "clientsharedkey": - print "Bad sharedkey password" - else: - raise - elif a.description == AlertDescription.user_canceled: - print str(a) + sys.exit(-1) + except TLSRemoteAlert as a: + if a.description == AlertDescription.unknown_psk_identity: + if username: + print("Unknown username") else: raise - sys.exit() - except TLSRemoteAlert, a: - if a.description == AlertDescription.unknown_srp_username: - if cmd == "clientsrp": - print "Unknown username" - else: - raise - elif a.description == AlertDescription.bad_record_mac: - if cmd == "clientsrp": - print "Bad username or password" - else: - raise - elif a.description == AlertDescription.handshake_failure: - print "Unable to negotiate mutually acceptable parameters" + elif a.description == AlertDescription.bad_record_mac: + if username: + print("Bad username or password") else: raise - sys.exit() - - stop = time.clock() - print "Handshake success" - print " Handshake time: %.4f seconds" % (stop - start) - print " Version: %s.%s" % connection.version - print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) - if connection.session.srpUsername: - print " Client SRP username: %s" % connection.session.srpUsername - if connection.session.sharedKeyUsername: - print " Client shared key username: %s" % connection.session.sharedKeyUsername - if connection.session.clientCertChain: - print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() - if connection.session.serverCertChain: - print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() - connection.close() - connection.sock.close() - - elif cmd.startswith("server"): - address = args.get(2) - - #Split address into hostname/port tuple - address = address.split(":") - if len(address)==1: - address.append("4443") - address = ( address[0], int(address[1]) ) - - verifierDBFilename = None - sharedKeyDBFilename = None - certFilename = None - keyFilename = None - sharedKeyDB = None - reqCert = False - - if cmd == "serversrp": - verifierDBFilename = args.getLast(3) - elif cmd == "servercert": - certFilename = args.get(3) - keyFilename = args.get(4) - if len(sys.argv)>=6: - req = args.getLast(5) - if req.lower() != "req": - raise SyntaxError() - reqCert = True - elif cmd == "serversrpcert": - verifierDBFilename = args.get(3) - certFilename = args.get(4) - keyFilename = args.getLast(5) - elif cmd == "serversharedkey": - sharedKeyDBFilename = args.getLast(3) - elif cmd == "servertest": - address = args.get(2) - dir = args.getLast(3) - serverTest(address, dir) - sys.exit() - - verifierDB = None - if verifierDBFilename: - verifierDB = VerifierDB(verifierDBFilename) - verifierDB.open() - - sharedKeyDB = None - if sharedKeyDBFilename: - sharedKeyDB = SharedKeyDB(sharedKeyDBFilename) - sharedKeyDB.open() - - certChain = None - privateKey = None - if certFilename: - s1 = open(certFilename, "rb").read() - s2 = open(keyFilename, "rb").read() - - #Try to create cryptoID cert chain - if cryptoIDlibLoaded: - try: - certChain = CertChain().parse(s1) - privateKey = parsePrivateKey(s2) - except: - certChain = None - privateKey = None + elif a.description == AlertDescription.handshake_failure: + print("Unable to negotiate mutually acceptable parameters") + else: + raise + sys.exit(-1) + printGoodConnection(connection, stop-start) + connection.close() - #Try to create X.509 cert chain - if not certChain: - x509 = X509() - x509.parse(s1) - certChain = X509CertChain([x509]) - privateKey = parsePrivateKey(s2) +def serverCmd(argv): + (address, privateKey, certChain, tacks, + verifierDB, directory, reqCert) = handleArgs(argv, "kctbvd", ["reqcert"]) + + + if (certChain and not privateKey) or (not certChain and privateKey): + raise SyntaxError("Must specify CERT and KEY together") + if tacks and not certChain: + raise SyntaxError("Must specify CERT with Tacks") + + print("I am an HTTPS test server, I will listen on %s:%d" % + (address[0], address[1])) + if directory: + os.chdir(directory) + print("Serving files from %s" % os.getcwd()) + + if certChain and privateKey: + print("Using certificate and private key...") + if verifierDB: + print("Using verifier DB...") + if tacks: + print("Using Tacks...") + + ############# + sessionCache = SessionCache() + class MyHTTPServer(ThreadingMixIn, TLSSocketServerMixIn, HTTPServer): + def handshake(self, connection): + print("About to handshake...") + activationFlags = 0 + if tacks: + if len(tacks) == 1: + activationFlags = 1 + elif len(tacks) == 2: + activationFlags = 3 - #Create handler function - performs handshake, then echos all bytes received - def handler(sock): try: - connection = TLSConnection(sock) + start = time.clock() settings = HandshakeSettings() - connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \ - certChain=certChain, privateKey=privateKey, \ - reqCert=reqCert, settings=settings) - print "Handshake success" - print " Version: %s.%s" % connection.version - print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) - if connection.session.srpUsername: - print " Client SRP username: %s" % connection.session.srpUsername - if connection.session.sharedKeyUsername: - print " Client shared key username: %s" % connection.session.sharedKeyUsername - if connection.session.clientCertChain: - print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() - if connection.session.serverCertChain: - print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() - - s = "" - while 1: - newS = connection.read() - if not newS: - break - s += newS - if s[-1]=='\n': - connection.write(s) - s = "" - except TLSLocalAlert, a: - if a.description == AlertDescription.unknown_srp_username: - print "Unknown SRP username" - elif a.description == AlertDescription.bad_record_mac: - if cmd == "serversrp" or cmd == "serversrpcert": - print "Bad SRP password for:", connection.allegedSrpUsername - else: - raise - elif a.description == AlertDescription.handshake_failure: - print "Unable to negotiate mutually acceptable parameters" + settings.useExperimentalTackExtension=True + connection.handshakeServer(certChain=certChain, + privateKey=privateKey, + verifierDB=verifierDB, + tacks=tacks, + activationFlags=activationFlags, + sessionCache=sessionCache, + settings=settings, + nextProtos=[b"http/1.1"]) + # As an example (does not work here): + #nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) + stop = time.clock() + except TLSRemoteAlert as a: + if a.description == AlertDescription.user_canceled: + print(str(a)) + return False else: raise - except TLSRemoteAlert, a: - if a.description == AlertDescription.bad_record_mac: - if cmd == "serversharedkey": - print "Bad sharedkey password for:", connection.allegedSharedKeyUsername + except TLSLocalAlert as a: + if a.description == AlertDescription.unknown_psk_identity: + if username: + print("Unknown username") + return False + else: + raise + elif a.description == AlertDescription.bad_record_mac: + if username: + print("Bad username or password") + return False else: raise - elif a.description == AlertDescription.user_canceled: - print "Handshake cancelled" elif a.description == AlertDescription.handshake_failure: - print "Unable to negotiate mutually acceptable parameters" - elif a.description == AlertDescription.close_notify: - pass + print("Unable to negotiate mutually acceptable parameters") + return False else: raise - - #Run multi-threaded server - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(address) - sock.listen(5) - while 1: - (newsock, cliAddress) = sock.accept() - thread.start_new_thread(handler, (newsock,)) - - + + connection.ignoreAbruptClose = True + printGoodConnection(connection, stop-start) + return True + + httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) + httpd.serve_forever() + + +if __name__ == '__main__': + if len(sys.argv) < 2: + printUsage("Missing command") + elif sys.argv[1] == "client"[:len(sys.argv[1])]: + clientCmd(sys.argv[2:]) + elif sys.argv[1] == "server"[:len(sys.argv[1])]: + serverCmd(sys.argv[2:]) else: - print "Bad command: '%s'" % cmd -except TLSRemoteAlert, a: - print str(a) - raise - - - - + printUsage("Unknown command: %s" % sys.argv[1]) |