forked from 0-duke/wdpassport-utils
-
Notifications
You must be signed in to change notification settings - Fork 1
/
wdpassport-utils.py
executable file
·428 lines (372 loc) · 13.4 KB
/
wdpassport-utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#!/usr/bin/env python3
import sys
import os
import struct
import getpass
from hashlib import sha256
from random import randint
import argparse
import subprocess
try:
import py3_sg
except ImportError as e:
print("You need to install the \"py3_sg\" module.")
sys.exit(1)
BLOCK_SIZE = 512
HANDSTORESECURITYBLOCK = 1
dev = None
## Print fail message with red leading characters
def fail(str):
return "\033[91m" + "[!]" + "\033[0m" + " " + str
## Print fail message with green leading characters
def success(str):
return "\033[92m" + "[*]" + "\033[0m" + " " + str
## Print fail message with blue leading characters
def question(str):
return "\033[94m" + "[+]" + "\033[0m" + " " + str
def title(str):
return "\033[93m" + str + "\033[0m"
### Return if the current user is root
def is_root_user():
if os.geteuid() != 0: return False
else: return True
## Convert an integer to his human-readable secure status
def sec_status_to_str(security_status):
if security_status == 0x00:
return "No lock"
elif security_status == 0x01:
return "Locked";
elif security_status == 0x02:
return "Unlocked";
elif security_status == 0x06:
return "Locked, unlock blocked";
elif security_status == 0x07:
return "No keys";
else:
return "unknown";
## Convert an integer to his human-readable cipher algorithm
def cipher_id_to_str(cipher_id):
if cipher_id == 0x10:
return "AES_128_ECB";
elif cipher_id == 0x12:
return "AES_128_CBC";
elif cipher_id == 0x18:
return "AES_128_XTS";
elif cipher_id == 0x20:
return "AES_256_ECB";
elif cipher_id == 0x22:
return "AES_256_CBC";
elif cipher_id == 0x28:
return "AES_256_XTS";
elif cipher_id == 0x30:
return "Full Disk Encryption";
else:
return "unknown";
## Transform "cdb" in char[]
def _scsi_pack_cdb(cdb):
return struct.pack('{0}B'.format(len(cdb)), *cdb)
## Convert int from host byte order to network byte order
def htonl(num):
return struct.pack('!I', num)
## Convert int from host byte order to network byte order
def htons(num):
return struct.pack('!H', num)
## Call the device and get the selected block of Handy Store.
def read_handy_store(page):
cdb = [0xD8,0x00,0x00,0x00,0x00,0x01,0x00,0x00,0x01,0x00]
i = 2
for c in htonl(page):
cdb[i] = c
i+=1
data = py3_sg.read_as_bin_str(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE)
return data
## Calculate checksum on the returned data
def hsb_checksum(data):
c = 0
for i in range(510):
c = c + data[i]
c = c + data[0] ## Some WD Utils count data[0] twice, some other not ...
r = (c * -1) & 0xFF
return hex(r)
## Call the device and get the encryption status.
## The function returns three values:
##
## SecurityStatus:
## 0x00 => No lock
## 0x01 => Locked
## 0x02 => Unlocked
## 0x06 => Locked, unlock blocked
## 0x07 => No keys
## CurrentChiperID
## 0x10 => AES_128_ECB
## 0x12 => AES_128_CBC
## 0x18 => AES_128_XTS
## 0x20 => AES_256_ECB
## 0x22 => AES_256_CBC
## 0x28 => AES_256_XTS
## 0x30 => Full Disk Encryption
## KeyResetEnabler (4 bytes that change every time)
##
def get_encryption_status():
cdb = [0xC0, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x00]
data = py3_sg.read_as_bin_str(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE)
if data[0] != 0x45:
print(fail("Wrong encryption status signature %s" % hex(data[0])))
sys.exit(1)
## SecurityStatus, CurrentChiperID, KeyResetEnabler
return (data[3], data[4], data[8:12])
## Call the device and get the first block of Handy Store.
## The function returns three values:
##
## Iteration - number of iteration (hashing) in password generation
## Salt - salt used in password generation
## Hint - hint of the password if used. TODO.
def read_handy_store_block1():
signature = [0x00, 0x01, 0x44, 0x57]
sector_data = read_handy_store(1)
## Check if retrieved Checksum is correct
if hsb_checksum(sector_data) != hex(sector_data[511]):
print(fail("Wrong HSB1 checksum"))
sys.exit(1)
## Check if retrieved Signature is correct
for i in range(0,4):
if signature[i] != sector_data[i]:
print(fail("Wrong HSB1 signature."))
sys.exit(1);
iteration = struct.unpack_from("<I",sector_data[8:])
salt = sector_data[12:20] + bytes([0x00, 0x00])
hint = sector_data[24:226] + bytes([0x00, 0x00])
return (iteration[0],salt,hint)
## Perform password hashing with requirements obtained from the device
def mk_password_block(passwd, iteration, salt):
clean_salt = ""
for i in range(int(len(salt)/2)):
if salt[2 * i] == 0x00 and salt[2 * i + 1] == 0x00:
break
clean_salt = clean_salt + chr(salt[2 * i])
password = clean_salt + passwd
password = password.encode("utf-16")[2:]
for i in range(iteration):
password = sha256(password).digest()
return password
## Unlock the device
def unlock():
cdb = [0xC1,0xE1,0x00,0x00,0x00,0x00,0x00,0x00,0x28,0x00]
sec_status, cipher_id, key_reset = get_encryption_status()
## Device should be in the correct state
if (sec_status == 0x00 or sec_status == 0x02):
print(fail("Your device is already unlocked!"))
return
elif (sec_status != 0x01):
print(fail("Wrong device status!"))
sys.exit(1)
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18:
pwblen = 16;
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28:
pwblen = 32;
elif cipher_id == 0x30:
pwblen = 32;
else:
print(fail("Unsupported cipher %s" % cipher_id))
sys.exit(1)
## Get password from user
print(question("Insert password to Unlock the device"))
passwd = getpass.getpass()
iteration,salt,hint = read_handy_store_block1()
pwd_hashed = mk_password_block(passwd, iteration, salt)
pw_block = [0x45,0x00,0x00,0x00,0x00,0x00]
for c in htons(pwblen):
pw_block.append(c)
pwblen = pwblen + 8
cdb[8] = pwblen
try:
## If there aren't exceptions the unlock operation is OK.
py3_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block) + pwd_hashed)
print(success("Device unlocked."))
except:
## Wrong password or something bad is happened.
print(fail("Wrong password."))
pass
## Change device password
## If the new password is empty the device state change and become "0x00 - No lock" meaning encryption is no more used.
## If the device is unencrypted a user can choose a password and make the whole device encrypted.
##
## DEVICE HAS TO BE UNLOCKED TO PERFORM THIS OPERATION
##
def change_password():
cdb = [0xC1, 0xE2, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x48, 0x00]
sec_status, cipher_id, key_reset = get_encryption_status()
if (sec_status != 0x02 and sec_status != 0x00):
print(fail("Device has to be unlocked or without encryption to perform this operation"))
sys.exit(1)
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18:
pwblen = 16;
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28:
pwblen = 32;
elif cipher_id == 0x30:
pwblen = 32;
else:
print(fail("Unsupported cipher %s" % cipher_id))
sys.exit(1)
print(question("Insert the OLD password"))
old_passwd = getpass.getpass()
print(question("Insert the NEW password"))
new_passwd = getpass.getpass()
print(question("Confirm the NEW password"))
new_passwd2 = getpass.getpass()
if new_passwd != new_passwd2:
print(fail("Password confirmation doesn't match the given password"))
sys.exit(1)
## Both passwords shouldn't be empty
if (len(old_passwd) <= 0 and len(new_passwd) <= 0):
print(fail("Both passwords shouldn't be empty"))
sys.exit(1)
iteration,salt,hint = read_handy_store_block1()
pw_block = [0x45,0x00,0x00,0x00,0x00,0x00]
for c in htons(pwblen):
pw_block.append(ord(c))
if (len(old_passwd) > 0):
old_passwd_hashed = mk_password_block(old_passwd, iteration, salt)
pw_block[3] = pw_block[3] | 0x10
else:
old_passwd_hashed = ""
for i in range(32):
old_passwd_hashed = old_passwd_hashed + chr(0x00)
if (len(new_passwd) > 0):
new_passwd_hashed = mk_password_block(new_passwd, iteration, salt)
pw_block[3] = pw_block[3] | 0x01
else:
new_passwd_hashed = ""
for i in range(32):
new_passwd_hashed = new_passwd_hashed + chr(0x00)
if pw_block[3] & 0x11 == 0x11:
pw_block[3] = pw_block[3] & 0xEE
pwblen = 8 + 2 * pwblen
cdb[8] = pwblen
try:
## If exception isn't raised the unlock operation gone ok.
py3_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block) + old_passwd_hashed + new_passwd_hashed)
print(success("Password changed."))
except:
## Wrong password or something bad is happened.
print(fail("Error changing password"))
pass
## Change the internal key used for encryption, every data on the device would be permanently unaccessible.
## Device forgets even the partition table so you have to make a new one.
def secure_erase(cipher_id = 0):
cdb = [0xC1, 0xE3, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00]
status, current_cipher_id, key_reset = get_encryption_status()
if cipher_id == 0:
cipher_id = current_cipher_id
pw_block = [0x45,0x00,0x00,0x00,0x30,0x00,0x00,0x00]
if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18:
pwblen = 16;
pw_block[3] = 0x01
elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28:
pwblen = 32;
pw_block[3] = 0x01
elif cipher_id == 0x30:
pwblen = 32;
# pw_block[3] = 0x00
else:
print(fail("Unsupported cipher %s" % cipher_id))
sys.exit(1)
## Set the actual lenght of pw_block (8 bytes + pwblen pseudorandom data)
cdb[8] = pwblen + 8
## Fill pw_block with random data
for rand_byte in os.urandom(pwblen):
pw_block.append(ord(rand_byte))
## key_reset needs to be retrieved immidiatly before the reset request
#status, current_cipher_id, key_reset = get_encryption_status()
key_reset = get_encryption_status()[2]
i = 2
for c in key_reset:
cdb[i] = ord(c)
i += 1
try:
py3_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block))
print(success("Device erased. You need to create a new partition on the device (Hint: fdisk and mkfs)"))
except:
## Something bad is happened.
print(fail("Something wrong."))
pass
## Get device info through "lsscsi" command
def get_device_info(device = None):
if device == None: grep_string = "Passport"
else: grep_string = device
## Ex. from the following string
## "[23:0:0:0] disk WD My Passport 0820 1012 /dev/sdb"
## We extract
p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\"",shell=True,stdout=subprocess.PIPE)
## /dev/sdb
complete_path = p.stdout.read().rstrip()
p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\" | cut -d '/' -f 3",shell=True,stdout=subprocess.PIPE)
## sdb
relative_path = p.stdout.read().rstrip()
p = subprocess.Popen("lsscsi -d|grep " + grep_string + "|cut -d ':' -f 1|cut -d '[' -f 2",shell=True,stdout=subprocess.PIPE)
## 23
host_number = p.stdout.read().rstrip()
return (complete_path, relative_path, host_number)
## Enable mount operations
## Tells the system to scan the "new" (unlocked) device
def enable_mount(device):
sec_status, cipher_id, key_reset = get_encryption_status()
## Device should be in the correct state
if (sec_status == 0x00 or sec_status == 0x02):
rp,hn = get_device_info(device)[1:]
p = subprocess.Popen("echo 1 > /sys/block/" + rp + "/device/delete",shell=True)
p = subprocess.Popen("echo \"- - -\" > /sys/class/scsi_host/host" + hn + "/scan",shell=True)
print(success("Now depending on your system you can mount your device or it will be automagically mounted."))
else:
print(fail("Device needs to be unlocked in order to mount it."))
## Main function, get parameters and manage operations
def main(argv):
global dev
print(title("WD Passport Ultra linux utility v0.1 by duke"))
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--status", required=False, action="store_true", help="Check device status and encryption type")
parser.add_argument("-u", "--unlock", required=False, action="store_true", help="Unlock")
parser.add_argument("-m", "--mount", required=False, action="store_true", help="Enable mount point for an unlocked device")
parser.add_argument("-c", "--change_passwd", required=False, action="store_true", help="Change (or disable) password")
parser.add_argument("-e", "--erase", required=False, action="store_true", help="Secure erase device")
parser.add_argument("-d", "--device", dest="device", required=False, help="Force device path (ex. /dev/sdb). Usually you don't need this option.")
args = parser.parse_args()
if not is_root_user():
print(fail("You need to have root privileges to run this script."))
sys.exit(1)
if len(sys.argv) == 1:
args.status = True
if args.device:
DEVICE = args.device
else:
## Get occurrences of "Passport" devices
p = subprocess.Popen("lsscsi | grep Passport | wc -l",shell=True,stdout=subprocess.PIPE)
if int(p.stdout.read().rstrip()) > 1:
print(fail("Multiple occurences of \"My Passport\" detected. You should specify a device manually (with -d option)."))
sys.exit(1)
DEVICE = get_device_info()[0]
try:
dev = open(DEVICE,"r+b")
except:
print(fail("Something wrong opening device \"%s\"" % (DEVICE)))
sys.exit(1)
if args.status:
status, cipher_id, key_reset = get_encryption_status()
print(success("Device state"))
print("\tSecurity status: %s" % sec_status_to_str(status))
print("\tEncryption type: %s" % cipher_id_to_str(cipher_id))
if args.unlock:
unlock()
if args.change_passwd:
change_password()
if args.erase:
print(question("Any data on the device will be lost. Are you sure you want to continue? [y/N]"))
r = sys.stdin.read(1)
if r.lower() == 'y':
secure_erase(0)
else:
print(success("Ok. Bye."))
if args.mount:
enable_mount(DEVICE)
if __name__ == "__main__":
main(sys.argv[1:])