Skip to content
120 changes: 120 additions & 0 deletions TpmApi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from pytss import tspi_defines
import pytss
from pytss import *
import pytss.tspi_exceptions as tspi_exceptions
import uuid
from pytss.tspi_defines import *
import binascii

srk_uuid = uuid.UUID('{00000000-0000-0000-0000-000000000001}')
keyFlags = TSS_KEY_TYPE_BIND | TSS_KEY_SIZE_2048 | TSS_KEY_NO_AUTHORIZATION | TSS_KEY_NOT_MIGRATABLE
srkSecret = bytearray([0] * 20)
ownerSecret = bytearray([0] * 20)

def idxToUUID(idx):
return uuid.UUID('{'+str(idx).zfill(8)+'-0000-0000-0000-000000000001}')

def getSrkKey(context):
srk= context.load_key_by_uuid(TSS_PS_TYPE_SYSTEM, srk_uuid)
srkpolicy = srk.get_policy_object(TSS_POLICY_USAGE)
srkpolicy.set_secret(TSS_SECRET_MODE_SHA1, srkSecret)
return srk

def getMasterkeyNumberArray():
return [77, 65, 83, 84, 69, 82, 75, 69, 89]

def getMasterkeyNumberArrayOne():
return [77, 65, 83, 84, 69, 82, 75, 69, 89,49]

def clearKeys(context):
try:
k1 = context.load_key_by_uuid(tss_lib.TSS_PS_TYPE_SYSTEM,old_uuid)
k1.unregisterKey()
k2 = context.load_key_by_uuid(tss_lib.TSS_PS_TYPE_SYSTEM,new_uuid)
k2.unregisterKey()
except:
pass

def get_current_key(idx):
context=TspiContext()
context.connect()
srk = getSrkKey(context)
k= context.load_key_by_uuid(tss_lib.TSS_PS_TYPE_SYSTEM,idxToUUID(idx))
return k

def get_new_key_and_replace_current(idx,first_run=False):

context=TspiContext()
context.connect()
srk = getSrkKey(context)

if first_run==True:
k=context.create_wrap_key(keyFlags,srk.get_handle())
k.load_key()
k.registerKey(idxToUUID(idx),srk_uuid)
return k
else:
kOld=context.load_key_by_uuid(tss_lib.TSS_PS_TYPE_SYSTEM,idxToUUID(idx))
kNew=context.create_wrap_key(keyFlags,srk.get_handle())
kOld.unregisterKey()
kNew.registerKey(idxToUUID(idx),srk_uuid)
kNew.load_key()
return kNew

def get_registered_keys():

context=TspiContext()
context.connect()
keys=context.list_keys()
keys.remove(str(srk_uuid))
indexes=[]
for k in keys:
#cut away leading 0
indexes.append(str(int(k.split("-")[0])))
return indexes

def is_key_registered_to_idx(idx):
return str(idx) in get_registered_keys()

def get_status():
context=TspiContext()
context.connect()
take_ownership(context)
srk = getSrkKey(context)
tpm = context.get_tpm_object()
versionInfo = binascii.b2a_qp(tpm.get_capability(tss_lib.TSS_TPMCAP_VERSION_VAL,0)).decode("ascii").split("=")
chipVer=".".join(versionInfo[2:7])
specLvl=versionInfo[7]
vendor=versionInfo[8]
statusStr=""
statusStr+=("ChipVersion={},SpecLevel={},SpecRevision={},Vendor={}".format(chipVer,specLvl,vendor[0:2],vendor[2:]))
tpmver=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_VERSION,0)).decode("ascii")
manufactInfo=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROP_MANUFACTURER,tss_lib.TSS_TCSCAP_PROP_MANUFACTURER_STR)).decode("ascii")
statusStr+=(",TPMVer={},ManufacturInfo={}".format(tpmver,manufactInfo))

maxkeyslots = binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_SLOTS)).decode("ascii")
maxKeys=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_MAXKEYS)).decode("ascii")
maxSess=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_MAXSESSIONS)).decode("ascii")
maxContexts=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_MAXCONTEXTS)).decode("ascii")
maxInputBuffer=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_INPUTBUFFERSIZE)).decode("ascii")
maxNVavail=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_PROPERTY,tss_lib.TSS_TPMCAP_PROP_MAXNVAVAILABLE)).decode("ascii")
statusStr+=(",KeySlots={},MaxKeys={},MaxSess={},MaxContexts={},InputBufferSize={},MaxNVSpace={}".format(maxkeyslots,maxKeys,maxSess,maxContexts,maxInputBuffer,maxNVavail))

