summaryrefslogtreecommitdiffstats
path: root/chromium/third_party/tlslite/scripts/tls.py
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/third_party/tlslite/scripts/tls.py')
-rwxr-xr-x[-rw-r--r--]chromium/third_party/tlslite/scripts/tls.py1324
1 files changed, 293 insertions, 1031 deletions
diff --git a/chromium/third_party/tlslite/scripts/tls.py b/chromium/third_party/tlslite/scripts/tls.py
index fa2c663f3b0..48035ce2cd1 100644..100755
--- a/chromium/third_party/tlslite/scripts/tls.py
+++ b/chromium/third_party/tlslite/scripts/tls.py
@@ -1,1074 +1,336 @@
-#! python
-
+#!/usr/bin/env python
+
+# Authors:
+# Trevor Perrin
+# Marcelo Fernandez - bugfix and NPN support
+# Martin von Loewis - python 3 port
+#
+# See the LICENSE file for legal information regarding use of this file.
+from __future__ import print_function
import sys
import os
import os.path
import socket
-import thread
import time
-import httplib
-import BaseHTTPServer
-import SimpleHTTPServer
-
-
+import getopt
try:
- from cryptoIDlib.api import *
- cryptoIDlibLoaded = True
-except:
- cryptoIDlibLoaded = False
+ import httplib
+ from SocketServer import *
+ from BaseHTTPServer import *
+ from SimpleHTTPServer import *
+except ImportError:
+ # Python 3.x
+ from http import client as httplib
+ from socketserver import *
+ from http.server import *
if __name__ != "__main__":
raise "This must be run as a command, not used as a module!"
-#import tlslite
-#from tlslite.constants import AlertDescription, Fault
-
-#from tlslite.utils.jython_compat import formatExceptionTrace
-#from tlslite.X509 import X509, X509CertChain
-
from tlslite.api import *
+from tlslite import __version__
-def parsePrivateKey(s):
- try:
- return parsePEMKey(s, private=True)
- except Exception, e:
- print e
- return parseXMLKey(s, private=True)
-
-
-def clientTest(address, dir):
-
- #Split address into hostname/port tuple
- address = address.split(":")
- if len(address)==1:
- address.append("4443")
- address = ( address[0], int(address[1]) )
-
- def connect():
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
- sock.settimeout(5)
- sock.connect(address)
- c = TLSConnection(sock)
- return c
-
- test = 0
-
- badFault = False
-
- print "Test 1 - good shared key"
- connection = connect()
- connection.handshakeClientSharedKey("shared", "key")
- connection.close()
- connection.sock.close()
-
- print "Test 2 - shared key faults"
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientSharedKey("shared", "key")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 3 - good SRP"
- connection = connect()
- connection.handshakeClientSRP("test", "password")
- connection.close()
-
- print "Test 4 - SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 5 - good SRP: unknown_srp_username idiom"
- def srpCallback():
- return ("test", "password")
- connection = connect()
- connection.handshakeClientUnknown(srpCallback=srpCallback)
- connection.close()
- connection.sock.close()
-
- print "Test 6 - good SRP: with X.509 certificate"
- connection = connect()
- connection.handshakeClientSRP("test", "password")
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 7 - X.509 with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 8 - good SRP: with cryptoID certificate chain"
- connection = connect()
- connection.handshakeClientSRP("test", "password")
- assert(isinstance(connection.session.serverCertChain, CertChain))
- if not (connection.session.serverCertChain.validate()):
- print connection.session.serverCertChain.validate(listProblems=True)
-
- connection.close()
- connection.sock.close()
-
- print "Test 9 - CryptoID with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 10 - good X509"
- connection = connect()
- connection.handshakeClientCert()
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 10.a - good X509, SSLv3"
- connection = connect()
- settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeClientCert(settings=settings)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 11 - X.509 faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert()
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 12 - good cryptoID"
- connection = connect()
- connection.handshakeClientCert()
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
- connection.close()
- connection.sock.close()
-
- print "Test 13 - cryptoID faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert()
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 14 - good mutual X509"
- x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
- x509Chain = X509CertChain([x509Cert])
- s = open(os.path.join(dir, "clientX509Key.pem")).read()
- x509Key = parsePEMKey(s, private=True)
-
- connection = connect()
- connection.handshakeClientCert(x509Chain, x509Key)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 14.a - good mutual X509, SSLv3"
- connection = connect()
- settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 15 - mutual X.509 faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert(x509Chain, x509Key)
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 16 - good mutual cryptoID"
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
-
- connection = connect()
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
- connection.close()
- connection.sock.close()
-
- print "Test 17 - mutual cryptoID faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 18 - good SRP, prepare to resume..."
- connection = connect()
- connection.handshakeClientSRP("test", "password")
- connection.close()
- connection.sock.close()
- session = connection.session
-
- print "Test 19 - resumption"
- connection = connect()
- connection.handshakeClientSRP("test", "garbage", session=session)
- #Don't close! -- see below
-
- print "Test 20 - invalidated resumption"
- connection.sock.close() #Close the socket without a close_notify!
- connection = connect()
- try:
- connection.handshakeClientSRP("test", "garbage", session=session)
- assert()
- except TLSRemoteAlert, alert:
- if alert.description != AlertDescription.bad_record_mac:
- raise
- connection.sock.close()
-
- print "Test 21 - HTTPS test X.509"
- address = address[0], address[1]+1
- if hasattr(socket, "timeout"):
- timeoutEx = socket.timeout
+try:
+ from tack.structures.Tack import Tack
+
+except ImportError:
+ pass
+
+def printUsage(s=None):
+ if s:
+ print("ERROR: %s" % s)
+
+ print("")
+ print("Version: %s" % __version__)
+ print("")
+ print("RNG: %s" % prngName)
+ print("")
+ print("Modules:")
+ if tackpyLoaded:
+ print(" tackpy : Loaded")
else:
- timeoutEx = socket.error
- while 1:
- try:
- time.sleep(2)
- htmlBody = open(os.path.join(dir, "index.html")).read()
- fingerprint = None
- for y in range(2):
- h = HTTPTLSConnection(\
- address[0], address[1], x509Fingerprint=fingerprint)
- for x in range(3):
- h.request("GET", "/index.html")
- r = h.getresponse()
- assert(r.status == 200)
- s = r.read()
- assert(s == htmlBody)
- fingerprint = h.tlsSession.serverCertChain.getFingerprint()
- assert(fingerprint)
- time.sleep(2)
- break
- except timeoutEx:
- print "timeout, retrying..."
- pass
-
- if cryptoIDlibLoaded:
- print "Test 21a - HTTPS test SRP+cryptoID"
- address = address[0], address[1]+1
- if hasattr(socket, "timeout"):
- timeoutEx = socket.timeout
- else:
- timeoutEx = socket.error
- while 1:
- try:
- time.sleep(2) #Time to generate key and cryptoID
- htmlBody = open(os.path.join(dir, "index.html")).read()
- fingerprint = None
- protocol = None
- for y in range(2):
- h = HTTPTLSConnection(\
- address[0], address[1],
- username="test", password="password",
- cryptoID=fingerprint, protocol=protocol)
- for x in range(3):
- h.request("GET", "/index.html")
- r = h.getresponse()
- assert(r.status == 200)
- s = r.read()
- assert(s == htmlBody)
- fingerprint = h.tlsSession.serverCertChain.cryptoID
- assert(fingerprint)
- protocol = "urn:whatever"
- time.sleep(2)
- break
- except timeoutEx:
- print "timeout, retrying..."
- pass
-
- address = address[0], address[1]+1
-
- implementations = []
- if cryptlibpyLoaded:
- implementations.append("cryptlib")
+ print(" tackpy : Not Loaded")
if m2cryptoLoaded:
- implementations.append("openssl")
+ print(" M2Crypto : Loaded")
+ else:
+ print(" M2Crypto : Not Loaded")
if pycryptoLoaded:
- implementations.append("pycrypto")
- implementations.append("python")
-
- print "Test 22 - different ciphers"
- for implementation in implementations:
- for cipher in ["aes128", "aes256", "rc4"]:
-
- print "Test 22:",
- connection = connect()
-
- settings = HandshakeSettings()
- settings.cipherNames = [cipher]
- settings.cipherImplementations = [implementation, "python"]
- connection.handshakeClientSharedKey("shared", "key", settings=settings)
- print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
-
- connection.write("hello")
- h = connection.read(min=5, max=5)
- assert(h == "hello")
- connection.close()
- connection.sock.close()
-
- print "Test 23 - throughput test"
- for implementation in implementations:
- for cipher in ["aes128", "aes256", "3des", "rc4"]:
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
- continue
-
- print "Test 23:",
- connection = connect()
-
- settings = HandshakeSettings()
- settings.cipherNames = [cipher]
- settings.cipherImplementations = [implementation, "python"]
- connection.handshakeClientSharedKey("shared", "key", settings=settings)
- print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
-
- startTime = time.clock()
- connection.write("hello"*10000)
- h = connection.read(min=50000, max=50000)
- stopTime = time.clock()
- print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
-
- assert(h == "hello"*10000)
- connection.close()
- connection.sock.close()
-
- print "Test 24 - Internet servers test"
- try:
- i = IMAP4_TLS("cyrus.andrew.cmu.edu")
- i.login("anonymous", "anonymous@anonymous.net")
- i.logout()
- print "Test 24: IMAP4 good"
- p = POP3_TLS("pop.gmail.com")
- p.quit()
- print "Test 24: POP3 good"
- except socket.error, e:
- print "Non-critical error: socket error trying to reach internet server: ", e
-
- if not badFault:
- print "Test succeeded"
+ print(" pycrypto : Loaded")
else:
- print "Test failed"
-
-
-def serverTest(address, dir):
+ print(" pycrypto : Not Loaded")
+ if gmpyLoaded:
+ print(" GMPY : Loaded")
+ else:
+ print(" GMPY : Not Loaded")
+
+ print("")
+ print("""Commands:
+
+ server
+ [-k KEY] [-c CERT] [-t TACK] [-v VERIFIERDB] [-d DIR]
+ [--reqcert] HOST:PORT
+
+ client
+ [-k KEY] [-c CERT] [-u USER] [-p PASS]
+ HOST:PORT
+""")
+ sys.exit(-1)
+
+def printError(s):
+ """Print error message and exit"""
+ sys.stderr.write("ERROR: %s\n" % s)
+ sys.exit(-1)
+
+
+def handleArgs(argv, argString, flagsList=[]):
+ # Convert to getopt argstring format:
+ # Add ":" after each arg, ie "abc" -> "a:b:c:"
+ getOptArgString = ":".join(argString) + ":"
+ try:
+ opts, argv = getopt.getopt(argv, getOptArgString, flagsList)
+ except getopt.GetoptError as e:
+ printError(e)
+ # Default values if arg not present
+ privateKey = None
+ certChain = None
+ username = None
+ password = None
+ tacks = None
+ verifierDB = None
+ reqCert = False
+ directory = None
+
+ for opt, arg in opts:
+ if opt == "-k":
+ s = open(arg, "rb").read()
+ privateKey = parsePEMKey(s, private=True)
+ elif opt == "-c":
+ s = open(arg, "rb").read()
+ x509 = X509()
+ x509.parse(s)
+ certChain = X509CertChain([x509])
+ elif opt == "-u":
+ username = arg
+ elif opt == "-p":
+ password = arg
+ elif opt == "-t":
+ if tackpyLoaded:
+ s = open(arg, "rU").read()
+ tacks = Tack.createFromPemList(s)
+ elif opt == "-v":
+ verifierDB = VerifierDB(arg)
+ verifierDB.open()
+ elif opt == "-d":
+ directory = arg
+ elif opt == "--reqcert":
+ reqCert = True
+ else:
+ assert(False)
+
+ if not argv:
+ printError("Missing address")
+ if len(argv)>1:
+ printError("Too many arguments")
#Split address into hostname/port tuple
+ address = argv[0]
address = address.split(":")
- if len(address)==1:
- address.append("4443")
+ if len(address) != 2:
+ raise SyntaxError("Must specify <host>:<port>")
address = ( address[0], int(address[1]) )
- #Connect to server
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- lsock.bind(address)
- lsock.listen(5)
-
- def connect():
- return TLSConnection(lsock.accept()[0])
-
- print "Test 1 - good shared key"
- sharedKeyDB = SharedKeyDB()
- sharedKeyDB["shared"] = "key"
- sharedKeyDB["shared2"] = "key2"
- connection = connect()
- connection.handshakeServer(sharedKeyDB=sharedKeyDB)
- connection.close()
- connection.sock.close()
-
- print "Test 2 - shared key faults"
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(sharedKeyDB=sharedKeyDB)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 3 - good SRP"
- #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
- #verifierDB.open()
- verifierDB = VerifierDB()
- verifierDB.create()
- entry = VerifierDB.makeVerifier("test", "password", 1536)
- verifierDB["test"] = entry
-
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB)
- connection.close()
- connection.sock.close()
-
- print "Test 4 - SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(verifierDB=verifierDB)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 5 - good SRP: unknown_srp_username idiom"
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB)
- connection.close()
- connection.sock.close()
-
- print "Test 6 - good SRP: with X.509 cert"
- x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
- x509Chain = X509CertChain([x509Cert])
- s = open(os.path.join(dir, "serverX509Key.pem")).read()
- x509Key = parsePEMKey(s, private=True)
-
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=x509Chain, privateKey=x509Key)
- connection.close()
- connection.sock.close()
-
- print "Test 7 - X.509 with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=x509Chain, privateKey=x509Key)
- assert()
- except:
- pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 8 - good SRP: with cryptoID certs"
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=cryptoIDChain, privateKey=cryptoIDKey)
- connection.close()
- connection.sock.close()
-
- print "Test 9 - cryptoID with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=cryptoIDChain, privateKey=cryptoIDKey)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 10 - good X.509"
- connection = connect()
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
- connection.close()
- connection.sock.close()
-
- print "Test 10.a - good X.509, SSL v3"
- connection = connect()
- settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
- connection.close()
- connection.sock.close()
-
- print "Test 11 - X.509 faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
- assert()
- except:
- pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 12 - good cryptoID"
- connection = connect()
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
- connection.close()
- connection.sock.close()
-
- print "Test 13 - cryptoID faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 14 - good mutual X.509"
- connection = connect()
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
+ # Populate the return list
+ retList = [address]
+ if "k" in argString:
+ retList.append(privateKey)
+ if "c" in argString:
+ retList.append(certChain)
+ if "u" in argString:
+ retList.append(username)
+ if "p" in argString:
+ retList.append(password)
+ if "t" in argString:
+ retList.append(tacks)
+ if "v" in argString:
+ retList.append(verifierDB)
+ if "d" in argString:
+ retList.append(directory)
+ if "reqcert" in flagsList:
+ retList.append(reqCert)
+ return retList
+
+
+def printGoodConnection(connection, seconds):
+ print(" Handshake time: %.3f seconds" % seconds)
+ print(" Version: %s" % connection.getVersionName())
+ print(" Cipher: %s %s" % (connection.getCipherName(),
+ connection.getCipherImplementation()))
+ if connection.session.srpUsername:
+ print(" Client SRP username: %s" % connection.session.srpUsername)
+ if connection.session.clientCertChain:
+ print(" Client X.509 SHA1 fingerprint: %s" %
+ connection.session.clientCertChain.getFingerprint())
+ if connection.session.serverCertChain:
+ print(" Server X.509 SHA1 fingerprint: %s" %
+ connection.session.serverCertChain.getFingerprint())
+ if connection.session.serverName:
+ print(" SNI: %s" % connection.session.serverName)
+ if connection.session.tackExt:
+ if connection.session.tackInHelloExt:
+ emptyStr = "\n (via TLS Extension)"
+ else:
+ emptyStr = "\n (via TACK Certificate)"
+ print(" TACK: %s" % emptyStr)
+ print(str(connection.session.tackExt))
+ print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+
+
+def clientCmd(argv):
+ (address, privateKey, certChain, username, password) = \
+ handleArgs(argv, "kcup")
+
+ if (certChain and not privateKey) or (not certChain and privateKey):
+ raise SyntaxError("Must specify CERT and KEY together")
+ if (username and not password) or (not username and password):
+ raise SyntaxError("Must specify USER with PASS")
+ if certChain and username:
+ raise SyntaxError("Can use SRP or client cert for auth, not both")
- print "Test 14a - good mutual X.509, SSLv3"
- connection = connect()
+ #Connect to server
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(5)
+ sock.connect(address)
+ connection = TLSConnection(sock)
+
settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 15 - mutual X.509 faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
- assert()
- except:
- pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 16 - good mutual cryptoID"
- connection = connect()
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
- connection.close()
- connection.sock.close()
-
- print "Test 17 - mutual cryptoID faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 18 - good SRP, prepare to resume"
- sessionCache = SessionCache()
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
- connection.close()
- connection.sock.close()
-
- print "Test 19 - resumption"
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
- #Don't close! -- see next test
-
- print "Test 20 - invalidated resumption"
- try:
- connection.read(min=1, max=1)
- assert() #Client is going to close the socket without a close_notify
- except TLSAbruptCloseError, e:
- pass
- connection = connect()
+ settings.useExperimentalTackExtension = True
+
try:
- connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
- except TLSLocalAlert, alert:
- if alert.description != AlertDescription.bad_record_mac:
+ start = time.clock()
+ if username and password:
+ connection.handshakeClientSRP(username, password,
+ settings=settings, serverName=address[0])
+ else:
+ connection.handshakeClientCert(certChain, privateKey,
+ settings=settings, serverName=address[0])
+ stop = time.clock()
+ print("Handshake success")
+ except TLSLocalAlert as a:
+ if a.description == AlertDescription.user_canceled:
+ print(str(a))
+ else:
raise
- connection.sock.close()
-
- print "Test 21 - HTTPS test X.509"
-
- #Close the current listening socket
- lsock.close()
-
- #Create and run an HTTP Server using TLSSocketServerMixIn
- class MyHTTPServer(TLSSocketServerMixIn,
- BaseHTTPServer.HTTPServer):
- def handshake(self, tlsConnection):
- tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
- return True
- cd = os.getcwd()
- os.chdir(dir)
- address = address[0], address[1]+1
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
- for x in range(6):
- httpd.handle_request()
- httpd.server_close()
- cd = os.chdir(cd)
-
- if cryptoIDlibLoaded:
- print "Test 21a - HTTPS test SRP+cryptoID"
-
- #Create and run an HTTP Server using TLSSocketServerMixIn
- class MyHTTPServer(TLSSocketServerMixIn,
- BaseHTTPServer.HTTPServer):
- def handshake(self, tlsConnection):
- tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
- verifierDB=verifierDB)
- return True
- cd = os.getcwd()
- os.chdir(dir)
- address = address[0], address[1]+1
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
- for x in range(6):
- httpd.handle_request()
- httpd.server_close()
- cd = os.chdir(cd)
-
- #Re-connect the listening socket
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- address = address[0], address[1]+1
- lsock.bind(address)
- lsock.listen(5)
-
- def connect():
- return TLSConnection(lsock.accept()[0])
-
- implementations = []
- if cryptlibpyLoaded:
- implementations.append("cryptlib")
- if m2cryptoLoaded:
- implementations.append("openssl")
- if pycryptoLoaded:
- implementations.append("pycrypto")
- implementations.append("python")
-
- print "Test 22 - different ciphers"
- for implementation in ["python"] * len(implementations):
- for cipher in ["aes128", "aes256", "rc4"]:
-
- print "Test 22:",
- connection = connect()
-
- settings = HandshakeSettings()
- settings.cipherNames = [cipher]
- settings.cipherImplementations = [implementation, "python"]
-
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
- print connection.getCipherName(), connection.getCipherImplementation()
- h = connection.read(min=5, max=5)
- assert(h == "hello")
- connection.write(h)
- connection.close()
- connection.sock.close()
-
- print "Test 23 - throughput test"
- for implementation in implementations:
- for cipher in ["aes128", "aes256", "3des", "rc4"]:
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
- continue
-
- print "Test 23:",
- connection = connect()
-
- settings = HandshakeSettings()
- settings.cipherNames = [cipher]
- settings.cipherImplementations = [implementation, "python"]
-
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
- print connection.getCipherName(), connection.getCipherImplementation()
- h = connection.read(min=50000, max=50000)
- assert(h == "hello"*10000)
- connection.write(h)
- connection.close()
- connection.sock.close()
-
- print "Test succeeded"
-
-
-
-
-
-
-
-
-
-
-
-
-if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
- print ""
- print "Version: 0.3.8"
- print ""
- print "RNG: %s" % prngName
- print ""
- print "Modules:"
- if cryptlibpyLoaded:
- print " cryptlib_py : Loaded"
- else:
- print " cryptlib_py : Not Loaded"
- if m2cryptoLoaded:
- print " M2Crypto : Loaded"
- else:
- print " M2Crypto : Not Loaded"
- if pycryptoLoaded:
- print " pycrypto : Loaded"
- else:
- print " pycrypto : Not Loaded"
- if gmpyLoaded:
- print " GMPY : Loaded"
- else:
- print " GMPY : Not Loaded"
- if cryptoIDlibLoaded:
- print " cryptoIDlib : Loaded"
- else:
- print " cryptoIDlib : Not Loaded"
- print ""
- print "Commands:"
- print ""
- print " clientcert <server> [<chain> <key>]"
- print " clientsharedkey <server> <user> <pass>"
- print " clientsrp <server> <user> <pass>"
- print " clienttest <server> <dir>"
- print ""
- print " serversrp <server> <verifierDB>"
- print " servercert <server> <chain> <key> [req]"
- print " serversrpcert <server> <verifierDB> <chain> <key>"
- print " serversharedkey <server> <sharedkeyDB>"
- print " servertest <server> <dir>"
- sys.exit()
-
-cmd = sys.argv[1].lower()
-
-class Args:
- def __init__(self, argv):
- self.argv = argv
- def get(self, index):
- if len(self.argv)<=index:
- raise SyntaxError("Not enough arguments")
- return self.argv[index]
- def getLast(self, index):
- if len(self.argv)>index+1:
- raise SyntaxError("Too many arguments")
- return self.get(index)
-
-args = Args(sys.argv)
-
-def reformatDocString(s):
- lines = s.splitlines()
- newLines = []
- for line in lines:
- newLines.append(" " + line.strip())
- return "\n".join(newLines)
-
-try:
- if cmd == "clienttest":
- address = args.get(2)
- dir = args.getLast(3)
- clientTest(address, dir)
- sys.exit()
-
- elif cmd.startswith("client"):
- address = args.get(2)
-
- #Split address into hostname/port tuple
- address = address.split(":")
- if len(address)==1:
- address.append("4443")
- address = ( address[0], int(address[1]) )
-
- def connect():
- #Connect to server
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if hasattr(sock, "settimeout"):
- sock.settimeout(5)
- sock.connect(address)
-
- #Instantiate TLSConnections
- return TLSConnection(sock)
-
- try:
- if cmd == "clientsrp":
- username = args.get(3)
- password = args.getLast(4)
- connection = connect()
- start = time.clock()
- connection.handshakeClientSRP(username, password)
- elif cmd == "clientsharedkey":
- username = args.get(3)
- password = args.getLast(4)
- connection = connect()
- start = time.clock()
- connection.handshakeClientSharedKey(username, password)
- elif cmd == "clientcert":
- certChain = None
- privateKey = None
- if len(sys.argv) > 3:
- certFilename = args.get(3)
- keyFilename = args.getLast(4)
-
- s1 = open(certFilename, "rb").read()
- s2 = open(keyFilename, "rb").read()
-
- #Try to create cryptoID cert chain
- if cryptoIDlibLoaded:
- try:
- certChain = CertChain().parse(s1)
- privateKey = parsePrivateKey(s2)
- except:
- certChain = None
- privateKey = None
-
- #Try to create X.509 cert chain
- if not certChain:
- x509 = X509()
- x509.parse(s1)
- certChain = X509CertChain([x509])
- privateKey = parsePrivateKey(s2)
-
- connection = connect()
- start = time.clock()
- connection.handshakeClientCert(certChain, privateKey)
- else:
- raise SyntaxError("Unknown command")
-
- except TLSLocalAlert, a:
- if a.description == AlertDescription.bad_record_mac:
- if cmd == "clientsharedkey":
- print "Bad sharedkey password"
- else:
- raise
- elif a.description == AlertDescription.user_canceled:
- print str(a)
+ sys.exit(-1)
+ except TLSRemoteAlert as a:
+ if a.description == AlertDescription.unknown_psk_identity:
+ if username:
+ print("Unknown username")
else:
raise
- sys.exit()
- except TLSRemoteAlert, a:
- if a.description == AlertDescription.unknown_srp_username:
- if cmd == "clientsrp":
- print "Unknown username"
- else:
- raise
- elif a.description == AlertDescription.bad_record_mac:
- if cmd == "clientsrp":
- print "Bad username or password"
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
+ elif a.description == AlertDescription.bad_record_mac:
+ if username:
+ print("Bad username or password")
else:
raise
- sys.exit()
-
- stop = time.clock()
- print "Handshake success"
- print " Handshake time: %.4f seconds" % (stop - start)
- print " Version: %s.%s" % connection.version
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
- if connection.session.srpUsername:
- print " Client SRP username: %s" % connection.session.srpUsername
- if connection.session.sharedKeyUsername:
- print " Client shared key username: %s" % connection.session.sharedKeyUsername
- if connection.session.clientCertChain:
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
- if connection.session.serverCertChain:
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
- connection.close()
- connection.sock.close()
-
- elif cmd.startswith("server"):
- address = args.get(2)
-
- #Split address into hostname/port tuple
- address = address.split(":")
- if len(address)==1:
- address.append("4443")
- address = ( address[0], int(address[1]) )
-
- verifierDBFilename = None
- sharedKeyDBFilename = None
- certFilename = None
- keyFilename = None
- sharedKeyDB = None
- reqCert = False
-
- if cmd == "serversrp":
- verifierDBFilename = args.getLast(3)
- elif cmd == "servercert":
- certFilename = args.get(3)
- keyFilename = args.get(4)
- if len(sys.argv)>=6:
- req = args.getLast(5)
- if req.lower() != "req":
- raise SyntaxError()
- reqCert = True
- elif cmd == "serversrpcert":
- verifierDBFilename = args.get(3)
- certFilename = args.get(4)
- keyFilename = args.getLast(5)
- elif cmd == "serversharedkey":
- sharedKeyDBFilename = args.getLast(3)
- elif cmd == "servertest":
- address = args.get(2)
- dir = args.getLast(3)
- serverTest(address, dir)
- sys.exit()
-
- verifierDB = None
- if verifierDBFilename:
- verifierDB = VerifierDB(verifierDBFilename)
- verifierDB.open()
-
- sharedKeyDB = None
- if sharedKeyDBFilename:
- sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
- sharedKeyDB.open()
-
- certChain = None
- privateKey = None
- if certFilename:
- s1 = open(certFilename, "rb").read()
- s2 = open(keyFilename, "rb").read()
-
- #Try to create cryptoID cert chain
- if cryptoIDlibLoaded:
- try:
- certChain = CertChain().parse(s1)
- privateKey = parsePrivateKey(s2)
- except:
- certChain = None
- privateKey = None
+ elif a.description == AlertDescription.handshake_failure:
+ print("Unable to negotiate mutually acceptable parameters")
+ else:
+ raise
+ sys.exit(-1)
+ printGoodConnection(connection, stop-start)
+ connection.close()
- #Try to create X.509 cert chain
- if not certChain:
- x509 = X509()
- x509.parse(s1)
- certChain = X509CertChain([x509])
- privateKey = parsePrivateKey(s2)
+def serverCmd(argv):
+ (address, privateKey, certChain, tacks,
+ verifierDB, directory, reqCert) = handleArgs(argv, "kctbvd", ["reqcert"])
+
+
+ if (certChain and not privateKey) or (not certChain and privateKey):
+ raise SyntaxError("Must specify CERT and KEY together")
+ if tacks and not certChain:
+ raise SyntaxError("Must specify CERT with Tacks")
+
+ print("I am an HTTPS test server, I will listen on %s:%d" %
+ (address[0], address[1]))
+ if directory:
+ os.chdir(directory)
+ print("Serving files from %s" % os.getcwd())
+
+ if certChain and privateKey:
+ print("Using certificate and private key...")
+ if verifierDB:
+ print("Using verifier DB...")
+ if tacks:
+ print("Using Tacks...")
+
+ #############
+ sessionCache = SessionCache()
+ class MyHTTPServer(ThreadingMixIn, TLSSocketServerMixIn, HTTPServer):
+ def handshake(self, connection):
+ print("About to handshake...")
+ activationFlags = 0
+ if tacks:
+ if len(tacks) == 1:
+ activationFlags = 1
+ elif len(tacks) == 2:
+ activationFlags = 3
- #Create handler function - performs handshake, then echos all bytes received
- def handler(sock):
try:
- connection = TLSConnection(sock)
+ start = time.clock()
settings = HandshakeSettings()
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
- certChain=certChain, privateKey=privateKey, \
- reqCert=reqCert, settings=settings)
- print "Handshake success"
- print " Version: %s.%s" % connection.version
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
- if connection.session.srpUsername:
- print " Client SRP username: %s" % connection.session.srpUsername
- if connection.session.sharedKeyUsername:
- print " Client shared key username: %s" % connection.session.sharedKeyUsername
- if connection.session.clientCertChain:
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
- if connection.session.serverCertChain:
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
-
- s = ""
- while 1:
- newS = connection.read()
- if not newS:
- break
- s += newS
- if s[-1]=='\n':
- connection.write(s)
- s = ""
- except TLSLocalAlert, a:
- if a.description == AlertDescription.unknown_srp_username:
- print "Unknown SRP username"
- elif a.description == AlertDescription.bad_record_mac:
- if cmd == "serversrp" or cmd == "serversrpcert":
- print "Bad SRP password for:", connection.allegedSrpUsername
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
+ settings.useExperimentalTackExtension=True
+ connection.handshakeServer(certChain=certChain,
+ privateKey=privateKey,
+ verifierDB=verifierDB,
+ tacks=tacks,
+ activationFlags=activationFlags,
+ sessionCache=sessionCache,
+ settings=settings,
+ nextProtos=[b"http/1.1"])
+ # As an example (does not work here):
+ #nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
+ stop = time.clock()
+ except TLSRemoteAlert as a:
+ if a.description == AlertDescription.user_canceled:
+ print(str(a))
+ return False
else:
raise
- except TLSRemoteAlert, a:
- if a.description == AlertDescription.bad_record_mac:
- if cmd == "serversharedkey":
- print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
+ except TLSLocalAlert as a:
+ if a.description == AlertDescription.unknown_psk_identity:
+ if username:
+ print("Unknown username")
+ return False
+ else:
+ raise
+ elif a.description == AlertDescription.bad_record_mac:
+ if username:
+ print("Bad username or password")
+ return False
else:
raise
- elif a.description == AlertDescription.user_canceled:
- print "Handshake cancelled"
elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
- elif a.description == AlertDescription.close_notify:
- pass
+ print("Unable to negotiate mutually acceptable parameters")
+ return False
else:
raise
-
- #Run multi-threaded server
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(address)
- sock.listen(5)
- while 1:
- (newsock, cliAddress) = sock.accept()
- thread.start_new_thread(handler, (newsock,))
-
-
+
+ connection.ignoreAbruptClose = True
+ printGoodConnection(connection, stop-start)
+ return True
+
+ httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
+ httpd.serve_forever()
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ printUsage("Missing command")
+ elif sys.argv[1] == "client"[:len(sys.argv[1])]:
+ clientCmd(sys.argv[2:])
+ elif sys.argv[1] == "server"[:len(sys.argv[1])]:
+ serverCmd(sys.argv[2:])
else:
- print "Bad command: '%s'" % cmd
-except TLSRemoteAlert, a:
- print str(a)
- raise
-
-
-
-
+ printUsage("Unknown command: %s" % sys.argv[1])