-
Notifications
You must be signed in to change notification settings - Fork 0
/
sod.py
195 lines (148 loc) · 6.11 KB
/
sod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import asn1crypto.core as asn1
from asn1crypto.algos import DigestAlgorithm
from asn1crypto.cms import SignerIdentifier
from asn1crypto.util import int_from_bytes
from .base import ElementaryFile, LDSVersionInfo
from .dg import DataGroup, DataGroupNumber
from pymrtd.pki import algo_utils, cms, oids, x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from typing import List, Optional, Union
class LDSSecurityObjectVersion(asn1.Integer):
_map = {
0: 'v0',
1: 'v1'
}
@property
def value(self):
return int_from_bytes(self.contents, signed=True)
class DataGroupHash(asn1.Sequence):
_fields = [
('dataGroupNumber', DataGroupNumber),
('dataGroupHashValue', asn1.OctetString),
]
@property
def number(self) -> DataGroupNumber:
return self['dataGroupNumber']
@property
def hash(self) -> bytes:
return self['dataGroupHashValue'].native
class DataGroupHashValues(asn1.SequenceOf):
_child_spec = DataGroupHash
def contains(self, dgNumber: DataGroupNumber) -> bool:
assert isinstance(dgNumber, DataGroupNumber)
for dg in self:
if dg.number == dgNumber:
return True
return False
def find(self, dgNumber: DataGroupNumber) -> Union[DataGroupHash, None]:
assert isinstance(dgNumber, DataGroupNumber)
for dg in self:
if dg.number == dgNumber:
return dg
return None
class LDSSecurityObject(asn1.Sequence):
_fields = [
('version', LDSSecurityObjectVersion),
('hashAlgorithm', DigestAlgorithm),
('dataGroupHashValues', DataGroupHashValues),
('ldsVersionInfo', LDSVersionInfo, {'optional': True})
]
@property
def version(self) -> LDSSecurityObjectVersion:
return self['version']
@property
def dgHashAlgo(self) -> DigestAlgorithm:
''' Returns the hash algorithm that the hash values of data groups were produced with. '''
return self['hashAlgorithm']
@property
def dgHashes(self) -> DataGroupHashValues:
''' Returns hash values of data groups. '''
return self['dataGroupHashValues']
@property
def ldsVerion(self) -> Union[LDSVersionInfo, None]:
''' Returns the version of LDS. It can return None if version of this object is 0 '''
return self['ldsVersionInfo']
def getDgHasher(self) -> hashes.Hash:
''' Returns hashes.Hash object of dgHashAlgo '''
h = algo_utils.get_hash_algo_by_name(self.dgHashAlgo['algorithm'].native)
return hashes.Hash(h, backend=default_backend())
def find(self, dgNumber: DataGroupNumber) -> Union[DataGroupHash, None]:
''''
Returns DataGroupHash if DataGroupHashValues contains specific data group number, else None
:param dgNumber:
Data group number to find DataGroupHash object
'''
assert isinstance(dgNumber, DataGroupNumber)
return self.dgHashes.find(dgNumber)
def contains(self, dg: DataGroup) -> bool:
''''
Returns True if DataGroupHashValues has matching hash of data group, else False
:param dg:
Data group to find and compare hash value of
'''
assert isinstance(dg, DataGroup)
dgh = self.find(dg.number)
if dgh is None:
return False
h = self.getDgHasher()
h.update(dg.dump())
return h.finalize() == dgh.hash
class SODSignedData(cms.MrtdSignedData):
_certificate_spec = x509.DocumentSignerCertificate
cms.cms_register_encap_content_info_type(
'ldsSecurityObject',
oids.id_mrtd_ldsSecurityObject,
LDSSecurityObject
)
class SODContentInfo(cms.MrtdContentInfo):
_signed_data_spec = SODSignedData
class SODError(Exception):
pass
class SOD(ElementaryFile):
class_ = 1
method = 1
tag = 23
_content_spec = SODContentInfo
@classmethod
def load(cls, encoded_bytes, strict=False):
# Parse parent type
value = super().load(encoded_bytes, strict=strict)
ci = value.content
ctype = ci['content_type'].native
if ctype != 'signed_data': # ICAO 9303-10-p21
raise SODError("Invalid master list content type: {}, should be 'signed_data'".format(ctype))
value._sd = ci['content']
cver = value._sd.version.native
if cver != 'v1' and cver != 'v3' and cver != 'v4': # RFC3369
raise CscaMasterListError("Invalid SignedData version: {}".format(cver))
if value._sd.contentType.dotted != oids.id_mrtd_ldsSecurityObject:
raise SODError("Invalid encapContentInfo type: {}, should be {}".format(value._sd.contentType.dotted, oids.id_mrtd_ldsSecurityObject))
if 1 < value.ldsSecurityObject.version.value < 0:
raise SODError("Unsupported LDSSecurityObject version: {}, should be 0 or 1".format(seco.version))
assert isinstance(value._sd.certificates[0], x509.DocumentSignerCertificate) if len(value._sd.certificates) else True
assert isinstance(value._sd.content, LDSSecurityObject)
return value
@property
def dsCertificates(self) -> Union[List[x509.DocumentSignerCertificate], None]:
''' Returns list of document signer certificates if present, otherwise None. '''
return self._sd.certificates
@property
def ldsSecurityObject(self) -> LDSSecurityObject:
return self._sd.content
@property
def signers(self) -> List[SignerIdentifier]:
''' Returns list of signer identifiers which signed this document. '''
sids = []
for si in self._sd.signerInfos:
sids.append(si['sid'])
return sids
def verify(self, issuer_dsc_certs: Optional[List[x509.DocumentSignerCertificate]] = []) -> None:
'''
Verifies every stored digital signature made over signed LdsSecurityObject.
:raises: SODError - if verification fails or other error occurs.
'''
try:
self._sd.verify(issuer_dsc_certs)
except cms.MrtdSignedDataError as e:
raise SODError(str(e)) from e