#nvIndices=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_NV_LIST,0)).decode("ascii")
algsrsa=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_RSA)).decode("ascii")
algsdes=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_DES)).decode("ascii")
algs3des=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_3DES)).decode("ascii")
algssha=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_SHA)).decode("ascii")
#algssha256=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_SHA256)).decode("ascii")
algshmac=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_HMAC)).decode("ascii")
algsaes128=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_AES128)).decode("ascii")
algsmgf1=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_MGF1)).decode("ascii")
algsaes192=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_AES192)).decode("ascii")
algsaes256=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_AES256)).decode("ascii")
algsxor=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_XOR)).decode("ascii")
algsaes=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_ALG,tss_lib.TSS_ALG_AES)).decode("ascii")
statusStr+=(",RSA={},DES={},3DES={},SHA-1={},HMAC={},AES128={},MGF1={},AES192={},AES256={},XOR={},AES={}".format(algsrsa,algsdes,algs3des,algssha,algshmac,algsaes128,algsmgf1,algsaes192,algsaes256,algsxor,algsaes))
flags=binascii.hexlify(tpm.get_capability(tss_lib.TSS_TPMCAP_FLAG,0)).decode("ascii")
statusStr+=(",Flags={}".format(flags))
statusStr+=",RegisteredKeys={}".format(get_registered_keys(context))
return statusStr
30 changes: 30 additions & 0 deletions init_tpm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pytss import tspi_defines
import pytss
from pytss import *
import pytss.tspi_exceptions as tspi_exceptions
from pytss.tspi_defines import *

srkSecret = bytearray([0] * 20)
ownerSecret = bytearray([0] * 20)

def take_ownership():
"""Take ownership of a TPM
:param context: The TSS context to use
:returns: True on ownership being taken, False if the TPM is already owned
"""
context=TspiContext()
context.connect()
tpm = context.get_tpm_object()
tpmpolicy = tpm.get_policy_object(TSS_POLICY_USAGE)
tpmpolicy.set_secret(TSS_SECRET_MODE_SHA1, srkSecret)

srk = context.create_rsa_key(TSS_KEY_TSP_SRK | TSS_KEY_AUTHORIZATION)
srkpolicy = srk.get_policy_object(TSS_POLICY_USAGE)
srkpolicy.set_secret(TSS_SECRET_MODE_SHA1, ownerSecret)

try:
tpm.take_ownership(srk)
except tspi_exceptions.TPM_E_DISABLED_CMD:
return False

return True
132 changes: 123 additions & 9 deletions pytss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from interface import tss_lib, ffi
import tspi_exceptions
import hashlib

from pytss.interface import tss_lib, ffi
import pytss.tspi_exceptions
import hashlib
import struct
import uuid

def uuid_to_tss_uuid(uuid):
"""Converts a Python UUID into a TSS UUID"""
Expand All @@ -20,7 +22,13 @@ def uuid_to_tss_uuid(uuid):

return tss_uuid


