Skip to content
This repository was archived by the owner on Dec 15, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions httpsig_cffi/sign.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import base64
import six

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hmac, serialization
from cryptography.hazmat.primitives.asymmetric import padding

from .utils import *

Expand All @@ -18,7 +15,7 @@ class Signer(object):

Password-protected keyfiles are not supported.
"""
def __init__(self, secret, algorithm=None):
def __init__(self, secret, algorithm=None, password=None):
if algorithm is None:
algorithm = DEFAULT_SIGN_ALGORITHM

Expand All @@ -35,7 +32,7 @@ def __init__(self, secret, algorithm=None):
try:
self._rsahash = HASHES[self.hash_algorithm]
self._rsa_private = serialization.load_pem_private_key(secret,
None,
password,
backend=default_backend())
self._rsa_public = self._rsa_private.public_key()
except ValueError as e:
Expand Down Expand Up @@ -80,20 +77,21 @@ def _sign(self, data):


class HeaderSigner(Signer):
'''
"""
Generic object that will sign headers as a dictionary using the http-signature scheme.
https://github.com/joyent/node-http-signature/blob/master/http_signing.md

:arg key_id: the mandatory label indicating to the server which secret to use
:arg secret: a PEM-encoded RSA private key or an HMAC secret (must match the algorithm)
:arg algorithm: one of the six specified algorithms
:arg headers: a list of http headers to be included in the signing string, defaulting to ['date'].
'''
def __init__(self, key_id, secret, algorithm=None, headers=None):
:arg password: password for an encrypted private key, defaulting to None.
"""
def __init__(self, key_id, secret, algorithm=None, headers=None, password=None):
if algorithm is None:
algorithm = DEFAULT_SIGN_ALGORITHM

super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm)
super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, password=password)
self.headers = headers or ['date']
self.signature_template = build_signature_template(key_id, algorithm, headers)

Expand Down
18 changes: 18 additions & 0 deletions httpsig_cffi/tests/rsa_private_pass.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-128-CBC,6BA4E0F6773E283292C483980349199A

/DgQVvifkEJ6oCs3RnvFSQlbI+aQGAz8soq98mKbPAOA3fxUJ3NyTRxUi/vLmu5L
j3WHc9lbxMggLYTHMclXZ/VTWYU5xXAXxOZSSDFpJ25SGXMYEHJVX8C9wkCRVESk
uAodi1rA/jyg096SvAwrpsZIflqlTcefiNlnfSVZ0XgJwHAdiccqxYXYWqBN+pIH
l1jXtH0HhdcEe9iHqMQlqX79TPck4PNQGon1F70Tuh+d04LTYgkeJD4qZvXen9Ik
LBCYC9/UuK5dBtADoBPKtm2AAE71LjraZxbDJXfVOyaDgKqMkiZcWRKhrqX012Me
vDOI/UOiWoi4Rwf21nMU94FEERHwfW8sYv9Av+dncsWwuvq4mwEdl44QXWiPl+yY
Gh3mRzOsH3p33Goh8ikSI4JuXBk8V1IIqTMBSn9FFy32YTgx5k8K68j9Qwafj/H9
ci0i1NkIufaCKhYO3GSVuxWir/vytqUdqfGo7sEh7rQm2emD8SQrEQUeU8STRIBa
dSZpnzPuNhW+UCKmavFLm1DdL350IREsE2h6rj6XXwY7Maby59m9+VzR8m5edyti
nzgR3LG1RSHy9e8RWHcVWHL92GlJDwnZ4Y6GpoLVCuS6zXiA1pbVQXyGkvtd/O4+
4KOR5jFZK8X6uG4U9aovienLSd9/YZZEip5TbfbOjsG6R4KKYJMi+RsvCLjUssx4
CEtIlqwWv/ONvhLUvnh7ER+cw3TGtkGJl02/h5ezWLdligE9uonBkqECyF2jXjUE
ArUqqgHh2CBt0RjbCA+rpysY8NfB6dO14eAAhjI2gGT07KE/D7t1FyCW2aZMA1lo
-----END RSA PRIVATE KEY-----
24 changes: 23 additions & 1 deletion httpsig_cffi/tests/test_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

import json
import unittest

import httpsig_cffi.sign as sign
Expand Down Expand Up @@ -37,6 +36,29 @@ def test_default(self):
self.assertEqual(params['algorithm'], 'rsa-sha256')
self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=')

def test_password(self):
"""Identical to test_default, but with a password on the private key."""
key_path = os.path.join(os.path.dirname(__file__), 'rsa_private_pass.pem')
key = open(key_path, 'rb').read()
password = b"hunter2"

hs = sign.HeaderSigner(key_id='Test', secret=key, password=password)
unsigned = {
'Date': 'Thu, 05 Jan 2012 21:31:40 GMT'
}
signed = hs.sign(unsigned)
self.assertIn('Date', signed)
self.assertEqual(unsigned['Date'], signed['Date'])
self.assertIn('Authorization', signed)
auth = parse_authorization_header(signed['authorization'])
params = auth[1]
self.assertIn('keyId', params)
self.assertIn('algorithm', params)
self.assertIn('signature', params)
self.assertEqual(params['keyId'], 'Test')
self.assertEqual(params['algorithm'], 'rsa-sha256')
self.assertEqual(params['signature'], 'ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=')

def test_all(self):
hs = sign.HeaderSigner(key_id='Test', secret=self.key, headers=[
'(request-target)',
Expand Down
5 changes: 4 additions & 1 deletion httpsig_cffi/tests/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

import json
import unittest

from httpsig_cffi.sign import HeaderSigner, Signer
from httpsig_cffi.verify import HeaderVerifier, Verifier


class BaseTestCase(unittest.TestCase):
def _parse_auth(self, auth):
"""Basic Authorization header parsing."""
Expand Down Expand Up @@ -135,6 +135,7 @@ def setUp(self):
super(TestVerifyHMACSHA256, self).setUp()
self.algorithm = "hmac-sha256"


class TestVerifyHMACSHA512(TestVerifyHMACSHA1):
def setUp(self):
super(TestVerifyHMACSHA512, self).setUp()
Expand All @@ -154,11 +155,13 @@ def setUp(self):
self.sign_secret = private_key
self.verify_secret = public_key


class TestVerifyRSASHA256(TestVerifyRSASHA1):
def setUp(self):
super(TestVerifyRSASHA256, self).setUp()
self.algorithm = "rsa-sha256"


class TestVerifyRSASHA512(TestVerifyRSASHA1):
def setUp(self):
super(TestVerifyRSASHA512, self).setUp()
Expand Down
28 changes: 17 additions & 11 deletions httpsig_cffi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@
class HttpSigException(Exception):
pass


def generate_message(required_headers, headers, host=None, method=None, path=None):
headers = CaseInsensitiveDict(headers)

if not required_headers:
required_headers = ['date']

signable_list = []
for h in required_headers:
h = h.lower()
if h == '(request-target)':
if not method or not path:
raise Exception('method and path arguments required when using "(request-target)"')
signable_list.append('%s: %s %s' % (h, method.lower(), path))

elif h == 'host':
# 'host' special case due to requests lib restrictions
# 'host' is not available when adding auth so must use a param
Expand All @@ -58,36 +59,37 @@ def generate_message(required_headers, headers, host=None, method=None, path=Non

def parse_authorization_header(header):
if not isinstance(header, six.string_types):
header = header.decode("ascii") #HTTP headers cannot be Unicode.
header = header.decode("ascii") # HTTP headers cannot be Unicode.

auth = header.split(" ", 1)
if len(auth) > 2:
raise ValueError('Invalid authorization header. (eg. Method key1=value1,key2="value, \"2\"")')

# Split up any args into a dictionary.
values = {}
if len(auth) == 2:
auth_value = auth[1]
if auth_value and len(auth_value):
# This is tricky string magic. Let urllib do it.
fields = parse_http_list(auth_value)

for item in fields:
# Only include keypairs.
if '=' in item:
# Split on the first '=' only.
key, value = item.split('=', 1)
if not (len(key) and len(value)):
continue

# Unquote values, if quoted.
if value[0] == '"':
value = value[1:-1]

values[key] = value

# ("Signature", {"headers": "date", "algorithm": "hmac-sha256", ... })
return (auth[0], CaseInsensitiveDict(values))
return auth[0], CaseInsensitiveDict(values)


def build_signature_template(key_id, algorithm, headers):
"""
Expand Down Expand Up @@ -120,12 +122,15 @@ def lkv(d):
d = d[len+4:]
return parts


def sig(d):
return lkv(d)[1]


def is_rsa(keyobj):
return lkv(keyobj.blob)[0] == "ssh-rsa"


# based on http://stackoverflow.com/a/2082169/151401
class CaseInsensitiveDict(dict):
def __init__(self, d=None, **kwargs):
Expand All @@ -142,6 +147,7 @@ def __getitem__(self, key):
def __contains__(self, key):
return super(CaseInsensitiveDict, self).__contains__(key.lower())


# currently busted...
def get_fingerprint(key):
"""
Expand Down
8 changes: 2 additions & 6 deletions httpsig_cffi/verify.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
"""
Module to assist in verifying a signed header.
"""
import six

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidSignature

from base64 import b64decode
Expand Down Expand Up @@ -45,7 +41,7 @@ def _verify(self, data, signature):
elif self.sign_algorithm == 'hmac':
h = self._sign_hmac(data)
s = b64decode(signature)
return (h == s)
return h == s

else:
raise HttpSigException("Unsupported algorithm.")
Expand Down