forked from lnbits/bleskomat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers.py
153 lines (128 loc) · 4.88 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import base64
import hashlib
import hmac
from http import HTTPStatus
from typing import Dict
from urllib import parse
from fastapi import Request
def generate_bleskomat_lnurl_hash(secret: str):
m = hashlib.sha256()
m.update(f"{secret}".encode())
return m.hexdigest()
def generate_bleskomat_lnurl_signature(
payload: str, api_key_secret: str, api_key_encoding: str = "hex"
):
if api_key_encoding == "hex":
key = bytes.fromhex(api_key_secret)
elif api_key_encoding == "base64":
key = base64.b64decode(api_key_secret)
else:
key = bytes.fromhex(api_key_secret)
return hmac.new(key=key, msg=payload.encode(), digestmod=hashlib.sha256).hexdigest()
def generate_bleskomat_lnurl_secret(api_key_id: str, signature: str):
# The secret is not randomly generated by the server.
# Instead it is the hash of the API key ID and signature concatenated together.
m = hashlib.sha256()
m.update(f"{api_key_id}-{signature}".encode())
return m.hexdigest()
def get_callback_url(req: Request):
return str(req.url_for("bleskomat.api_bleskomat_lnurl"))
def is_supported_lnurl_subprotocol(tag: str) -> bool:
return tag == "withdrawRequest"
class LnurlHttpError(Exception):
def __init__(
self,
message: str = "",
http_status: HTTPStatus = HTTPStatus.INTERNAL_SERVER_ERROR,
):
self.message = message
self.http_status = http_status
super().__init__(self.message)
class LnurlValidationError(Exception):
pass
def prepare_lnurl_params(tag: str, query: dict) -> dict:
params: dict = {}
if not is_supported_lnurl_subprotocol(tag):
raise LnurlValidationError(f'Unsupported subprotocol: "{tag}"')
if tag == "withdrawRequest":
params["minWithdrawable"] = float(query["minWithdrawable"])
params["maxWithdrawable"] = float(query["maxWithdrawable"])
params["defaultDescription"] = query["defaultDescription"]
if not params["minWithdrawable"] > 0:
raise LnurlValidationError('"minWithdrawable" must be greater than zero')
if not params["maxWithdrawable"] >= params["minWithdrawable"]:
raise LnurlValidationError(
'"maxWithdrawable" must be greater than or equal to "minWithdrawable"'
)
return params
encode_uri_component_safe_chars = (
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.!~*'()"
)
def query_to_signing_payload(query: Dict[str, str]) -> str:
# Sort the query by key, then stringify it to create the payload.
sorted_keys = sorted(query.keys(), key=str.lower)
payload = []
for key in sorted_keys:
if not key == "signature":
encoded_key = parse.quote(key, safe=encode_uri_component_safe_chars)
encoded_value = parse.quote(
query[key], safe=encode_uri_component_safe_chars
)
payload.append(f"{encoded_key}={encoded_value}")
return "&".join(payload)
unshorten_rules: dict[str, dict] = {
"query": {"n": "nonce", "s": "signature", "t": "tag"},
"tags": {
"c": "channelRequest",
"l": "login",
"p": "payRequest",
"w": "withdrawRequest",
},
"params": {
"channelRequest": {"pl": "localAmt", "pp": "pushAmt"},
"login": {},
"payRequest": {"pn": "minSendable", "px": "maxSendable", "pm": "metadata"},
"withdrawRequest": {
"pn": "minWithdrawable",
"px": "maxWithdrawable",
"pd": "defaultDescription",
},
},
}
def unshorten_lnurl_query(query: dict) -> Dict[str, str]:
new_query = {}
rules = unshorten_rules
if "tag" in query:
tag = query["tag"]
elif "t" in query:
tag = query["t"]
else:
raise LnurlValidationError('Missing required query parameter: "tag"')
# Unshorten tag:
if tag in rules["tags"]:
long_tag = rules["tags"][tag]
new_query["tag"] = long_tag
tag = long_tag
if tag not in rules["params"]:
raise LnurlValidationError(f'Unknown tag: "{tag}"')
for key in query:
if key in rules["params"][str(tag)]:
short_param_key = key
long_param_key = rules["params"][str(tag)][short_param_key]
if short_param_key in query:
new_query[long_param_key] = query[short_param_key]
else:
new_query[long_param_key] = query[long_param_key]
elif key in rules["query"]:
# Unshorten general keys:
short_key = key
long_key = rules["query"][short_key]
if long_key not in new_query:
if short_key in query:
new_query[long_key] = query[short_key]
else:
new_query[long_key] = query[str(long_key)]
else:
# Keep unknown key/value pairs unchanged:
new_query[key] = query[key]
return new_query