def tss_uuid_to_uuid(tss_uuid):
"""Converts a TSS UUID to a Python UUID"""
node = struct.unpack(">Q", ffi.buffer(tss_uuid.rgbNode, 6)[:].rjust(8, '\00'.encode("ascii")))[0]
return uuid.UUID(
fields=(tss_uuid.ulTimeLow, tss_uuid.usTimeMid, tss_uuid.usTimeHigh,
tss_uuid.bClockSeqHigh, tss_uuid.bClockSeqLow, node)
)
class TspiObject(object):
def __init__(self, context, ctype, tss_type, flags, handle=None):
"""
Expand Down Expand Up @@ -221,7 +229,6 @@ def sign(self, key):
tss_lib.Tspi_Hash_Sign(self.get_handle(), key.get_handle(), csig_size, csig_data)
return ffi.buffer(csig_data[0], csig_size[0])


class TspiKey(TspiObject):
def __init__(self, context, flags, handle=None):
self.context = context
Expand All @@ -236,6 +243,76 @@ def __del__(self):
# operation
except tspi_exceptions.TSS_E_INVALID_HANDLE:
pass
except tspi_exceptions.TSS_E_KEY_NOT_LOADED:
pass
except tspi_exceptions.TPM_E_FAIL:
pass
def registerKey(self,keyUUID,parentUUID):
'''TSS_RESULT Tspi_Context_RegisterKey(TSS_HCONTEXT hContext, TSS_HKEY hKey,
TSS_FLAG persistentStorageType, TSS_UUID uuidKey,
TSS_FLAG persistentStorageTypeParent, TSS_UUID uuidParentKey);'''
self.uuid=uuid_to_tss_uuid(keyUUID)
tss_lib.Tspi_Context_RegisterKey(self.context,self.get_handle(),
tss_lib.TSS_PS_TYPE_SYSTEM, self.uuid,
tss_lib.TSS_PS_TYPE_SYSTEM, uuid_to_tss_uuid(parentUUID))
return
def unregisterKey(self,uuid=None):
'''TSS_RESULT Tspi_Context_UnregisterKey(TSS_HCONTEXT hContext, TSS_FLAG persistentStorageType,
TSS_UUID uuidKey, TSS_HKEY* phKey);'''
if uuid is None:
tss_lib.Tspi_Context_UnregisterKey(self.context, tss_lib.TSS_PS_TYPE_SYSTEM,
self.uuid, self.handle)
else:
tss_lib.Tspi_Context_UnregisterKey(self.context, tss_lib.TSS_PS_TYPE_SYSTEM,
uuid_to_tss_uuid(uuid), self.handle)
#tss_lib.Tspi_Context_UnRegisterKey
return

def bind(self,data):
"""
Seal data to the local TPM using this key

:param data: The data to seal

:returns: a bytearray of the encrypted data
"""
encdata = TspiObject(self.context, 'TSS_HENCDATA *',
tss_lib.TSS_OBJECT_TYPE_ENCDATA,
tss_lib.TSS_ENCDATA_BIND)

cdata = ffi.new('BYTE[]', len(data))
for i in range(len(data)):
cdata[i] = data[i]

tss_lib.Tspi_Data_Bind(encdata.get_handle(), self.get_handle(),
len(data), cdata)
blob = encdata.get_attribute_data(tss_lib.TSS_TSPATTRIB_ENCDATA_BLOB,
tss_lib.TSS_TSPATTRIB_ENCDATABLOB_BLOB)
return bytearray(blob)

def unbind(self, data):
"""
Unbind data from the local TPM using this key

:param data: The data to unbind

:returns: a bytearray of the unencrypted data
"""
encdata = TspiObject(self.context, 'TSS_HENCDATA *',
tss_lib.TSS_OBJECT_TYPE_ENCDATA,
tss_lib.TSS_ENCDATA_BIND)

encdata.set_attribute_data(tss_lib.TSS_TSPATTRIB_ENCDATA_BLOB,
tss_lib.TSS_TSPATTRIB_ENCDATABLOB_BLOB, data)

bloblen = ffi.new('UINT32 *')
blob = ffi.new('BYTE **')

tss_lib.Tspi_Data_Unbind(encdata.get_handle(), self.get_handle(),
bloblen, blob)
ret = bytearray(blob[0][0:bloblen[0]])
tss_lib.Tspi_Context_FreeMemory(self.context, blob[0])
return ret

def set_modulus(self, n):
"""
Expand Down Expand Up @@ -336,6 +413,9 @@ def __init__(self, context):
self.handle = tpm
self.context = context

def load_key_by_handle(self, keyHandle, wrappingKeyHandle):
tss_lib.Tspi_Key_LoadKey(keyHandle, wrappingKeyHandle)

