-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
193 lines (149 loc) · 5.64 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import machineid
import threading
import signal
import requests
import json
import sys
import os
def to_error_message(errs):
"""
Formats an array of error dicts into an error message string. Returns an error message.
"""
return ', '.join(map(lambda e: f"{e['title']}: {e['detail']}", errs))
def validate_license_key_with_fingerprint(license_key, machine_fingerprint):
"""
Validates a license key scoped to a machine fingerprint. Returns a validation code and the license's ID.
"""
validation = requests.post(
f"https://api.keygen.sh/v1/accounts/{os.environ['KEYGEN_ACCOUNT_ID']}/licenses/actions/validate-key",
headers={
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json'
},
data=json.dumps({
'meta': {
'scope': { 'fingerprint': machine_fingerprint },
'key': license_key
}
})
).json()
license_id = None
if 'data' in validation:
data = validation['data']
if data != None:
license_id = data['id']
if 'errors' in validation:
errs = validation['errors']
print(f'[keygen.validate_license_key_with_fingerprint] license_id={license_id} machine_fingerprint={machine_fingerprint} errors={to_error_message(errs)}',
file=sys.stderr)
return None, license_id
validation_code = validation['meta']['code']
print(f'[keygen.validate_license_key_with_fingerprint] validation_code={validation_code} license_id={license_id} machine_fingerprint={machine_fingerprint}')
return validation_code, license_id
def activate_machine_for_license(license_id, machine_fingerprint):
"""
Activates a machine for a license. Returns the activated machine's ID.
"""
activation = requests.post(
f"https://api.keygen.sh/v1/accounts/{os.environ['KEYGEN_ACCOUNT_ID']}/machines",
headers={
'Authorization': f"Bearer {os.environ['KEYGEN_ACTIVATION_TOKEN']}",
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json'
},
data=json.dumps({
'data': {
'type': 'machines',
'attributes': {
'fingerprint': machine_fingerprint
},
'relationships': {
'license': {
'data': { 'type': 'licenses', 'id': license_id }
}
}
}
})
).json()
if 'errors' in activation:
errs = activation['errors']
print(f'[keygen.activate_machine_for_license] license_id={license_id} machine_fingerprint={machine_fingerprint} errors={to_error_message(errs)}',
file=sys.stderr)
return None
machine_id = activation['data']['id']
print(f'[keygen.activate_machine_for_license] license_id={license_id} machine_id={machine_id} machine_fingerprint={machine_fingerprint}')
return machine_id
def deactivate_machine(machine_id):
"""
Deactivates a machine. Returns a boolean indicating success or failure.
"""
deactivation = requests.delete(
f"https://api.keygen.sh/v1/accounts/{os.environ['KEYGEN_ACCOUNT_ID']}/machines/{machine_id}",
headers={
'Authorization': f"Bearer {os.environ['KEYGEN_ACTIVATION_TOKEN']}",
'Accept': 'application/vnd.api+json'
}
)
if deactivation.status_code != 204:
data = deactivation.json()
errs = data['errors']
print(f'[keygen.deactivate_machine] machine_id={machine_id} errors={to_error_message(errs)}',
file=sys.stderr)
return False
print(f'[keygen.deactivate_machine] machine_id={machine_id}')
return True
def deactivate_machine_on_exit(machine_id):
"""
Deactivates a machine on exit signal. Exits program with exit code indicating deactivation success or failure.
"""
ok = deactivate_machine(machine_fingerprint)
if ok:
sys.exit(0)
else:
sys.exit(1)
def ping_heartbeat_for_machine(machine_id):
"""
Performs a hearbeat ping for a machine. Returns a boolean indicating success or failure.
"""
ping = requests.post(
f"https://api.keygen.sh/v1/accounts/{os.environ['KEYGEN_ACCOUNT_ID']}/machines/{machine_id}/actions/ping-heartbeat",
headers={
'Authorization': f"Bearer {os.environ['KEYGEN_ACTIVATION_TOKEN']}",
'Accept': 'application/vnd.api+json'
}
).json()
if 'errors' in ping:
errs = ping['errors']
print(f'[keygen.ping_heartbeat_for_machine] machine_id={machine_id} errors={to_error_message(errs)}',
file=sys.stderr)
return False
print(f'[keygen.ping_heartbeat_for_machine] machine_id={machine_id}')
return True
def maintain_hearbeat_for_machine(machine_id):
"""
Performs minutely hearbeat pings for a machine on a loop.
"""
timer = threading.Timer(60.0, lambda: maintain_hearbeat_for_machine(machine_id))
ok = ping_heartbeat_for_machine(machine_id)
if not ok:
sys.exit(1)
timer.start()
# Fingerprint the current device and get the license key
machine_fingerprint = machineid.hashed_id('example-app')
license_key = sys.argv[1]
# Validate the license key scoped to the current machine fingerprint
validation_code, license_id = validate_license_key_with_fingerprint(license_key, machine_fingerprint)
if validation_code == 'NOT_FOUND':
sys.exit(1)
# Attempt to activate the machine if it's not already activated
activation_is_required = validation_code == 'NO_MACHINE' or \
validation_code == 'NO_MACHINES' or \
validation_code == 'FINGERPRINT_SCOPE_MISMATCH'
if activation_is_required:
machine_id = activate_machine_for_license(license_id, machine_fingerprint)
if machine_id == None:
sys.exit(1)
# Attempt to deactivate machine on process exit
signal.signal(signal.SIGINT, lambda _s, _f: deactivate_machine_on_exit(machine_fingerprint))
# Start a heartbeat ping loop
maintain_hearbeat_for_machine(machine_fingerprint)