diff --git a/httpsig_cffi/sign.py b/httpsig_cffi/sign.py index dd5726e..9672016 100644 --- a/httpsig_cffi/sign.py +++ b/httpsig_cffi/sign.py @@ -1,9 +1,6 @@ -import base64 -import six - from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, hmac, serialization -from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives import hmac, serialization +from cryptography.hazmat.primitives.asymmetric import padding from .utils import * @@ -18,7 +15,7 @@ class Signer(object): Password-protected keyfiles are not supported. """ - def __init__(self, secret, algorithm=None): + def __init__(self, secret, algorithm=None, password=None): if algorithm is None: algorithm = DEFAULT_SIGN_ALGORITHM @@ -35,7 +32,7 @@ def __init__(self, secret, algorithm=None): try: self._rsahash = HASHES[self.hash_algorithm] self._rsa_private = serialization.load_pem_private_key(secret, - None, + password, backend=default_backend()) self._rsa_public = self._rsa_private.public_key() except ValueError as e: @@ -80,7 +77,7 @@ def _sign(self, data): class HeaderSigner(Signer): - ''' + """ Generic object that will sign headers as a dictionary using the http-signature scheme. https://github.com/joyent/node-http-signature/blob/master/http_signing.md @@ -88,12 +85,13 @@ class HeaderSigner(Signer): :arg secret: a PEM-encoded RSA private key or an HMAC secret (must match the algorithm) :arg algorithm: one of the six specified algorithms :arg headers: a list of http headers to be included in the signing string, defaulting to ['date']. - ''' - def __init__(self, key_id, secret, algorithm=None, headers=None): + :arg password: password for an encrypted private key, defaulting to None. + """ + def __init__(self, key_id, secret, algorithm=None, headers=None, password=None): if algorithm is None: algorithm = DEFAULT_SIGN_ALGORITHM - super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm) + super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, password=password) self.headers = headers or ['date'] self.signature_template = build_signature_template(key_id, algorithm, headers) diff --git a/httpsig_cffi/tests/rsa_private_pass.pem b/httpsig_cffi/tests/rsa_private_pass.pem new file mode 100644 index 0000000..a1e1c81 --- /dev/null +++ b/httpsig_cffi/tests/rsa_private_pass.pem @@ -0,0 +1,18 @@ +-----BEGIN RSA PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: AES-128-CBC,6BA4E0F6773E283292C483980349199A + +/DgQVvifkEJ6oCs3RnvFSQlbI+aQGAz8soq98mKbPAOA3fxUJ3NyTRxUi/vLmu5L +j3WHc9lbxMggLYTHMclXZ/VTWYU5xXAXxOZSSDFpJ25SGXMYEHJVX8C9wkCRVESk +uAodi1rA/jyg096SvAwrpsZIflqlTcefiNlnfSVZ0XgJwHAdiccqxYXYWqBN+pIH +l1jXtH0HhdcEe9iHqMQlqX79TPck4PNQGon1F70Tuh+d04LTYgkeJD4qZvXen9Ik +LBCYC9/UuK5dBtADoBPKtm2AAE71LjraZxbDJXfVOyaDgKqMkiZcWRKhrqX012Me +vDOI/UOiWoi4Rwf21nMU94FEERHwfW8sYv9Av+dncsWwuvq4mwEdl44QXWiPl+yY +Gh3mRzOsH3p33Goh8ikSI4JuXBk8V1IIqTMBSn9FFy32YTgx5k8K68j9Qwafj/H9 +ci0i1NkIufaCKhYO3GSVuxWir/vytqUdqfGo7sEh7rQm2emD8SQrEQUeU8STRIBa +dSZpnzPuNhW+UCKmavFLm1DdL350IREsE2h6rj6XXwY7Maby59m9+VzR8m5edyti +nzgR3LG1RSHy9e8RWHcVWHL92GlJDwnZ4Y6GpoLVCuS6zXiA1pbVQXyGkvtd/O4+ +4KOR5jFZK8X6uG4U9aovienLSd9/YZZEip5TbfbOjsG6R4KKYJMi+RsvCLjUssx4 +CEtIlqwWv/ONvhLUvnh7ER+cw3TGtkGJl02/h5ezWLdligE9uonBkqECyF2jXjUE +ArUqqgHh2CBt0RjbCA+rpysY8NfB6dO14eAAhjI2gGT07KE/D7t1FyCW2aZMA1lo +-----END RSA PRIVATE KEY----- diff --git a/httpsig_cffi/tests/test_signature.py b/httpsig_cffi/tests/test_signature.py index 6c0a735..e04725b 100755 --- a/httpsig_cffi/tests/test_signature.py +++ b/httpsig_cffi/tests/test_signature.py @@ -3,7 +3,6 @@ import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -import json import unittest import httpsig_cffi.sign as sign @@ -37,6 +36,29 @@ def test_default(self): self.assertEqual(params['algorithm'], 'rsa-sha256') self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=') + def test_password(self): + """Identical to test_default, but with a password on the private key.""" + key_path = os.path.join(os.path.dirname(__file__), 'rsa_private_pass.pem') + key = open(key_path, 'rb').read() + password = b"hunter2" + + hs = sign.HeaderSigner(key_id='Test', secret=key, password=password) + unsigned = { + 'Date': 'Thu, 05 Jan 2012 21:31:40 GMT' + } + signed = hs.sign(unsigned) + self.assertIn('Date', signed) + self.assertEqual(unsigned['Date'], signed['Date']) + self.assertIn('Authorization', signed) + auth = parse_authorization_header(signed['authorization']) + params = auth[1] + self.assertIn('keyId', params) + self.assertIn('algorithm', params) + self.assertIn('signature', params) + self.assertEqual(params['keyId'], 'Test') + self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=') + def test_all(self): hs = sign.HeaderSigner(key_id='Test', secret=self.key, headers=[ '(request-target)', diff --git a/httpsig_cffi/tests/test_verify.py b/httpsig_cffi/tests/test_verify.py index e62aff0..b2e4091 100755 --- a/httpsig_cffi/tests/test_verify.py +++ b/httpsig_cffi/tests/test_verify.py @@ -3,12 +3,12 @@ import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -import json import unittest from httpsig_cffi.sign import HeaderSigner, Signer from httpsig_cffi.verify import HeaderVerifier, Verifier + class BaseTestCase(unittest.TestCase): def _parse_auth(self, auth): """Basic Authorization header parsing.""" @@ -135,6 +135,7 @@ def setUp(self): super(TestVerifyHMACSHA256, self).setUp() self.algorithm = "hmac-sha256" + class TestVerifyHMACSHA512(TestVerifyHMACSHA1): def setUp(self): super(TestVerifyHMACSHA512, self).setUp() @@ -154,11 +155,13 @@ def setUp(self): self.sign_secret = private_key self.verify_secret = public_key + class TestVerifyRSASHA256(TestVerifyRSASHA1): def setUp(self): super(TestVerifyRSASHA256, self).setUp() self.algorithm = "rsa-sha256" + class TestVerifyRSASHA512(TestVerifyRSASHA1): def setUp(self): super(TestVerifyRSASHA512, self).setUp() diff --git a/httpsig_cffi/utils.py b/httpsig_cffi/utils.py index fc5de71..09b9cc6 100644 --- a/httpsig_cffi/utils.py +++ b/httpsig_cffi/utils.py @@ -22,12 +22,13 @@ class HttpSigException(Exception): pass + def generate_message(required_headers, headers, host=None, method=None, path=None): headers = CaseInsensitiveDict(headers) - + if not required_headers: required_headers = ['date'] - + signable_list = [] for h in required_headers: h = h.lower() @@ -35,7 +36,7 @@ def generate_message(required_headers, headers, host=None, method=None, path=Non if not method or not path: raise Exception('method and path arguments required when using "(request-target)"') signable_list.append('%s: %s %s' % (h, method.lower(), path)) - + elif h == 'host': # 'host' special case due to requests lib restrictions # 'host' is not available when adding auth so must use a param @@ -58,12 +59,12 @@ def generate_message(required_headers, headers, host=None, method=None, path=Non def parse_authorization_header(header): if not isinstance(header, six.string_types): - header = header.decode("ascii") #HTTP headers cannot be Unicode. - + header = header.decode("ascii") # HTTP headers cannot be Unicode. + auth = header.split(" ", 1) if len(auth) > 2: raise ValueError('Invalid authorization header. (eg. Method key1=value1,key2="value, \"2\"")') - + # Split up any args into a dictionary. values = {} if len(auth) == 2: @@ -71,7 +72,7 @@ def parse_authorization_header(header): if auth_value and len(auth_value): # This is tricky string magic. Let urllib do it. fields = parse_http_list(auth_value) - + for item in fields: # Only include keypairs. if '=' in item: @@ -79,15 +80,16 @@ def parse_authorization_header(header): key, value = item.split('=', 1) if not (len(key) and len(value)): continue - + # Unquote values, if quoted. if value[0] == '"': value = value[1:-1] - + values[key] = value - + # ("Signature", {"headers": "date", "algorithm": "hmac-sha256", ... }) - return (auth[0], CaseInsensitiveDict(values)) + return auth[0], CaseInsensitiveDict(values) + def build_signature_template(key_id, algorithm, headers): """ @@ -120,12 +122,15 @@ def lkv(d): d = d[len+4:] return parts + def sig(d): return lkv(d)[1] + def is_rsa(keyobj): return lkv(keyobj.blob)[0] == "ssh-rsa" + # based on http://stackoverflow.com/a/2082169/151401 class CaseInsensitiveDict(dict): def __init__(self, d=None, **kwargs): @@ -142,6 +147,7 @@ def __getitem__(self, key): def __contains__(self, key): return super(CaseInsensitiveDict, self).__contains__(key.lower()) + # currently busted... def get_fingerprint(key): """ diff --git a/httpsig_cffi/verify.py b/httpsig_cffi/verify.py index 1cd46b2..620d73b 100644 --- a/httpsig_cffi/verify.py +++ b/httpsig_cffi/verify.py @@ -1,11 +1,7 @@ """ Module to assist in verifying a signed header. """ -import six - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, hmac, serialization -from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.asymmetric import padding from cryptography.exceptions import InvalidSignature from base64 import b64decode @@ -45,7 +41,7 @@ def _verify(self, data, signature): elif self.sign_algorithm == 'hmac': h = self._sign_hmac(data) s = b64decode(signature) - return (h == s) + return h == s else: raise HttpSigException("Unsupported algorithm.")