def collate_identity_request(self, srk, pubkey, aik):
"""
Generate everything required to authenticate the TPM to a third party
Expand Down Expand Up @@ -368,10 +448,15 @@ def get_capability(self, cap, sub):
"""
resp = ffi.new('BYTE **')
resplen = ffi.new('UINT32 *')
csub = ffi.new('BYTE []', len(sub))
''' csub = ffi.new('BYTE []', len(sub))
for i in range(len(sub)):
csub[i] = sub[i]
tss_lib.Tspi_TPM_Getcapability(self.handle[0], cap, len(sub), csub,
'''
csub = ffi.new('BYTE [4]')
bytearr = [hex(sub >> i & 0xff) for i in (0,8,16,24)]
for i in range(4):
csub[i] =int(bytearr[i],16)
tss_lib.Tspi_TPM_GetCapability(self.handle[0], cap,4, csub,
resplen, resp)
ret = bytearray(resp[0][0:resplen[0]])
tss_lib.Tspi_Context_FreeMemory(self.context, resp[0])
Expand Down Expand Up @@ -474,6 +559,18 @@ def extend_pcr(self, pcr, data, event):
tss_lib.Tspi_Context_FreeMemory(self.context, blob[0])
return ret

class TspiWrapKey(TspiKey):
def __init__(self, context, flags, wrappingKeyHandle):
self.context = context
self.parent=wrappingKeyHandle
super(TspiWrapKey, self).__init__(self.context, flags)
blaa = self.get_handle()
tss_lib.Tspi_Key_CreateKey(self.get_handle(),self.parent,0)
self.load_key()

def load_key(self):
tss_lib.Tspi_Key_LoadKey(self.get_handle(), self.parent)

class TspiContext():
def __init__(self):
self.context = ffi.new('TSS_HCONTEXT *')
Expand Down Expand Up @@ -536,6 +633,10 @@ def create_hash(self, flags):
obj = TspiHash(self.context, flags)
return obj

def create_wrap_key(self,flags,wrappingkeyHandle):
obj = TspiWrapKey(self.context,flags,wrappingkeyHandle)
return obj

def create_rsa_key(self, flags):
"""
Create a Tspi key object associated with this context
Expand All @@ -559,6 +660,7 @@ def load_key_by_uuid(self, storagetype, uuid):
tss_lib.Tspi_Context_LoadKeyByUUID(self.context, storagetype, tss_uuid,
tss_key)
key = TspiKey(self.context, None, handle=tss_key)
key.uuid=tss_uuid
return key

def load_key_by_blob(self, srk, blob):
Expand All @@ -582,7 +684,19 @@ def load_key_by_blob(self, srk, blob):
def get_tpm_object(self):
"""Returns the TspiTPM associated with this context"""
return self.tpm

def list_keys(self):
"""
Return a tuple of uuid.UUID instances and storagetype values for all
available keys on the TPM. from @nresare
"""
count = ffi.new('UINT32 *')
key_info_value = ffi.new('TSS_KM_KEYINFO2**')
tss_lib.Tspi_Context_GetRegisteredKeysByUUID2(self.context, 1, ffi.NULL, count, key_info_value)
values = []
for i in range(count[0]):
key_info = key_info_value[0][i]
values.append(str(tss_uuid_to_uuid(key_info.keyUUID)))
return values

def _c_byte_array(data):
"""
Expand All @@ -592,7 +706,7 @@ def _c_byte_array(data):
the contents of data
"""
cdata = ffi.new('BYTE []', len(data))
if isinstance(data, basestring):
if isinstance(data, str):
data = bytearray(data)
for i in range(len(data)):
cdata[i] = data[i]
Expand Down
8 changes: 4 additions & 4 deletions pytss/attestationutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,28 +437,28 @@ def get_ekcert(context):
# Verify that the certificate is well formed
tag = blob[0] << 8 | blob[1]
if tag != 0x1001:
print "Invalid tag %x %x\n" % (blob[0], blob[1])
print("Invalid tag %x %x\n" % (blob[0], blob[1]))
return None

certtype = blob[2]
if certtype != 0:
print "Not a full certificate\n"
print("Not a full certificate\n")
return None

ekbuflen = blob[3] << 8 | blob[4]
offset = 5

blob = nv.read_value(offset, 2)
if len(blob) < 2:
print "Invalid length"
print("Invalid length")
return None

tag = blob[0] << 8 | blob[1]
if tag == 0x1002:
offset += 2
ekbuflen -= 2
elif blob[0] != 0x30:
print "Invalid header %x %x" % (blob[0], blob[1])
print("Invalid header %x %x" % (blob[0], blob[1]))
return None

ekbuf = bytearray()
Expand Down
Loading